In [None]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.ml.linalg import *
from pyspark.ml.feature import *
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml import *

import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import statistics as stat
import pandas as pd

In [None]:
spark = SparkSession.builder.appName("appSpark").getOrCreate()

In [4]:
df = spark.read.option('header', True).option("inferSchema", True).csv("./csv/Linear_regression_dataset.csv")

                                                                                

In [10]:
df.show(10, False)

+-----+-----+-----+-----+-----+------+
|var_1|var_2|var_3|var_4|var_5|output|
+-----+-----+-----+-----+-----+------+
|734  |688  |81   |0.328|0.259|0.418 |
|700  |600  |94   |0.32 |0.247|0.389 |
|712  |705  |93   |0.311|0.247|0.417 |
|734  |806  |69   |0.315|0.26 |0.415 |
|613  |759  |61   |0.302|0.24 |0.378 |
|748  |676  |85   |0.318|0.255|0.422 |
|669  |588  |97   |0.315|0.251|0.411 |
|667  |845  |68   |0.324|0.251|0.381 |
|758  |890  |64   |0.33 |0.274|0.436 |
|726  |670  |88   |0.335|0.268|0.422 |
+-----+-----+-----+-----+-----+------+


In [11]:
df.printSchema()

root
 |-- var_1: integer (nullable = true)
 |-- var_2: integer (nullable = true)
 |-- var_3: integer (nullable = true)
 |-- var_4: double (nullable = true)
 |-- var_5: double (nullable = true)
 |-- output: double (nullable = true)


In [12]:
df.describe().show()

23/07/13 00:47:36 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 7:>                                                          (0 + 1) / 1]

+-------+-----------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|            var_1|            var_2|             var_3|               var_4|               var_5|             output|
+-------+-----------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|             1232|             1232|              1232|                1232|                1232|               1232|
|   mean|715.0819805194806|715.0819805194806| 80.90422077922078|  0.3263311688311693| 0.25927272727272715|0.39734172077922014|
| stddev| 91.5342940441652|93.07993263118064|11.458139049993724|0.015012772334166148|0.012907228928000298|0.03326689862173776|
|    min|              463|              472|                40|               0.277|               0.214|              0.301|
|    max|             1009|             1103|               116|               0.373|               0.294|     

                                                                                

## Linear regression

In [19]:
df = df.withColumnRenamed("laberl", "label")

In [20]:
df.show()

+-----+-----+-----+-----+-----+-----+
|var_1|var_2|var_3|var_4|var_5|label|
+-----+-----+-----+-----+-----+-----+
|  734|  688|   81|0.328|0.259|0.418|
|  700|  600|   94| 0.32|0.247|0.389|
|  712|  705|   93|0.311|0.247|0.417|
|  734|  806|   69|0.315| 0.26|0.415|
|  613|  759|   61|0.302| 0.24|0.378|
|  748|  676|   85|0.318|0.255|0.422|
|  669|  588|   97|0.315|0.251|0.411|
|  667|  845|   68|0.324|0.251|0.381|
|  758|  890|   64| 0.33|0.274|0.436|
|  726|  670|   88|0.335|0.268|0.422|
|  583|  794|   55|0.302|0.236|0.371|
|  676|  746|   72|0.317|0.265|  0.4|
|  767|  699|   89|0.332|0.274|0.433|
|  637|  597|   86|0.317|0.252|0.374|
|  609|  724|   69|0.308|0.244|0.382|
|  776|  733|   83|0.325|0.259|0.437|
|  701|  832|   66|0.325| 0.26| 0.39|
|  650|  709|   74|0.316|0.249|0.386|
|  804|  668|   95|0.337|0.265|0.453|
|  713|  614|   94| 0.31|0.238|0.404|
+-----+-----+-----+-----+-----+-----+


In [21]:
df.count()

1232

In [22]:
df.select(corr("var_1", "label")).show()

+------------------+
|corr(var_1, label)|
+------------------+
|0.9187399607627283|
+------------------+


In [24]:
cols = ['var_1', 'var_2', 'var_3', 'var_4', 'var_5']

In [30]:
vector = VectorAssembler(inputCols=cols, outputCol= 'features')

In [31]:
features_df = vector.transform(df)
features_df.printSchema()

root
 |-- var_1: integer (nullable = true)
 |-- var_2: integer (nullable = true)
 |-- var_3: integer (nullable = true)
 |-- var_4: double (nullable = true)
 |-- var_5: double (nullable = true)
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)


In [32]:
features_df.select("*").show()

+-----+-----+-----+-----+-----+-----+--------------------+
|var_1|var_2|var_3|var_4|var_5|label|            features|
+-----+-----+-----+-----+-----+-----+--------------------+
|  734|  688|   81|0.328|0.259|0.418|[734.0,688.0,81.0...|
|  700|  600|   94| 0.32|0.247|0.389|[700.0,600.0,94.0...|
|  712|  705|   93|0.311|0.247|0.417|[712.0,705.0,93.0...|
|  734|  806|   69|0.315| 0.26|0.415|[734.0,806.0,69.0...|
|  613|  759|   61|0.302| 0.24|0.378|[613.0,759.0,61.0...|
|  748|  676|   85|0.318|0.255|0.422|[748.0,676.0,85.0...|
|  669|  588|   97|0.315|0.251|0.411|[669.0,588.0,97.0...|
|  667|  845|   68|0.324|0.251|0.381|[667.0,845.0,68.0...|
|  758|  890|   64| 0.33|0.274|0.436|[758.0,890.0,64.0...|
|  726|  670|   88|0.335|0.268|0.422|[726.0,670.0,88.0...|
|  583|  794|   55|0.302|0.236|0.371|[583.0,794.0,55.0...|
|  676|  746|   72|0.317|0.265|  0.4|[676.0,746.0,72.0...|
|  767|  699|   89|0.332|0.274|0.433|[767.0,699.0,89.0...|
|  637|  597|   86|0.317|0.252|0.374|[637.0,597.0,86.0..

In [33]:
model_df = features_df.select("label", "features")

In [34]:
model_df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|0.418|[734.0,688.0,81.0...|
|0.389|[700.0,600.0,94.0...|
|0.417|[712.0,705.0,93.0...|
|0.415|[734.0,806.0,69.0...|
|0.378|[613.0,759.0,61.0...|
|0.422|[748.0,676.0,85.0...|
|0.411|[669.0,588.0,97.0...|
|0.381|[667.0,845.0,68.0...|
|0.436|[758.0,890.0,64.0...|
|0.422|[726.0,670.0,88.0...|
|0.371|[583.0,794.0,55.0...|
|  0.4|[676.0,746.0,72.0...|
|0.433|[767.0,699.0,89.0...|
|0.374|[637.0,597.0,86.0...|
|0.382|[609.0,724.0,69.0...|
|0.437|[776.0,733.0,83.0...|
| 0.39|[701.0,832.0,66.0...|
|0.386|[650.0,709.0,74.0...|
|0.453|[804.0,668.0,95.0...|
|0.404|[713.0,614.0,94.0...|
+-----+--------------------+


In [35]:
train_df, test_df = model_df.randomSplit([0.75, 0.25])

In [36]:
train_df.count()

                                                                                

913

In [37]:
test_df.count()

319

In [39]:
lin_reg = LinearRegression(labelCol='label')

In [40]:
lr_model = lin_reg.fit(train_df)

23/07/13 01:05:34 WARN Instrumentation: [68e94545] regParam is zero, which might cause numerical instability and overfitting.
23/07/13 01:05:35 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

In [41]:
lr_model.coefficients

DenseVector([0.0003, 0.0001, 0.0002, -0.6845, 0.5173])

In [42]:
lr_model.intercept

0.18930968207761767

In [43]:
training_predictions = lr_model.evaluate(train_df)

In [45]:
training_predictions.r2

0.8685026096285458

In [46]:
test_results = lr_model.evaluate(test_df)

In [47]:
test_results.r2

0.8701743121243751

In [48]:
test_results.rootMeanSquaredError

0.012410168801895074

## Pipeline

In [5]:
def read_csv(file):
    return spark.read\
        .option('inferSchema', True)\
        .option('header', True)\
        .csv(file)

In [6]:
file = "./csv/Linear_regression_dataset.csv"
df = read_csv(file).withColumnRenamed('output', 'label')
train_df, test_df = df.randomSplit([.75, .25])

In [9]:
features = ['var_1', 'var_2', 'var_3', 'var_4', 'var_5']
stage_1 = VectorAssembler(inputCols=features, outputCol="out_features")
stage_2 = StandardScaler(inputCol="out_features", outputCol="features")
stage_3 = LinearRegression()
stages = [stage_1, stage_2, stage_3]
pipeline = Pipeline(stages=stages)


model = pipeline.fit(train_df)
pred_result = model.transform(test_df)
pred_result.show(5)

23/07/13 02:02:16 WARN Instrumentation: [6da8d134] regParam is zero, which might cause numerical instability and overfitting.
23/07/13 02:02:16 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

+-----+-----+-----+-----+-----+-----+--------------------+--------------------+-------------------+
|var_1|var_2|var_3|var_4|var_5|label|        out_features|            features|         prediction|
+-----+-----+-----+-----+-----+-----+--------------------+--------------------+-------------------+
|  514|  549|   81|0.307|0.252|0.339|[514.0,549.0,81.0...|[5.53744168224839...|0.32780702590812316|
|  516|  504|   86|0.293|0.234|0.327|[516.0,504.0,86.0...|[5.55898814793807...|0.32829470637095276|
|  527|  569|   75|0.297|0.239|0.341|[527.0,569.0,75.0...|[5.67749370923132...| 0.3328365442264376|
|  532|  690|   69|0.303|0.245|0.351|[532.0,690.0,69.0...|[5.73135987345553...| 0.3391130402933826|
|  541|  830|   60|0.302|0.229| 0.33|[541.0,830.0,60.0...|[5.82831896905910...| 0.3421441353390216|
+-----+-----+-----+-----+-----+-----+--------------------+--------------------+-------------------+


In [10]:
reg_eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
acc = reg_eval.evaluate(pred_result, {reg_eval.metricName: "r2"})
print(acc)
rmse = reg_eval.evaluate(pred_result)
print(rmse)

0.8472125275249509
0.01235997204833508
