In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F

In [2]:
def show(df, limit = 5):
    return df.toPandas().head(limit)

In [3]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Events

In [10]:
events = spark.read.csv(r"C:\Development\ultimateNakMuay\data\raw\wiki_events_onefc.csv", header=True)

In [13]:
events = events.withColumn(
    "date",
    F.when(
        F.to_date(F.col("date"), "MMMM d, yyyy").isNotNull(), F.to_date(F.col("date"), "MMMM d, yyyy")
        ).otherwise(F.to_date(F.col("date"), "d MMMM yyyy"))
        )\
    .withColumn("attendance", F.regexp_replace(F.col("attendance"), ",", ""))\
    .withColumn("attendance", F.col("attendance").cast(T.IntegerType()))\
    .withColumn("location", F.when(F.col("location") == "—", F.lit(None)).otherwise(F.col("location")))

In [16]:
events = events.withColumn("venue", F.when(F.col("venue") == "—", F.lit(None)).otherwise(F.col("venue")))

In [19]:
events = events.withColumn("location", F.split(F.col("location"), ","))\
    .withColumn("city", F.when(F.size(F.col("location")) == 3, F.element_at(F.col("location"), 1)))\
    .withColumn("state", F.when(
        F.size(F.col("location")) == 3, F.element_at(F.col("location"), 2)
        ).otherwise(F.element_at(F.col("location"), 1))
        )\
    .withColumn("country", F.element_at(F.col("location"), -1))\
    .withColumn("country", F.regexp_replace(F.col("country"), "[^a-zA-Z0-9 ]", ""))\
    .drop("location")

In [21]:
events = events.withColumnRenamed("#", "event_num")
for _ in events.columns:
    events = events.withColumnRenamed(_, _.lower())

In [23]:
cols = ["event_num", "event", "date", "venue", "city", "state", "country", "attendance"]

events = events.select(*cols)

# Results

In [4]:
results = spark.read.csv(r"C:\Development\ultimateNakMuay\data\raw\wiki_results_onefc.csv", header=True)

In [8]:
results = results.withColumn("event_name", F.when(
    F.col("event").isNotNull(), F.col("event")
).otherwise(F.col("event_name"))
)

In [10]:
results = results.drop("event")

In [12]:
def remove_poisoned_rows(df):
    hash_rows = lambda col_list: F.sha2(F.concat_ws("|", *col_list), 256)
    test_cols = [F.col(_) for _ in df.columns[:7]]

    incorrect_data = df.filter(
        ~(F.col("time").contains(":") | F.col("time").contains("."))
        ).select(*test_cols)\
        .distinct()
    
    incorrect_data = incorrect_data.withColumn("poison", hash_rows(incorrect_data.columns))

    df = df.withColumn("poison", hash_rows(test_cols))\
        .join(incorrect_data, ["poison"], how="left_anti")\
        .drop("poison")
    
    return df

In [15]:
results = remove_poisoned_rows(results)

In [21]:
results = results.withColumn("time", F.regexp_replace(F.col("time"), "\\.", ":"))\
    .withColumn("time_parts", F.split(F.col("time"), ":"))\
    .withColumn("time", F.element_at(F.col("time_parts"), 1)*60 + F.element_at(F.col("time_parts"), 2))\
    .drop("time_parts")

In [23]:
results = results.withColumn("time", F.col("time").cast(T.DoubleType()))\
    .withColumn("round", F.col("round").cast(T.IntegerType()))

In [24]:
show(results)

Unnamed: 0,weight_class,winner,loser,method,round,time,notes,fight_card,event_name
0,Strawweight 57 kg,Yosuke Saruta,Joshua Pacio (c),Decision (Split),5.0,300.0,For the ONE Strawweight Championship,Main card,ONE Championship: Eternal Glory
1,Flyweight 61 kg,Mongkolpetch Petchyindee,Alexi Serepisos,Decision (Unanimous),3.0,180.0,Muay Thai,Main card,ONE Championship: Eternal Glory
2,Featherweight 70 kg,Christian Lee,Edward Kelly,TKO (Punches),1.0,173.0,,Main card,ONE Championship: Eternal Glory
3,Atomweight 52 kg,Puja Tomar,Priscilla Gaol,Decision (Split),3.0,300.0,,Main card,ONE Championship: Eternal Glory
4,Flyweight 61 kg,Jonathan Haggerty,Joseph Lasiri,Decision (Unanimous),3.0,180.0,Muay Thai,Main card,ONE Championship: Eternal Glory
