# Importing Libraries

In [1]:
import requests
import io
import zipfile
import re
import pandas as pd
import numpy as np
import json
import modules.psql as psql
from sqlalchemy import types as altypes

# Postgres Configuration

In [2]:
%run config_psql.ipynb

# Settings Configuration

In [3]:
# Settings configurations

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

# Initializing parameters

In [4]:
url = "https://cricsheet.org/downloads/recently_played_30_json.zip"
filetype = ".json"

df_meta = pd.DataFrame()
df_match = pd.DataFrame()
df_official = pd.DataFrame()
df_registry = pd.DataFrame()
df_player = pd.DataFrame()
df_innings = pd.DataFrame()
df_deliveries = pd.DataFrame()
df_powerplay = pd.DataFrame()
df_absent_hurt = pd.DataFrame()
df_miscounted_overs = pd.DataFrame()

# Read the downloaded data

In [5]:
response = requests.get(url)

if response.status_code == 200:
    content = response.content
    
    zip_file = zipfile.ZipFile(io.BytesIO(content))
    
    with zip_file.open('README.txt') as f:
        lines = [line.decode('utf-8') for line in f.readlines()]
        pattern = re.compile(r'(\d{4}-\d{2}-\d{2}) - ([^-]+) - ([^-]+) - (\w+) - (\d+) - (.+)')
        ids = [match.group(5) for line in lines if (match := pattern.match(line))]
    f.close()

# Building indivudal DataFrames for different tables

In [6]:
print(len(ids), " files present")
#for file in ids[-40:]:

for file in ids:
    with zip_file.open(file+filetype) as jsonfile:
        data = json.load(jsonfile)
        # -----------------------------
        # DataFrame to store - Metadata
        df_meta = pd.concat([df_meta, pd.DataFrame([data["meta"]]).assign(filename=file, filetype=filetype)])
        
        # ----------------------------------
        # DataFrame to store - match details
        df_info = pd.DataFrame([data["info"]])
        df_match_temp = pd.concat([
            pd.json_normalize(df_info['event'], sep='_').add_prefix('event_'),
            pd.DataFrame(df_info[list(set(['balls_per_over','season', 'gender', 'city', 'venue', 'match_type', 'match_type_number', 'overs', 'team_type']) & set(df_info.columns))]),
            df_info['dates'].apply(lambda x: [x[0], x[-1]]).apply(pd.Series).rename(columns={0: 'start_date', 1: 'end_date'}),
            df_info['teams'].apply(lambda x: [x[0], x[1]]).apply(pd.Series).rename(columns={0: 'team_host', 1: 'team_visitor'}),
            pd.json_normalize(df_info['toss'], sep='_').add_prefix('toss_'),
            pd.json_normalize(df_info['outcome'], sep='_').add_prefix('outcome_')
        ], axis=1).assign(match_id = file)
        if 'player_of_match' in df_info.columns:
            df_match_temp['player_of_match'] = df_info['player_of_match'].apply(lambda x: ','.join(x))
            
        df_match = pd.concat([df_match, df_match_temp])
        
        # -----------------------------------
        # DataFrame to store official details
        df_umpire = pd.json_normalize(df_info['officials'], sep = '_')
        umpire_set = set()
        
        for column in df_umpire.columns:
            umpire_set.update(df_umpire[column].explode().dropna())

        df_umpire2 = pd.DataFrame(index=list(umpire_set), columns=df_umpire.columns).fillna(False)
        
        for column in df_umpire.columns:
            df_umpire2[column] = df_umpire2.index.isin(df_umpire[column].explode().dropna())
            
        df_umpire2 = df_umpire2.reset_index().rename(columns={'index': 'name'}).assign(match_id = file)     
        df_official = pd.concat([df_official, df_umpire2])

        # -------------------------------------
        # DataFrame to store - registry details
        df_registry = pd.concat([
            df_registry,
            pd.DataFrame(list(data["info"]["registry"]["people"].items()), columns=['people', 'identifier']).assign(match_id = file)
        ])
        
        # -----------------------------------------
        # DataFrame to store - match player details
        df_player = pd.concat([
            df_player,
            pd.json_normalize(df_info['players']).melt(var_name='team', value_name='player').explode('player').assign(match_id = file)
        ])
        
        # -------------------------------------------------
        # DataFrame to store - innings details
        df_innings = pd.concat([df_innings,
                                pd.json_normalize(data['innings'], sep = '_').drop('overs', axis = 1).assign(match_id = file)])
        
        # -------------------------------------------------
        # DataFrame to store - deliveries ball by ball
        for i in data['innings']:
            index = data['innings'].index(i)
            df_deliveries = pd.concat([df_deliveries,
                                        pd.json_normalize(i,
                                                          record_path=['overs', 'deliveries'],
                                                          meta=['team', ['overs', 'over']],
                                                          sep='_'
                                                         )
                                        .assign(match_id=file, inning = index+1)
                                       ])
        
        # --------------------------------------
        # DataFrame to store - powerplay details
        for i in data['innings']:
            if 'powerplays' in pd.json_normalize(i).columns:
                index = data['innings'].index(i)
                df_powerplay = pd.concat([df_powerplay, pd.json_normalize(data['innings'][index], record_path = ['powerplays'], meta = ['team'], sep = '_').assign(match_id = file)])

            if 'absent_hurt' in pd.json_normalize(i).columns:
                index = data['innings'].index(i)
                df_absent_hurt = pd.concat([
                    df_absent_hurt,
                    pd.json_normalize(data['innings'][index])[['team','absent_hurt']].explode('absent_hurt').assign(match_id = file)
                ])                

        df_miscounted_overs = pd.concat([df_miscounted_overs,pd.DataFrame([
            {
                "team": inning.get("team", ""),
                "miscounted_over": over_number,
                "balls": over_data.get("balls", ""),
                "umpire": over_data.get("umpire", "")
            }
            for inning in data.get("innings", [])
            for over_number, over_data in inning.get("miscounted_overs", {}).items()
        ]).assign(match_id = file)])
        
    print(file + " executed!")

172  files present
1406911 executed!
1418188 executed!
1419511 executed!
1375848 executed!
1392681 executed!
1406909 executed!
1406910 executed!
1418186 executed!
1418187 executed!
1419510 executed!
1375872 executed!
1391789 executed!
1392679 executed!
1392680 executed!
1406907 executed!
1406908 executed!
1412292 executed!
1412293 executed!
1418184 executed!
1418185 executed!
1375847 executed!
1389400 executed!
1392678 executed!
1406906 executed!
1412290 executed!
1412291 executed!
1415987 executed!
1416073 executed!
1418182 executed!
1418183 executed!
1392677 executed!
1406905 executed!
1415986 executed!
1418181 executed!
1392676 executed!
1406904 executed!
1375871 executed!
1392675 executed!
1406903 executed!
1412288 executed!
1412289 executed!
1415985 executed!
1392674 executed!
1406902 executed!
1412286 executed!
1412287 executed!
1375870 executed!
1392673 executed!
1406900 executed!
1406901 executed!
1409505 executed!
1409537 executed!
1415984 executed!
1418177 executed!
1375869 e

## Adding/Modifying additional fields

In [7]:
match_id_list = ", ".join([f"'{match_id}'" for match_id in ids])

df_meta['created'] = pd.to_datetime(df_meta['created'])

# Merging registry details into match-player details
df_player.reset_index(inplace = True, drop = True)
df_registry.reset_index(inplace = True, drop = True)
df_player.rename(columns = {'player':'name'}, inplace = True)
df_player['player_id'] = df_player.merge(df_registry, how='left', left_on=['match_id', 'name'], right_on=['match_id', 'people'])['identifier']

df_official.reset_index(inplace = True, drop = True)
df_official['official_id'] = df_official.merge(df_registry, how='left', left_on=['match_id','name'], right_on=['match_id','people'])['identifier']

df_miscounted_overs.reset_index(inplace = True, drop = True)

df_innings.drop(['powerplays','absent_hurt'], axis=1, inplace=True, errors='ignore')
df_innings.drop(df_innings.filter(like='miscounted_overs_').columns, axis=1, inplace=True, errors='ignore')

if not df_absent_hurt.empty:
    df_absent_hurt.reset_index(inplace = True, drop = True)
    df_absent_hurt.rename(columns = {'absent_hurt':'name'}, inplace = True)
    df_absent_hurt['player_id'] = df_absent_hurt.merge(df_registry, how='left', left_on=['match_id', 'name'], right_on=['match_id', 'people'])['identifier']
    
df_deliveries.reset_index(inplace = True, drop = True)
df_deliveries.reset_index(inplace = True)

## Load data into Database

#### 1. Metadata

In [8]:
# Upsert MetaData information
query = psql.upsert(
    engine,
    dataFrame = df_meta,
    table = "meta",
    schema = "dwh",
    pk_col = list(df_meta.columns),
    update_col = list(df_meta.columns))

#### 2. Officials(umpires)

In [9]:
# Load official(umpires) information
with engine.connect() as conn:
    conn.execute(f"DELETE FROM dwh.official WHERE match_id IN ({match_id_list})")
    
count_rows = df_official.to_sql('official', schema='dwh', con=engine, if_exists='append', method='multi', index=False)

with engine.connect() as conn:
    conn.execute("""
        UPDATE dwh.official OF
        SET is_registered = FALSE
        FROM dwh.people P
        WHERE OF.name = P.identifier AND P.identifier IS NULL;
    """)
    
    conn.execute("""
        UPDATE dwh.official off
        SET official_id_num = p.id
        FROM dwh.people p
        WHERE off.official_id = p.identifier
        AND official_id_num IS NULL;
    """)    

#### 3. Player-match (players who played a particular match) & Team

In [10]:
# Load match_player information into Stage table
with engine.connect() as conn:
    conn.execute("TRUNCATE TABLE stg.match_player")

count_rows = df_player.to_sql('match_player', schema = 'stg', con = engine, if_exists='append', method = 'multi', index = False)

In [11]:
# Load match information into dwh layer
with engine.connect() as conn:
    conn.execution_options(isolation_level = "AUTOCOMMIT")
    with conn.begin():
        conn.execute("CALL dwh.LoadMatchPlayerAndTeam()")

#### 4. Match details

In [12]:
# Load match information into Stage table
with engine.connect() as conn:
    conn.execute("TRUNCATE TABLE stg.match")

count_rows = df_match.to_sql('match', schema = 'stg', con = engine, if_exists='append', method = 'multi', index = False)

In [89]:
# Load match information into dwh layer
with engine.connect() as conn:
    conn.execution_options(isolation_level = "AUTOCOMMIT")
    with conn.begin():
        conn.execute("CALL dwh.LoadMatch()")

#### 5. absent hurt details

In [90]:
query = psql.insert_without_duplicate(
    engine,
    dataFrame = df_absent_hurt,
    table = "absent_hurt",
    schema = "dwh",
    conflict_col = list(df_absent_hurt.columns))

#### 6. miscounted overs

In [91]:
with engine.connect() as conn:
    conn.execute(f"DELETE FROM dwh.miscounted_over WHERE match_id IN ({match_id_list})")
    
count_rows = df_miscounted_overs.to_sql('miscounted_over', schema='dwh', con=engine, if_exists='append', method='multi', index=False)

with engine.connect() as conn:
    conn.execute("""
        UPDATE dwh.miscounted_over MO
        SET umpire = NULL
        WHERE umpire = ''
    """)
    
    conn.execute("""
        UPDATE dwh.miscounted_over MO
        SET umpire_id_num = official_id_num
        FROM dwh.official OFF
        WHERE MO.match_id = OFF.match_id AND MO.umpire = OFF.name AND MO.umpire_id_num IS NULL;
    """)

#### 7. innings

In [92]:
with engine.connect() as conn:
    conn.execute("TRUNCATE TABLE stg.inning")
    
df_innings.to_sql('inning', schema = 'stg', con = engine, if_exists='append', method = 'multi', index = False)

264

In [93]:
with engine.connect() as conn:
    conn.execution_options(isolation_level = "AUTOCOMMIT")
    with conn.begin():
        conn.execute("CALL dwh.LoadInning()") 

#### 8. powerplay

In [94]:
with engine.connect() as conn:
    conn.execute("TRUNCATE TABLE stg.powerplay")

df_powerplay.to_sql('powerplay', schema = 'stg', con = engine, if_exists='append', method = 'multi', index = False)

318

In [95]:
with engine.connect() as conn:
    conn.execution_options(isolation_level = "AUTOCOMMIT")
    with conn.begin():
        conn.execute("CALL dwh.LoadPowerplay()") 

#### 9. delivery

In [96]:
with engine.connect() as conn:
    conn.execute("TRUNCATE TABLE stg.delivery")

df_deliveries.to_sql('delivery', 
                     schema = 'stg', 
                     con = engine, 
                     if_exists='append', 
                     method = 'multi', 
                     dtype = {
                                 "wickets":altypes.JSON(none_as_null=True),
                                 "replacements_match":altypes.JSON(none_as_null=True),
                                 "replacements_role":altypes.JSON(none_as_null=True)
                             },
                     index = False,
                     chunksize = 5000)

## 692 records per second | chunksize = 10k
# 1076 records per second | chunksize = 2k
# 1050 records per second | chunksize = 1024
# 1015 records per second | chunksize = 1k
# 1005 records per second | chunksize = 750

41684

In [97]:
with engine.connect() as conn:
    conn.execution_options(isolation_level = "AUTOCOMMIT")
    with conn.begin():
        conn.execute("CALL dwh.LoadDelivery()")