In [27]:
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, MinMaxScaler
from pyspark.sql.functions import col, when, expr,avg,udf, lit, min as spark_min, max as spark_max, mean
from pyspark.sql import SparkSession
from pyspark.ml.clustering import GaussianMixture
import numpy as np
from pyspark.sql.types import IntegerType
from pyspark.ml.linalg import Vectors, VectorUDT

In [2]:
ADDICTION_CUTOFF = 1280
RANDOM_STATE = 42
TARGET_VARIABLE = "EngagementLevel"

Tải dữ liệu

In [3]:

cvModel_loaded = CrossValidatorModel.load("c:\\Python\\BTLBD\\models\\cv_pipeline_model")
spark = SparkSession.builder.appName("GamingBehavior").getOrCreate()
test_df = spark.read.csv("data/train/*", header=True, inferSchema=True)
test_processed = test_df.select(
    col("PlayerID"),
    col("Age"),
    col("Gender"),
    col("Location"),
    col("GameGenre"),
    col("InGamePurchases"),
    col("SessionsPerWeek"),
    col("AvgSessionDurationMinutes"),
    col("PlayerLevel"),
    col("AchievementsUnlocked"),
    when(col("GameDifficulty") == "Hard", "true").otherwise("false").alias("IsStressed"),
    when(col("GameDifficulty") == "Easy", 1)
        .when(col("GameDifficulty") == "Medium", 4)
        .when(col("GameDifficulty") == "Hard", 8).alias("GameDifficultyQuantified"),
        
)
test_processed = test_processed.withColumn(
    "isAddicted",
    when((col("AvgSessionDurationMinutes") * col("SessionsPerWeek")) > ADDICTION_CUTOFF, 1).otherwise(0)
)

Tải model và test

In [4]:
predictions = cvModel_loaded.transform(test_processed)
results = predictions.select(
    col("PlayerID"),
    col("prediction").cast("int").alias("PredictedEngagement")
)

results.coalesce(1).write.mode("overwrite").option("header", "true").csv("data/results")

print("Kết quả lưu ở data/results/")

Kết quả lưu ở data/results/


In [5]:
results2 = predictions.select(
    col("PlayerID"),
    col("Age"),
    col("Gender"),
    col("Location"),
    col("GameGenre"),
    col("InGamePurchases"),
    col("SessionsPerWeek"),
    col("AvgSessionDurationMinutes"),
    col("PlayerLevel"),
    col("AchievementsUnlocked"),
    col("IsStressed"),
    col("GameDifficultyQuantified"),
    col("isAddicted"),
    col("prediction").cast("int").alias("PredictedEngagement")

)

In [6]:
results2.head(10)

[Row(PlayerID=9000, Age=43, Gender='Male', Location='Other', GameGenre='Strategy', InGamePurchases=0, SessionsPerWeek=6, AvgSessionDurationMinutes=108, PlayerLevel=79, AchievementsUnlocked=25, IsStressed='false', GameDifficultyQuantified=4, isAddicted=0, PredictedEngagement=0),
 Row(PlayerID=9001, Age=29, Gender='Female', Location='USA', GameGenre='Strategy', InGamePurchases=0, SessionsPerWeek=5, AvgSessionDurationMinutes=144, PlayerLevel=11, AchievementsUnlocked=10, IsStressed='false', GameDifficultyQuantified=4, isAddicted=0, PredictedEngagement=0),
 Row(PlayerID=9002, Age=22, Gender='Female', Location='USA', GameGenre='Sports', InGamePurchases=0, SessionsPerWeek=16, AvgSessionDurationMinutes=142, PlayerLevel=35, AchievementsUnlocked=41, IsStressed='false', GameDifficultyQuantified=1, isAddicted=1, PredictedEngagement=1),
 Row(PlayerID=9003, Age=35, Gender='Male', Location='USA', GameGenre='Action', InGamePurchases=1, SessionsPerWeek=9, AvgSessionDurationMinutes=85, PlayerLevel=57, A

# Phân cụm

Tìm số cụm

In [7]:

clusteringCol = [
    "Age",
    "SessionsPerWeek",
    "AvgSessionDurationMinutes",
    "PlayerLevel",
    "AchievementsUnlocked",
    "GameDifficultyQuantified",
    "isAddicted",
    "PredictedEngagement"
]

df_high = results2.filter(col("PredictedEngagement") == 2)

assembler = VectorAssembler(
    inputCols=clusteringCol,
    outputCol="features"
)
df_high_vector = assembler.transform(df_high)

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withMean=True,
    withStd=True
)
scaler_model = scaler.fit(df_high_vector)
df_high_scaled = scaler_model.transform(df_high_vector)

first_row = df_high_scaled.select("scaled_features").first()
numFeatures = len(first_row["scaled_features"])
n_high = df_high_scaled.count()
results = []
for k in range(2, 10):
    gmm = GaussianMixture(k=k, featuresCol="scaled_features", predictionCol="cluster")
    model = gmm.fit(df_high_scaled)
    
    logL = model.summary.logLikelihood
    numParams = k * numFeatures * 2 + (k - 1) 
    aic = 2 * numParams - 2 * logL
    bic = numParams * np.log(n_high) - 2 * logL
    
    results.append((k, aic, bic))
    print(f"k={k} -> AIC={aic}, BIC={bic}")

best_k = min(results, key=lambda x: x[2])[0]
print("Số cụm tốt nhất theo BIC cho nhóm engagementLv=2:", best_k)



k=2 -> AIC=170612.3066162117, BIC=170847.57865243847
k=3 -> AIC=168203.59927907356, BIC=168560.07206123535
k=4 -> AIC=65260.80688569715, BIC=65738.48041379395
k=5 -> AIC=67981.07237551625, BIC=68579.94664954804
k=6 -> AIC=119169.85387701329, BIC=119889.9288969801
k=7 -> AIC=93416.51980368585, BIC=94257.79556958766
k=8 -> AIC=61058.342934509514, BIC=62020.81944634634
k=9 -> AIC=119136.61113561368, BIC=120220.2883933855
Số cụm tốt nhất theo BIC cho nhóm engagementLv=2: 8


In [13]:
gmm_final = GaussianMixture(k=best_k, featuresCol="scaled_features", predictionCol="cluster")
model_final = gmm_final.fit(df_high_scaled)
df_high_clustered = model_final.transform(df_high_scaled)

gaussians = model_final.gaussiansDF.collect()
means = [row['mean'] for row in gaussians]
sc = spark.sparkContext
broadcast_means = sc.broadcast(means)

def assign_nearest_cluster(vec):
    # Convert input vec to ml.Vector if needed
    vec_ml = Vectors.dense(vec.toArray()) if hasattr(vec, 'toArray') else vec
    dists = []
    for mean in broadcast_means.value:
        # Explicitly convert each mean to ml.Vector
        mean_ml = Vectors.dense(mean.toArray()) if hasattr(mean, 'toArray') else mean
        dists.append(Vectors.squared_distance(vec_ml, mean_ml))
    return dists.index(min(dists))

udf_assign = udf(assign_nearest_cluster, IntegerType())

df_normal = results2.filter(col("PredictedEngagement") != 2)

df_normal_vector = assembler.transform(df_normal)
df_normal_scaled = scaler_model.transform(df_normal_vector)

df_normal_clustered = df_normal_scaled.withColumn("cluster", udf_assign(col("scaled_features")))
df_clustered = df_high_clustered.unionByName(df_normal_clustered, allowMissingColumns=True)
df_clustered.select("PlayerID", "cluster").show()

+--------+-------+
|PlayerID|cluster|
+--------+-------+
|    9005|      5|
|    9006|      3|
|    9012|      0|
|    9013|      5|
|    9017|      4|
|    9019|      5|
|    9022|      4|
|    9029|      5|
|    9030|      5|
|    9032|      4|
|    9034|      4|
|    9047|      7|
|    9053|      5|
|    9064|      5|
|    9076|      3|
|    9077|      5|
|    9079|      0|
|    9080|      5|
|    9087|      6|
|    9091|      5|
+--------+-------+
only showing top 20 rows



In [20]:
df_clustered.show()

+--------+---+------+--------+----------+---------------+---------------+-------------------------+-----------+--------------------+----------+------------------------+----------+-------------------+--------------------+--------------------+--------------------+-------+
|PlayerID|Age|Gender|Location| GameGenre|InGamePurchases|SessionsPerWeek|AvgSessionDurationMinutes|PlayerLevel|AchievementsUnlocked|IsStressed|GameDifficultyQuantified|isAddicted|PredictedEngagement|            features|     scaled_features|         probability|cluster|
+--------+---+------+--------+----------+---------------+---------------+-------------------------+-----------+--------------------+----------+------------------------+----------+-------------------+--------------------+--------------------+--------------------+-------+
|    9005| 37|  Male|  Europe|       RPG|              0|              2|                       81|         74|                  22|     false|                       1|         0|        

In [22]:
df_fn=df_clustered.select(
    col("PlayerID"),
    col("Age"),
    col("Gender"),
    col("Location"),
    col("GameGenre"),
    col("PlayerLevel"),
    col("AchievementsUnlocked"),
    col("GameDifficultyQuantified"),
    col("cluster"),
    col("predictedEngagement")
    
).orderBy("PlayerID")

In [23]:
df_fn.show()

+--------+---+------+--------+----------+-----------+--------------------+------------------------+-------+-------------------+
|PlayerID|Age|Gender|Location| GameGenre|PlayerLevel|AchievementsUnlocked|GameDifficultyQuantified|cluster|predictedEngagement|
+--------+---+------+--------+----------+-----------+--------------------+------------------------+-------+-------------------+
|    9000| 43|  Male|   Other|  Strategy|         79|                  25|                       4|      4|                  0|
|    9001| 29|Female|     USA|  Strategy|         11|                  10|                       4|      4|                  0|
|    9002| 22|Female|     USA|    Sports|         35|                  41|                       1|      0|                  1|
|    9003| 35|  Male|     USA|    Action|         57|                  47|                       1|      0|                  0|
|    9004| 33|  Male|  Europe|    Action|         95|                  37|                       4|     

In [24]:
numeric_cols = [
    "PlayerLevel",
    "AchievementsUnlocked",
    "GameDifficultyQuantified",
]

min_max_dict = {}
for col_name in numeric_cols:
    min_val = df_fn.select(spark_min(col(col_name))).first()[0]
    max_val = df_fn.select(spark_max(col(col_name))).first()[0]
    min_max_dict[col_name] = (min_val, max_val)
for col_name in numeric_cols:
    min_val, max_val = min_max_dict[col_name]
    if min_val == max_val:
        df_fn = df_fn.withColumn(f"scaled_{col_name}", lit(5.0))
    else:
        df_fn = df_fn.withColumn(
            f"scaled_{col_name}",
            ((col(col_name) - lit(min_val)) / (lit(max_val) - lit(min_val))) * 10
        )

In [25]:
df_fn.show()

+--------+---+------+--------+----------+-----------+--------------------+------------------------+-------+-------------------+------------------+---------------------------+-------------------------------+
|PlayerID|Age|Gender|Location| GameGenre|PlayerLevel|AchievementsUnlocked|GameDifficultyQuantified|cluster|predictedEngagement|scaled_PlayerLevel|scaled_AchievementsUnlocked|scaled_GameDifficultyQuantified|
+--------+---+------+--------+----------+-----------+--------------------+------------------------+-------+-------------------+------------------+---------------------------+-------------------------------+
|    9000| 43|  Male|   Other|  Strategy|         79|                  25|                       4|      4|                  0| 7.959183673469388|         5.1020408163265305|              4.285714285714286|
|    9001| 29|Female|     USA|  Strategy|         11|                  10|                       4|      4|                  0|1.0204081632653061|         2.040816326530612

In [29]:
df_eng2 = df_fn.filter(col("predictedEngagement") == 2)

df_cluster_means = df_eng2.groupBy("cluster").agg(
    mean("scaled_PlayerLevel").alias("mean_scaled_PlayerLevel"),
    mean("scaled_AchievementsUnlocked").alias("mean_scaled_AchievementsUnlocked"),
    mean("scaled_GameDifficultyQuantified").alias("mean_scaled_GameDifficultyQuantified")
)

df_fn_with_means = df_fn.join(df_cluster_means, on="cluster", how="left")

df_fn_with_means.select(
    "PlayerID", "cluster", "predictedEngagement",
    "scaled_PlayerLevel", "mean_scaled_PlayerLevel",
    "scaled_AchievementsUnlocked", "mean_scaled_AchievementsUnlocked",
    "scaled_GameDifficultyQuantified", "mean_scaled_GameDifficultyQuantified"
).orderBy("PlayerID").show(10)

+--------+-------+-------------------+------------------+-----------------------+---------------------------+--------------------------------+-------------------------------+------------------------------------+
|PlayerID|cluster|predictedEngagement|scaled_PlayerLevel|mean_scaled_PlayerLevel|scaled_AchievementsUnlocked|mean_scaled_AchievementsUnlocked|scaled_GameDifficultyQuantified|mean_scaled_GameDifficultyQuantified|
+--------+-------+-------------------+------------------+-----------------------+---------------------------+--------------------------------+-------------------------------+------------------------------------+
|    9000|      4|                  0| 7.959183673469388|      4.969803305937764|         5.1020408163265305|               4.691845969156895|              4.285714285714286|                   6.664511958629693|
|    9001|      4|                  0|1.0204081632653061|      4.969803305937764|         2.0408163265306123|               4.691845969156895|          

In [30]:
output_path = "df_fn_with_means.csv"

df_fn_with_means.coalesce(1).write.option("header", True).mode("overwrite").csv(output_path)