## Configure PySpark environment

In [1]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

In [2]:
sc.install_pypi_package("boto3==1.19.2")
sc.install_pypi_package("pandas==1.0.5")
sc.install_pypi_package("scipy==1, key.4.1")
sc.install_pypi_package("matplotlib==3.2.1")
sc.install_pypi_package("seaborn==0.10.1")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1685143379847_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Invalid package name: Package name and version must contain valid characters
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 1178, in install_pypi_package
    pypi_package = self._validate_package(pypi_package)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 1251, in _validate_package
    raise ValueError("Invalid package name: Package name and version must contain valid characters")
ValueError: Invalid package name: Package name and version must contain valid characters



## Connect to S3 bucket with the scraped data

In [3]:
import boto3
import json
import pandas as pd
import numpy as np

from pyspark.ml.feature import Tokenizer, Word2Vec
from pyspark.ml.feature import ElementwiseProduct, VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
s3 = boto3.resource('s3')
bucket = 'scraped-data-zh'
bucket_resource = s3.Bucket(bucket)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Cleaning function: convert json to Pandas dataframe

In [5]:
# Function takes in a JSON and returns a Pandas DataFrame for easier operation. 
def stocktwits_json_to_df(data, key):
    
    columns = ['id','created_at','username','name','user_id','body','basic_sentiment','reshare_count']
    df = pd.DataFrame(index=range(len(data)),columns=columns)
    for i, message in enumerate(data):
        df.loc[i,'id'] = message['id']
        df.loc[i,'created_at'] = message['created_at']
        df.loc[i,'username'] = message['user']['username']
        df.loc[i,'name'] = message['user']['name']
        df.loc[i,'user_id'] = message['user']['id']
        df.loc[i,'body'] = message['body']
        #We'll classify bullish as +1 and bearish as -1 to make it ready for classification training
        try:
            if (message['entities']['sentiment']['basic'] == 'Bullish'):
                df.loc[i,'basic_sentiment'] = 1
            elif (message['entities']['sentiment']['basic'] == 'Bearish'):
                df.loc[i,'basic_sentiment'] = -1
            else:
                df.loc[i,'basic_sentiment'] = 0
        except:
                df.loc[i,'basic_sentiment'] = 0
        try: 
            df.loc[i,'reshare_count'] = message['reshares']['reshared_count']
        except:
             df.loc[i,'reshare_count'] = 0
        df["symbol"] = key
        
    return df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Read in json files and convert to pandas dataframe

In [6]:
list_dfs = []
for i in bucket_resource.objects.all():
    if 'json' in i.key:
        obj = s3.Object(bucket, i.key)
        string = obj.get()['Body'].read().decode('utf-8')
        data = json.loads(string)
        df = stocktwits_json_to_df(data, i.key[:-5])
        list_dfs.append(df)
        
df = pd.concat(list_dfs)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Convert to spark dataframe

In [7]:
sparkDF = spark.createDataFrame(df) 

print('Total Columns: %d' % len(sparkDF.dtypes))
print('Total Rows: %d' % sparkDF.count())
sparkDF.printSchema()
sparkDF.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total Columns: 9
Total Rows: 269990
root
 |-- id: long (nullable = true)
 |-- created_at: string (nullable = true)
 |-- username: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- body: string (nullable = true)
 |-- basic_sentiment: long (nullable = true)
 |-- reshare_count: long (nullable = true)
 |-- symbol: string (nullable = true)

+---------+--------------------+--------------------+--------------------+-------+--------------------+---------------+-------------+------+
|       id|          created_at|            username|                name|user_id|                body|basic_sentiment|reshare_count|symbol|
+---------+--------------------+--------------------+--------------------+-------+--------------------+---------------+-------------+------+
|529591661|2023-05-26T17:58:05Z|          Austin2980|          Austin2980|4401868|$AAPL really no m...|              0|            0|  AAPL|
|529591424|2023-05-26T17:56:48Z|WolverineVcap

In [8]:
sentiment_cnt = (sparkDF.groupBy('basic_sentiment').count())
sentiment_cnt.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+------+
|basic_sentiment| count|
+---------------+------+
|              1| 55528|
|              0|191772|
|             -1| 22690|
+---------------+------+

## Classification Pipeline

In [9]:
# 1 = bullish, -1 = bearish
training_df = sparkDF.filter("basic_sentiment != 0")
sentiment_cnt = (training_df.groupBy('basic_sentiment').count())
sentiment_cnt.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+-----+
|basic_sentiment|count|
+---------------+-----+
|              1|55528|
|             -1|22690|
+---------------+-----+

In [10]:
training_df = training_df.withColumn('is_bullish', (training_df.basic_sentiment == 1).cast("integer"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
tokenizer = Tokenizer(inputCol="body", outputCol="body_tokens")
w2v = Word2Vec(vectorSize=50, minCount=0, inputCol="body_tokens", outputCol="body_vec")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
indexer = StringIndexer(inputCol="symbol", outputCol="symbol_index")
indexed = indexer.fit(training_df).transform(training_df)
indexed['symbol', 'symbol_index'].show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+------------+
|symbol|symbol_index|
+------+------------+
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
|  AAPL|         7.0|
+------+------------+
only showing top 20 rows

In [13]:
encoder = OneHotEncoder(inputCols=["symbol_index"], outputCols=["symbol_vec"])
encoded = encoder.fit(indexed).transform(indexed)
encoded[["symbol_index", "symbol_vec"]].show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+--------------+
|symbol_index|    symbol_vec|
+------------+--------------+
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
|         7.0|(44,[7],[1.0])|
+------------+--------------+
only showing top 20 rows

In [14]:
features = ['symbol_vec', 'body_vec']
assembler = VectorAssembler(inputCols = features, outputCol = 'features')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
my_pipeline = Pipeline(stages=[indexer, 
                               encoder, 
                               tokenizer, 
                               w2v,
                               assembler])
train = my_pipeline.fit(training_df).transform(training_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
lr = LogisticRegression(featuresCol='features', labelCol='is_bullish')
model = lr.fit(train)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# Training Summary Data
trainingSummary = model.summary

print("Training AUC: " + str(trainingSummary.areaUnderROC))
print("\nTraining Accuracy: " + str(trainingSummary.accuracy))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Training AUC: 0.7278661243742433

Training Accuracy: 0.7363906006290112

## Get Predictions

In [21]:
prediction_df = sparkDF.filter("basic_sentiment == 0")
prediction = my_pipeline.fit(prediction_df).transform(prediction_df)
predicted_df = model.transform(prediction)
out_df = predicted_df.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
from io import StringIO

csv_buffer = StringIO()
out_df.to_csv(csv_buffer)
s3.Object(bucket, 'predictions.csv').put(Body=csv_buffer.getvalue())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'ResponseMetadata': {'RequestId': 'PT2YMRXC5J729Q3M', 'HostId': 'PukbAPYjii4uxfMG4K4gfdDuVZCTKttu8PJ5KRVpLdabhLCYgmYoSHqmv8Ulx3QVwykSwbCeR1BAA7sNRKMv8g==', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'PukbAPYjii4uxfMG4K4gfdDuVZCTKttu8PJ5KRVpLdabhLCYgmYoSHqmv8Ulx3QVwykSwbCeR1BAA7sNRKMv8g==', 'x-amz-request-id': 'PT2YMRXC5J729Q3M', 'date': 'Fri, 26 May 2023 23:49:10 GMT', 'x-amz-server-side-encryption': 'AES256', 'etag': '"9752be78fbc137a7a0fbc06172cccf20"', 'server': 'AmazonS3', 'content-length': '0'}, 'RetryAttempts': 0}, 'ETag': '"9752be78fbc137a7a0fbc06172cccf20"', 'ServerSideEncryption': 'AES256'}