In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, LinearSVC, DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.functions import vector_to_array
import seaborn as sns
import matplotlib.pyplot as plt
import time
from sklearn.metrics import roc_curve, auc

In [None]:
spark = SparkSession.builder \
    .appName("team_comp_and_analysis") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

In [None]:
#df = spark.read.csv("*.csv", header=True, inferSchema=True)

In [None]:
# null_rows_count = df.filter(
#     F.expr(" OR ".join([f"{col} IS NULL" for col in df.columns]))
# ).count()

# players_per_team_round = (
#     df.groupBy("matchid", "roundnumber", "team")
#       .agg(F.count("*").alias("player_count"))
# )

# matchids_over_5 = (
#     players_per_team_round.filter(F.col("player_count") > 5)
#                           .select("matchid")
#                           .distinct()
# )

# df_filtered = df.join(matchids_over_5, on="matchid", how="left_anti")

# first_round_counts = (
#     df_filtered.filter(F.col("roundnumber") == 1)
#                .groupBy("matchid", "team")
#                .agg(F.count("*").alias("player_count"))
# )

# match_teams = df_filtered.select("matchid", "team").distinct()

# missing_in_first_round = (
#     match_teams.join(first_round_counts, on=["matchid", "team"], how="left_anti")
#                 .select("matchid").distinct()
# )

# df_clear = df_filtered.join(missing_in_first_round, on="matchid", how="left_anti")

In [None]:
#df_selected = df_clear.select('gamemode', 'mapname', 'operator', 'skillrank', 'haswon', 'matchid', 'roundnumber', 'role', 'clearancelevel')

In [None]:
df_selected = spark.read.parquet("df_selected")

In [None]:
def analysis(model_name, train_data, test_data):
    label_column = "haswon"
    indexers = [StringIndexer(inputCol="gamemode", outputCol="gamemode_idx", handleInvalid="keep"),
                StringIndexer(inputCol="mapname", outputCol="mapname_idx", handleInvalid="keep"),
                StringIndexer(inputCol="operator", outputCol="operator_idx", handleInvalid="keep"),
                StringIndexer(inputCol="role", outputCol="role_idx", handleInvalid="keep"),
                StringIndexer(inputCol="skillrank", outputCol="skillrank_idx", handleInvalid="keep"),]
    encoders = [OneHotEncoder(inputCol="gamemode_idx", outputCol="gamemode_vec"),
                OneHotEncoder(inputCol="mapname_idx", outputCol="mapname_vec"),
                OneHotEncoder(inputCol="operator_idx", outputCol="operator_vec"),
                OneHotEncoder(inputCol="role_idx", outputCol="role_vec"),
                OneHotEncoder(inputCol="skillrank_idx", outputCol="skillrank_vec"),]
    feature_cols = ["gamemode_vec", "mapname_vec", "operator_vec", "role_vec", "skillrank_vec", "clearancelevel"]
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    if model_name == "RandomForest":
        classifier = RandomForestClassifier(labelCol=label_column, featuresCol="features")
    elif model_name == "LogisticRegression":
        classifier = LogisticRegression(labelCol=label_column, featuresCol="features")
    elif model_name == "LinearSVC":
        classifier = LinearSVC(labelCol=label_column, featuresCol="features")
    elif model_name == "DecisionTree":
        classifier = DecisionTreeClassifier(labelCol=label_column, featuresCol="features")
    pipeline = Pipeline(stages=indexers + encoders + [assembler, classifier])
    model = pipeline.fit(train_data)
    predictions = model.transform(test_data)
    evaluator = BinaryClassificationEvaluator(labelCol=label_column)
    auc = evaluator.evaluate(predictions)
    return predictions, auc
    
def metrics(predictions):
    label_column = "haswon"
    evaluator = MulticlassClassificationEvaluator(
    labelCol=label_column , predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)

    conf_matrix = predictions.groupBy(label_column, "prediction").count()

    conf_matrix = conf_matrix.toPandas()
    conf_matrix_pivot = conf_matrix.pivot(index=label_column, columns='prediction', values='count')

    tp = conf_matrix_pivot.loc[1, 1]
    fp = conf_matrix_pivot.loc[0, 1]
    tn = conf_matrix_pivot.loc[0, 0]
    fn = conf_matrix_pivot.loc[1, 0]

    recall = tp / (tp + fn)
    precision = tp / (tp + fp)
    fpr = fp / (fp + tn)
    tnr = tn / (tn + fp)

    print("Recall (Sensitivity): {:.4f}".format(recall))
    print("Positive Predictive Value (Precision): {:.4f}".format(precision))
    print("False Positive Rate (1 - Specificity): {:.4f}".format(fpr))
    print("True Negative Rate (Specificity): {:.4f}".format(tnr))
    print(f"Accuracy: {accuracy}")

    plt.figure(figsize=(8, 6))
    sns.heatmap(conf_matrix_pivot, annot=True, fmt='d', cmap='Blues',cbar= False)
    plt.title("Confusion Matrix Heatmap")
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.show()

In [None]:
train, test = df_selected.randomSplit([0.8, 0.2], seed=42)

In [None]:
predictions, auc = analysis("RandomForest", train, test)
metrics(predictions)
predictions = predictions.withColumn("prob_pos", vector_to_array("probability")[1])

pdf = predictions.select("haswon", "prob_pos").toPandas()

fpr, tpr, thresholds = roc_curve(pdf['haswon'], pdf['prob_pos'])
roc_auc = auc(fpr, tpr)
plt.figure(figsize=(8,6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0,1], [0,1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0,1.0])
plt.ylim([0.0,1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.grid(True)
plt.show()

In [None]:
predictions, auc = analysis("LogisticRegression", train, test)
metrics(predictions)
predictions = predictions.withColumn("prob_pos", vector_to_array("probability")[1])

pdf = predictions.select("haswon", "prob_pos").toPandas()

fpr, tpr, thresholds = roc_curve(pdf['haswon'], pdf['prob_pos'])
roc_auc = auc(fpr, tpr)
plt.figure(figsize=(8,6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0,1], [0,1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0,1.0])
plt.ylim([0.0,1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.grid(True)
plt.show()

In [None]:
predictions, auc = analysis("LinearSVC", train, test)
metrics(predictions)
predictions = predictions.withColumn("prob_pos", vector_to_array("probability")[1])

pdf = predictions.select("haswon", "prob_pos").toPandas()

fpr, tpr, thresholds = roc_curve(pdf['haswon'], pdf['prob_pos'])
roc_auc = auc(fpr, tpr)
plt.figure(figsize=(8,6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0,1], [0,1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0,1.0])
plt.ylim([0.0,1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.grid(True)
plt.show()

In [None]:
predictions, auc = analysis("DecisionTree", train, test)
metrics(predictions)
predictions = predictions.withColumn("prob_pos", vector_to_array("probability")[1])

pdf = predictions.select("haswon", "prob_pos").toPandas()

fpr, tpr, thresholds = roc_curve(pdf['haswon'], pdf['prob_pos'])
roc_auc = auc(fpr, tpr)
plt.figure(figsize=(8,6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0,1], [0,1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0,1.0])
plt.ylim([0.0,1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.grid(True)
plt.show()