In [62]:
import json
from pyspark.sql import SparkSession, functions as f, DataFrame as DF
from pyspark.sql.types import *
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("BD_final_project") \
      .getOrCreate() 

In [2]:
df = spark.read.csv("./data/used_cars_data_combine.csv", header=True).drop("_c0", "Unnamed: 0")
df.createOrReplaceTempView("df")

In [3]:
null_cnt = df.select([f.count(f.when(f.isnull(c), c)).alias(c) for c in df.columns]).toPandas()
threshold = null_cnt > 0.4 * df.count()
df_drop = df.drop(*threshold.columns[threshold.iloc[0]])
with open("./data/useless.json", 'r') as useless:
    df_drop = df_drop.drop(*json.load(useless))
# df.createOrReplaceTempView("df")

In [4]:
def performance(engine: str):
    if engine is None:
        return [None, None]
    p = [int(i) for i in engine.replace(",","").split() if i.isdigit()]
    return p if len(p) == 2 else [None, None]

def hp_rpm(df: DF, col: str) -> DF:
    original = df.columns
    schema = StructType([StructField(f"{col}_hp", IntegerType(), True),
                         StructField(f"{col}_rpm", IntegerType(), True)])
    trans = f.udf(performance, schema)
    return df.withColumn("result", trans(df[col])).select(*(original + ["result.*"]))

for col in ["power", "torque"]:
    df_drop = hp_rpm(df_drop, col).drop(col)

df_drop = df_drop.drop("power_hp")

In [5]:
transmission = f.udf(lambda tr: "6" if tr in ["Automatic", "Continuously Variable Transmission", "Manual"] else tr, StringType())
df_extract = df_drop.withColumn("transmission_display", transmission(df["transmission_display"]))

In [6]:
df_extract = df_extract.withColumn("is_new", df["is_new"].cast(BooleanType()))

In [7]:
def extract_val(df: DF, cols: list) -> DF:
    for col in cols:
        df = df.withColumn(col, f.regexp_extract(col, r"(\d+(?:\.\d+)?)", 1))
    return df

df_extract = extract_val(df_extract, ["fuel_tank_volume", "maximum_seating", "transmission_display"])

In [8]:
df_extract = df_extract.dropna(subset="mileage")

In [9]:
get_mean = lambda df, col: df.select(f.mean(col)).collect()[0].__getattr__(f"avg({col})")
hp_mean = get_mean(df_extract, "horsepower")
engine_displacement_mean = get_mean(df_extract, "engine_displacement")

In [20]:
electric = {"engine_type": "Electric_Motor", 
            "transmission_display": float(6), 
            "horsepower": hp_mean, 
            "engine_displacement": engine_displacement_mean}

df_fill = df_extract
for key, item in electric.items():
   df_fill = df_fill.withColumn(key, f.when(f.col("fuel_type") == "Electric", item).otherwise(f.col(key)))

In [21]:
# df_extract.groupby(["body_type", "engine_displacement", "engine_type", "fuel_type", "horsepower", "make_name", "model_name",  "year"]).agg(f.mean("maximum_seating")).count()
def group_agg(df: DF, cols: dict[tuple: list], op: str) -> DF:
    func = f.mean if op == "avg" else f.max
    for key, item in cols.items():
        temp = df.groupby(item).agg(*[func(c) for c in key])
        df = df.join(temp, on=item)
        for k in key:
            avg = f"{op}({k})"
            df = df.withColumn(k, f.when(f.isnull(k), df[avg]).otherwise(df[k])).drop(avg)
    return df

fill_avg_cols = {
    ("maximum_seating",):
        ["make_name", "model_name", "body_type"],
    ("fuel_tank_volume",):
        ["make_name", "model_name",  "year"],
    ("horsepower", "engine_displacement"):  
        ["engine_type", "fuel_type", "make_name", "model_name",  "year"],
    ("power_rpm", "torque_hp", "torque_rpm"):
        ["engine_type", "engine_displacement", "horsepower", "fuel_type", "make_name"],
    ("city_fuel_economy", "highway_fuel_economy"):
        ["body_type", "engine_displacement", "engine_type", "fuel_type", "horsepower", "make_name", "model_name",  "year"]
}

fill_max_cols = {
    ("body_type", "transmission", "fuel_type", "engine_type", "wheel_system"):
        ['make_name','model_name']
}

In [22]:
df_fill = group_agg(df_fill, fill_avg_cols, "avg")
df_fill = group_agg(df_fill, fill_max_cols, "max")

In [67]:
df_final = df_fill.dropna()

In [68]:
convert = {
    ByteType:   ["city_fuel_economy", "highway_fuel_economy", "fuel_tank_volume", "maximum_seating", "transmission_display", "is_new"],
    ShortType:  ["engine_displacement", "horsepower", "power_rpm", "torque_hp", "torque_rpm", "year"],
    IntegerType:["mileage", "price"]
}

for t, cols in convert.items():
    for col in cols:
        df_final = df_final.withColumn(col, df_final[col].cast(t()))