In [1]:
# Import statements
from pyspark.sql import SparkSession
from pyspark.sql.functions import max, col, to_date, date_format
import csv
import json

import pandas as pd

In [2]:
# Initialise Spark session
spark = SparkSession.builder.appName("PowerliftingData").getOrCreate()

# Load CSV file
input_file = "../openipf-2024-11-23-9813bb5f.csv"
df = spark.read.csv(input_file, header=True, inferSchema=True)

In [3]:
# Set relevant columns
columns_of_interest = [
    "Name",
    "Sex",
    "Country",
    "Event",
    "Equipment",
    "Age",
    "AgeClass",
    "BirthYearClass",
    "Division",
    "BodyweightKg",
    "WeightClassKg",
    "Federation",
    "Squat1Kg",
    "Squat2Kg",
    "Squat3Kg",
    "Best3SquatKg",
    "Bench1Kg",
    "Bench2Kg",
    "Bench3Kg",
    "Best3BenchKg",
    "Deadlift1Kg",
    "Deadlift2Kg",
    "Deadlift3Kg",
    "Best3DeadliftKg",
    "TotalKg",
    "Place",
    "Goodlift",
    "Tested",
    "Country",
    "Date",
]

In [4]:
# Select DF with relevant columns
df_cleaned = df.select(columns_of_interest)

In [5]:
# Filter rows where 'Federation' is 'MaltaPA'
df_filtered = df_cleaned.filter(
    (col("Federation").isin("MaltaPA", "IPF", "EPF")) & 
    ((col("Federation").isin("MaltaPA")) | (col("Country") == "Malta"))
)

In [6]:
# Cast 'Goodlift' to float
df_casted = df_filtered.withColumn("Goodlift", col("Goodlift").cast("float"))

# Group by 'Name' and retain the maximum 'Goodlift'
df_max_goodlift = df_casted.groupBy("Name").agg(max("Goodlift").alias("Goodlift"))

# Join back to retain other columns for the rows with the highest 'Goodlift' per 'Name'
df_sorted = df_casted.join(
    df_max_goodlift,
    on=["Name", "Goodlift"],  # Join on both 'Name' and the maximum 'Goodlift'
    how="inner",
).orderBy(
    col("Goodlift").desc()
)  # Final sorting by 'Goodlift' descending

In [7]:
# Validation
df_sorted.show(n=df_sorted.count(), truncate=False)

+---------------------+--------+---+-------+-----+---------+----+--------+--------------+-----------+------------+-------------+----------+--------+--------+--------+------------+--------+--------+--------+------------+-----------+-----------+-----------+---------------+-------+-----+------+-------+----------+
|Name                 |Goodlift|Sex|Country|Event|Equipment|Age |AgeClass|BirthYearClass|Division   |BodyweightKg|WeightClassKg|Federation|Squat1Kg|Squat2Kg|Squat3Kg|Best3SquatKg|Bench1Kg|Bench2Kg|Bench3Kg|Best3BenchKg|Deadlift1Kg|Deadlift2Kg|Deadlift3Kg|Best3DeadliftKg|TotalKg|Place|Tested|Country|Date      |
+---------------------+--------+---+-------+-----+---------+----+--------+--------------+-----------+------------+-------------+----------+--------+--------+--------+------------+--------+--------+--------+------------+-----------+-----------+-----------+---------------+-------+-----+------+-------+----------+
|Evita Otigbah        |101.36  |F  |Malta  |SBD  |Raw      |29.5

In [8]:
# Get columns that actually matter by selecting wanted columns
df_final = df_sorted.select(
    "Name", 
    "Goodlift", 
    col("Sex").alias("Gender"),
    # Cast 'Date' to DateType and format it to DD-MM-YYYY
    date_format(to_date(col("Date"), "yyyy-MM-dd"), "dd-MM-yyyy").alias("Date")
)

# # Rename some columns
# columns_to_rename = {
#     "Sex": "Gender",
#     "BodyweightKg": "Bodyweight",
#     "WeightClassKg": "WeightClass",
#     "TotalKg": "Total",
# }

# for old_name, new_name in columns_to_rename.items():
#     df_final = df_final.withColumnRenamed(old_name, new_name)

In [9]:
# Final validation
df_final.show(n=df_final.count(), truncate=False)

+---------------------+--------+------+----------+
|Name                 |Goodlift|Gender|Date      |
+---------------------+--------+------+----------+
|Evita Otigbah        |101.36  |F     |13-09-2024|
|Neil Bezzina         |94.26   |M     |09-11-2024|
|Nikola Vuksanovic    |94.21   |M     |09-11-2024|
|Jurgen Dalli         |93.66   |M     |09-11-2024|
|Benjamin Sacco       |92.35   |M     |15-06-2024|
|Matthew Mifsud       |91.0    |M     |28-08-2024|
|Shawn Farrugia       |90.84   |M     |04-12-2023|
|Wayne Gregoraci      |90.0    |M     |04-12-2023|
|Ian Pace             |89.18   |M     |09-11-2024|
|Lorna Cachia         |89.06   |F     |06-04-2024|
|Joseph Abela #1      |87.91   |M     |13-09-2024|
|Paul Gauci           |87.63   |M     |06-04-2024|
|Alessandro Gatt      |86.26   |M     |28-08-2024|
|Daryl Ruggier        |86.13   |M     |06-04-2024|
|Nicolas Azzopardi    |85.63   |M     |16-09-2023|
|Mario Mifsud         |85.49   |M     |11-06-2023|
|Nicholas Azzopardi   |84.84   

In [10]:
# Convert Spark DataFrame to Pandas DataFrame
df_pandas = df_final.toPandas()

In [11]:
# Write the Pandas DataFrame to CSV file
output_file = "../updated-rankings.csv"
df_pandas.to_csv(output_file, index=False)

In [12]:
# Convert CSV to JSON for frontend, split by gender
file = "../updated-rankings.csv"
json_file_male = "../malta-national-rankings/json/updated-rankings-male.json"
json_file_female = "../malta-national-rankings/json/updated-rankings-female.json"

# Read CSV File
def read_CSV(file, json_file_male, json_file_female):
    male_rows = []
    female_rows = []
    with open(file) as csvfile:
        reader = csv.DictReader(csvfile)
        field = reader.fieldnames
        for row in reader:
            if row['Gender'] == 'M':
                male_rows.append({field[i]: row[field[i]] for i in range(len(field))})
            elif row['Gender'] == 'F':
                female_rows.append({field[i]: row[field[i]] for i in range(len(field))})
        convert_write_json(male_rows, json_file_male)
        convert_write_json(female_rows, json_file_female)

# Convert csv data into json
def convert_write_json(data, json_file):
    with open(json_file, "w") as f:
        json.dump(data, f, sort_keys=False, indent=4, separators=(",", ": "))

read_CSV(file, json_file_male, json_file_female)
