In [0]:
#Put your access key, secret_key and resion below
access_key = ''
secret_key = ''
aws_region = ''

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", f"s3.{aws_region}.amazonaws.com")

In [0]:
raw_data_path = "s3://cricket-data-pipeline/raw_data/to_processed/"
raw_df = spark.read.json(raw_data_path)


In [0]:
from pyspark.sql.functions import col, to_timestamp, date_format, current_timestamp, to_date, col, split
from pyspark.sql.types import DecimalType



In [0]:
def summary():
    summary_df = raw_df.select(
        date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss').alias("Timestamp"),
        col("match_details.series").alias("Series"),
        col("match_details.series_id").alias("Series_Id"),
        col("match_details.series_type").alias("Series_Type"),
        col("match_details.venue").alias("Venue"),
        col("match_details.venue_id").alias("Venue_Id"),
        col("match_details.matchs").alias("Match"),
        col("match_details.match_id").alias("Match_Id"),
        to_timestamp(col("match_details.date_time"), 'yyyy-MM-dd HH:mm:ss').alias("Match_Timestamp"),
        to_date(col("Match_Timestamp")).alias("Match_Date"),
        col("match_details.match_time").alias("Match_Time"),
        col("match_details.match_type").alias("Match_Type"),
        col("match_details.toss").alias("Toss"),
        col("match_details.balling_team").cast("long").alias("Balling_Team_Id"),
        col("match_details.batting_team").cast("long").alias("Batting_Team_Id"),
    )
    return summary_df


In [0]:
def team_a():
  team_a_df = raw_df.select(
    date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss').alias("Timestamp"),
    col("match_details.team_a_id").alias("Team_A_Id"),
    col("match_details.team_a_img").alias("Team_A_Image"),
    col("match_details.team_a").alias("Team_A_Name"),
    col("match_details.team_a_short").alias("Team_A_Shortform"),
    col("match_details.team_a_scores").alias("Team_A_Scoreboard"),
    col("match_details.team_a_over").cast(DecimalType(10, 1)).alias("Team_A_Over"),
    split(col("Team_A_Scoreboard"), "-").getItem(0).cast("long").alias("Team_A_Score"),
    split(col("Team_A_Scoreboard"), "-").getItem(1).cast("long").alias("Team_A_Wicket")
  )
  return team_a_df



In [0]:
def team_b():
  team_b_df = raw_df.select(
    date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss').alias("Timestamp"),
    col("match_details.team_b_id").alias("Team_B_Id"),
    col("match_details.team_b_img").alias("Team_B_Image"),
    col("match_details.team_b").alias("Team_B_Name"),
    col("match_details.team_b_short").alias("Team_B_Shortform"),
    col("match_details.team_b_scores").alias("Team_B_Scoreboard"),
    col("match_details.team_b_over").cast(DecimalType(10, 1)).alias("Team_B_Over"),
    split(col("Team_B_Scoreboard"), "-").getItem(0).cast("long").alias("Team_B_Score"),
    split(col("Team_B_Scoreboard"), "-").getItem(1).cast("long").alias("Team_B_Wicket")
  )
  return team_b_df

In [0]:
def status():
    status_df = raw_df.select(
        date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss').alias("Timestamp"),
        col("match_details.match_status").alias("Match_Status"),
        col("match_details.team_a_id").alias("Team_A_Id"),
        col("match_details.team_a").alias("Team_A_Name"),
        col("match_details.team_a_over").cast(DecimalType(10, 1)).alias("Team_a_Over"),
        col("match_details.team_b_id").alias("Team_B_Id"),
        col("match_details.team_b").alias("Team_B_Name"),
        col("match_details.team_b_over").cast(DecimalType(10, 1)).alias("Team_B_Over"),
        col("match_details.trail_lead").alias("Trail_Lead"),
        col("match_details.session").alias("Session"),
        col("match_details.need_run_ball").alias("Match_Status"),
        col("match_details.result").alias("Result")   
    )
    return status_df

In [0]:
def commentry():
    commentry_df = raw_df.select(
        date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss').alias("Timestamp"),
        col("match_details.team_a_over").cast(DecimalType(10, 1)).alias("Team_a_Over"),
        col("match_details.team_b_over").cast(DecimalType(10, 1)).alias("Team_b_Over"),
        col("player_details.commentary").alias("Commentry")
    )
    return commentry_df

In [0]:
def batsmen():
    batsmen_df = raw_df.select(
        date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss').alias("Timestamp"),
        col("match_details.team_a_over").cast(DecimalType(10, 1)).alias("Team_A_Over"),
        col("match_details.team_b_over").cast(DecimalType(10, 1)).alias("Team_B_Over"),
        col("player_details.0").alias("Batsmen_Name"),
        col("player_details.1").cast("long").alias("Runs"),
        col("player_details.2").cast("long").alias("Balls"),
        col("player_details.3").cast("long").alias("Four"),
        col("player_details.4").cast("long").alias("Six"),
        col("player_details.5").cast(DecimalType(10, 2)).alias("Strike_Rate")
    )
    return batsmen_df

In [0]:
def bowler():
    bowler_df = raw_df.select(
        date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss').alias("Timestamp"),
        col("match_details.team_a_over").cast(DecimalType(10, 1)).alias("Team_A_Over"),
        col("match_details.team_b_over").cast(DecimalType(10, 1)).alias("Team_B_Over"),
        col("player_details.16").alias("Bowler_Name"),
        col("player_details.17").cast(DecimalType(10, 1)).alias("Bowler_Overs"),
        col("player_details.18").cast("long").alias("Maiden"),
        col("player_details.19").cast("long").alias("Bowler_Runs"),
        col("player_details.20").cast("long").alias("Wickets"),
        col("player_details.21").cast(DecimalType(10, 2)).alias("Economy")
    )
    return bowler_df

In [0]:
Summary_data_transformed = summary()
Team_A_data_transformed = team_a()
Team_B_data_transformed = team_b()
Status_data_transformed = status()
Commentry_data_transformed = commentry()
Batsmen_data_transformed = batsmen()
Bowler_data_transformed = bowler()

In [0]:
from datetime import datetime

def write_to_s3(df, path_suffix, format_type="csv"):
    timestamp_str = datetime.now().strftime("%Y-%m-%d")
    path = f"s3://cricket-data-pipeline/transformed_data/{path_suffix}_{timestamp_str}/"

    df.write \
        .format(format_type) \
        .option("header", "true") \
        .mode("append") \
        .save(path)


In [0]:
write_to_s3(Summary_data_transformed, "summary_data/summary_transformed", format_type="csv")
write_to_s3(Commentry_data_transformed, "commentry_data/commentry_transformed", format_type="csv")
write_to_s3(Team_A_data_transformed, "team_a_data/team_a_transformed", format_type="csv")
write_to_s3(Team_B_data_transformed, "team_b_data/team_b_transformed", format_type="csv")
write_to_s3(Status_data_transformed, "status_data/status_transformed", format_type="csv")
write_to_s3(Batsmen_data_transformed, "batsmen_data/batsmen_transformed", format_type="csv")
write_to_s3(Bowler_data_transformed, "bowler_data/bowler_transformed", format_type="csv")



In [0]:
import boto3
s3_client = boto3.client('s3', 
                         aws_access_key_id=access_key, 
                         aws_secret_access_key=secret_key, 
                         region_name=aws_region)
source_bucket = 'cricket-data-pipeline'
source_prefix = 'raw_data/to_processed/'
destination_prefix = 'raw_data/processed/'
response = s3_client.list_objects_v2(Bucket=source_bucket, Prefix=source_prefix)

if 'Contents' in response:
    for file in response['Contents']:
        source_file_key = file['Key']
        destination_file_key = source_file_key.replace(source_prefix, destination_prefix)
        
        try:
            s3_client.copy_object(
                Bucket=source_bucket,
                CopySource={'Bucket': source_bucket, 'Key': source_file_key},
                Key=destination_file_key
            )
            print(f"Successfully copied: {source_file_key} to {destination_file_key}")
    
            s3_client.delete_object(Bucket=source_bucket, Key=source_file_key)
            print(f"Successfully deleted: {source_file_key}")
            
        except Exception as e:
            print(f"Error moving {source_file_key}: {e}")
else:
    print("No files found in the source folder.")


Successfully copied: raw_data/to_processed/cricket_data_raw_2024-11-08_09-26-36.json to raw_data/processed/cricket_data_raw_2024-11-08_09-26-36.json
Successfully deleted: raw_data/to_processed/cricket_data_raw_2024-11-08_09-26-36.json
Successfully copied: raw_data/to_processed/cricket_data_raw_2024-11-08_09-26-45.json to raw_data/processed/cricket_data_raw_2024-11-08_09-26-45.json
Successfully deleted: raw_data/to_processed/cricket_data_raw_2024-11-08_09-26-45.json
