## Data cleaning 

In [42]:
# Import Modules
import os
import pyspark.sql.types as typ
import pyspark.sql.functions as F
import numpy as np # linear algebra
import pandas as pd

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [43]:
# Read csv file into pyspark dataframe
filename = 'data.csv'
df = spark.read.csv(filename, inferSchema=True, header = True)

In [44]:
type(df)

pyspark.sql.dataframe.DataFrame

In [45]:
# Converting columns to appropriate types
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType

floats = ["acousticness", "danceability", "energy", "instrumentalness", "liveness", "loudness",
         "speechiness", "tempo", "valence"]
for i in floats:
        df = df.withColumn(i, df[i].cast(FloatType()))

ints = ["duration_ms", "explicit", "key", "mode", "popularity", "year"]
for i in ints:
    df = df.withColumn(i, df[i].cast(IntegerType()))

In [46]:
# Check Process: whether it works
df

DataFrame[acousticness: float, artists: string, danceability: float, duration_ms: int, energy: float, explicit: int, id: string, instrumentalness: float, key: int, liveness: float, loudness: float, mode: int, name: string, popularity: int, release_date: string, speechiness: float, tempo: float, valence: float, year: int]

In [47]:
# After converitn, create a New Dataset called df.
df.createOrReplaceTempView("df")
spark.sql("select * from df").show(5)

+------------+--------------------+------------+-----------+------+--------+--------------------+----------------+---+--------+--------+----+--------------------+----------+------------+-----------+-------+-------+----+
|acousticness|             artists|danceability|duration_ms|energy|explicit|                  id|instrumentalness|key|liveness|loudness|mode|                name|popularity|release_date|speechiness|  tempo|valence|year|
+------------+--------------------+------------+-----------+------+--------+--------------------+----------------+---+--------+--------+----+--------------------+----------+------------+-----------+-------+-------+----+
|       0.991|     ['Mamie Smith']|       0.598|     168333| 0.224|       0|0cS0A1fUEUd1EW3Fc...|         5.22E-4|  5|   0.379| -12.628|   0|Keep A Song In Yo...|        12|        1920|     0.0936|149.976|  0.634|1920|
|       0.643|"[""Screamin' Jay...|       0.852|     150200| 0.517|       0|0hbkKFIJm7Z05H8Zl...|          0.0264|  5|  

In [48]:
df.printSchema()

root
 |-- acousticness: float (nullable = true)
 |-- artists: string (nullable = true)
 |-- danceability: float (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- energy: float (nullable = true)
 |-- explicit: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- instrumentalness: float (nullable = true)
 |-- key: integer (nullable = true)
 |-- liveness: float (nullable = true)
 |-- loudness: float (nullable = true)
 |-- mode: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- release_date: string (nullable = true)
 |-- speechiness: float (nullable = true)
 |-- tempo: float (nullable = true)
 |-- valence: float (nullable = true)
 |-- year: integer (nullable = true)



In [49]:
# Cleaning
# Variables id, name, release_date are not related to our question.
# Thus, droping all three.
df_drop = df.drop("id", "name", "release_date")

In [50]:
# Artists is the only remaining categorical feature
df_drop.printSchema()

root
 |-- acousticness: float (nullable = true)
 |-- artists: string (nullable = true)
 |-- danceability: float (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- energy: float (nullable = true)
 |-- explicit: integer (nullable = true)
 |-- instrumentalness: float (nullable = true)
 |-- key: integer (nullable = true)
 |-- liveness: float (nullable = true)
 |-- loudness: float (nullable = true)
 |-- mode: integer (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- speechiness: float (nullable = true)
 |-- tempo: float (nullable = true)
 |-- valence: float (nullable = true)
 |-- year: integer (nullable = true)



In [51]:
# Count rows and unique rows
print('Rows = {}'.format(df_drop.count()))
print('Distinct Rows = {}'.format(df_drop.distinct().count()))

# Check
# Drop all duplicates 
df1 = df_drop.dropDuplicates()
print('The number of rows with duplicate data removed = {}'.format(df1.count()))

Rows = 174389
Distinct Rows = 170930
The number of rows with duplicate data removed = 170930


In [52]:
# Check for Null
from pyspark.sql.functions import *
df1.select([count(when(isnan(c), c)).alias(c) for c in df1.columns]).show()

+------------+-------+------------+-----------+------+--------+----------------+---+--------+--------+----+----------+-----------+-----+-------+----+
|acousticness|artists|danceability|duration_ms|energy|explicit|instrumentalness|key|liveness|loudness|mode|popularity|speechiness|tempo|valence|year|
+------------+-------+------------+-----------+------+--------+----------------+---+--------+--------+----+----------+-----------+-----+-------+----+
|           0|      0|           0|          0|     0|       0|               0|  0|       0|       0|   0|         0|          0|    0|      0|   0|
+------------+-------+------------+-----------+------+--------+----------------+---+--------+--------+----+----------+-----------+-----+-------+----+



In [53]:
# Numeric Variables & Categorical Variables
num_cols = []
cat_cols = []

for s in df1.schema:
    data_type = str(s.dataType)
    if data_type == "StringType":
        cat_cols.append(s.name)
    
    #if data_type == "FloatType" or data_type == "IntType" or data_type =="String":
    else:
        num_cols.append(s.name)

In [54]:
num_cols

['acousticness',
 'danceability',
 'duration_ms',
 'energy',
 'explicit',
 'instrumentalness',
 'key',
 'liveness',
 'loudness',
 'mode',
 'popularity',
 'speechiness',
 'tempo',
 'valence',
 'year']

In [55]:
cat_cols

['artists']

In [56]:
df1.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
acousticness,170930,0.5018294484191316,0.37894631454438643,0.0,0.996
artists,170930,,,"""[""""'In The Heights' Original Broadway Company""""",['조정현']
danceability,170351,0.5364033642625393,0.17540137804044542,0.0,0.988
duration_ms,170730,231982.6593920225,147474.7744185314,0,5338302
energy,170853,525.2470461099238,13301.393685683328,0.0,1622000.0
explicit,170888,167.60111300969055,7335.756577177498,0,900000
instrumentalness,170546,32.79207922065262,2945.6281195531965,0.0,475880.0
key,170731,21.722950137936287,2266.708310766973,0,503320
liveness,170891,1.6928761315791787,607.9804295652749,0.0,251333.0


## Linear Regression Model

**SCALING**

In [57]:
from pyspark.sql.functions import col
import pyspark.sql.functions as func
df1 = df1.withColumn('popularity_final',
                     func.round(df1['popularity']/100, 2))

**SELECT AND STANDARDIZE FEATURES**

In [60]:
vars_to_keep = ['popularity_final','acousticness']

# subset the dataframe on these predictors
df2 = df1.select(vars_to_keep)
df2.show(1)

#,'energy','loudness','year'

+----------------+------------+
|popularity_final|acousticness|
+----------------+------------+
|             0.0|       0.276|
+----------------+------------+
only showing top 1 row



In [61]:
from pyspark.ml.linalg import DenseVector
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

In [62]:
features = ['acousticness']

assembler = VectorAssembler(
    inputCols = features, 
    outputCol = "features") 
df2 = assembler.transform(df2)

In [63]:
df2 = df2.select(["popularity_final",'features']) \
         .withColumnRenamed("popularity_final", 'label')
df2.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.2759999930858612]|
|  0.0|[0.9909999966621399]|
| 0.01|[0.9729999899864197]|
|  0.0|[0.9959999918937683]|
| 0.09|[0.9919999837875366]|
+-----+--------------------+
only showing top 5 rows



In [78]:
## Feature scaling
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled", 
                                withStd=True, withMean=False)

# Fit the DataFrame to the scaler; this computes the mean, standard deviation of each feature
scaler = standardScaler.fit(df2)

# Transform the data in `df2` with the scaler
scaled_df2 = scaler.transform(df2)

In [79]:
# Split data into train set (80%), test set (20%) 
splits = df2.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]

In [80]:
train_df.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
| null|[0.0333000011742115]|
| null|[0.7649999856948853]|
| null|[0.9200000166893005]|
| null|[0.9399999976158142]|
| null|[0.9670000076293945]|
+-----+--------------------+
only showing top 5 rows



In [76]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='label', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Py4JJavaError: An error occurred while calling o832.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 69.0 failed 1 times, most recent failure: Lost task 0.0 in stage 69.0 (TID 2660, jupyter-ruiqi, executor driver): scala.MatchError: [null,1.0,[0.0333000011742115]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$1(Predictor.scala:80)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$3(RDD.scala:1204)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$5(RDD.scala:1205)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2188)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1157)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1151)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1220)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1196)
	at org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:107)
	at org.apache.spark.ml.regression.LinearRegression.$anonfun$train$1(LinearRegression.scala:340)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:319)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:176)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:150)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: [null,1.0,[0.0333000011742115]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$1(Predictor.scala:80)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$3(RDD.scala:1204)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$5(RDD.scala:1205)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [29]:
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

import pyspark.sql.functions as func

In [None]:
# Scaling the duration_ms to duration while changing it from milliseconds to minutes.
df1 = df1.withColumn('duration_min', 
                      func.round(df1['duration_ms']/6000, 2))

# Retain these predictors
vars_to_keep = ["duration_min",
                'acousticness',
                "danceability",
                'energy',
                'liveness',
                "loudness",
                'popularity',
                "speechiness",
                'year'
                ]
# subset the dataframe on these predictors
df2 = df1.select(vars_to_keep)
df2.show(1)

# Setting features
features = ['acousticness',
            "danceability",
            'energy',
            'liveness',
            "loudness",
            'popularity',
            "speechiness",
            'year']

# Adding one additional feature
assembler = VectorAssembler(
    inputCols = features, outputCol = "features") 
df2 = assembler.transform(df2)

df2 = df2.select(["duration_min",'features']) \
         .withColumnRenamed("duration_min", 'label')
df2.show(5)

# Split data into train set (80%), test set (20%) 
splits = df2.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]

# Linear Regression
lr = LinearRegression(featuresCol = 'features', labelCol='label',
                      maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))