In [22]:
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql import functions as f
from pyspark.ml.feature import StringIndexer
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import DecisionTreeRegressor, LinearRegression

In [2]:
conf=SparkConf().setAppName('Regression')
sc=SparkContext(conf=conf)

In [3]:
sparksession=SparkSession.builder.appName('DataFrames').config('"spark.some.config.option","some-value"').getOrCreate()

In [4]:
data=sparksession.read.csv('MSD.txt',header=False,inferSchema=True)

In [5]:
#renaming column names to integers 
new_names=range(0,91)
new_names=list(map(str,new_names))
data=data.toDF(*new_names)
print(data.show(40))

+----+--------+----------+---------+---------+---------+---------+---------+---------+--------+--------+---------+---------+--------+----------+----------+----------+----------+----------+---------+----------+---------+---------+---------+---------+----------+-----------+----------+----------+----------+----------+----------+---------+----------+---------+---------+----------+----------+----------+----------+----------+----------+----------+----------+---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+---------+----------+---------+----------+----------+----------+----------+----------+---------+---------+----------+----------+----------+----------+----------+---------+----------+----------+----------+----------+---------+----------+----------+----------+---------+---------+----------+--------+----------+---------+
|   0|       1|         2|        3|  

In [6]:
#Min Max scaler to normalize values between 0 and 1
#Vector Assembler to have data in form of list
cols=range(1,91)
cols=list(map(str,cols))
data=VectorAssembler(inputCols=cols,outputCol="features").transform(data)

In [7]:
#Now Min Max Scaler
scaler=MinMaxScaler(inputCol='features',outputCol='scaledfeatures')
scalerModel=scaler.fit(data)
scaledData=scalerModel.transform(data)
scaledData.show()

+----+--------+----------+---------+---------+---------+---------+---------+---------+--------+--------+---------+---------+--------+----------+----------+----------+----------+----------+---------+----------+---------+---------+---------+---------+----------+-----------+----------+----------+----------+----------+---------+---------+----------+---------+---------+----------+----------+----------+----------+----------+----------+----------+----------+---------+----------+----------+----------+---------+----------+----------+---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+---------+----------+---------+----------+----------+----------+----------+----------+---------+---------+----------+----------+---------+----------+----------+---------+----------+---------+---------+----------+---------+----------+----------+----------+---------+---------+----------+--------+----------+---------+--------------------+--------------------+
| 

In [8]:
#shifting the year label so that it starts from zero
print(scaledData.select(f.max('0')).show())
print(scaledData.select(f.min('0')).show())

+------+
|max(0)|
+------+
|  2011|
+------+

None
+------+
|min(0)|
+------+
|  1922|
+------+

None


In [9]:
#subtract the min from the year
scaledData=scaledData.withColumn('Year_new',data['0']-1922)

In [10]:
print(scaledData.select(f.max('Year_new')).show())
print(scaledData.select(f.min('Year_new')).show())

+-------------+
|max(Year_new)|
+-------------+
|           89|
+-------------+

None
+-------------+
|min(Year_new)|
+-------------+
|            0|
+-------------+

None


In [11]:
#StringInder to have labels for the data
df=StringIndexer(inputCol='Year_new',outputCol='labels').setHandleInvalid('skip').fit(scaledData).transform(scaledData)
print(df.columns)

['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '90', 'features', 'scaledfeatures', 'Year_new', 'labels']


In [12]:
df=df.drop('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '90', 'features', 'Year_new')
print(df.columns)

['scaledfeatures', 'labels']


In [13]:
#train test split
train,test=df.randomSplit([0.7,0.3],seed=123)
train_set,validation_set=train.randomSplit([0.9,0.1],seed=45)

In [17]:
#Decision Tree Regressor

dt=DecisionTreeRegressor(labelCol='labels',featuresCol='scaledfeatures')
dt_model=dt.fit(train_set)

In [18]:
#predictions
dt_predictions=dt_model.transform(validation_set)
dt_evaluator=RegressionEvaluator(labelCol='labels',predictionCol='prediction',metricName='rmse')
rmse=dt_evaluator.evaluate(dt_predictions)
print('RMSE of validation data:',rmse)

RMSE of validation data: 10.230438972817744


In [21]:
#prediciton on test data
test_preds=dt_model.transform(test)
test_evaluator=RegressionEvaluator(labelCol='labels',predictionCol='prediction',metricName='rmse')
rmse=test_evaluator.evaluate(test_preds)
print('RMSE of test data:',rmse)

Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/pyspark/ml/wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'DecisionTreeRegressor' object has no attribute '_java_obj'


RMSE of test data: 10.328978900697248


#### Linear Regression

In [24]:
lr=LinearRegression(labelCol='labels',featuresCol='scaledfeatures',maxIter=10,regParam=0.3, elasticNetParam=0.8)

In [25]:
#fit model on train
lr_model=lr.fit(train)
print("Coefficients: %s" % str(lr_model.coefficients))
print("Intercept: %s" % str(lr_model.intercept))

Coefficients: [-39.83403435344032,24.383501569989672,22.443361505034677,0.0,0.0,38.42780000572209,0.0,0.0,6.452368643647816,0.0,1.6849352614172282,3.070920944745042,-10.758807119209173,-16.68795615650741,0.0,-13.567705298563988,-7.1276284532014795,-0.5894842152491102,-10.437248245652574,-31.66127401971618,0.0,0.0,-24.790203564636354,0.0,0.0,0.0,-10.345193674652416,-0.9784164222917396,0.0,0.0,0.0,0.0,2.614034997540717,0.0,0.0,10.508944104850906,0.0,-11.968252023364894,-8.219776054779642,22.901831607483157,22.624918571163484,0.0,0.0,0.0,0.0,0.0,-3.7297973108091536,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,19.67332671260229,-2.122226487075044,6.418913906468236,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,11.802653727205314,0.0,0.0,0.0,0.0,-7.027151324616384,-3.992993425400477,0.0,0.0,9.876279009407828,0.0,0.0,0.0,0.0,-8.480238097217182,0.0,-7.225957462652023,0.0,0.0,0.0,0.0,0.0]
Intercept: -7.119948345647558


In [26]:
trainingSummary = lr_model.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

numIterations: 11
objectiveHistory: [0.5, 0.4857745322803967, 0.46168364080494534, 0.45306377925191216, 0.4427183388800197, 0.44110685448659087, 0.43824526616779014, 0.4371597508469618, 0.43619905985974455, 0.4359538935384754, 0.43580452749956106]
+-------------------+
|          residuals|
+-------------------+
| -29.06632837309877|
| -30.60345678477819|
|-18.983485481495578|
| -36.73514332075453|
| -44.45354123784723|
| -9.122140725915884|
| -4.038155578017442|
|-17.812598844875303|
| -39.42448321663122|
| -30.29384395771072|
|-17.082565821776505|
| -5.150067339614022|
|-12.788359111707486|
|-6.4491454670982264|
|-26.498026762502512|
|-26.895026230622356|
|-17.836998235160983|
|-17.939414538146067|
| -32.72636125899554|
| -11.70317863500878|
+-------------------+
only showing top 20 rows

RMSE: 10.041855
r2: 0.191967


In [28]:
#predictions on the test set
lr_preds=lr_model.transform(test)
lr_evaluator=RegressionEvaluator(predictionCol='prediction',labelCol='labels',metricName='r2')
test_res=lr_model.evaluate(test)

In [29]:
print('Rmse of test set:',test_res.rootMeanSquaredError)

Rmse of test set: 10.093692966635448
