## Data Transformation Level 2


##### Call all the associated files from Silver Container

In [0]:
df_bowling_silver = spark.read.format("delta").load("/mnt/saprojectone/silver-project1/bowling",header="True")

In [0]:
df_bating_silver = spark.read.format("delta").load("/mnt/saprojectone/silver-project1/bating",header="True")

In [0]:
df_tour_silver = spark.read.format("delta").load("/mnt/saprojectone/silver-project1/tour",header="True")

##### Making Dimension and Fact Tables in normalised form to store in Data Warehouse.

In [0]:
df_facts_bating = df_bating_silver.drop(*["player_name","country"])

In [0]:
df_dim_player = df_bating_silver["player_id","player_name","country","ingestion_date"].fillna({"country":"IND"})

In [0]:
df_facts_bolwing = df_bowling_silver.drop("player_name")

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, lit

In [0]:
from pyspark.sql.functions import col


##### Making Transformations to Tournament Table to replace the names with repective cont_id and player_id

In [0]:
df_tour_silver.createOrReplaceTempView("temp_view_tour")
df_dim_player.createOrReplaceTempView("temp_view_player")

In [0]:
joined_player_tour = spark.sql("""
    SELECT 
        df_tour_silver.final_id,
        df_tour_silver.date_final,
        df_tour_silver.nbr_teams,
        df_tour_silver.game_format,
        df_tour_silver.venue,
        df_tour_silver.venue_choice,
        df_tour_silver.winner,
        df_tour_silver.runner_up,
        df_tour_silver.toss_win,
        df_tour_silver.bowling_select,
        df_tour_silver.player_top_scorer,
        df_tour_silver.player_top_wkts,
        df_tour_silver.player_series,
        df_tour_silver.ingestion_date,
        COALESCE(df_dim_player_scorer.player_id, -1) AS player_top_scorer_id,
        COALESCE(df_dim_player_wkts.player_id, -1) AS player_top_wkts_id,
        COALESCE(df_dim_player_series.player_id, -1) AS player_series_id
    FROM 
        temp_view_tour df_tour_silver
    LEFT JOIN 
        temp_view_player df_dim_player_scorer
    ON 
        df_tour_silver.player_top_scorer = df_dim_player_scorer.player_name
    LEFT JOIN 
        temp_view_player df_dim_player_wkts
    ON 
        df_tour_silver.player_top_wkts = df_dim_player_wkts.player_name
    LEFT JOIN 
        temp_view_player df_dim_player_series
    ON 
        df_tour_silver.player_series = df_dim_player_series.player_name
""")

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

In [0]:
df_dim_contestant = ((joined_player_tour.selectExpr("winner as contestant").union(joined_player_tour.selectExpr("runner_up as contestant")))).distinct().withColumn("cont_id", monotonically_increasing_id()+1)

In [0]:
joined_player_tour.createOrReplaceTempView("temp_view_join_tbl")
df_dim_contestant.createOrReplaceTempView("temp_view_cont")

In [0]:
df_fact_tour = spark.sql("""
    SELECT 
        joined_player_tour.final_id,
        joined_player_tour.date_final,
        joined_player_tour.nbr_teams,
        joined_player_tour.game_format,
        joined_player_tour.venue,
        COALESCE(df_dim_contestant_vc.cont_id, -1) AS venue_choice_cont,
        COALESCE(df_dim_contestant_w.cont_id, -1) AS winner_cont,
        COALESCE(df_dim_contestant_ru.cont_id, -1) AS runnerup_cont,
        COALESCE(df_dim_contestant_tw.cont_id, -1) AS toss_win_cont,
        joined_player_tour.bowling_select,
        joined_player_tour.ingestion_date,
        joined_player_tour.player_top_scorer_id,
        joined_player_tour.player_top_wkts_id,
        joined_player_tour.player_series_id
    FROM 
        temp_view_join_tbl joined_player_tour
    LEFT JOIN 
        temp_view_cont df_dim_contestant_vc
    ON 
        joined_player_tour.venue_choice = df_dim_contestant_vc.contestant
       LEFT JOIN 
        temp_view_cont df_dim_contestant_w
    ON 
        joined_player_tour.winner = df_dim_contestant_w.contestant
       LEFT JOIN 
        temp_view_cont df_dim_contestant_ru
    ON 
        joined_player_tour.runner_up = df_dim_contestant_ru.contestant
       LEFT JOIN 
        temp_view_cont df_dim_contestant_tw
    ON 
        joined_player_tour.toss_win = df_dim_contestant_tw.contestant
    

""")

In [0]:
df_facts_bating.write.mode("overwrite").format("delta").save("/mnt/saprojectone/gold-project1/ckt/bating")
df_facts_bolwing.write.mode("overwrite").format("delta").save("/mnt/saprojectone/gold-project1/ckt/bowling")
df_dim_player.write.mode("overwrite").format("delta").save("/mnt/saprojectone/gold-project1/ckt/players")
df_dim_contestant.write.mode("overwrite").format("delta").save("/mnt/saprojectone/gold-project1/ckt/cont")
df_fact_tour.write.mode("overwrite").format("delta").save("/mnt/saprojectone/gold-project1/ckt/tour")