In [1]:
# Import required packages/libraries:
import os
import sys
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import isnan, when, count, col

# Dependencies for Regression Algorithms:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import IsotonicRegression

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
17,application_1606590675617_0018,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Define sql context:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Read the csv file(stored in DBFS) as a Spark dataframe:
df = spark.read.format("csv").option("inferSchema", "true").option("header","true").load("s3://projectmlldsongs/processed_songs/*.csv")

# Cache the dataframe across all workers:
#df.cache()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
print(sc.getConf().get('spark.dynamicAllocation.enabled'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

true

In [5]:
print(df.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

453250

In [6]:
# Usue Regex to filter numeric entries in target column i.e. 'song_hotttnesss':
expr = "(^[+-]?([0-9]*[.])?[0-9]+)"
df = (df.filter(df.song_hotttnesss.rlike(expr)))
df = df.filter(df.song_hotttnesss > 0.0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Seperate and typecast features having numeric entries:
numeric_columns=['artist_familiarity',
                 'artist_hotttnesss',
                 'duration',
                 'end_of_fade_in',
                 'energy',
                 'mode',
                 'key',
                 'key_confidence',
                 'loudness',
                 'song_hotttnesss',
                 'tempo',
                 'time_signature',
                 'time_signature_confidence',
                 'year']

for c in numeric_columns:
    df=df.withColumn(c,df[c].cast(DoubleType()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Drop rows with null/NA values and duplicates:
df = df.na.drop()
df=df.drop_duplicates()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# Select columns for the regression model:
select_df = df.select([ 'artist_familiarity','duration','loudness','key_confidence','key','end_of_fade_in',
                       'time_signature_confidence','tempo','mode','song_hotttnesss'])

# Split dataset into train and validation sets:
train_df,test_df_model =select_df.randomSplit([0.8, 0.2])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
train_df.write.parquet("s3://projectmlldsongs/songs_data_split/type=train/")
test_df_model.write.parquet("s3://projectmlldsongs/songs_data_split/type=test/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
##Linear Regression Model

In [None]:
spark.read.parquet("s3://whatever/type=train/").where("type = 'train'")

In [None]:
# Convert Rows of dataframe to Dense Vectors:
assembler = VectorAssembler(
    inputCols=['artist_familiarity','duration','loudness','key_confidence','key','end_of_fade_in',
               'time_signature_confidence','tempo','mode'],
    outputCol='features')

# Create linear regression object
lr = LinearRegression(labelCol='song_hotttnesss', featuresCol='features')

# Create a pipeline to sequentially perform operations on the training set:
pipeline = Pipeline(stages=[assembler, lr])

In [None]:
# Perform 4-fold cross validation on the train dataset:
#paramGrid = ParamGridBuilder()\
#    .addGrid(lr.regParam, [0.1, 0.01]) \
#    .addGrid(lr.fitIntercept, [False, True])\
#    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
#    .build()

#crossval = CrossValidator(estimator=pipeline,
#                          estimatorParamMaps=paramGrid,
#                          evaluator=RegressionEvaluator(metricName="rmse",labelCol="song_hotttnesss",predictionCol="prediction"),
#                          numFolds=4)

#Train the model:
lrmodel = pipeline.fit(train_df)

In [None]:
# Make predictions on test set
predictions = lrmodel.transform(test_df_model)

# Select example rows to display.
predictions.select("prediction", "song_hotttnesss", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="song_hotttnesss", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

# Print RMSE:
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [None]:
# Print r2 value:
print(f" The r2 value is {lrmodel.stages[1].summary.r2}")

In [None]:
# Create GBT regression object:
xgb = GBTRegressor(featuresCol="features",labelCol="song_hotttnesss",maxIter =10)

# Chain indexer and forest in a Pipeline. Use same assembler object as before:
pipeline = Pipeline(stages=[assembler, xgb])

# Train model.  This also runs the indexer.
model_xgb = pipeline.fit(train_df)

In [None]:
# Evaluate the model on test dataset:
predictions_xgb = model_xgb.transform(test_df_model)

# Select example rows to display.
predictions_xgb.select("prediction", "song_hotttnesss", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator_xgb = RegressionEvaluator(
    labelCol="song_hotttnesss", predictionCol="prediction", metricName="rmse")
rmse = evaluator_xgb.evaluate(predictions_xgb)

# Print RMSE:
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [None]:
# Create Isotonic Regression object:
isoreg = IsotonicRegression(featuresCol="features",labelCol="song_hotttnesss")

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[assembler, isoreg])

In [None]:
train_df.schema

In [None]:
# Perform 4-fold cross validation:
#paramGrid = ParamGridBuilder()\
#    .addGrid(lr.regParam, [0.1, 0.01]) \
#    .addGrid(lr.fitIntercept, [False, True])\
#    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
#    .build()

#crossval = CrossValidator(estimator=pipeline,
#                          estimatorParamMaps=paramGrid,
#                          evaluator=RegressionEvaluator(metricName ='rmse',labelCol="song_hotttnesss"),
#                          numFolds=4)

# Train the model:
model_isoreg = pipeline.fit(train_df)

In [None]:
# Evaluate the model on test dataset:
predictions_isoreg = model_isoreg.transform(test_df_model)

# Select example rows to display.
predictions_isoreg.select("prediction", "song_hotttnesss", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator_isoreg = RegressionEvaluator(
    labelCol="song_hotttnesss", predictionCol="prediction", metricName="rmse")
rmse = evaluator_isoreg.evaluate(predictions_isoreg)

# Print RMSE
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [None]:
a = model_isoreg.stages[1].boundaries
print(a)

In [None]:
model_isoreg.save("s3://whatever/models/isotonic/")

In [None]:
PipelineModel.load("s3://whatever/models/isotonic/")

In [None]:
# Print r2 value:
print(f" The r2 value is {model_isoreg.stages[1].summary.r2}")

In [None]:
# PCA snippet: to be included later....

#pca = PCA(k=2, inputCol="features",outputCol="features")

# Add the pca object to Pipeline