# **1. Import libraries and Create Spark Session** #

In [85]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, Row
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import max, udf
from pyspark.sql.types import DateType
import datetime


# Create Spark Session
spark = SparkSession.builder.appName("LinearRegressio").getOrCreate()

# **2. Read and show data** #

In [86]:
df_vcb = spark.read.csv("/home/thanhphat/Download/part-00000-c4693ad4-e240-46cd-aa9a-32d5b0bf423d-c000.csv", header = True, inferSchema = True)
df_vcb.show(10)

+----------+--------+-------+-------+----+----+
|      Date|Ajusting|Closing|Opening|High| Low|
+----------+--------+-------+-------+----+----+
|31/12/2015|   23.45|   43.9|   43.9|44.5|43.8|
|30/12/2015|   23.66|   44.3|   44.3|44.9|43.9|
|29/12/2015|    23.5|   44.0|   42.5|44.1|42.4|
|28/12/2015|    22.7|   42.5|   42.8|43.0|42.5|
|25/12/2015|    22.7|   42.5|   42.3|42.9|42.2|
|24/12/2015|   22.54|   42.2|   42.6|42.8|42.2|
|23/12/2015|   22.65|   42.4|   42.1|42.6|42.0|
|22/12/2015|    22.7|   42.5|   42.7|42.9|42.3|
|21/12/2015|    22.7|   42.5|   42.0|42.5|42.0|
|18/12/2015|    22.0|   41.2|   43.5|44.1|41.2|
+----------+--------+-------+-------+----+----+
only showing top 10 rows



In [87]:
df_vcb.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Ajusting: double (nullable = true)
 |-- Closing: double (nullable = true)
 |-- Opening: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)



# **3. Model Definition** #

## 3.1. Create features, target and train test data ##

In [88]:
assembler = VectorAssembler(
    inputCols=["Ajusting", "Opening", "High", "Low"],
    outputCol="features")

data_vcb = assembler.transform(df_vcb)

# Apply vector
final_vcb = data_vcb.select("features", "Closing")

In [89]:
train_data, test_data = final_vcb.randomSplit([0.8, 0.2], seed=42)

In [90]:
train_data.show(10)

+--------------------+-------+
|            features|Closing|
+--------------------+-------+
|[16.67,31.7,32.1,...|   31.9|
|[17.3,32.5,33.3,3...|   33.1|
|[17.56,34.1,34.6,...|   33.6|
|[17.77,31.9,34.0,...|   34.0|
|[18.09,33.7,35.0,...|   34.6|
|[18.3,34.9,35.7,3...|   35.0|
|[18.35,35.1,35.5,...|   35.1|
|[18.35,35.1,35.6,...|   35.1|
|[18.4,35.3,36.2,3...|   35.2|
|[18.45,35.0,35.4,...|   35.3|
+--------------------+-------+
only showing top 10 rows



In [91]:
test_data.show(10)

+--------------------+-------+
|            features|Closing|
+--------------------+-------+
|[17.41,35.6,35.8,...|   33.3|
|[18.24,33.5,35.0,...|   34.9|
|[18.3,35.1,35.3,3...|   35.0|
|[18.45,35.0,35.6,...|   35.3|
|[18.56,35.6,35.6,...|   35.5|
|[18.66,35.5,36.1,...|   35.7|
|[18.71,35.6,35.9,...|   35.8|
|[18.92,35.8,36.7,...|   36.2|
|[19.08,36.8,36.9,...|   36.5|
|[19.13,36.3,36.7,...|   36.6|
+--------------------+-------+
only showing top 10 rows



In [92]:
# Build model
lr = LinearRegression(featuresCol = "features", labelCol="Closing", predictionCol="predicted_closing")

lr_model = lr.fit(train_data)

## 3.2. Predict ##

In [93]:
predictions = lr_model.transform(test_data)

In [94]:
predictions.show(10)

+--------------------+-------+------------------+
|            features|Closing| predicted_closing|
+--------------------+-------+------------------+
|[17.41,35.6,35.8,...|   33.3| 34.21137375701764|
|[18.24,33.5,35.0,...|   34.9| 34.53260019587356|
|[18.3,35.1,35.3,3...|   35.0|  34.9158037417087|
|[18.45,35.0,35.6,...|   35.3| 35.25365279752606|
|[18.56,35.6,35.6,...|   35.5| 35.25627667660775|
|[18.66,35.5,36.1,...|   35.7| 35.75230840569804|
|[18.71,35.6,35.9,...|   35.8|35.494671077255646|
|[18.92,35.8,36.7,...|   36.2|  36.1700556863158|
|[19.08,36.8,36.9,...|   36.5| 36.47246522659555|
|[19.13,36.3,36.7,...|   36.6| 36.45227339106082|
+--------------------+-------+------------------+
only showing top 10 rows



## 3.3. Visualization predict on test ##

# **4. Calculate Performace** #

In [95]:
evaluator = RegressionEvaluator(labelCol="Closing", predictionCol="predicted_closing", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

evaluator_r2 = RegressionEvaluator(labelCol="Closing", predictionCol="predicted_closing", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print("R-squared (R2) on test data: {:.3f}".format(r2))

Root Mean Squared Error (RMSE) on test data: 0.649
R-squared (R2) on test data: 0.999
