# Notebook to run lake ingestion from File(s3), RDS(MySql), NoSQL(DynamoDB)
### This notebook will load lake tables player_info, matches, and deliveries

## Assumptions
- Ideally we should be doing incremental loads fetching new data since the last lake load using watermark, but in this example we will doing full load everytime.

In [6]:
%idle_timeout 20
%glue_version 3.0
%worker_type G.1X
%number_of_workers 2

Current idle_timeout is 2800 minutes.
idle_timeout has been set to 20 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 2


In [7]:
# Run this cell if you want to write data in delta lake format
# %%configure
# {
#     "--datalake-formats":"delta",
#     "--conf":"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
# }

####  Run this cell to set up and start your interactive session.


In [4]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)




In [5]:
import boto3

# Create a Glue client
glue_client = boto3.client('glue')




### Variables

In [6]:
####################
##### Change variables
####################
# Replace with your IAM role ARN to be used by crawler
crawler_role = 'arn:aws:iam::687003041478:role/orka-glue-role'

# Change variable for MySQL deliveries source
host = 'mydbinstance.ctf19flbptnt.us-east-1.rds.amazonaws.com'
port = 3306
user = 'admin'
password = 'MyPassword123'
mssql_database = 'ipl'
mssql_table='deliveries'

# Change variables for s3 player info source
player_info_input_file_s3_path = "s3://iplcricketinfo/input_files/Players_info.csv" # Change to the path where you have your Players_info.csv

# Change variable for dynamodb matches source
macthes_dynamodb_table_name = "MatchesTable"

# DB variables
bronze_database_name = "orka_warehouse_bronze" # Specify the lake/bronze glue catalog database name
bronze_database_loc = "s3://iplcricketinfo/datalake/bronze" # S3 location for bronze table

# Target Player info table
player_info_table_name = "player_info" # Specify player info target table name in lake
# Target Deliveries table
deliveries_table_name = "deliveries"
# Target matches table
matches_table_name = "matches"




### Utility functions

In [12]:
import boto3

# Function to read data from MYSQL database
def read_from_mysql(host, port, user, password, database, table):
    mysql_url = f"jdbc:mysql://{host}:{port}"
    mysql_properties = {
        "user": user,
        "password": password,
        "driver": "com.mysql.jdbc.Driver"
    }

    return spark.read \
        .format("jdbc") \
        .option("url", mysql_url) \
        .option("dbtable", f"{database}.{table}") \
        .options(**mysql_properties) \
        .load()

# Function to crawl data stored in lake
def crawl_tables(glue_client, df_dic, data_loc, database_name):
    for table, data_df in df_dic.items():
        crawler_name = f"{table}_crwl"
        # Specify the S3 target for the crawler
        s3_target_path = f"{data_loc}/{table}"

        # Create the crawler
        response = glue_client.create_crawler(
            Name=crawler_name,
            Role=crawler_role,
            Targets={
                'S3Targets': [
                    {
                        'Path': s3_target_path
                    }
                ]
            },
            DatabaseName= database_name,
            SchemaChangePolicy={
                'UpdateBehavior': 'UPDATE_IN_DATABASE',
                'DeleteBehavior': 'DEPRECATE_IN_DATABASE'
            }
        )

        # Start the crawler
        glue_client.start_crawler(Name=crawler_name)


def read_from_dynamodb(table_name):
    # Read data from the DynamoDB table
    dynamodb_options = {
        "dynamodb.input.tableName": table_name,
        "dynamodb.throughput.read.percent": "1.0"
    }
    dynamic_frame = glueContext.create_dynamic_frame.from_options(
        connection_type="dynamodb",
        connection_options=dynamodb_options
    )

    # Convert the dynamic frame to a Spark DataFrame and return
    return dynamic_frame.toDF()




## Create database in catalog

In [7]:
# Create the database
glue_client.create_database(
    DatabaseInput={
        'Name': database_name
    }
)

{'ResponseMetadata': {'RequestId': '7c34261c-eced-4bcd-96ed-2191951b32ee', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Sun, 09 Jul 2023 05:57:44 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': '7c34261c-eced-4bcd-96ed-2191951b32ee'}, 'RetryAttempts': 0}}


# ********************************************************************
# File ingestion example - Loading csv file in s3 to lake/silver layer
# ********************************************************************

Ingesting table from s3 into df

In [8]:
# Created my own clean header from the csv
player_info_header_clean = "index,id,name,country,full_name,birthdate,birthplace,died,date_of_death,age,major_teams,batting_style,bowling_style,other,awards,batting_tests_mat,batting_tests_inns,batting_tests_no,batting_tests_runs,batting_tests_hs,batting_tests_ave,batting_tests_bf,batting_tests_sr,batting_tests_100,batting_tests_50,batting_tests_4s,batting_tests_6s,batting_tests_ct,batting_tests_st,batting_odis_mat,batting_odis_inns,batting_odis_no,batting_odis_runs,batting_odis_hs,batting_odis_ave,batting_odis_bf,batting_odis_sr,batting_odis_100,batting_odis_50,batting_odis_4s,batting_odis_6s,batting_odis_ct,batting_odis_st,batting_t20is_mat,batting_t20is_inns,batting_t20is_no,batting_t20is_runs,batting_t20is_hs,batting_t20is_ave,batting_t20is_bf,batting_t20is_sr,batting_t20is_100,batting_t20is_50,batting_t20is_4s,batting_t20is_6s,batting_t20is_ct,batting_t20is_st,batting_first_class_mat,batting_first_class_inns,batting_first_class_no,batting_first_class_runs,batting_first_class_hs,batting_first_class_ave,batting_first_class_bf,batting_first_class_sr,batting_first_class_100,batting_first_class_50,batting_first_class_4s,batting_first_class_6s,batting_first_class_ct,batting_first_class_st,batting_list_a_mat,batting_list_a_inns,batting_list_a_no,batting_list_a_runs,batting_list_a_hs,batting_list_a_ave,batting_list_a_bf,batting_list_a_sr,batting_list_a_100,batting_list_a_50,batting_list_a_4s,batting_list_a_6s,batting_list_a_ct,batting_list_a_st,batting_t20s_mat,batting_t20s_inns,batting_t20s_no,batting_t20s_runs,batting_t20s_hs,batting_t20s_ave,batting_t20s_bf,batting_t20s_sr,batting_t20s_100,batting_t20s_50,batting_t20s_4s,batting_t20s_6s,batting_t20s_ct,batting_t20s_st,bowling_tests_mat,bowling_tests_inns,bowling_tests_balls,bowling_tests_runs,bowling_tests_wkts,bowling_tests_bbi,bowling_tests_bbm,bowling_tests_ave,bowling_tests_econ,bowling_tests_sr,bowling_tests_4w,bowling_tests_5w,bowling_tests_10,bowling_odis_mat,bowling_odis_inns,bowling_odis_balls,bowling_odis_runs,bowling_odis_wkts,bowling_odis_bbi,bowling_odis_bbm,bowling_odis_ave,bowling_odis_econ,bowling_odis_sr,bowling_odis_4w,bowling_odis_5w,bowling_odis_10,bowling_t20is_mat,bowling_t20is_inns,bowling_t20is_balls,bowling_t20is_runs,bowling_t20is_wkts,bowling_t20is_bbi,bowling_t20is_bbm,bowling_t20is_ave,bowling_t20is_econ,bowling_t20is_sr,bowling_t20is_4w,bowling_t20is_5w,bowling_t20is_10,bowling_first_class_mat,bowling_first_class_inns,bowling_first_class_balls,bowling_first_class_runs,bowling_first_class_wkts,bowling_first_class_bbi,bowling_first_class_bbm,bowling_first_class_ave,bowling_first_class_econ,bowling_first_class_sr,bowling_first_class_4w,bowling_first_class_5w,bowling_first_class_10,bowling_list_a_mat,bowling_list_a_inns,bowling_list_a_balls,bowling_list_a_runs,bowling_list_a_wkts,bowling_list_a_bbi,bowling_list_a_bbm,bowling_list_a_ave,bowling_list_a_econ,bowling_list_a_sr,bowling_list_a_4w,bowling_list_a_5w,bowling_list_a_10,bowling_t20s_mat,bowling_t20s_inns,bowling_t20s_balls,bowling_t20s_runs,bowling_t20s_wkts,bowling_t20s_bbi,bowling_t20s_bbm,bowling_t20s_ave,bowling_t20s_econ,bowling_t20s_sr,bowling_t20s_4w,bowling_t20s_5w,bowling_t20s_10"

# Read the CSV file and into a DataFrame
player_info_raw = spark.read.csv(player_info_input_file_s3_path, header=True, inferSchema=True)

# Use the custom header
player_info_raw = player_info_raw.toDF(*player_info_header_clean.split(','))

# Show the dataframe
player_info_raw.show(5)

+-----+------+----------------+-------+--------------------+----------+--------------------+-----+-------------+---+--------------------+--------------+--------------------+-----+------+-----------------+------------------+----------------+------------------+----------------+-----------------+----------------+----------------+-----------------+----------------+----------------+----------------+----------------+----------------+----------------+-----------------+---------------+-----------------+---------------+----------------+---------------+---------------+----------------+---------------+---------------+---------------+---------------+---------------+-----------------+------------------+----------------+------------------+----------------+-----------------+----------------+----------------+-----------------+----------------+----------------+----------------+----------------+----------------+-----------------------+------------------------+----------------------+---------------------

Check the schema

Loading the dataframe to lake

In [12]:
player_info_raw.write.format("parquet").mode("overwrite").save(f"{bronze_database_loc}/{player_info_table_name}")




# ********************************************************************
# Relational database example - Loading tablle from MySQL DB to lake/silver layer
# ********************************************************************

Reading deliveries from mysql

In [17]:
deliveries_df = read_from_mysql(host, port, user, password, mssql_database, mssql_table)
deliveries_df.show()

+--------+------+--------------------+--------------------+----+----+--------------+--------------+------------+-------------+---------+--------+-----------+-----------+------------+------------+----------+----------+----------------+--------------+-------+
|match_id|inning|        batting_team|        bowling_team|over|ball|       batsman|   non_striker|      bowler|is_super_over|wide_runs|bye_runs|legbye_runs|noball_runs|penalty_runs|batsman_runs|extra_runs|total_runs|player_dismissed|dismissal_kind|fielder|
+--------+------+--------------------+--------------------+----+----+--------------+--------------+------------+-------------+---------+--------+-----------+-----------+------------+------------+----------+----------+----------------+--------------+-------+
|     187|     2|Kolkata Knight Ri...|    Rajasthan Royals|  11|   6|    SC Ganguly|      BJ Hodge|  SK Trivedi|            0|        0|       0|          0|          0|           0|           0|         0|         0|         

Storing deliveries in lake

In [23]:
deliveries_df.write.format("parquet").mode("overwrite").save(f"{bronze_database_loc}/{deliveries_table_name}")




# ********************************************************************
# NOSQL ingestion example - Loading DynamoDB Table in s3 to lake/silver layer
# ********************************************************************

Loading matches from dynamodb

In [20]:
matches_df = read_from_dynamodb(macthes_dynamodb_table_name)
matches_df.show()

+-------------+--------------------+----------+----------------+--------------------+--------+------+--------------+--------------+-----------+----------+---------------+-----+----------+--------------------+--------------------+--------------------+--------------------+
|toss_decision|              winner|      city| player_of_match|             umpire3|  Season|result|       umpire2|win_by_wickets|win_by_runs|dl_applied|        umpire1|   id|      date|               team1|               team2|               venue|         toss_winner|
+-------------+--------------------+----------+----------------+--------------------+--------+------+--------------+--------------+-----------+----------+---------------+-----+----------+--------------------+--------------------+--------------------+--------------------+
|        field| Chennai Super Kings|    Mumbai|        SK Raina|                    |IPL-2011|normal|    SJA Taufel|             6|          0|         0|      Asad Rauf|  304|24-05-20

Loading matches in lake

In [21]:
matches_df.write.format("parquet").mode("overwrite").save(f"{bronze_database_loc}/{matches_table_name}")




## Creating crawlers to generate metadata for ingested lake tables

In [23]:
bronze_tables_to_load = {player_info_table_name:player_info_raw,
                        deliveries_table_name:deliveries_df,
                        matches_table_name:matches_df}

crawl_tables(glue_client, bronze_tables_to_load, bronze_database_loc, bronze_database_name)




# ********************************************************************
# Verifying lake ingestion
# ********************************************************************

In [8]:
loaded_player_info_dyf = glueContext.create_dynamic_frame.from_catalog(database=bronze_database_name, table_name=player_info_table_name)
loaded_deliveries_dyf = glueContext.create_dynamic_frame.from_catalog(database=bronze_database_name, table_name=deliveries_table_name)
loaded_matches_dyf = glueContext.create_dynamic_frame.from_catalog(database=bronze_database_name, table_name=matches_table_name)

# Convert the dynamic frame to a Spark DataFrame
lplayerinfo_df = loaded_player_info_dyf.toDF()
ldeliveries_df = loaded_deliveries_dyf.toDF()
lmatches_df = loaded_matches_dyf.toDF()




# For player info

In [10]:
lplayerinfo_df.count()

90308


In [11]:
lplayerinfo_df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- birthplace: string (nullable = true)
 |-- died: string (nullable = true)
 |-- date_of_death: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- major_teams: string (nullable = true)
 |-- batting_style: string (nullable = true)
 |-- bowling_style: string (nullable = true)
 |-- other: string (nullable = true)
 |-- awards: string (nullable = true)
 |-- batting_tests_mat: string (nullable = true)
 |-- batting_tests_inns: string (nullable = true)
 |-- batting_tests_no: string (nullable = true)
 |-- batting_tests_runs: string (nullable = true)
 |-- batting_tests_hs: string (nullable = true)
 |-- batting_tests_ave: string (nullable = true)
 |-- batting_tests_bf: string (nullable = true)
 |-- batting_tests_sr: string (nullable = true)
 |-- bat

# For deliveries

In [12]:
ldeliveries_df.count()

179078


In [13]:
ldeliveries_df.printSchema()

root
 |-- match_id: integer (nullable = true)
 |-- inning: integer (nullable = true)
 |-- batting_team: string (nullable = true)
 |-- bowling_team: string (nullable = true)
 |-- over: integer (nullable = true)
 |-- ball: integer (nullable = true)
 |-- batsman: string (nullable = true)
 |-- non_striker: string (nullable = true)
 |-- bowler: string (nullable = true)
 |-- is_super_over: integer (nullable = true)
 |-- wide_runs: integer (nullable = true)
 |-- bye_runs: integer (nullable = true)
 |-- legbye_runs: integer (nullable = true)
 |-- noball_runs: integer (nullable = true)
 |-- penalty_runs: integer (nullable = true)
 |-- batsman_runs: integer (nullable = true)
 |-- extra_runs: integer (nullable = true)
 |-- total_runs: integer (nullable = true)
 |-- player_dismissed: string (nullable = true)
 |-- dismissal_kind: string (nullable = true)
 |-- fielder: string (nullable = true)


In [11]:
# For matches

In [14]:
lmatches_df.count()

756


In [15]:
lmatches_df.printSchema()

root
 |-- toss_decision: string (nullable = true)
 |-- winner: string (nullable = true)
 |-- city: string (nullable = true)
 |-- player_of_match: string (nullable = true)
 |-- umpire3: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- result: string (nullable = true)
 |-- umpire2: string (nullable = true)
 |-- win_by_wickets: long (nullable = true)
 |-- win_by_runs: long (nullable = true)
 |-- dl_applied: long (nullable = true)
 |-- umpire1: string (nullable = true)
 |-- id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- venue: string (nullable = true)
 |-- toss_winner: string (nullable = true)
