In [49]:
spark

In [1]:
#Reading Files From HDFS Local
resultfile_path = "hdfs://localhost:9000/data/international_football_DS/results.csv"
goalscorersfile_path = "hdfs://localhost:9000/data/international_football_DS/goalscorers.csv"
shootoutsfile_path = "hdfs://localhost:9000/data/international_football_DS/shootouts.csv"


In [2]:
from pyspark.sql.functions import col, monotonically_increasing_id
from pyspark.sql.functions import col, sum


In [3]:
#Read result_file
df_result=spark.read.csv(resultfile_path,header=True,inferSchema=True)

                                                                                

In [4]:
df_result.show(5)

+-------------------+---------+---------+----------+----------+----------+-------+--------+-------+
|               date|home_team|away_team|home_score|away_score|tournament|   city| country|neutral|
+-------------------+---------+---------+----------+----------+----------+-------+--------+-------+
|1872-11-30 00:00:00| Scotland|  England|         0|         0|  Friendly|Glasgow|Scotland|  false|
|1873-03-08 00:00:00|  England| Scotland|         4|         2|  Friendly| London| England|  false|
|1874-03-07 00:00:00| Scotland|  England|         2|         1|  Friendly|Glasgow|Scotland|  false|
|1875-03-06 00:00:00|  England| Scotland|         2|         2|  Friendly| London| England|  false|
|1876-03-04 00:00:00| Scotland|  England|         3|         0|  Friendly|Glasgow|Scotland|  false|
+-------------------+---------+---------+----------+----------+----------+-------+--------+-------+
only showing top 5 rows



In [5]:
#Read goalscorers_file
df_goalscorers=spark.read.csv(goalscorersfile_path,header=True,inferSchema=True)

In [6]:
df_goalscorers.show(5)

+-------------------+---------+---------+---------+----------------+------+--------+-------+
|               date|home_team|away_team|     team|          scorer|minute|own_goal|penalty|
+-------------------+---------+---------+---------+----------------+------+--------+-------+
|1916-07-02 00:00:00|    Chile|  Uruguay|  Uruguay| José Piendibene|    44|   false|  false|
|1916-07-02 00:00:00|    Chile|  Uruguay|  Uruguay|Isabelino Gradín|    55|   false|  false|
|1916-07-02 00:00:00|    Chile|  Uruguay|  Uruguay|Isabelino Gradín|    70|   false|  false|
|1916-07-02 00:00:00|    Chile|  Uruguay|  Uruguay| José Piendibene|    75|   false|  false|
|1916-07-06 00:00:00|Argentina|    Chile|Argentina|   Alberto Ohaco|     2|   false|  false|
+-------------------+---------+---------+---------+----------------+------+--------+-------+
only showing top 5 rows



In [7]:
df_goalscorers.count()

44362

In [8]:
#Read shootouts_file
df_shootouts=spark.read.csv(shootoutsfile_path,header=True,inferSchema=True)

In [9]:
#check null vals
df_shootouts.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_shootouts.columns]).show()


+----+---------+---------+------+-------------+
|date|home_team|away_team|winner|first_shooter|
+----+---------+---------+------+-------------+
|   0|        0|        0|     0|          415|
+----+---------+---------+------+-------------+



In [10]:
#drop null column
df_shootouts=df_shootouts.drop("first_shooter")
df_shootouts.show(5)

+-------------------+-----------+----------------+-----------+
|               date|  home_team|       away_team|     winner|
+-------------------+-----------+----------------+-----------+
|1967-08-22 00:00:00|      India|          Taiwan|     Taiwan|
|1971-11-14 00:00:00|South Korea|Vietnam Republic|South Korea|
|1972-05-07 00:00:00|South Korea|            Iraq|       Iraq|
|1972-05-17 00:00:00|   Thailand|     South Korea|South Korea|
|1972-05-19 00:00:00|   Thailand|        Cambodia|   Thailand|
+-------------------+-----------+----------------+-----------+
only showing top 5 rows



In [12]:
#check null vals
df_result.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_result.columns]).show()


+----+---------+---------+----------+----------+----------+----+-------+-------+
|date|home_team|away_team|home_score|away_score|tournament|city|country|neutral|
+----+---------+---------+----------+----------+----------+----+-------+-------+
|   0|        0|        0|         0|         0|         0|   0|      0|      0|
+----+---------+---------+----------+----------+----------+----+-------+-------+



In [13]:
#check null vals
df_goalscorers.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_goalscorers.columns]).show()

+----+---------+---------+----+------+------+--------+-------+
|date|home_team|away_team|team|scorer|minute|own_goal|penalty|
+----+---------+---------+----+------+------+--------+-------+
|   0|        0|        0|   0|     0|     0|       0|      0|
+----+---------+---------+----+------+------+--------+-------+



In [14]:
 # create Dim_Teams
df_teams_dim = df_result.selectExpr("home_team as team").union(df_result.selectExpr("away_team as team")).distinct()
df_teams_dim = df_teams_dim.withColumn("team_id", monotonically_increasing_id())

In [15]:
df_teams_dim.show(5)

[Stage 24:>                                                         (0 + 2) / 2]

+--------+-------+
|    team|team_id|
+--------+-------+
| Kabylia|      0|
|  Kernow|      1|
|    Chad|      2|
|Provence|      3|
|  Russia|      4|
+--------+-------+
only showing top 5 rows



                                                                                

In [16]:
#create dim date
from pyspark.sql.functions import date_format

df_date_dim = df_result.select("date").distinct()
df_date_dim = df_date_dim.withColumn("date_id", date_format(col("date"), "yyyyMMdd").cast("int"))

In [17]:
df_date_dim.show(5)

+-------------------+--------+
|               date| date_id|
+-------------------+--------+
|1880-03-27 00:00:00|18800327|
|1906-10-07 00:00:00|19061007|
|1924-01-20 00:00:00|19240120|
|1924-04-21 00:00:00|19240421|
|1925-03-26 00:00:00|19250326|
+-------------------+--------+
only showing top 5 rows



In [18]:
# Dim_tournament
df_tournaments_dim = df_result.selectExpr("tournament as tournament").distinct()
df_tournaments_dim= df_tournaments_dim.withColumn("tournament_id",monotonically_increasing_id())

In [19]:
df_tournaments_dim.show(5)

+--------------------+-------------+
|          tournament|tournament_id|
+--------------------+-------------+
|South Pacific Min...|            0|
|United Arab Emira...|            1|
|          Balkan Cup|            2|
|           UEFA Euro|            3|
| Nordic Championship|            4|
+--------------------+-------------+
only showing top 5 rows



In [20]:
# Dim_locations
df_locations_dim = df_result.select("country","city").distinct()
df_locations_dim = df_locations_dim.withColumn("location_id",monotonically_increasing_id())

In [21]:
df_locations_dim.show(5)

+------------+-------------+-----------+
|     country|         city|location_id|
+------------+-------------+-----------+
|     Romania|     Ploiești|          0|
|    Malaysia|  George Town|          1|
|    Cameroon|       Garoua|          2|
|Saudi Arabia|         Abha|          3|
|    Botswana|Selebi-Phikwe|          4|
+------------+-------------+-----------+
only showing top 5 rows



In [47]:
#create players dim
df_players_dim = df_goalscorers.selectExpr("scorer as player_name").distinct()
df_players_dim = df_players_dim.withColumn("player_id",monotonically_increasing_id())

In [48]:
df_players_dim.show(5)

+-----------------+---------+
|      player_name|player_id|
+-----------------+---------+
|       Davy Walsh|        0|
|   Herbert Martin|        1|
|    Gianni Rivera|        2|
|Walt Schmotolocha|        3|
|      Matar Niang|        4|
+-----------------+---------+
only showing top 5 rows



In [24]:
#create matches Fact

from pyspark.sql.functions import col, sha2, concat_ws

#  match_id Hash 
df_fact_matches = df_result.withColumn(
    "match_id",
    sha2(concat_ws("_", col("date"), col("home_team"), col("away_team"), col("tournament")), 256)
)

# Dim_Teamر
df_fact_matches = df_fact_matches \
    .join(df_teams_dim.withColumnRenamed("team", "home_team").withColumnRenamed("team_id", "home_team_id"), "home_team", "left") \
    .join(df_teams_dim.withColumnRenamed("team", "away_team").withColumnRenamed("team_id", "away_team_id"), "away_team", "left")

# join with Dim_Tournaments
df_fact_matches = df_fact_matches.join(df_tournaments_dim,"tournament","left")
# join with Dim_Locations
df_fact_matches = df_fact_matches.join(df_locations_dim, ["city", "country"] ,"left")
# join with Dim_date
df_fact_matches = df_fact_matches.join(df_date_dim,"date" ,"left")
#select cols
df_fact_matches=df_fact_matches.select("match_id","date_id","home_team_id","away_team_id","tournament_id","home_score","away_score","location_id","neutral")



In [25]:
df_fact_matches.printSchema()

root
 |-- match_id: string (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- home_team_id: long (nullable = true)
 |-- away_team_id: long (nullable = true)
 |-- tournament_id: long (nullable = true)
 |-- home_score: integer (nullable = true)
 |-- away_score: integer (nullable = true)
 |-- location_id: long (nullable = true)
 |-- neutral: boolean (nullable = true)



In [37]:
df_fact_matches.show(5)

+--------------------+--------+------------+------------+-------------+----------+----------+-----------+-------+
|            match_id| date_id|home_team_id|away_team_id|tournament_id|home_score|away_score|location_id|neutral|
+--------------------+--------+------------+------------+-------------+----------+----------+-----------+-------+
|65eaa3137a7049cef...|18721130|         308|         279|           80|         0|         0|       1264|  false|
|15d05aa97f15593df...|18730308|         279|         308|           80|         4|         2|       1057|  false|
|ac44c311321beb7cf...|18740307|         308|         279|           80|         2|         1|       1264|  false|
|f9b50f6106904ea55...|18750306|         279|         308|           80|         2|         2|       1057|  false|
|1cb0b458c67035d5a...|18760304|         308|         279|           80|         3|         0|       1264|  false|
+--------------------+--------+------------+------------+-------------+----------+------

In [33]:
#create fact goalscorers

df_fact_goalscorers = df_goalscorers \
    .join(df_teams_dim.withColumnRenamed("team", "home_team")
                      .withColumnRenamed("team_id", "home_team_id"), "home_team", "left") \
    .join(df_teams_dim.withColumnRenamed("team", "away_team")
                      .withColumnRenamed("team_id", "away_team_id"), "away_team", "left") \
    .join(df_players_dim.withColumnRenamed("player_name","scorer").withColumnRenamed("player_id","player_scorer_id"),"scorer","left") \


df_fact_goalscorers=df_fact_goalscorers.join(df_date_dim,"date","left")

df_fact_goalscorers=df_fact_goalscorers.select("date_id", "home_team_id", "away_team_id","team", "player_scorer_id", "minute", "own_goal", "penalty")


In [34]:
df_fact_goalscorers.show(5)



+--------+------------+------------+---------+----------------+------+--------+-------+
| date_id|home_team_id|away_team_id|     team|player_scorer_id|minute|own_goal|penalty|
+--------+------------+------------+---------+----------------+------+--------+-------+
|19160702|         116|         192|  Uruguay|            6020|    44|   false|  false|
|19160702|         116|         192|  Uruguay|            null|    55|   false|  false|
|19160702|         116|         192|  Uruguay|            null|    70|   false|  false|
|19160702|         116|         192|  Uruguay|            6020|    75|   false|  false|
|19160706|          80|         116|Argentina|            null|     2|   false|  false|
+--------+------------+------------+---------+----------------+------+--------+-------+
only showing top 5 rows



                                                                                

In [44]:
# create shootouts fact

df_fact_shootouts = df_shootouts \
.join(df_teams_dim.withColumnRenamed("team","home_team").withColumnRenamed("team_id","home_team_id"),"home_team","left") \
.join(df_teams_dim.withColumnRenamed("team","away_team").withColumnRenamed("team_id","away_team_id"),"away_team","left") \
.join(df_date_dim ,"date","left")


df_fact_shootouts=df_fact_shootouts.select("date_id","home_team_id","away_team_id","winner")

In [45]:
df_fact_shootouts.show(5)

+--------+------------+------------+-----------+
| date_id|home_team_id|away_team_id|     winner|
+--------+------------+------------+-----------+
|19670822|         106|          69|     Taiwan|
|19711114|         187|         249|South Korea|
|19720507|         187|          37|       Iraq|
|19720517|         165|         187|South Korea|
|19720519|         165|          45|   Thailand|
+--------+------------+------------+-----------+
only showing top 5 rows



In [46]:
spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY")

df_fact_matches.write.mode("overwrite").parquet("hdfs://localhost:9000/parquet_data/fact_matches.parquet")
df_fact_goalscorers.write.mode("overwrite").parquet("hdfs://localhost:9000/parquet_data/fact_goalscorers.parquet")
df_fact_shootouts.write.mode("overwrite").parquet("hdfs://localhost:9000/parquet_data/fact_shootouts.parquet")
df_teams_dim.write.mode("overwrite").parquet("hdfs://localhost:9000/parquet_data/dim_teams.parquet")
df_date_dim.write.mode("overwrite").parquet("hdfs://localhost:9000/parquet_data/dim_date.parquet")
df_players_dim.write.mode("overwrite").parquet("hdfs://localhost:9000/parquet_data/dim_players.parquet")
df_tournaments_dim.write.mode("overwrite").parquet("hdfs://localhost:9000/parquet_data/dim_tournaments.parquet")
df_locations_dim.write.mode("overwrite").parquet("hdfs://localhost:9000/parquet_data/dim_locations.parquet")


                                                                                