# Project Option 1
## Memmber: Rolian Tan, Micah Baldonado

# Configuration

In [1]:
import pyspark
import feedparser
import numpy as np
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext 
from datetime import datetime, timedelta
from pyspark.sql.functions import desc, concat, lit, avg, asc, round

import findspark
findspark.init()
findspark.find()

# Create Spark Seesion
appName = "project"
master = "local"
jdbc_driver_path = "/opt/homebrew/Cellar/apache-spark/spark-3.5.2-bin-hadoop3/jars/postgresql-42.7.4.jar"
# Create Configuration object for Spark. 
# Here I use ChatGPT to help me debug the conection error from spark to jdbc, it helps me add additional SET configs.
conf = pyspark.SparkConf().set('spark.driver.host','127.0.0.1').set("spark.jars", jdbc_driver_path).set("spark.executor.extraClassPath", jdbc_driver_path).setAppName(appName).setMaster(master)
# Create Spark Context with the new configurations rather than relying on the default
sc = SparkContext.getOrCreate(conf=conf)
# You need to create SQL Context to conduct some database operations like what we wil
sqlContext = SQLContext(sc)
# If you have SQL context, you create the session from the Spark Context
spark = sqlContext.sparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

24/10/08 16:41:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
# Database connection configures
database_property = {
    "user" : "postgres",
    "password" : "",
    "driver" : "org.postgresql.Driver"
}
jdbc = "jdbc:postgresql://localhost:5431/postgres"
data_folder = "data_folder"

In [4]:
# Check possible column mismatches
for i in range(16, 23, 1):
    df_tmp_1 = (spark.read.csv(f"{data_folder}/players_{i-1}.csv", header=True, inferSchema=True)).columns
    df_tmp_2 = (spark.read.csv(f"{data_folder}/players_{i}.csv", header=True, inferSchema=True)).columns
    if df_tmp_1 != df_tmp_2:
        print(f"columns in players_{i-1}.csv but not in players_{i}.csv: {[col for col in df_tmp_1 if col not in df_tmp_2]}")
        print(f"columns in players_{i}.csv but not in players_{i-1}.csv: {[col for col in df_tmp_2 if col not in df_tmp_1]}")

# Female
for i in range(17, 23, 1):
    df_tmp_1 = (spark.read.csv(f"{data_folder}/female_players_{i-1}.csv", header=True, inferSchema=True)).columns
    df_tmp_2 = (spark.read.csv(f"{data_folder}/female_players_{i}.csv", header=True, inferSchema=True)).columns
    if df_tmp_1 != df_tmp_2:
        print(f"columns in female_players_{i-1}.csv but not in female_players_{i}.csv: {[col for col in df_tmp_1 if col not in df_tmp_2]}")
        print(f"columns in female_players_{i}.csv but not in female_players_{i-1}.csv: {[col for col in df_tmp_2 if col not in df_tmp_1]}")

# Task01: Load and process Data

In [5]:
df_total = spark.read.csv(f"{data_folder}/players_15.csv", header=True, inferSchema=True)
df_total = df_total.withColumn("year", lit(2015))
df_total = df_total.withColumn("gender", lit("male"))
# Male
for i in range(16, 23):
    df_tmp = spark.read.csv(f"{data_folder}/players_{i}.csv", header=True, inferSchema=True)
    df_tmp = df_tmp.withColumn("year", lit(2000+i))
    df_tmp = df_tmp.withColumn("gender", lit("male"))
    df_total = df_total.union(df_tmp)

# Female
for i in range(16, 23):
    df_tmp = spark.read.csv(f"{data_folder}/female_players_{i}.csv", header=True, inferSchema=True)
    df_tmp = df_tmp.withColumn("year", lit(2000+i))
    df_tmp = df_tmp.withColumn("gender", lit("female"))
    df_total = df_total.union(df_tmp)

#### Unique ID

In [6]:
df_total = df_total.withColumn("id", concat(df_total["sofifa_id"], lit("_"), df_total["year"]))
column_order = ["id"] + [col for col in df_total.columns if col != "id"]
df_total = df_total.select(column_order)

#### Datatype Casting

In [7]:
df_total = (df_total.withColumn("value_eur", df_total["value_eur"].cast("double"))
                    .withColumn("wage_eur", df_total["wage_eur"].cast("double"))
                    .withColumn("club_team_id", df_total["club_team_id"].cast("integer"))
                    .withColumn("club_jersey_number", df_total["club_jersey_number"].cast("integer"))
                    .withColumn("club_contract_valid_until", df_total["club_contract_valid_until"].cast("integer"))
                    .withColumn("club_joined", df_total["club_joined"].cast("date"))           
           )

In [8]:
table_name = "fifa.playerdata"
mode = "overwrite"
df_total.write.jdbc(url=jdbc, table=table_name, mode=mode, properties=database_property)

                                                                                

In [9]:
# feature summay
# df_total.describe().show(vertical=True)

In [10]:
# df_total.printSchema()

In [11]:
# df_total.columns

In [12]:
# df_total.show(1, vertical=True)

# Task 02

In [13]:
# Load and Analyze male information
table_name = "fifa.playerdata"
df = spark.read.jdbc(url=jdbc, table=table_name, properties=database_property)
df_total = df.filter(df["gender"] == "male")

### Q1

In [14]:
def club_contracts_players(df_total, X, Z, Y):
    filtered_df = df_total.filter(df_total["year"] == X)
    filtered_df = filtered_df.filter(df_total["club_contract_valid_until"] >= Z)
    counts_df = filtered_df.groupBy("club_name").count().sort(desc("count")).limit(Y)
    counts_df.show()

In [15]:
club_contracts_players(df_total, 2016, 2021, 4)

+-----------------+-----+
|        club_name|count|
+-----------------+-----+
|     Al Qadisiyah|   20|
|      Envigado FC|   18|
|Crucero del Norte|   16|
|  Boyacá Chicó FC|   15|
+-----------------+-----+



### Q2

In [16]:
def average_age(df, X, Y, mode):
    if (X <= 0):
        raise ValueError(f"X should be possitive, but get: X = {X}")
    avg_age_df = df.filter(df["year"] == Y).groupBy("club_name").agg(round(avg("age"), 2).alias("avg_age")) # round to 2 decimal
    if mode == "highest":
        avg_age_df = avg_age_df.sort(desc("avg_age"))
    elif mode == "lowest":
        avg_age_df = avg_age_df.sort(asc("avg_age"))
    else:
        raise ValueError(f"Mode should be highest or lowest, but get: {mode}")

    # Check the value of the Xth largest club to make sure output all the valid clubs
    last_val = avg_age_df.collect()[X-1]["avg_age"]
    if mode == "highest":
        counts_df = avg_age_df.filter(avg_age_df["avg_age"] >= last_val)
    else:
        counts_df = avg_age_df.filter(avg_age_df["avg_age"] <= last_val)
    counts_df.show()

In [17]:
average_age(df_total, 4, 2022, "highest")

+--------------------+-------+
|           club_name|avg_age|
+--------------------+-------+
|         Guaireña FC|  30.25|
|    12 de Octubre FC|  30.05|
| Shanghai Shenhua FC|  29.89|
|Demir Grup Sivasspor|  29.33|
+--------------------+-------+



In [18]:
average_age(df_total, 4, 2022, "lowest")

+--------------------+-------+
|           club_name|avg_age|
+--------------------+-------+
|     FC Nordsjælland|  20.12|
|      SC Freiburg II|  20.86|
|     Real Sociedad B|  21.16|
|Borussia Dortmund II|  21.46|
+--------------------+-------+



### Q3

In [19]:
def mostPopularNationality(df_total):
    for year in range(2015, 2023):
        df_year = df_total.filter(df_total["year"] == year).groupBy("nationality_name").count().sort(desc("count")).limit(1)
        print(f"Most popular nationality in the year {year}:", df_year.collect()[0][0])

In [20]:
mostPopularNationality(df_total)

Most popular nationality in the year 2015: England
Most popular nationality in the year 2016: England
Most popular nationality in the year 2017: England
Most popular nationality in the year 2018: England
Most popular nationality in the year 2019: England
Most popular nationality in the year 2020: England
Most popular nationality in the year 2021: England
Most popular nationality in the year 2022: England
