In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, expr, rand, when, count, col

spark = SparkSession.builder.appName("FIFA 21 Challenge") \
    .master("local[3]") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport() \
    .getOrCreate()


from delta.tables import *

spark.conf.set("spark.sql.shuffle.partitions", 6)
spark.conf.set("spark.default.parallelism", 3)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")

In [2]:
inputData = "/home/oliver/Fifa21/fifa21_male2.csv" 

In [3]:
fifa21DF = (
    spark.read
    .format("csv")
    .option("header","true")
    .load(inputData)
)
fifa21DF.show(5)

+---+-----------+---+---+--------------------+-------------+---+---+------------+--------------------+--------------------+--------------------+---+--------------------+------+------+-----+------+------------+-------------+-----+----+--------------+-----------+---------+--------+---------+----------------+-------------+-------+-----+---------+-----+-----------+------------+------------+--------+------------+------------+-------+---------+-------+-----+----------+-------+-------+--------+----------+---------+----------+-------------+-----------+------+---------+---------+---------+-------+---------------+--------------+-----------+---------+-----------+----------+--------------+-----------+-----------+----------+---+---+------+------+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+-----+-----+-----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+----+----+----+-----+----+------+
| ID|       Name|Age|OVA|         Nationality|         Club|BOV| BP|   

In [4]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, dense_rank, countDistinct

windowSpec = Window.partitionBy("Position").orderBy(desc("POT"))
POTDenseRank = dense_rank().over(windowSpec)

top10Players_Position = fifa21DF.select(
    col("Position"),
    col("Name"),
    col("Club"),
    col("POT"),
    POTDenseRank.alias("top10")
).where("Position IS NOT NULL AND top10 <= 10")

In [5]:
nationalitiesPath = "Nationalities"
fifa21DF.write.partitionBy("Nationality").mode("overwrite").format("orc").save(nationalitiesPath)

In [6]:
top10Path = "TOP10"
top10Players_Position.write.partitionBy("Position").mode("overwrite").format("orc").save(top10Path)

In [7]:
from pyspark.sql.functions import countDistinct

fifa21DF.groupBy("Club", "Position").agg(
    countDistinct("Name").alias("Number of players per Club")
).where("Position IS NOT NULL AND Club IS NOT NULL").orderBy("Club", "Position").show()

+--------------------+---------+--------------------------+
|                Club| Position|Number of players per Club|
+--------------------+---------+--------------------------+
|1. FC Heidenheim ...|       CB|                         2|
|1. FC Heidenheim ...|    CB LB|                         1|
|1. FC Heidenheim ...|      CDM|                         1|
|1. FC Heidenheim ...|   CDM CM|                         1|
|1. FC Heidenheim ...|       CF|                         1|
|1. FC Heidenheim ...|       CM|                         1|
|1. FC Heidenheim ...|   CM CDM|                         1|
|1. FC Heidenheim ...|    CM LM|                         1|
|1. FC Heidenheim ...| CM RM RB|                         1|
|1. FC Heidenheim ...|       GK|                         3|
|1. FC Heidenheim ...|       LB|                         2|
|1. FC Heidenheim ...|   LB CDM|                         1|
|1. FC Heidenheim ...|    LM RM|                         1|
|1. FC Heidenheim ...|    LM ST|        

In [8]:
from pyspark.sql.functions import avg

fifa21DF.groupBy("Club").agg(
    avg("Sprint Speed").alias("Avg sprint speed per club")
).orderBy(desc("Avg sprint speed per club")).show(10)

+------------------+-------------------------+
|              Club|Avg sprint speed per club|
+------------------+-------------------------+
|  Alemannia Aachen|                     91.0|
|     United States|                     91.0|
|    UCAM Murcia CF|                     90.0|
|     GIF Sundsvall|                     89.0|
|      Terek Grozny|                     89.0|
|        Córdoba CF|                     88.0|
|         Dundee FC|                     86.0|
|Dorados de Sinaloa|                     85.0|
|       Netherlands|                     84.0|
|           Palermo|                     83.0|
+------------------+-------------------------+
only showing top 10 rows



In [10]:
from pyspark.sql.functions import regexp_extract

OverweightPath = "Overweight"
fifa21DF = fifa21DF.withColumn("Weight", regexp_extract("Weight", "\\d+",0))
fifa21DF = fifa21DF.withColumn("Weight", fifa21DF.Weight.cast("float"))

fifa21DF = fifa21DF.withColumn("Feet", regexp_extract("Height", "\\d+",0))
fifa21DF = fifa21DF.withColumn("Feet", fifa21DF.Feet.cast("float"))


fifa21DF = fifa21DF.withColumn("inches", regexp_extract("Height", "'\\d+",0))
fifa21DF = fifa21DF.withColumn("inches", regexp_extract("inches", "\\d+",0))
fifa21DF = fifa21DF.withColumn("inches", fifa21DF.inches.cast("float"))

fifa21DF = fifa21DF.withColumn("IMC", (col("Weight") / ((col("Feet") * 12) + col("inches"))**2 * 703) )
OverweightPlayers = fifa21DF.where("IMC > 25")
OverweightPlayers.write.mode("overwrite").format("orc").save(OverweightPath)