In [1]:
!pip install pyspark findspark


Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Collecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m497.7 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: py4j, findspark
Successfully installed findspark-2.0.1 py4j-0.10.9.7


In [3]:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import when, col,sqrt, cbrt,log, expr,sum
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer

In [5]:

spark = SparkSession.builder.appName("MusicClassifier").master("spark://spark-master:7077").getOrCreate()



In [4]:
df = spark.read.csv("hdfs://namenode:9000/data/train.csv", header=True, inferSchema=True)
df.show(5)


+--------------------+--------------------+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+------------------+--------------+-----+
|         Artist Name|          Track Name|Popularity|danceability|energy| key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|duration_in min/ms|time_signature|Class|
+--------------------+--------------------+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+------------------+--------------+-----+
|          Bruno Mars|That's What I Lik...|      60.0|       0.854| 0.564| 1.0|  -4.964| 1.0|     0.0485|      0.0171|            null|  0.0849|  0.899|134.071|          234596.0|           4.0|  5.0|
|              Boston|        Hitch a Ride|      54.0|       0.382| 0.814| 3.0|   -7.23| 1.0|     0.0406|      0.0011|         0.00401|   0.101|  0.569|116.454|          251733.0|           4.0| 1

In [1]:
df.cache()

NameError: name 'df' is not defined

In [5]:
new_columns = [col.replace(" ", "_").lower() for col in df.columns]
df = df.toDF(*new_columns)

In [6]:
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

+-----------+----------+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-----+------------------+--------------+-----+
|artist_name|track_name|popularity|danceability|energy| key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo|duration_in_min/ms|time_signature|class|
+-----------+----------+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-----+------------------+--------------+-----+
|          0|         0|       428|           0|     0|2014|       0|   0|          0|           0|            4377|       0|      0|    0|                 0|             0|    0|
+-----------+----------+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-----+------------------+--------------+-----+



In [7]:
df = df.withColumn(
    "duration_in_min/ms",
    when(col("duration_in_min/ms") < 30, col("duration_in_min/ms") * 60000)
    .otherwise(col("duration_in_min/ms"))
    )
columns_to_check=[col for col in df.columns if col != "class"]
df = df.dropDuplicates(subset=columns_to_check)
df = df.withColumn("popularity", col("popularity").cast("float"))
popularity_median = df.approxQuantile("popularity", [0.5], 0.001)[0]
instrumentalness_median = df.approxQuantile("instrumentalness", [0.5], 0.001)[0]
df = df.fillna({"popularity": popularity_median,"instrumentalness": instrumentalness_median})
df = df.fillna({"key": -1})

In [8]:
columns = ["artist_name", "track_name"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in columns]
pipeline = Pipeline(stages=indexers)
df= pipeline.fit(df).transform(df)

In [9]:
epsilon = 1e-6

# Biến đổi các cột để giảm skew
df =df.withColumn("duration_in_ms_trans", sqrt(col("duration_in_min/ms")))
df =df.withColumn("loudness_trans", cbrt(col("loudness")))
df =df.withColumn("speechiness_trans", expr(f"1 / (speechiness + {epsilon})"))
df =df.withColumn("acousticness_trans", cbrt(col("acousticness")))
df =df.withColumn("instrumentalness_trans", log(col("instrumentalness") + epsilon))
df =df.withColumn("liveness_trans", log(col("liveness") + epsilon))
df =df.withColumn("tempo_trans", cbrt(col("tempo")))

In [12]:
df = df.withColumn("danceability", col("danceability").cast("float"))
df.printSchema()


root
 |-- artist_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: float (nullable = false)
 |-- danceability: float (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: double (nullable = false)
 |-- loudness: double (nullable = true)
 |-- mode: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = false)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_in_min/ms: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- class: double (nullable = true)
 |-- artist_name_index: double (nullable = false)
 |-- track_name_index: double (nullable = false)
 |-- duration_in_ms_trans: double (nullable = true)
 |-- loudness_trans: double (nullable = true)
 |-- speechiness_trans: double (nullable = true)
 |-- acousticness_trans: double (nullable = 

In [13]:

df.groupBy("class").count().orderBy("count").show()



+-------+-----+
|  class|count|
+-------+-----+
|117.017|    1|
|    4.0|  387|
|    3.0|  402|
|    7.0|  574|
|    0.0|  625|
|    1.0|  951|
|    2.0| 1220|
|    5.0| 1409|
|    8.0| 1738|
|    6.0| 2152|
|    9.0| 2392|
|   10.0| 4468|
+-------+-----+



In [14]:
df_class= df.filter(df["class"]!=117.017)
df_final.groupBy("class").count().orderBy("count").show()

+-----+-----+
|class|count|
+-----+-----+
|  4.0|  387|
|  3.0|  402|
|  7.0|  574|
|  0.0|  625|
|  1.0|  951|
|  2.0| 1220|
|  5.0| 1409|
|  8.0| 1738|
|  6.0| 2152|
|  9.0| 2392|
| 10.0| 4468|
+-----+-----+



In [17]:
df_final=df_final.drop("duration_in_min/ms","loudness","speechiness","acousticness","instrumentalness","liveness","tempo")
df_final.printSchema()

root
 |-- artist_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: float (nullable = false)
 |-- danceability: float (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: double (nullable = false)
 |-- mode: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- class: double (nullable = true)
 |-- artist_name_index: double (nullable = false)
 |-- track_name_index: double (nullable = false)
 |-- duration_in_ms_trans: double (nullable = true)
 |-- loudness_trans: double (nullable = true)
 |-- speechiness_trans: double (nullable = true)
 |-- acousticness_trans: double (nullable = true)
 |-- instrumentalness_trans: double (nullable = true)
 |-- liveness_trans: double (nullable = true)
 |-- tempo_trans: double (nullable = true)



In [18]:
df_final.coalesce(1).write.mode("overwrite").option("header", "true").csv("hdfs://namenode:9000/data/data_clean_train.csv")

In [21]:
df_final.printSchema()

root
 |-- artist_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: float (nullable = false)
 |-- danceability: float (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: double (nullable = false)
 |-- mode: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- class: double (nullable = true)
 |-- artist_name_index: double (nullable = false)
 |-- track_name_index: double (nullable = false)
 |-- duration_in_ms_trans: double (nullable = true)
 |-- loudness_trans: double (nullable = true)
 |-- speechiness_trans: double (nullable = true)
 |-- acousticness_trans: double (nullable = true)
 |-- instrumentalness_trans: double (nullable = true)
 |-- liveness_trans: double (nullable = true)
 |-- tempo_trans: double (nullable = true)



In [24]:
df_final = df_final.withColumn("energy", col("energy").cast("float"))
df_train=df_final.drop("artist_name","track_name")
df_train.printSchema()

root
 |-- popularity: float (nullable = false)
 |-- danceability: float (nullable = true)
 |-- energy: float (nullable = true)
 |-- key: double (nullable = false)
 |-- mode: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- class: double (nullable = true)
 |-- artist_name_index: double (nullable = false)
 |-- track_name_index: double (nullable = false)
 |-- duration_in_ms_trans: double (nullable = true)
 |-- loudness_trans: double (nullable = true)
 |-- speechiness_trans: double (nullable = true)
 |-- acousticness_trans: double (nullable = true)
 |-- instrumentalness_trans: double (nullable = true)
 |-- liveness_trans: double (nullable = true)
 |-- tempo_trans: double (nullable = true)



In [25]:
from pyspark.ml.feature import VectorAssembler, StandardScaler 
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col
feature_cols = [col for col in df_train.columns if col not in "class"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_unscaled")
df_vec = assembler.transform(df_train)

In [26]:
scaler = StandardScaler(inputCol="features_unscaled",outputCol="features",withStd=True,withMean=True)
scaler_model=scaler.fit(df_vec)
df_scaled = scaler_model.transform(df_vec)

In [27]:
df_scaled.select("features").show(5)


+--------------------+
|            features|
+--------------------+
|[-0.1173827908468...|
|[0.51996184121788...|
|[0.34614057792750...|
|[-0.1753232119436...|
|[-2.3191187925250...|
+--------------------+
only showing top 5 rows



In [11]:
df_scaled.groupBy("class").count().orderBy("count").show()
# Kiểm tra các giá trị duy nhất trong cột class
df_scaled.select("class").distinct().show()
df_scaled.printSchema()

+-----+-----+
|class|count|
+-----+-----+
|  4.0|  387|
|  3.0|  402|
|  7.0|  574|
|  0.0|  625|
|  1.0|  951|
|  2.0| 1220|
|  5.0| 1409|
|  8.0| 1738|
|  6.0| 2152|
|  9.0| 2392|
| 10.0| 4468|
+-----+-----+

+-----+
|class|
+-----+
|  8.0|
|  0.0|
|  7.0|
|  1.0|
|  4.0|
|  3.0|
|  2.0|
| 10.0|
|  6.0|
|  5.0|
|  9.0|
+-----+

root
 |-- popularity: double (nullable = true)
 |-- key: double (nullable = true)
 |-- mode: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- class: double (nullable = true)
 |-- duration_in_ms_trans: double (nullable = true)
 |-- loudness_trans: double (nullable = true)
 |-- speechiness_trans: double (nullable = true)
 |-- acousticness_trans: double (nullable = true)
 |-- instrumentalness_trans: double (nullable = true)
 |-- liveness_trans: double (nullable = true)
 |-- tempo_trans: double (nullable = true)
 |-- danceability_index: double (nullable = false)
 |-- features_unscaled: vector (nulla

In [28]:
(training_data, testing_data) = df_scaled.randomSplit([0.7, 0.3], seed=42)

In [29]:

training_data.cache()

DataFrame[popularity: float, danceability: float, energy: float, key: double, mode: double, valence: double, time_signature: double, class: double, artist_name_index: double, track_name_index: double, duration_in_ms_trans: double, loudness_trans: double, speechiness_trans: double, acousticness_trans: double, instrumentalness_trans: double, liveness_trans: double, tempo_trans: double, features_unscaled: vector, features: vector]

In [31]:
training_data.printSchema()

root
 |-- popularity: float (nullable = false)
 |-- danceability: float (nullable = true)
 |-- energy: float (nullable = true)
 |-- key: double (nullable = false)
 |-- mode: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- class: double (nullable = true)
 |-- artist_name_index: double (nullable = false)
 |-- track_name_index: double (nullable = false)
 |-- duration_in_ms_trans: double (nullable = true)
 |-- loudness_trans: double (nullable = true)
 |-- speechiness_trans: double (nullable = true)
 |-- acousticness_trans: double (nullable = true)
 |-- instrumentalness_trans: double (nullable = true)
 |-- liveness_trans: double (nullable = true)
 |-- tempo_trans: double (nullable = true)
 |-- features_unscaled: vector (nullable = true)
 |-- features: vector (nullable = true)



 randomForestClassifier 


In [None]:
 # train
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    labelCol="class",
    featuresCol="features",
    numTrees=80,
    maxDepth=10,
    seed=42
)
rf_model = rf.fit(training_data)
rf_predictions = rf_model.transform(testing_data)

In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
rf_evaluator = MulticlassClassificationEvaluator(
    labelCol="class", 
    predictionCol="prediction", 
    metricName="accuracy"
)
rf_accuracy = rf_evaluator.evaluate(rf_predictions)
print(f"Random Forest Accuracy = {rf_accuracy}")

Random Forest Accuracy = 0.460559265442404


In [16]:
rf_f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="class", 
    predictionCol="prediction", 
    metricName="f1"
)
rf_f1 = rf_f1_evaluator.evaluate(rf_predictions)
print(f"Random Forest F1 Score = {rf_f1}")

Random Forest F1 Score = 0.4098084333302436


In [17]:
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, LinearSVC, NaiveBayes, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [18]:
# rf1 = RandomForestClassifier(labelCol="class", featuresCol="features")
# paramGridrf = ParamGridBuilder() \
#         .addGrid(rf1.numTrees, [50, 100, 200]) \
#         .addGrid(rf1.maxDepth, [5, 10, 15]) \
#         .build()
# cv1 = CrossValidator(
#         estimator=rf1,
#         estimatorParamMaps=paramGridrf,
#         evaluator=MulticlassClassificationEvaluator(labelCol="class", predictionCol="prediction", metricName="accuracy"),
#         numFolds=3
#     )

# cv_model = cv1.fit(training_data)
# best_rf_model = cv_model.bestModel
# best_predictions = best_rf_model.transform(testing_data)
# best_accuracy = rf_evaluator.evaluate(best_predictions)
# print(f"Random Forest tối ưu - Accuracy: {best_accuracy:.4f}")