In [16]:
!wget -q https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz 
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/default-java"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"
!pip install -q findspark
import findspark
findspark.init()

In [17]:
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import length, col
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
sql = SQLContext(sc)


In [18]:
import numpy as np
import pandas as pd
import pyspark.ml as ml
from functools import reduce
import pandas_datareader as pdr
import matplotlib.pyplot as plt
from pyspark.sql.types import *
from pyspark.ml.feature import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import  BinaryClassificationMetrics,MulticlassMetrics
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [75]:
import pyspark
from pyspark.sql.types import StructField
from pyspark.sql.types import *

custom_schema = StructType([
        StructField("date", StringType(), True),
        StructField("Open", FloatType(), True),
        StructField("High", FloatType(), True),
        StructField("Low", FloatType(), True),
        StructField("Close", FloatType(), True),
    ])

In [27]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [76]:
# Financial Data
manufacturing_stock = spark.read.csv("/content/drive/MyDrive/data/Manufacturing_avg_prices.csv", sep=',',schema= custom_schema, header=True)

# News sentement data
sentiment_news = spark.read.csv("/content/drive/MyDrive/data/full_score.csv", sep=',', header=True)

# Join two dataframes
manufacturing_df = manufacturing_stock.join(sentiment_news,"date")

In [77]:
manufacturing_df =manufacturing_df.dropna()

In [78]:
manufacturing_df.show()

+----------+----+----+----+-----+---+--------------------+--------------------+----------+----------+
|      date|Open|High| Low|Close|_c0|            headline|             snippet|positivity|negativity|
+----------+----+----+----+-----+---+--------------------+--------------------+----------+----------+
|2020-04-02|54.8|54.9|54.8| 54.8| 75|Gerald Freedman, ...|Among his credits...|     0.211|       0.0|
|2020-04-02|54.8|54.9|54.8| 54.8| 76|Review: In Apple’...|Brooklynn Prince ...|     0.069|       0.0|
|2020-04-03|56.2|56.2|56.2| 56.2| 77|Crisis Gives Fake...|Police impersonat...|     0.068|     0.155|
|2020-04-07|59.2|59.3|59.2| 59.2| 79|Trump Ousts Pande...|The official had ...|     0.148|       0.0|
|2020-04-08|59.4|59.4|59.3| 59.4| 80|What’s on TV Wedn...|“Parasite” hits H...|       0.0|       0.0|
|2020-04-09|60.3|60.3|60.2| 60.3| 81|In Britain, Summe...|The losses of Wim...|       0.0|     0.101|
|2020-04-13|59.9|59.9|59.8| 59.9| 85|    What Is a Tribe?|Human culture as ...|   

In [79]:
manufacturing_df_drop = manufacturing_df.drop('_c0','headline','snippet')

In [80]:
manufacturing_df_drop.show()

+----------+----+----+----+-----+----------+----------+
|      date|Open|High| Low|Close|positivity|negativity|
+----------+----+----+----+-----+----------+----------+
|2020-04-02|54.8|54.9|54.8| 54.8|     0.211|       0.0|
|2020-04-02|54.8|54.9|54.8| 54.8|     0.069|       0.0|
|2020-04-03|56.2|56.2|56.2| 56.2|     0.068|     0.155|
|2020-04-07|59.2|59.3|59.2| 59.2|     0.148|       0.0|
|2020-04-08|59.4|59.4|59.3| 59.4|       0.0|       0.0|
|2020-04-09|60.3|60.3|60.2| 60.3|       0.0|     0.101|
|2020-04-13|59.9|59.9|59.8| 59.9|       0.0|       0.0|
|2020-04-16|60.5|60.6|60.5| 60.5|       0.0|     0.228|
|2020-04-16|60.5|60.6|60.5| 60.5|      0.04|      0.14|
|2020-04-16|60.5|60.6|60.5| 60.5|       0.0|     0.254|
|2020-04-17|61.1|61.2|61.1| 61.1|       0.0|     0.141|
|2020-04-17|61.1|61.2|61.1| 61.1|     0.412|       0.0|
|2020-04-17|61.1|61.2|61.1| 61.1|     0.281|       0.0|
|2020-04-21|60.4|60.4|60.4| 60.4|       0.0|       0.0|
|2020-04-21|60.4|60.4|60.4| 60.4|     0.204|    

In [85]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [82]:
vector_assmebler= VectorAssembler(inputCols=['Open','High','Low','Close'] 
                                ,outputCol="features")
df_transformed= vector_assmebler.transform(manufacturing_df_drop) 
df_transformed.show()

+----------+----+----+----+-----+----------+----------+--------------------+
|      date|Open|High| Low|Close|positivity|negativity|            features|
+----------+----+----+----+-----+----------+----------+--------------------+
|2020-04-02|54.8|54.9|54.8| 54.8|     0.211|       0.0|[54.7999992370605...|
|2020-04-02|54.8|54.9|54.8| 54.8|     0.069|       0.0|[54.7999992370605...|
|2020-04-03|56.2|56.2|56.2| 56.2|     0.068|     0.155|[56.2000007629394...|
|2020-04-07|59.2|59.3|59.2| 59.2|     0.148|       0.0|[59.2000007629394...|
|2020-04-08|59.4|59.4|59.3| 59.4|       0.0|       0.0|[59.4000015258789...|
|2020-04-09|60.3|60.3|60.2| 60.3|       0.0|     0.101|[60.2999992370605...|
|2020-04-13|59.9|59.9|59.8| 59.9|       0.0|       0.0|[59.9000015258789...|
|2020-04-16|60.5|60.6|60.5| 60.5|       0.0|     0.228|[60.5,60.59999847...|
|2020-04-16|60.5|60.6|60.5| 60.5|      0.04|      0.14|[60.5,60.59999847...|
|2020-04-16|60.5|60.6|60.5| 60.5|       0.0|     0.254|[60.5,60.59999847...|

In [86]:
df_pos_model=df_transformed.select('features','Positivity')
df_pos_model.show()

+--------------------+----------+
|            features|Positivity|
+--------------------+----------+
|[54.7999992370605...|     0.211|
|[54.7999992370605...|     0.069|
|[56.2000007629394...|     0.068|
|[59.2000007629394...|     0.148|
|[59.4000015258789...|       0.0|
|[60.2999992370605...|       0.0|
|[59.9000015258789...|       0.0|
|[60.5,60.59999847...|       0.0|
|[60.5,60.59999847...|      0.04|
|[60.5,60.59999847...|       0.0|
|[61.0999984741210...|       0.0|
|[61.0999984741210...|     0.412|
|[61.0999984741210...|     0.281|
|[60.4000015258789...|       0.0|
|[60.4000015258789...|     0.204|
|[59.2999992370605...|       0.0|
|[61.2999992370605...|       0.0|
|[61.2999992370605...|       0.0|
|[63.0999984741210...|       0.0|
|[62.9000015258789...|       0.0|
+--------------------+----------+
only showing top 20 rows



In [87]:
# Cast Course_Fees from integer type to float type
df_pos_model_cast = df_pos_model.withColumn("Positivity", 
                                  df_pos_model["Positivity"]
                                  .cast('float'))
df_pos_model_cast.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Positivity: float (nullable = true)



In [42]:
df_neg_model=df_transformed.select('features','Negativity')
df_neg_model.show(truncate= False)

+----------------------------------------------------------------------------+----------+
|features                                                                    |Negativity|
+----------------------------------------------------------------------------+----------+
|[54.83000183105469,54.869998931884766,54.790000915527344,54.83000183105469] |0.0       |
|[54.83000183105469,54.869998931884766,54.790000915527344,54.83000183105469] |0.0       |
|[56.20000076293945,56.2400016784668,56.16999816894531,56.20000076293945]    |0.155     |
|[59.2400016784668,59.279998779296875,59.189998626708984,59.2400016784668]   |0.0       |
|[59.369998931884766,59.400001525878906,59.33000183105469,59.369998931884766]|0.0       |
|[60.2599983215332,60.29999923706055,60.220001220703125,60.2599983215332]    |0.101     |
|[59.849998474121094,59.880001068115234,59.81999969482422,59.849998474121094]|0.0       |
|[60.52000045776367,60.560001373291016,60.4900016784668,60.529998779296875]  |0.228     |
|[60.52000

In [57]:
# Cast Course_Fees from integer type to float type
df_neg_model_cast = df_neg_model.withColumn("Negativity", 
                                  df_neg_model["Negativity"]
                                  .cast('float'))
df_neg_model_cast.printSchema()


root
 |-- features: vector (nullable = true)
 |-- Negativity: float (nullable = true)



In [59]:
# split the data, %80 train %20 test
(training, test) = df_neg_model_cast.randomSplit([0.8, 0.2], seed = 1234)
print (training.first())

Row(features=DenseVector([45.6, 45.62, 45.58, 45.6]), Negativity=0.0)


In [98]:
training_no_null =training.dropna()
test_no_null = test.dropna()

In [90]:
#Linear regression model
from pyspark.ml.regression import LinearRegression
Linear_Regression=LinearRegression(labelCol='Negativity')
regression = Linear_Regression.fit(training_no_null)

In [91]:
print(f"intercept: {regression.intercept}")
print(f"coefficiencts: {regression.coefficients}")

intercept: 0.07415187493515459
coefficiencts: [0.0010495503535216289,-0.05019627353940324,0.06760333314063317,-0.018477891022934526]


In [93]:
training_predictions=regression.evaluate(training_no_null)

print("Training R squared:", training_predictions.r2)

Training R squared: 0.0012274194760139023


In [99]:
test_prediction  = regression.evaluate(test_no_null)

In [100]:
print("Testing MSE: ", test_prediction.meanSquaredError)

Testing MSE:  0.009504081772197541


In [101]:
#Predictions 
df_prediction = test_prediction.predictions
df_prediction.orderBy('prediction',ascending=False).show(10, truncate = False)


+-----------------------------------------------------------------------------+----------+-------------------+
|features                                                                     |Negativity|prediction         |
+-----------------------------------------------------------------------------+----------+-------------------+
|[45.599998474121094,45.599998474121094,45.599998474121094,45.599998474121094]|0.0       |0.07318145825848406|
|[45.599998474121094,45.599998474121094,45.599998474121094,45.599998474121094]|0.0       |0.07318145825848406|
|[45.599998474121094,45.599998474121094,45.599998474121094,45.599998474121094]|0.0       |0.07318145825848406|
|[45.599998474121094,45.599998474121094,45.599998474121094,45.599998474121094]|0.0       |0.07318145825848406|
|[45.599998474121094,45.599998474121094,45.599998474121094,45.599998474121094]|0.0       |0.07318145825848406|
|[45.599998474121094,45.599998474121094,45.599998474121094,45.599998474121094]|0.0       |0.07318145825848406|
|

In [102]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'Negativity')
dt_model = dt.fit(training_no_null)
dt_predictions = dt_model.transform(test_no_null)
dt_evaluator = RegressionEvaluator(labelCol="Negativity", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on negative test data = %g" % rmse)

Root Mean Squared Error (RMSE) on negative test data = 0.0971791


In [103]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'Negativity', maxIter=10)
gbt_model = gbt.fit(training_no_null)
gbt_predictions = gbt_model.transform(test_no_null)
gbt_predictions.select('prediction', 'Negativity', 'features').show(5)

+-------------------+----------+--------------------+
|         prediction|Negativity|            features|
+-------------------+----------+--------------------+
|0.06802302487545628|       0.0|[45.5999984741210...|
|0.06802302487545628|       0.0|[45.5999984741210...|
|0.06802302487545628|       0.0|[45.5999984741210...|
|0.06802302487545628|       0.0|[45.5999984741210...|
|0.06802302487545628|       0.0|[45.5999984741210...|
+-------------------+----------+--------------------+
only showing top 5 rows



In [104]:
gbt_evaluator = RegressionEvaluator(
    labelCol="Negativity", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on negative test data = %g" % rmse)

Root Mean Squared Error (RMSE) on negative test data = 0.0971925


In [113]:
(training_pos, test_pos) = df_pos_model_cast.randomSplit([0.8, 0.2], seed = 1234)
print (training_pos.first())

Row(features=DenseVector([45.6, 45.6, 45.6, 45.6]), Positivity=0.0)


In [114]:
training_pos_null = training_pos.dropna()
test_pos_null = test_pos.dropna()

In [115]:
#Linear regression model
from pyspark.ml.regression import LinearRegression
Linear_Regression=LinearRegression(labelCol='Positivity')
regression = Linear_Regression.fit(training_pos_null)

In [116]:
print(f"intercept: {regression.intercept}")
print(f"coefficiencts: {regression.coefficients}")

intercept: 0.07205059081704651
coefficiencts: [0.047360323278074164,-0.07835544312932988,0.11093956891857094,-0.07981963490867372]


In [117]:
training_predictions=regression.evaluate(training_pos_null)

print("Training R squared:", training_predictions.r2)

Training R squared: 0.003033291094148294


In [118]:
test_prediction  = regression.evaluate(test_pos_null)

In [119]:
print("Testing MSE: ", test_prediction.meanSquaredError)

Testing MSE:  0.009702271030605081


In [120]:
#Predictions 
df_prediction = test_prediction.predictions
df_prediction.orderBy('prediction',ascending=False).show(10, truncate = False)

+-----------------------------------------------------------------------------+----------+------------------+
|features                                                                     |Positivity|prediction        |
+-----------------------------------------------------------------------------+----------+------------------+
|[132.39999389648438,132.39999389648438,132.39999389648438,132.39999389648438]|0.0       |0.0885759846593756|
|[132.39999389648438,132.39999389648438,132.39999389648438,132.39999389648438]|0.114     |0.0885759846593756|
|[132.39999389648438,132.39999389648438,132.39999389648438,132.39999389648438]|0.0       |0.0885759846593756|
|[132.39999389648438,132.39999389648438,132.39999389648438,132.39999389648438]|0.0       |0.0885759846593756|
|[132.39999389648438,132.39999389648438,132.39999389648438,132.39999389648438]|0.0       |0.0885759846593756|
|[132.39999389648438,132.39999389648438,132.39999389648438,132.39999389648438]|0.0       |0.0885759846593756|
|[132.3999

In [121]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'Positivity')
dt_model = dt.fit(training_pos_null)
dt_predictions = dt_model.transform(test_pos_null)
dt_evaluator = RegressionEvaluator(labelCol="Positivity", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on positive test data = %g" % rmse)

Root Mean Squared Error (RMSE) on positive test data = 0.0982396


In [124]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'Positivity', maxIter=10)
gbt_model = gbt.fit(training_pos_null)
gbt_predictions = gbt_model.transform(test_pos_null)
gbt_predictions.select('prediction', 'Positivity', 'features').show(5)

+-------------------+----------+--------------------+
|         prediction|Positivity|            features|
+-------------------+----------+--------------------+
|0.07617717172183193|       0.0|[45.5999984741210...|
|0.07617717172183193|       0.0|[45.5999984741210...|
|0.07617717172183193|       0.0|[45.5999984741210...|
|0.07617717172183193|       0.0|[45.5999984741210...|
|0.07617717172183193|       0.0|[45.5999984741210...|
+-------------------+----------+--------------------+
only showing top 5 rows



In [125]:
gbt_evaluator = RegressionEvaluator(
    labelCol="Positivity", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on positive test data = %g" % rmse)

Root Mean Squared Error (RMSE) on positive test data = 0.0983112
