Combining the data from csv files into a single file. We want per fight: Fighter A with columns: Fighter A, with their respective data, fighter B with their respective data. The winner of the fight and the method of winning. The data will be used to train a model to predict the winner of a fight.

imports

In [1]:
import config.ConnectionConfig as cc
import pandas as pd

cc.setupEnvironment()
spark = cc.startLocalCluster("UFC_Fighter_Stats")
spark.getActiveSession()

In [5]:
fight_results = pd.read_csv('../processed_data/all_fighter_details.csv', sep=',', index_col=0)
fight_results

In [2]:
# read the data from the csv files
predictions = pd.read_csv('../processed_data/upcoming_fights_predictions.csv', sep=',')
predictions

First we will calculate averages of certain data per fighter ( statistics of certain punches etc. ). We want to add this to a total fighter csv file.

In [3]:
df_operational_base_stats = spark.read.csv('../scraped_data/ufc_fighter_tott.csv', header=True)
df_operational_to_calc_stats = spark.read.csv('../scraped_data/ufc_fight_stats.csv', header=True)
df_operational_to_calc_result = spark.read.csv('../scraped_data/ufc_fight_results.csv', header=True)

df_operational_base_stats.show()
df_operational_to_calc_stats.show()
df_operational_to_calc_result.show()


In [4]:
df_operational_to_calc_stats = df_operational_to_calc_stats.withColumnRenamed("TD %", "TD_Percentage")
df_operational_to_calc_stats = df_operational_to_calc_stats.withColumnRenamed("SIG.STR. %", "Significant_Strike_Percentage")
df_operational_to_calc_stats = df_operational_to_calc_stats.withColumnRenamed("SUB.ATT", "SUB_ATT")

df_operational_base_stats.createOrReplaceTempView("fighter_info")
df_operational_to_calc_stats.createOrReplaceTempView("fight_stats")

In [5]:
updated_fighter_info = spark.sql("""
    SELECT 
    fighter_info.FIGHTER, fighter_info.Height, fighter_info.Weight, fighter_info.Reach, 
    AVG(fight_stats.KD) AS AVG_KD,
    AVG(fight_stats.SUB_ATT) AS AVG_SUB_ATT,
    AVG(regexp_replace(fight_stats.TD_Percentage, '%', '')) AS AVG_TD_Percentage,
    AVG(regexp_replace(fight_stats.Significant_Strike_Percentage, '%', '')) AS AVG_Significant_Strike_Percentage,
    AVG((CAST(SUBSTRING_INDEX(fight_stats.`TOTAL STR.`, ' of ', 1) AS INT) + 
             CAST(SUBSTRING_INDEX(SUBSTRING_INDEX(fight_stats.`TOTAL STR.`, ' of ', -1), '%', 1) AS INT)) / 2) AS AVG_TOTAL_STR,
    AVG(CAST(SPLIT(fight_stats.Round, ' ')[1] AS INT)) AS AVG_ROUND,
    AVG(
            CASE 
                WHEN fight_stats.CTRL LIKE '%:%' THEN
                    CAST(SPLIT(fight_stats.CTRL, ':')[0] AS INT) * 60 + CAST(SPLIT(fight_stats.CTRL, ':')[1] AS INT)
                ELSE 
                    CAST(SPLIT(fight_stats.CTRL, ' ')[0] AS INT)
            END
        ) AS AVG_CTRL_SECONDS
    FROM 
        fighter_info JOIN fight_stats ON fighter_info.FIGHTER = fight_stats.FIGHTER 
    GROUP BY 
        fighter_info.FIGHTER, fighter_info.Height, fighter_info.Weight, fighter_info.Reach
""")

updated_fighter_info.show()

Adding result table to the data

In [7]:
from pyspark.sql.functions import split, expr

# Extract fighters' names from the "BOUT" column
fight_results = df_operational_to_calc_result.withColumn("fighter1", split(df_operational_to_calc_result["BOUT"], " vs. ")[0]) \
    .withColumn("fighter2", split(df_operational_to_calc_result["BOUT"], " vs. ")[1])

#spark sql query select outcome, fighter1 and fighter 2
fight_results.createOrReplaceTempView("fight_results")
fight_results = spark.sql("""
    SELECT 
        fighter1,
        fighter2,
        outcome
        from fight_results
""")
#fighter one gets outcome 1, fighter 2 gets outcome 0 when outcome is outcome1/outcome2

In [8]:
# add column accuracy with value 0
fight_results.write.csv('../processed_data/fight_results.csv', header=True, mode='overwrite')
fight_results = fight_results.toPandas()
fight_results['Accuracy'] = 1
df = spark.createDataFrame(fight_results)
df.createOrReplaceTempView("fightresults")

In [9]:
df.createOrReplaceTempView("fight_results")

win_loss_percentage = spark.sql("""
    SELECT fighter, 
       SUM(wins) AS total_wins, 
       SUM(losses) AS total_losses, 
       SUM(draws) AS total_draws
FROM (
    SELECT fighter1 AS fighter,
           CASE 
               WHEN outcome = 'W/L' THEN 1
               WHEN outcome = 'L/W' THEN 0
               ELSE 0
           END AS wins,
           CASE 
               WHEN outcome = 'W/L' THEN 0
               WHEN outcome = 'L/W' THEN 1
               ELSE 0
           END AS losses,
           CASE 
               WHEN outcome = 'D' THEN 1
               ELSE 0
           END AS draws
    FROM fight_results

    UNION ALL

    SELECT fighter2 AS fighter,
           CASE 
               WHEN outcome = 'W/L' THEN 0
               WHEN outcome = 'L/W' THEN 1
               ELSE 0
           END AS wins,
           CASE 
               WHEN outcome = 'W/L' THEN 1
               WHEN outcome = 'L/W' THEN 0
               ELSE 0
           END AS losses,
           CASE 
               WHEN outcome = 'D' THEN 1
               ELSE 0
           END AS draws
    FROM fight_results
) AS all_results
GROUP BY fighter;
""")

win_loss_percentage = win_loss_percentage.withColumn("Win_Percentage", expr("total_wins / (total_wins + total_losses + total_draws) * 100"))
# rename the column fighter to fighter_name
win_loss_percentage = win_loss_percentage.withColumnRenamed("fighter", "fighter_name")
win_loss_percentage.show()

Joining the data

In [10]:
from pyspark.sql.functions import expr

# Joining the two tables on the fighter and FIGHTER columns
joined_df = updated_fighter_info.alias("a").join(win_loss_percentage.alias("b"),
                                      expr("trim(a.fighter) = trim(b.fighter_name)"))

# Showing the joined DataFrame
joined_df.show(100)

In [11]:

# Register the joined DataFrame as a temporary view
joined_df.createOrReplaceTempView("joined_df")

# Convert the height from feet and inches to centimeters using SQL
converted_height_df = spark.sql("""
    SELECT *,
           (CAST(SPLIT(Height, "'")[0] AS INT) * 30.48 +
            CAST(REGEXP_REPLACE(SPLIT(SUBSTR(Height, INSTR(Height, "'") + 1), '"')[0], '\\\\"', '') AS INT) * 2.54) AS Height_CM,
           CAST(REGEXP_REPLACE(Reach, '["]', '') AS INT) AS Reach_Conv,
           CAST(REGEXP_REPLACE(Weight, ' lbs.', '') AS INT) * 0.453592 AS Weight_KG
    FROM joined_df
""")

# Show the DataFrame with the converted height
# remove the Height column
converted_height_df = converted_height_df.drop("Height", "Weight", "Reach")
converted_height_df = converted_height_df.drop("fighter_name")
converted_height_df.show(1000)

In [12]:
from sklearn.impute import KNNImputer
# convert converted_height_df to pandas dataframe
converted_height_df_pd = converted_height_df.toPandas()
numerical_cols = ['AVG_KD', 'AVG_SUB_ATT', 'AVG_TD_Percentage', 'AVG_Significant_Strike_Percentage', 'AVG_TOTAL_STR', 'AVG_ROUND', 'AVG_CTRL_SECONDS', 'total_wins', 'total_losses', 'total_draws', 'Win_Percentage', 'Height_CM', 'Reach_Conv', 'Weight_KG']

imputer = KNNImputer(n_neighbors=5)
converted_height_df_pd_imputed = imputer.fit_transform(converted_height_df_pd[numerical_cols])
converted_height_df_pd_imputed

In [13]:
# put the imputed data back into the original spark dataframe converted_height_df
converted_height_df_pd[numerical_cols] = converted_height_df_pd_imputed
converted_height_df = spark.createDataFrame(converted_height_df_pd)
converted_height_df.show()

In [14]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="FIGHTER", outputCol="fighter_index")
converted_height_df = indexer.fit(converted_height_df).transform(converted_height_df)
converted_height_df.show(1000)

In [14]:
converted_height_df.write.csv('../processed_data/fighter_details.csv', header=True, mode='overwrite')

### Joining the result with the fighters

In [15]:
fight_results = spark.read.csv("../processed_data/fight_results.csv", header=True)
fight_results.show()

In [16]:
fighter_details = spark.read.csv("../processed_data/fighter_details.csv", header=True, inferSchema=True)
fighter_details.show()

In [17]:
fight_results.createOrReplaceTempView("match_results")
fighter_details.createOrReplaceTempView("fighters")

total_df = spark.sql("""SELECT
    mr.fighter1,
    fr1.fighter_index as fighter1_index,
    fr1.Height_CM AS height1,
    fr1.Weight_KG AS weight_kg1,
    fr1.AVG_KD AS avg_kd1,
    fr1.AVG_SUB_ATT AS avg_sub_att1,
    fr1.AVG_TD_Percentage AS avg_td_percentage1,
    fr1.AVG_Significant_Strike_Percentage AS avg_significant_strike_percentage1,
    fr1.AVG_TOTAL_STR AS avg_total_str1,
    fr1.AVG_ROUND AS avg_round1,
    fr1.AVG_CTRL_SECONDS AS avg_ctrl_seconds1,
    fr1.total_wins AS total_wins1,
    fr1.total_losses AS total_losses1,
    fr1.total_draws AS total_draws1,
    fr1.Win_Percentage AS win_percentage1,
    fr1.Reach_Conv AS reach_conv1,
    mr.fighter2,
    fr2.fighter_index as fighter2_index,
    fr2.Height_CM AS height2,
    fr2.Weight_KG AS weight_kg2,
    fr2.AVG_KD AS avg_kd2,
    fr2.AVG_SUB_ATT AS avg_sub_att2,
    fr2.AVG_TD_Percentage AS avg_td_percentage2,
    fr2.AVG_Significant_Strike_Percentage AS avg_significant_strike_percentage2,
    fr2.AVG_TOTAL_STR AS avg_total_str2,
    fr2.AVG_ROUND AS avg_round2,
    fr2.AVG_CTRL_SECONDS AS avg_ctrl_seconds2,
    fr2.total_wins AS total_wins2,
    fr2.total_losses AS total_losses2,
    fr2.total_draws AS total_draws2,
    fr2.Win_Percentage AS win_percentage2,
    fr2.Reach_Conv AS reach_conv2,
    mr.outcome
FROM
    match_results mr
JOIN
    fighters fr1 ON mr.fighter1 = fr1.fighter
JOIN
    fighters fr2 ON mr.fighter2 = fr2.fighter;
""")


total_df.write.csv("../processed_data/fight_total.csv", header=True, mode="overwrite")


total_df.show()

### upcoming events and fights

In [18]:
upcoming_events_df = spark.read.csv("../scraped_data/upcoming_event_details.csv", header=True)

upcoming_events_df.show()


In [19]:
upcoming_fights_df = spark.read.csv("../scraped_data/upcoming_fight_details.csv", header=True)

upcoming_fights_df.show()

now we will combine the data from the upcoming fights and the fighters and their event details, so we want fight, fighter1, fighter2, event details. We get the fighters from the bout

In [20]:
upcoming_events_df.createOrReplaceTempView("upcoming_events")
upcoming_fights_df.createOrReplaceTempView("upcoming_fights")

upcoming_events_fights_df = spark.sql(
    """
    SELECT
    uf.event,
    ue.date AS event_date,
    ue.location AS event_location,
    SPLIT(uf.bout, ' vs. ')[0] AS fighter1,
    SPLIT(uf.bout, ' vs. ')[1] AS fighter2
FROM
    upcoming_fights uf
JOIN
    upcoming_events ue
ON
    uf.event = ue.event

    """
)

upcoming_events_fights_df.show()
upcoming_events_fights_df.write.csv("../processed_data/upcoming_events_fights.csv", header=True, mode="overwrite")

In [21]:
upcoming_events_fights_df.show()

In [22]:
# upcomingevents and fightresults
upcoming_events_fights_df.createOrReplaceTempView("upcomingevents")
df.show()

In [23]:

# combine both spark dataframes on column fighter1 and fighter2
combined = spark.sql(""
                     "SELECT ue.event "
                     "FROM upcomingevents ue "
                     "JOIN fight_results fr "
                     "ON fr.fighter1 == ue.fighter1 "
                     "AND fr.fighter2 == ue.fighter2")
combined.show()

In [24]:
# upcoming_events = spark.read.csv("processed_data/upcoming_events.csv", header=True)
# 
# #change event_date from June 22, 2024 to 2024-06-22
# 
# upcoming_events = upcoming_events.withColumn("event_date", expr("date_format(to_date(event_date, 'MMMM dd, yyyy'), 'yyyy-MM-dd')"))
# 
# upcoming_events.show()
# 
# upcoming_events.toPandas().to_csv("processed_data/upcoming_events.csv", index=False, sep=";")
# 


In [25]:
fights = spark.read.csv("../processed_data/final_table_fights.csv", header=True)



fights = fights.toPandas()

fights.to_csv("../processed_data/final_table_fights.csv", index=False, sep=";")

fights.head()



In [26]:
fighter_details = spark.read.csv("../processed_data/fighter_details.csv", header=True)

fighter_details = fighter_details.toPandas()

fighter_details.to_csv("../processed_data/fighter_details_final.csv", index=False, sep=";")

fighter_details.head()