## Intro to ML using Pyspark

#### Getting our data ready

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
        .appName("ML DEMO") \
        .config("spark.driver.memory", "2g") \
        .config("spark.executor.memory", "4g") \
        .getOrCreate()

25/08/04 08:09:35 WARN Utils: Your hostname, ardent resolves to a loopback address: 127.0.1.1; using 192.168.122.50 instead (on interface enp1s0)
25/08/04 08:09:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/08/04 08:09:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [1]:
import test

In [2]:
df = spark.read.parquet("/home/ardent/Data/transform/stage3/track_metadata")
df.printSchema()

                                                                                

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- danceability: float (nullable = true)
 |-- energy: float (nullable = true)
 |-- tempo: float (nullable = true)



In [3]:
df = df.dropna(subset=["popularity", "danceability", "energy", "tempo"])

In [4]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

feature_cols = ["danceability", "energy", "tempo"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="raw_features")
df = assembler.transform(df)

In [5]:
df.show(truncate=False)

+----------------------+-----------------------------------------------+----------+-----------+------------+------+-------+-----------------------------------------------------------+
|id                    |name                                           |popularity|duration_ms|danceability|energy|tempo  |raw_features                                               |
+----------------------+-----------------------------------------------+----------+-----------+------------+------+-------+-----------------------------------------------------------+
|00105Q1NbnHkf8R5eXXeXm|Es un Secreto                                  |44        |234067     |0.777       |0.832 |94.964 |[0.7770000100135803,0.8320000171661377,94.96399688720703]  |
|001GxQGaFwTjxM7tmKbMF3|La Falla Fue Tuya                              |30        |214493     |0.547       |0.731 |159.801|[0.546999990940094,0.7310000061988831,159.80099487304688]  |
|001LvKFwYbfKYPQF2Fiv77|Yeh Lo Main Haari Piya                         |34      

                                                                                

In [6]:

scaler = StandardScaler(inputCol="raw_features", outputCol="features")
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)
df = df.select("id", "name", "features", F.col("popularity").alias("label"))

                                                                                

In [7]:
df.show(truncate=False)

+----------------------+-----------------------------------------------+------------------------------------------------------------+-----+
|id                    |name                                           |features                                                    |label|
+----------------------+-----------------------------------------------+------------------------------------------------------------+-----+
|00105Q1NbnHkf8R5eXXeXm|Es un Secreto                                  |[0.021192901782353193,3.3105930406460367,3.1840039702302647]|44   |
|001GxQGaFwTjxM7tmKbMF3|La Falla Fue Tuya                              |[0.01491958421305411,2.9087061097391613,5.357893715518947]  |30   |
|001LvKFwYbfKYPQF2Fiv77|Yeh Lo Main Haari Piya                         |[0.015137786987395138,2.120848592547762,2.7721395250770353] |34   |
|001gx41rQo0bKh063TrC1I|Camino a Camagüey                              |[0.013719472205646581,0.927125176879668,4.333697566045485]  |22   |
|002CcxKpBE1tfKOy2CR

In [8]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

In [9]:
lr = LinearRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_df)
lr_predictions = lr_model.transform(test_df)

25/08/04 08:11:55 WARN Instrumentation: [019fc8ae] regParam is zero, which might cause numerical instability and overfitting.


[Stage 6:>                                                          (0 + 8) / 8]

25/08/04 08:11:57 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/08/04 08:11:57 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

25/08/04 08:11:58 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


                                                                                

In [10]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
lr_rmse = evaluator.evaluate(lr_predictions)
print(f"Linear Regression RMSE on test data: {lr_rmse}")
r2_evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
lr_r2 = r2_evaluator.evaluate(lr_predictions)
print(f"Linear Regression R-squared on test data: {lr_r2}")

                                                                                

Linear Regression RMSE on test data: 17.471510846385318
Linear Regression R-squared on test data: 0.0930268659166521


In [11]:

rf = RandomForestRegressor(featuresCol="features", labelCol="label", numTrees=20, maxDepth=10)
rf_model = rf.fit(train_df)
rf_predictions = rf_model.transform(test_df)

                                                                                

25/08/04 08:12:25 WARN DAGScheduler: Broadcasting large task binary with size 1143.0 KiB


                                                                                

25/08/04 08:12:28 WARN DAGScheduler: Broadcasting large task binary with size 1872.7 KiB




In [12]:
rf_rmse = evaluator.evaluate(rf_predictions)
print(f"Random Forest RMSE on test data: {rf_rmse}")
rf_r2 = r2_evaluator.evaluate(rf_predictions)
print(f"Random Forest R-squared on test data: {rf_r2}")

                                                                                

Random Forest RMSE on test data: 17.02067351390865
Random Forest R-squared on test data: 0.13923028009120952


                                                                                

In [13]:
rf_predictions.select("id", "name", "label", "prediction").show(10)

+--------------------+--------------------+-----+------------------+
|                  id|                name|label|        prediction|
+--------------------+--------------------+-----+------------------+
|001LvKFwYbfKYPQF2...|Yeh Lo Main Haari...|   34|31.527948163055157|
|002TGKi4LBwxYodlf...|Cunnamulla Fella ...|   32|36.737411988187745|
|00309sp8DDeN07RtA...|          Workaholic|   34| 30.21518557551919|
|004TP6xsBixiK3zqO...|Kapitel 17 - Vene...|    2|15.420232075170878|
|005xgk7TSc0gKBEEn...|      Waltz For Ruth|    5|22.911190365274074|
|006dsLKWlQBtBPdhy...|      Limonádový Joe|    5|28.955630982641214|
|008MKSfbnj5cY8AT8...|Spaziergang - Tei...|    2|14.046157179314374|
|00An04LOF36saBLHZ...|           Aí Já Era|   58|27.217085151114198|
|00ENwiGXMl2NpX2pI...|Qu'est-ce qu'on v...|   42| 31.66315831540793|
|00EeJGg0rQWFYsREp...|     Armando Aguirre|   44| 32.03518033882743|
+--------------------+--------------------+-----+------------------+
only showing top 10 rows

