In [10]:
import os
import shutil
import json
import zipfile
import pyspark
import logging
import pandas as pd
from database_script import get_connection
from sqlalchemy import create_engine
from sqlalchemy import text
from sqlalchemy.exc import SQLAlchemyError
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
# from selenium import webdriver
# from selenium.webdriver import chrome
# from selenium.webdriver.common.by import By
# from selenium.webdriver.chrome.service import Service
# from webdriver_manager.chrome import ChromeDriverManager
# from selenium.webdriver.common.keys import Keys
# from selenium.webdriver.chrome.options import Options
from data_ingestion_script import *
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)
logger = logging.getLogger('Data_Processing')
logger.info('Data_Processing Script started')

spark = SparkSession.builder.getOrCreate()

2024-02-26 23:06:48,592 - Data_Processing Script started


In [11]:
logger.info("Running the requirements.txt file")
!pip install -r requirements.txt
logger.info("requirements.txt runned successfully")

2024-02-26 23:06:49,065 - Running the requirements.txt file
2024-02-26 23:06:58,230 - requirements.txt runned successfully





[notice] A new release of pip is available: 23.0.1 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip


In [12]:
def start_ingesting_data(origin, target_directory):
    try:
        # Data Ingestion  to correct path
        origin = origin
        target_directory = target_directory
        download_required_files(logger)
        logger.info("Ingestion of Data Completed successfully")

        file_in_origin = os.listdir(origin)
        
        while ("odis_female_json.zip" not in file_in_origin) and ("odis_male_json.zip" not in file_in_origin):
            file_in_origin = os.listdir(origin)
            if ("odis_female_json.zip" in file_in_origin) and ("odis_male_json.zip" in file_in_origin):
                logger.info("waiting for file to get downloaded")
                break
        if ('LANDING' in os.listdir(f'{target_directory}')) and ('DOWNLOAD_PATH' in os.listdir(f'{target_directory}')):
            logger.info("LANDING PATH IS THERE")
            logger.info("DOWNLOAD PATH IS THERE")
        else :
            os.makedirs(f'{target_directory}LANDING')
            logger.info("LANDING path created successfully")
            os.makedirs(f'{target_directory}DOWNLOAD_PATH')
            logger.info("DOWNLOAD_PATH created created successfully")

        files_in_target_directory = os.listdir(target_directory+'DOWNLOAD_PATH')
        needed_files = ['odis_female_json.zip', "odis_male_json.zip"]
        file_in_origin = os.listdir(origin)
        for file in file_in_origin:
            if (file.startswith("odis_female_json") or file.startswith("odis_male_json")) and (('odis_female_json.json' not in files_in_target_directory) and ('odis_male_json.json' not in files_in_target_directory)):
                shutil.copy(origin+file, target_directory+'DOWNLOAD_PATH')
        logger.info("Data Loaded to Download path Successfully")

        #extracting all the files in landing folder
        for file_name in needed_files:
            if (file_name == "odis_female_json.zip") or (file_name == "odis_male_json.zip"):
              logger.info(f"Started Extraction of file {file_name}")
              with zipfile.ZipFile(f'{target_directory}/DOWNLOAD_PATH/{file_name}') as f:
                      # f.extractall()/
                      f.extractall(f'{target_directory}LANDING/')
        logger.info("Data Extraction Completed Successfully")
        
    except Exception as e:
        logging.error("Exception occurred", exc_info=True)

origin = 'C:/Users/Admin/Downloads/'
target_directory = 'C:/Users/Admin/Downloads/procesing/Batch_pipeline_DataEngineering_task/'
start_ingesting_data(origin, target_directory)



2024-02-26 23:07:00,127 - Get LATEST chromedriver version for google-chrome
2024-02-26 23:07:00,470 - Get LATEST chromedriver version for google-chrome
2024-02-26 23:07:00,642 - Driver [C:\Users\Admin\.wdm\drivers\chromedriver\win64\122.0.6261.69\chromedriver-win32/chromedriver.exe] found in cache
2024-02-26 23:07:18,582 - Download of Male_Dataset Started
2024-02-26 23:07:21,046 - Download of Female_Dataset Started
2024-02-26 23:07:23,091 - Ingestion of Data Completed successfully
2024-02-26 23:07:23,096 - LANDING PATH IS THERE
2024-02-26 23:07:23,099 - DOWNLOAD PATH IS THERE
2024-02-26 23:07:23,607 - Data Loaded to Download path Successfully
2024-02-26 23:07:23,610 - Started Extraction of file odis_female_json.zip
2024-02-26 23:07:25,345 - Started Extraction of file odis_male_json.zip
2024-02-26 23:07:34,685 - Data Extraction Completed Successfully


In [13]:
#Necessay Function
def get_batter_bowler_striker(column, column_need):
  try:
    if column_need.strip() == 'batter':
      return column['batter']
 
    elif column_need.strip() == 'bowler':
      return column['bowler']

    elif column_need.strip() == 'non_striker':
      return column['non_striker']
  except Exception as e:
    logging.error("Exception occurred", exc_info=True)



#Necessay Function
def get_run_ball_by_ball(column):
  try:

    runs_list = [-1]*3
    for scores in column.keys():
      if scores == 'runs':
        return column[scores]
      
  except Exception as e:
    logging.error("Exception occurred", exc_info=True)


def get_info_and_meta_data(all_information):
  try:
      for data in all_information.keys():
        if data == 'meta':
          meta_information = all_information['meta']
        elif data == 'info':
          info_information = all_information["info"]
      return meta_information, info_information
  except Exception as e:
    logging.error("Exception occurred", exc_info=True)

def get_necessary_information(data):
  try:
      date = data['dates'][0]

      if "city" in data.keys():
        city = data['city']
      else:
        city = "NULL"
      if 'event' in data['event'].keys():
        event_name = data['event']['name']
        if 'match_number' in data['event'].keys():
          match_number = data['event']['match_number']
        else:
          match_number = "NULL"
      else:
        event_name = "NULL"
        match_number = "NULL"

      gender = data['gender']

      if "winner" in data['outcome'].keys():

        winner_team = data['outcome']['winner']
      elif 'result' in data['outcome'].keys():

        winner_team = data['outcome']['result']

      if "winner" in data['outcome'].keys():
        if 'wickets' in data['outcome']['by'].keys():
          winned_by = f"{data['outcome']['by']['wickets']} wickets"
        elif 'runs' in data['outcome']['by'].keys():
          winned_by = f"{data['outcome']['by']['runs']} runs"
      else:
        winned_by = "NULL"
      team_1 = data['teams'][0]
      team_2 = data['teams'][1]
      return date, city, event_name, match_number, gender, winner_team, winned_by, team_1, team_2
  except Exception as e:
    logging.error("Exception occurred", exc_info=True)

#UDF created to get the run scored ball-by-ball
get_run_ball_by_ball_udf =  udf(lambda column: get_run_ball_by_ball(column), StringType())
#UDF created to get the batter name, striker name, bowler name
get_batter_bowler_striker_udf =  udf(lambda column, column_need: get_batter_bowler_striker(column, column_need), StringType())

In [17]:
try:
  path = target_directory+'LANDING/'
  all_the_files = os.listdir(path)
  # print(all_the_files)
  necessary_columns = []
  for i in all_the_files:
    # print(f"/female_dataset/{i}")
    # print(i)
    if i.endswith(".json"):
      with open(f"{path}/{i}","r") as file_obj:
        file_content = file_obj.read()
        # print(file_content)
        details = json.loads(file_content)
        meta_information, info_information = get_info_and_meta_data(details)
        date, city, event_name, match_number, gender, winner_team, winned_by, team_1, team_2 = get_necessary_information(info_information)
        necessary_columns = []
        temp_dict = {}
        for i in range(len(details['innings'][0]['overs'])):
          temp_dict['overs'] = i
          temp_dict['balls_per_over'] = details['innings'][0]['overs'][i]['deliveries']
          # print(student_details['innings'][0]['overs'][i]['deliveries'])
          necessary_columns.append(temp_dict)
          temp_dict = {}
        dataframe = spark.createDataFrame(necessary_columns)
        # dataframe.show(1000, False)
        dataframe = dataframe.select("overs", posexplode_outer(dataframe.balls_per_over))

        dataframe_new = dataframe.withColumn("BATTER", get_batter_bowler_striker_udf(col("col"), lit("batter")))
        dataframe_new = dataframe_new.withColumn("BOWLER", get_batter_bowler_striker_udf(col("col"), lit("bowler")))
        dataframe_new = dataframe_new.withColumn("NON_STRIKER", get_batter_bowler_striker_udf(col("col"), lit("non_striker")))
        dataframe_new = dataframe_new.withColumn("runs_scored_per_ball", get_run_ball_by_ball_udf(col("col")))
        dataframe_new = dataframe_new.select('*', lit(date).alias("MATCH_DATE"), lit(city).alias("MATCH_CITY"),\
                                            lit(event_name).alias("EVENT_NAME"),lit(match_number).alias("MATCH_NUMBER"),lit(gender).alias("GENDER"),\
                                            lit(winner_team).alias("WINNER_TEAM"),lit(winned_by).alias("WINNED_BY"),lit(team_1).alias("FIRST_TEAM"),lit(team_2).alias("SECOND_TEAM"))

        dataframe_new = dataframe_new.withColumn("runs_scored_per_ball", regexp_replace(col("runs_scored_per_ball"), "(\{extras=)|(total=)|(batter=)|(\})", "")).withColumn("EXTRAS_EARNED_PER_BALL", trim(split(col("runs_scored_per_ball"), ',').getItem(0))).withColumn("TOTAL_RUNS_PER_BALL", trim(split(col("runs_scored_per_ball"), ',').getItem(1))).withColumn("BATTER_SCORED_RUNS_PER_BALL", trim(split(col("runs_scored_per_ball"), ',').getItem(2)))
        dataframe_new = dataframe_new.select('overs','FIRST_TEAM', 'SECOND_TEAM','EVENT_NAME','MATCH_DATE','MATCH_CITY','MATCH_NUMBER','GENDER','WINNER_TEAM','WINNED_BY','BATTER','BOWLER','NON_STRIKER','BATTER_SCORED_RUNS_PER_BALL','TOTAL_RUNS_PER_BALL', 'EXTRAS_EARNED_PER_BALL')
        # dataframe_new.write.parquet("/output_folder/")\
        # dataframe_new.write.mode("append").format("parquet").save("/output_folder/")
        dataframe_new.show(10, False)
        # dataframe_new = dataframe_new.toPandas()
        # dataframe_new.to_sql(con = my_conn, name = 'male', if_exists='replace')
        # dataframe_new.show(10, Fals
        # result = my_conn.execute(text("select * from male where WINNED_BY NOT like '%runs%'"))
        # print(result.all())
        print(f"{path}/{i}")
        break
except Exception as e:
  logging.error("Exception occurred", exc_info=True)
   


+-----+----------+-----------+----------+----------+----------+------------+------+-----------+---------+---------+---------------+-----------+---------------------------+-------------------+----------------------+
|overs|FIRST_TEAM|SECOND_TEAM|EVENT_NAME|MATCH_DATE|MATCH_CITY|MATCH_NUMBER|GENDER|WINNER_TEAM|WINNED_BY|BATTER   |BOWLER         |NON_STRIKER|BATTER_SCORED_RUNS_PER_BALL|TOTAL_RUNS_PER_BALL|EXTRAS_EARNED_PER_BALL|
+-----+----------+-----------+----------+----------+----------+------------+------+-----------+---------+---------+---------------+-----------+---------------------------+-------------------+----------------------+
|0    |Australia |Pakistan   |NULL      |2017-01-13|Brisbane  |NULL        |male  |Australia  |92 runs  |DA Warner|Mohammad Amir  |TM Head    |0                          |extras=0           |{0                    |
|0    |Australia |Pakistan   |NULL      |2017-01-13|Brisbane  |NULL        |male  |Australia  |92 runs  |DA Warner|Mohammad Amir  |TM Head  

In [15]:
print(i)

49


In [16]:
conn = get_connection()
print(conn)
create_table = "CREATE TABLE IF NOT EXISTS ODI_CRICKET_RESULT(overs INTEGER NOT NULL, FIRST_TEAM VARCHAR(30), SECOND_TEAM VARCHAR(30), EVENT_NAME VARCHAR(60), MATCH_DATE DATE , MATCH_CITY VARCHAR(50), MATCH_NUMBER INTEGER, GENDER VARCHAR(50), WINNER_TEAM VARCHAR(50), WINNED_BY VARCHAR(50), BATTER VARCHAR(50), BOWLER VARCHAR(50), NON_STRIKER VARCHAR(50), BATTER_SCORED_RUNS_PER_BALL VARCHAR(50), TOTAL_RUNS_PER_BALL VARCHAR(50), EXTRAS_EARNED_PER_BALL VARCHAR(50))"
# my_conn = my_conn.connect()
# result = my_conn.execute(text(create_table))

TypeError: get_connection() missing 2 required positional arguments: 'logger' and 'path_of_database'

In [None]:
result = conn.execute(text(create_table))


In [None]:
result = conn.execute(text("select name from sqlite_master"))
for tables in result.all():
    if tables[0] != "ODI_CRICKET_RESULT":
        create_table = "CREATE TABLE IF NOT EXISTS ODI_CRICKET_RESULT(overs INTEGER NOT NULL, \
        FIRST_TEAM VARCHAR(30), SECOND_TEAM VARCHAR(30), EVENT_NAME VARCHAR(60), MATCH_DATE DATE , \
        MATCH_CITY VARCHAR(50), MATCH_NUMBER INTEGER, GENDER VARCHAR(50), WINNER_TEAM VARCHAR(50), WINNED_BY VARCHAR(50),\
        BATTER VARCHAR(50), BOWLER VARCHAR(50), NON_STRIKER VARCHAR(50), BATTER_SCORED_RUNS_PER_BALL VARCHAR(50), TOTAL_RUNS_PER_BALL VARCHAR(50), EXTRAS_EARNED_PER_BALL VARCHAR(50))"
        conn.execute(text(create_table))
        logger.info("ODI_CRICKET_RESULT table created successfully")



ODI_CRICKET_RESULT


In [None]:
# DROP TABLE table_name;
result = conn.execute(text("DROP TABLE male"))
# print(print(result.all()))

ResourceClosedError: This result object does not return rows. It has been closed automatically.

In [None]:
working_directory = os.getcwd()
print(working_directory)

c:\Users\Admin\Downloads\procesing\Batch_pipeline_DataEngineering_task


In [None]:
conn = get_connection(logger, working_directory)
print(conn)

NameError: name 'conn' is not defined

In [None]:
and (('odis_female_json.json' not in files_in_target_directory) and ('odis_male_json.json' not in files_in_target_directory))

In [None]:
path = ("sqlite:///c:/Users/Admin/Downloads/procesing/Batch_pipeline_DataEngineering_task/DATABASE/cricket.db")
my_conn = create_engine(path)

In [None]:
result = my_conn.execute(text("select name from sqlite_master"))
print(result.all())

AttributeError: 'Engine' object has no attribute 'execute'