In [1]:
#load Spark
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext()
ss = SparkSession.builder.appName("project1").getOrCreate()

In [2]:
import pandas as pd
import boto3
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import *
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.feature import Imputer


In [3]:
#df1 = pd.read_csv("s3://502projectsong/cleaned-subset.csv")
#df1.head(5)

In [30]:
df=ss.read.load("s3://502projectsong/clean_big_subset.csv",format="csv",header="true")
#df.na.drop()

In [31]:
from pyspark.sql.types import FloatType,IntegerType
from pyspark.sql.functions import when

In [32]:
#drop meaning less columns
df = df.drop('_c0','analysis_sample_rate' , 'artist_id', 'artist_location', 'artist_name', 'title', 'song_id', 'release')

In [33]:
# convert artist terms 
df = df.withColumn('artist_terms', length(df.artist_terms))

#extract value in paranthesis
df = df.withColumn("segments_loudness_max_time",regexp_extract(df['segments_loudness_max_time'], '[0-9]+',0))
df = df.withColumn("similar_artists",regexp_extract(df['similar_artists'], '[0-9]+',0))
split_col = split(df['segments_pitches'], ',')
df = df.withColumn('segments_pitches_1', split_col.getItem(0))
df = df.withColumn('segments_pitches', split_col.getItem(1))
df = df.withColumn("segments_pitches_1",regexp_extract(df['segments_pitches_1'], '[0-9]+',0))
df = df.withColumn("segments_pitches", substring_index(df.segments_pitches, ')', 1))

split_col = split(df['segments_timbre'], ',')
df = df.withColumn('segments_timbre_1', split_col.getItem(0))
df = df.withColumn('segments_timbre', split_col.getItem(1))
df = df.withColumn("segments_timbre_1",regexp_extract(df['segments_timbre_1'], '[0-9]+',0))
df = df.withColumn("segments_timbre", substring_index(df.segments_pitches, ')', 1))

df.show(1)

+------------------+-----------------+---------------+----------------+------------+-----------------+---------------+-----------+----------------+-----------+------------+--------+--------------+------+---+--------------+--------+----+---------------+-------------------+--------------+-------------------+---------------------+--------------------------+-----------------------+----------------+--------------+---------------+---------------+---------------+-----------------+-----------------+------------+-------+--------------+-------------------------+----+------------------+-----------------+
|artist_familiarity|artist_hotttnesss|artist_latitude|artist_longitude|artist_terms|artist_terms_freq|bars_confidence| bars_start|beats_confidence|beats_start|danceability|duration|end_of_fade_in|energy|key|key_confidence|loudness|mode|mode_confidence|sections_confidence|sections_start|segments_confidence|segments_loudness_max|segments_loudness_max_time|segments_loudness_start|segments_pitches|se

In [34]:
#df.show(5)
df=df.filter(df.song_hotttnesss.isNotNull())
df = df.withColumn('song_hotttnesss',when(df.song_hotttnesss <= 0.5, 0).otherwise(1))

#convert field type  df.call_time.cast('float')
for item in df.columns:
    df = df.withColumn(item, df[item].cast('float'))
df = df.dropna(how='any')

In [35]:
inputs = [i for i in df.schema.names if i != 'song_hotttnesss']

In [36]:
vectorAssembler = VectorAssembler(inputCols =inputs, outputCol = 'features')
v_df = vectorAssembler.transform(df)
v_df = v_df.select(['features', 'song_hotttnesss'])
v_df.show(3)

+--------------------+---------------+
|            features|song_hotttnesss|
+--------------------+---------------+
|[0.64793360233306...|            0.0|
|[0.60753244161605...|            0.0|
|[0.62143224477767...|            0.0|
+--------------------+---------------+
only showing top 3 rows



In [37]:
Scalerizer = StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
v_df = Scalerizer.fit(v_df).transform(v_df)
v_df.show(3)

+--------------------+---------------+--------------------+
|            features|song_hotttnesss|     Scaled_features|
+--------------------+---------------+--------------------+
|[0.64793360233306...|            0.0|[5.46873131801131...|
|[0.60753244161605...|            0.0|[5.12773481450917...|
|[0.62143224477767...|            0.0|[5.24505283689671...|
+--------------------+---------------+--------------------+
only showing top 3 rows



In [38]:
splits = v_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [39]:
#lr = LinearRegression(featuresCol = 'Scaled_features', labelCol='song_hotttnesss', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr = LogisticRegression(featuresCol = 'features', labelCol='song_hotttnesss', maxIter=10, regParam=0.3, elasticNetParam=0.8)

lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: (38,[],[])
Intercept: -0.8404071242823876


In [40]:
trainingSummary = lr_model.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

objectiveHistory:
0.6120870807191139
+---+---+
|FPR|TPR|
+---+---+
|0.0|0.0|
|1.0|1.0|
|1.0|1.0|
+---+---+

areaUnderROC: 0.5


In [41]:
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

Accuracy: 0.6985509541531396
FPR: 0.6985509541531396
TPR: 0.6985509541531396
F-measure: 0.574576151931285
Precision: 0.4879734355482618
Recall: 0.6985509541531396


In [42]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","song_hotttnesss","Scaled_features").show(5)

+----------+---------------+--------------------+
|prediction|song_hotttnesss|     Scaled_features|
+----------+---------------+--------------------+
|       0.0|            0.0|[1.16634011027594...|
|       0.0|            0.0|[2.45154370162199...|
|       0.0|            0.0|[2.63583738566256...|
|       0.0|            0.0|[2.74900073014320...|
|       0.0|            0.0|[2.86055120335552...|
+----------+---------------+--------------------+
only showing top 5 rows



In [43]:
#test_result = lr_model.evaluate(test_df)
#print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

evaluator = BinaryClassificationEvaluator( labelCol = 'song_hotttnesss')
print('Test Area Under ROC', evaluator.evaluate(lr_predictions))


Test Area Under ROC 0.5


In [44]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))

numIterations: 1
objectiveHistory: [0.6120870807191139]


In [53]:
from pyspark.ml.classification import RandomForestClassifier,GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf = RandomForestClassifier(labelCol="song_hotttnesss", featuresCol="features", numTrees=20)
model = rf.fit(train_df)
predictions = model.transform(test_df)
rf_evaluator = MulticlassClassificationEvaluator(
    labelCol="song_hotttnesss", predictionCol="prediction", metricName="accuracy")
accuracy = rf_evaluator.evaluate(predictions)
print("Accuracy: ",accuracy)

Accuracy:  0.7660748781160887


In [58]:
model.featureImportances

SparseVector(38, {0: 0.3704, 1: 0.2377, 2: 0.0088, 3: 0.0042, 4: 0.0122, 5: 0.098, 6: 0.0007, 7: 0.0002, 8: 0.0005, 9: 0.0, 11: 0.0001, 12: 0.0008, 14: 0.0001, 15: 0.0007, 16: 0.0274, 17: 0.0002, 18: 0.0005, 19: 0.0012, 21: 0.0216, 22: 0.0228, 23: 0.0019, 24: 0.0262, 26: 0.0005, 29: 0.0006, 30: 0.001, 31: 0.0017, 32: 0.0001, 33: 0.0006, 35: 0.1525, 36: 0.0042, 37: 0.0027})

In [59]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))


In [63]:
ExtractFeatureImp(model.featureImportances, train_df, "features")

Unnamed: 0,idx,name,score
0,0,artist_familiarity,0.37037
1,1,artist_hotttnesss,0.237743
35,35,year,0.152502
5,5,artist_terms_freq,0.097953
16,16,loudness,0.027439
24,24,segments_loudness_start,0.026197
22,22,segments_loudness_max,0.02277
21,21,segments_confidence,0.021577
4,4,artist_terms,0.012151
2,2,artist_latitude,0.008826
