In [1]:
# Spark Session, Pipeline, Functions, and Metrics
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StringIndexer, StandardScaler, VectorAssembler, VectorIndexer
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

Using TensorFlow backend.


In [2]:
# Spark Session
conf = SparkConf().setAppName('Spark DL Project').setMaster('local[2]')
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)

In [3]:
sc

In [151]:
# Load Data to Spark Dataframe
textblob_data = sql_context.read.csv('/Users/lincong/Downloads/textblob_data.csv',
                    header=True,
                    inferSchema=True,
                    multiLine=True)

In [154]:
# View Schema
textblob_data.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- textblob_score: double (nullable = true)
 |-- textblob_category: string (nullable = true)
 |-- cleaned_eng_text: string (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Marketcap: double (nullable = true)
 |-- High_pre: double (nullable = true)
 |-- Low_pre: double (nullable = true)
 |-- Open_pre: double (nullable = true)
 |-- Close_pre: double (nullable = true)
 |-- Volume_pre: double (nullable = true)
 |-- Marketcap_pre: double (nullable = true)



In [15]:
# Preview Dataframe (Pandas Preview is Cleaner)
textblob_data.limit(5).toPandas()

Unnamed: 0,_c0,timestamp,textblob_score,textblob_category,cleaned_text,High,Low,Open,Close,Volume,Marketcap
0,0,2019-05-27,0.0,neutral,another test tweet caught stream bitcoin,8907.174441,8668.70492,8674.072403,8805.778377,27949840000.0,156093800000.0
1,1,2019-05-27,0.0,neutral,current crypto prices btc usd eth usd ltc usd ...,8907.174441,8668.70492,8674.072403,8805.778377,27949840000.0,156093800000.0
2,2,2019-05-27,0.0,neutral,spiv nosar baz bitcoin is an asset amp not a c...,8907.174441,8668.70492,8674.072403,8805.778377,27949840000.0,156093800000.0
3,3,2019-05-27,-0.1,negative,we building real bitcoin sv what building brok...,8907.174441,8668.70492,8674.072403,8805.778377,27949840000.0,156093800000.0
4,4,2019-05-27,0.2,positive,change is coming get ready boom another cb jab...,8907.174441,8668.70492,8674.072403,8805.778377,27949840000.0,156093800000.0


In [86]:
textblob_data.count()

52652

In [153]:
# Drop Unnessary Features (Day and Month)
textblob_data = textblob_data.drop('_c0')

In [155]:
#pipeline the word embedding

# tokenize the sentence to get words
tokenizer = Tokenizer(inputCol="cleaned_eng_text", outputCol="words")

# word2vec word embedding
word2Vec = Word2Vec(vectorSize = 100, minCount=1, inputCol="words", outputCol="feature")

pipeline = Pipeline(stages=[tokenizer, word2Vec])

In [156]:
model = pipeline.fit(textblob_data)
result = model.transform(textblob_data)

In [157]:
result.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- textblob_score: double (nullable = true)
 |-- textblob_category: string (nullable = true)
 |-- cleaned_eng_text: string (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Marketcap: double (nullable = true)
 |-- High_pre: double (nullable = true)
 |-- Low_pre: double (nullable = true)
 |-- Open_pre: double (nullable = true)
 |-- Close_pre: double (nullable = true)
 |-- Volume_pre: double (nullable = true)
 |-- Marketcap_pre: double (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- feature: vector (nullable = true)



In [108]:
score_features = ['High_pre', 'Low_pre', 'Open_pre', 'Close_pre', 'Volume_pre', 'Marketcap_pre', 'textblob_score']
text_features = ['High_pre', 'Low_pre', 'Open_pre', 'Close_pre', 'Volume_pre', 'Marketcap_pre', 'feature']

In [119]:
# Split the dataset randomly into 70% for training and 30% for testing. Passing a seed for deterministic behavior
train1, test1 = score_lr_data.randomSplit([0.7, 0.3], seed = 1234)
print("There are %d training examples and %d test examples." % (train1.count(), test1.count()))

There are 36853 training examples and 15799 test examples.


In [120]:
vectorAssembler1 = VectorAssembler(inputCols = score_features, outputCol = 'features')
score_lr_df = vectorAssembler1.transform(result)
score_lr_df = score_lr_df.select(['features', 'Close'])
score_lr_df.show(3)

+--------------------+-----------------+
|            features|            Close|
+--------------------+-----------------+
|[8687.52079924,79...|8805.778376600001|
|[8687.52079924,79...|8805.778376600001|
|[8687.52079924,79...|8805.778376600001|
+--------------------+-----------------+
only showing top 3 rows



In [122]:
splits1 = score_lr_df.randomSplit([0.7, 0.3], seed = 1234)
train_score_lr_df = splits1[0]
test_score_lr_df = splits1[1]

In [123]:
# linear model using sentiment score
lr1 = LinearRegression(featuresCol = 'features', labelCol='Close', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model1 = lr1.fit(train_score_lr_df)
print("Coefficients: " + str(lr_model1.coefficients))
print("Intercept: " + str(lr_model1.intercept))

Coefficients: [0.15476622014734134,0.5303207617312827,0.49600687324413983,-0.07439739895179205,7.924133813753182e-08,-5.243269042880468e-09,4.720183709655843]
Intercept: -845.1784893131276


In [124]:
trainingSummary1 = lr_model1.summary
print("RMSE: %f" % trainingSummary1.rootMeanSquaredError)
print("r2: %f" % trainingSummary1.r2)

RMSE: 254.847128
r2: 0.807795


In [126]:
lr_predictions1 = lr_model1.transform(test_score_lr_df)
lr_predictions1.select("prediction","Close","features").show(5)

lr_evaluator1 = RegressionEvaluator(predictionCol="prediction", labelCol="Close",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator1.evaluate(lr_predictions1))

+-------------------+------------------+--------------------+
|         prediction|             Close|            features|
+-------------------+------------------+--------------------+
| -728.6287785951586|108.99400329589844|[113.245002746582...|
| -611.7609249104727|257.32101440429693|[240.259002685546...|
|-255.71305683399896|     594.916015625|[586.234985351562...|
|   3077.69176764182|3413.7677182999996|[3478.14811641,34...|
|   3085.07009313032|     3521.06078726|[3501.95413455,34...|
+-------------------+------------------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.795829


In [127]:
test_score_lr_result = lr_model1.evaluate(test_score_lr_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_score_lr_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 257.676


In [None]:
lr_evaluator2 = RegressionEvaluator(predictionCol="prediction", labelCol="Close",metricName="mae")
print("MAE on test data = %g" % lr_evaluator2.evaluate(lr_predictions1))

In [129]:
# gradient boosting using sentiment score
gbt1 = GBTRegressor(featuresCol = 'features', labelCol = 'Close', maxIter=10)
gbt_model1 = gbt1.fit(train_score_lr_df)
gbt_predictions1 = gbt_model1.transform(test_score_lr_df)
gbt_predictions1.select('prediction', 'Close', 'features').show(5)

+-----------------+------------------+--------------------+
|       prediction|             Close|            features|
+-----------------+------------------+--------------------+
|5226.917884376136|108.99400329589844|[113.245002746582...|
|5087.323810970841|257.32101440429693|[240.259002685546...|
|5087.323810970841|     594.916015625|[586.234985351562...|
|5158.948128214426|3413.7677182999996|[3478.14811641,34...|
|5158.948128214426|     3521.06078726|[3501.95413455,34...|
+-----------------+------------------+--------------------+
only showing top 5 rows



In [130]:
gbt_evaluator1 = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="rmse")
rmse1 = gbt_evaluator1.evaluate(gbt_predictions1)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse1)

Root Mean Squared Error (RMSE) on test data = 148.237


In [None]:
gbt_evaluator2 = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="mae")
mae1 = gbt_evaluator2.evaluate(gbt_predictions1)
print("MAE on test data = %g" % mae1)

In [None]:
gbt_evaluator3 = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="r2")
r21 = gbt_evaluator3.evaluate(gbt_predictions1)
print("R Square on test data = %g" % r21)

In [136]:
vectorAssembler2 = VectorAssembler(inputCols = text_features, outputCol = 'features')
text_lr_df = vectorAssembler2.transform(result)
text_lr_df = text_lr_df.select(['features', 'Close'])
text_lr_df.show(3)

+--------------------+-----------------+
|            features|            Close|
+--------------------+-----------------+
|[8687.52079924,79...|8805.778376600001|
|[8687.52079924,79...|8805.778376600001|
|[8687.52079924,79...|8805.778376600001|
+--------------------+-----------------+
only showing top 3 rows



In [137]:
splits2 = text_lr_df.randomSplit([0.7, 0.3], seed = 1234)
train_text_lr_df = splits2[0]
test_text_lr_df = splits2[1]

In [139]:
lr2 = LinearRegression(featuresCol = 'features', labelCol='Close', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model2 = lr2.fit(train_text_lr_df)

In [140]:
trainingSummary2 = lr_model2.summary
print("RMSE: %f" % trainingSummary2.rootMeanSquaredError)
print("r2: %f" % trainingSummary2.r2)

RMSE: 259.725498
r2: 0.800366


In [142]:
lr_predictions2 = lr_model2.transform(test_text_lr_df)
lr_predictions2.select("prediction","Close","features").show(5)

lr_evaluator3 = RegressionEvaluator(predictionCol="prediction", labelCol="Close",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator3.evaluate(lr_predictions2))

+-------------------+------------------+--------------------+
|         prediction|             Close|            features|
+-------------------+------------------+--------------------+
| -900.3987966208587|108.99400329589844|[113.245002746582...|
| -705.7609811483867|257.32101440429693|[240.259002685546...|
|-404.53777180987413|     594.916015625|[586.234985351562...|
| 2989.6676733029085|3413.7677182999996|[3478.14811641,34...|
| 3036.3718617219884|     3521.06078726|[3501.95413455,34...|
+-------------------+------------------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.787573


In [143]:
test_text_lr_result = lr_model2.evaluate(test_text_lr_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_text_lr_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 262.834


In [144]:
lr_evaluator4 = RegressionEvaluator(predictionCol="prediction", labelCol="Close",metricName="mae")
print("R Squared (R2) on test data = %g" % lr_evaluator4.evaluate(lr_predictions2))

R Squared (R2) on test data = 223.77


In [145]:
gbt2 = GBTRegressor(featuresCol = 'features', labelCol = 'Close', maxIter=10)
gbt_model2 = gbt2.fit(train_text_lr_df)
gbt_predictions2 = gbt_model2.transform(test_text_lr_df)
gbt_predictions2.select('prediction', 'Close', 'features').show(5)

+-----------------+------------------+--------------------+
|       prediction|             Close|            features|
+-----------------+------------------+--------------------+
|5367.287915364713|108.99400329589844|[113.245002746582...|
|4992.690272769945|257.32101440429693|[240.259002685546...|
|4281.569510292039|     594.916015625|[586.234985351562...|
|5266.479161137053|3413.7677182999996|[3478.14811641,34...|
|5204.299533460629|     3521.06078726|[3501.95413455,34...|
+-----------------+------------------+--------------------+
only showing top 5 rows



In [146]:
gbt_evaluator4 = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="rmse")
rmse2 = gbt_evaluator4.evaluate(gbt_predictions2)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse2)

Root Mean Squared Error (RMSE) on test data = 155.41


In [147]:
gbt_evaluator5 = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="mae")
mae2 = gbt_evaluator5.evaluate(gbt_predictions2)
print("Root Mean Squared Error (RMSE) on test data = %g" % mae2)

Root Mean Squared Error (RMSE) on test data = 14.5282


In [148]:
gbt_evaluator6 = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="r2")
r22 = gbt_evaluator6.evaluate(gbt_predictions2)
print("Root Mean Squared Error (RMSE) on test data = %g" % r22)

Root Mean Squared Error (RMSE) on test data = 0.925732


In [158]:
result.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- textblob_score: double (nullable = true)
 |-- textblob_category: string (nullable = true)
 |-- cleaned_eng_text: string (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Marketcap: double (nullable = true)
 |-- High_pre: double (nullable = true)
 |-- Low_pre: double (nullable = true)
 |-- Open_pre: double (nullable = true)
 |-- Close_pre: double (nullable = true)
 |-- Volume_pre: double (nullable = true)
 |-- Marketcap_pre: double (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- feature: vector (nullable = true)



In [160]:
rnn_data = result.select(['timestamp', 'textblob_score', 'High', 'Low', 'Open', 'Close', 'Volume', 'Marketcap', 'feature'])

In [170]:
rnn_lstm_data = rnn_data.groupBy("timestamp") \
    .avg('textblob_score', 'High', 'Low', 'Open', 'Close', 'Volume', 'Marketcap')\
    .withColumnRenamed("avg(textblob_score)","textblob_score")\
    .withColumnRenamed("avg(High)","High")\
    .withColumnRenamed("avg(Low)","Low")\
    .withColumnRenamed("avg(Open)","Open")\
    .withColumnRenamed("avg(Close)","Close")\
    .withColumnRenamed("avg(Volume)","Volume")\
    .withColumnRenamed("avg(Marketcap)","Marketcap")

In [174]:
rnn_lstm_data.repartition(1).write.option("header", "true").csv('/Users/lincong/Downloads/lstm_data.csv')