### Import required packages and Create spark session with delta 

In [92]:
from delta.tables import *
import pyspark.sql.functions as F
import pyspark
import os
from delta import configure_spark_with_delta_pip
from pyspark.sql.types import IntegerType

In [4]:
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport()
spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [5]:
spark

### Prepare a dictionary with file name as key and table name as value

In [21]:
path = "C://Users//mmoha//Documents//LinkedIn//"
file_dict = {}
for (root, dirs, file) in os.walk(path):
    for f in file:
        if '.csv' in f:
           file_dict[f] = f[0:-4]+"_table"

In [22]:
file_dict

{'appearances.csv': 'appearances_table',
 'clubs.csv': 'clubs_table',
 'club_games.csv': 'club_games_table',
 'competitions.csv': 'competitions_table',
 'games.csv': 'games_table',
 'game_events.csv': 'game_events_table',
 'game_lineups.csv': 'game_lineups_table',
 'players.csv': 'players_table',
 'player_valuations.csv': 'player_valuations_table',
 'transfers.csv': 'transfers_table'}

### Create tables

In [23]:
for i in file_dict:
    print(i)
    df = spark.read.format("csv").option("inferSchema","true").option("header", "true").load(f"{path}{i}")
    df.write.format("delta").mode("overwrite").saveAsTable(file_dict[i])

appearances.csv
clubs.csv
club_games.csv
competitions.csv
games.csv
game_events.csv
game_lineups.csv
players.csv
player_valuations.csv
transfers.csv


### Read players_table and analyze the data

In [38]:
players_df = spark.read.table("players_table")

In [39]:
players_df.describe()

DataFrame[summary: string, player_id: string, first_name: string, last_name: string, name: string, last_season: string, current_club_id: string, player_code: string, country_of_birth: string, city_of_birth: string, country_of_citizenship: string, sub_position: string, position: string, foot: string, height_in_cm: string, agent_name: string, image_url: string, url: string, current_club_domestic_competition_id: string, current_club_name: string, market_value_in_eur: string, highest_market_value_in_eur: string]

In [40]:
players_df.count()

32601

### Read player_valuations_table and analyze the data

In [41]:
player_valuations_df = spark.read.table("player_valuations_table")

In [42]:
player_valuations_df.describe()

DataFrame[summary: string, player_id: string, market_value_in_eur: string, current_club_id: string, player_club_domestic_competition_id: string]

In [43]:
player_valuations_df.count()

496606

### Read clubs_table and analyze the data

In [44]:
clubs_df = spark.read.table("clubs_table")

In [45]:
clubs_df.describe()

DataFrame[summary: string, club_id: string, club_code: string, name: string, domestic_competition_id: string, total_market_value: string, squad_size: string, average_age: string, foreigners_number: string, foreigners_percentage: string, national_team_players: string, stadium_name: string, stadium_seats: string, net_transfer_record: string, coach_name: string, last_season: string, filename: string, url: string]

In [46]:
clubs_df.count()

439

### Read appearances_table and analyze the data

In [47]:
appearances_df = spark.read.table("appearances_table")

In [48]:
appearances_df.describe()

DataFrame[summary: string, appearance_id: string, game_id: string, player_id: string, player_club_id: string, player_current_club_id: string, player_name: string, competition_id: string, yellow_cards: string, red_cards: string, goals: string, assists: string, minutes_played: string]

In [49]:
appearances_df.count()

1706806

### Read club_games_table and analyze the data

In [50]:
club_games_df = spark.read.table("club_games_table")

In [51]:
club_games_df.describe()

DataFrame[summary: string, game_id: string, club_id: string, own_goals: string, own_position: string, own_manager_name: string, opponent_id: string, opponent_goals: string, opponent_position: string, opponent_manager_name: string, hosting: string, is_win: string]

In [52]:
club_games_df.count()

148052

### Read competitions_table and analyze the data

In [53]:
competitions_df = spark.read.table("competitions_table")

In [54]:
competitions_df.describe()

DataFrame[summary: string, competition_id: string, competition_code: string, name: string, sub_type: string, type: string, country_id: string, country_name: string, domestic_league_code: string, confederation: string, url: string]

In [55]:
competitions_df.count()

44

### Read games_table and analyze the data

In [56]:
games_df = spark.read.table("games_table")

In [57]:
games_df.describe()

DataFrame[summary: string, game_id: string, competition_id: string, season: string, round: string, home_club_id: string, away_club_id: string, home_club_goals: string, away_club_goals: string, home_club_position: string, away_club_position: string, home_club_manager_name: string, away_club_manager_name: string, stadium: string, attendance: string, referee: string, url: string, home_club_formation: string, away_club_formation: string, home_club_name: string, away_club_name: string, competition_type: string]

In [58]:
games_df.count()

74026

### Read game_lineups_table and analyze the data

In [59]:
game_lineups_df = spark.read.table("game_lineups_table")

game_lineups_df.describe()

In [60]:
game_lineups_df.count()

2285289

### Read transfers_table and analyze the data

In [61]:
transfers_df = spark.read.table("transfers_table")

In [62]:
transfers_df.describe()

DataFrame[summary: string, player_id: string, transfer_season: string, from_club_id: string, to_club_id: string, from_club_name: string, to_club_name: string, transfer_fee: string, market_value_in_eur: string, player_name: string]

In [63]:
transfers_df.count()

79646

### Read game_events and analyze the data

In [64]:
game_events_df = spark.read.table("game_events_table")

In [65]:
game_events_df.describe()

DataFrame[summary: string, game_event_id: string, game_id: string, minute: string, type: string, club_id: string, player_id: string, description: string, player_in_id: string, player_assist_id: string]

In [66]:
game_events_df.count()

1035043

### NARROW TRANSFORMATIONS
### 1. Filter / Where

In [113]:
france_player_df = players_df.filter(col("country_of_citizenship")=="France").select("first_name", "last_name")
france_player_df.explain("formatted")

== Physical Plan ==
* Project (4)
+- * Filter (3)
   +- * ColumnarToRow (2)
      +- Scan parquet spark_catalog.default.players_table (1)


(1) Scan parquet spark_catalog.default.players_table
Output [3]: [first_name#5383, last_name#5384, country_of_citizenship#5391]
Batched: true
Location: PreparedDeltaFileIndex [file:/C:/Users/mmoha/linkedin/transformations/spark-warehouse/players_table]
PushedFilters: [IsNotNull(country_of_citizenship), EqualTo(country_of_citizenship,France)]
ReadSchema: struct<first_name:string,last_name:string,country_of_citizenship:string>

(2) ColumnarToRow [codegen id : 1]
Input [3]: [first_name#5383, last_name#5384, country_of_citizenship#5391]

(3) Filter [codegen id : 1]
Input [3]: [first_name#5383, last_name#5384, country_of_citizenship#5391]
Condition : (isnotnull(country_of_citizenship#5391) AND (country_of_citizenship#5391 = France))

(4) Project [codegen id : 1]
Output [2]: [first_name#5383, last_name#5384]
Input [3]: [first_name#5383, last_name#5384, c

### 2. Select / Drop

In [112]:
select_df = players_df.select(F.col("name").alias("player_name"),F.col("city_of_birth").alias("player_nationality"),F.when(F.col("position") == "Defender", "Unknown").otherwise(F.col("position")).alias("player_position"),F.col("position"))

drop_df = select_df.drop(F.col("position"))

select_df.explain("formatted")
drop_df.explain("formatted")

== Physical Plan ==
* Project (3)
+- * ColumnarToRow (2)
   +- Scan parquet spark_catalog.default.players_table (1)


(1) Scan parquet spark_catalog.default.players_table
Output [3]: [name#5385, city_of_birth#5390, position#5394]
Batched: true
Location: PreparedDeltaFileIndex [file:/C:/Users/mmoha/linkedin/transformations/spark-warehouse/players_table]
ReadSchema: struct<name:string,city_of_birth:string,position:string>

(2) ColumnarToRow [codegen id : 1]
Input [3]: [name#5385, city_of_birth#5390, position#5394]

(3) Project [codegen id : 1]
Output [4]: [name#5385 AS player_name#13285, city_of_birth#5390 AS player_nationality#13286, CASE WHEN (position#5394 = Defender) THEN Unknown ELSE position#5394 END AS player_position#13287, position#5394]
Input [3]: [name#5385, city_of_birth#5390, position#5394]


== Physical Plan ==
* Project (3)
+- * ColumnarToRow (2)
   +- Scan parquet spark_catalog.default.players_table (1)


(1) Scan parquet spark_catalog.default.players_table
Output [3]: [n

### WIDE TRANSFORMATIONS
### 1. GroupBy / Aggregations

In [102]:
club_stats = appearances_db.groupBy("player_club_id").agg(
    F.sum(F.col("goals")).cast(IntegerType()).alias("total_goals"),
    F.sum(F.col("assists")).cast(IntegerType()).alias("total_assists"),
    F.avg(F.col("minutes_played")).cast(IntegerType()).alias("avg_minutes"),
    F.count(F.col("appearance_id")).cast(IntegerType()).alias("total_appearances"),
    F.max(F.col("yellow_cards")).cast(IntegerType()).alias("max_yellow_cards"),
    F.min(F.col("red_cards")).cast(IntegerType()).alias("min_red_cards"),
    F.stddev(F.col("goals")).cast(IntegerType()).alias("goals_stddev"),
    F.variance(F.col("assists")).cast(IntegerType()).alias("assists_variance")
)

club_stats.explain("formatted")

== Physical Plan ==
AdaptiveSparkPlan (5)
+- HashAggregate (4)
   +- Exchange (3)
      +- HashAggregate (2)
         +- Scan parquet spark_catalog.default.appearances_table (1)


(1) Scan parquet spark_catalog.default.appearances_table
Output [7]: [appearance_id#1797, player_club_id#1800, yellow_cards#1805, red_cards#1806, goals#1807, assists#1808, minutes_played#1809]
Batched: true
Location: PreparedDeltaFileIndex [file:/C:/Users/mmoha/linkedin/transformations/spark-warehouse/appearances_table]
ReadSchema: struct<appearance_id:string,player_club_id:int,yellow_cards:int,red_cards:int,goals:int,assists:int,minutes_played:int>

(2) HashAggregate
Input [7]: [appearance_id#1797, player_club_id#1800, yellow_cards#1805, red_cards#1806, goals#1807, assists#1808, minutes_played#1809]
Keys [1]: [player_club_id#1800]
Functions [8]: [partial_sum(goals#1807), partial_sum(assists#1808), partial_avg(minutes_played#1809), partial_count(appearance_id#1797), partial_max(yellow_cards#1805), partial_min

### 2. Join Operations

In [111]:
# Inner Join
player_appearances = players_df.join(appearances_df, players_df.player_id == appearances_df.player_id, "inner")
player_appearances.explain("formatted")

# Left Join
players_with_stats = players_df.join(
    appearances_df.groupBy("player_id").agg(
        F.sum(F.col("goals")).alias("career_goals"),
        F.sum(F.col("assists")).alias("career_assists")
    ),
    "player_id",
    "left"
)
players_with_stats.explain("formatted")

# Broadcast Join (hint for small tables)
player_club_info = players_df.join(
    F.broadcast(clubs_df),
    players_df.current_club_id == clubs_df.club_id,
    "left"
)

player_appearances.explain("formatted")

== Physical Plan ==
AdaptiveSparkPlan (7)
+- BroadcastHashJoin Inner BuildLeft (6)
   :- BroadcastExchange (3)
   :  +- Filter (2)
   :     +- Scan parquet spark_catalog.default.players_table (1)
   +- Filter (5)
      +- Scan parquet spark_catalog.default.appearances_table (4)


(1) Scan parquet spark_catalog.default.players_table
Output [23]: [player_id#5382, first_name#5383, last_name#5384, name#5385, last_season#5386, current_club_id#5387, player_code#5388, country_of_birth#5389, city_of_birth#5390, country_of_citizenship#5391, date_of_birth#5392, sub_position#5393, position#5394, foot#5395, height_in_cm#5396, contract_expiration_date#5397, agent_name#5398, image_url#5399, url#5400, current_club_domestic_competition_id#5401, current_club_name#5402, market_value_in_eur#5403, highest_market_value_in_eur#5404]
Batched: true
Location: PreparedDeltaFileIndex [file:/C:/Users/mmoha/linkedin/transformations/spark-warehouse/players_table]
PushedFilters: [IsNotNull(player_id)]
ReadSchema: st