In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
path = 'GME_with_comments_groupped_text'

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import  SparkContext
from pyspark.sql.types import StructType

import pyspark.sql.types as Ts
from pyspark.sql import functions as F 

In [None]:
conf = SparkConf().setAppName('appName').setMaster('local[*]')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [None]:
data = spark.read.option("delimiter", ",")\
                   .option("header", "true")\
                   .option("multiline", "true")\
                   .option("escape", "\\")\
                   .option("escape", '"')\
                   .option("quote", '"')\
                   .csv(path)
data = data.limit(1)
data.show()

+---+----------+-----+------------+---------+--------------------+--------------------+--------------------+-----+-----+----------+----------+----------+---------+
|_c0|      date|likes|comments_num|posts_num|              titles|            comments|            selftext| Open| High|       Low|     Close| Adj Close|   Volume|
+---+----------+-----+------------+---------+--------------------+--------------------+--------------------+-----+-----+----------+----------+----------+---------+
|318|2021-12-25|    6|           0|        6|To my fellow Apes...| I am a bot from ...| So, while we're ...|154.0|155.0|146.020004|152.139999|152.139999|1055500.0|
+---+----------+-----+------------+---------+--------------------+--------------------+--------------------+-----+-----+----------+----------+----------+---------+



# Cleaning

In [None]:
import nltk
import re

from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

nltk.download('words')
nltk.download('stopwords')
nltk.download('vader_lexicon')

[nltk_data] Downloading package words to /root/nltk_data...
[nltk_data]   Package words is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package vader_lexicon to /root/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


True

In [None]:
stemmer = nltk.PorterStemmer()
lemm = nltk.WordNetLemmatizer()

# eng_words = set(nltk.corpus.words.words())

stop_words = set(stopwords.words('english'))

In [None]:
def cleaning(x):
  x = re.sub(r"\/r|\/n|\/t", '', x)
  x = re.sub(r"\\n|\\t|\\r", '', x)
  x = re.sub('[^a-zA-Z]',' ', x)
  x = re.sub(r"(http|ftp|https)://([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:/~+#-]*[\w@?^=%&/~+#-])?", ' ', x)
  x = str(x).replace("\n", '')
  x = str(x).replace("/r", '')
  x = str(x).replace("[removed]", '')
  
  x=x.lower().split()
  # tweet = [w for w in tweet if w in eng_words or not w.isalpha()]
  x=[stemmer.stem(word) for word in x if (word not in stop_words)]
  # tweet=[lemm.lemmatize(word) for word in tweet if (word not in stop_words)]

  return ' '.join(x)

cleaning_udf = F.udf( lambda x: cleaning(x), returnType=Ts.StringType() )

In [None]:
data2 = data.withColumn( 'proc_comments', cleaning_udf('comments') )
data2.show()

+---+----------+-----+------------+---------+--------------------+--------------------+--------------------+-----+-----+----------+----------+----------+---------+-----------+--------------------+
|_c0|      date|likes|comments_num|posts_num|              titles|            comments|            selftext| Open| High|       Low|     Close| Adj Close|   Volume|       test|       proc_comments|
+---+----------+-----+------------+---------+--------------------+--------------------+--------------------+-----+-----+----------+----------+----------+---------+-----------+--------------------+
|318|2021-12-25|    6|           0|        6|To my fellow Apes...| I am a bot from ...| So, while we're ...|154.0|155.0|146.020004|152.139999|152.139999|1055500.0|{a -> null}|bot wallstreetbet...|
+---+----------+-----+------------+---------+--------------------+--------------------+--------------------+-----+-----+----------+----------+----------+---------+-----------+--------------------+



# Processing

In [None]:
!pip install pysentiment2

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pysentiment2
  Downloading pysentiment2-0.1.1-py3-none-any.whl (1.9 MB)
[K     |████████████████████████████████| 1.9 MB 5.7 MB/s 
Installing collected packages: pysentiment2
Successfully installed pysentiment2-0.1.1


In [None]:
import pysentiment2 as ps2
import datetime

In [None]:
lm = ps2.LM()
hiv4 = ps2.HIV4()

In [None]:
lm_schema = Ts.MapType(
    keyType=Ts.StringType(), valueType=Ts.FloatType()
)

def lm_scoring(x):
  token_lm = lm.tokenize(x)
  score_lm = lm.get_score(token_lm)
  score_lm_2 = {
      'Positive': float(score_lm['Positive']),
      'Negative': float(score_lm['Negative']),
      'Polarity': float(score_lm['Polarity']),
      'Subjectivity': float(score_lm['Subjectivity'])
  }
  return score_lm_2

# lm_scor_udf = F.udf( lambda x: lm_scoring(x), returnType=Ts.MapType(keyType=Ts.StringType(), valueType=Ts.StringType() ) )
lm_scor_udf = F.udf( lambda x: lm_scoring(x), lm_schema )

# lm_scoring(data.rdd.map(lambda x: x.comments).collect()[0])


In [None]:
def hiv4_scoring(x):
  token_h4 = lm.tokenize(x)
  score_h4 = lm.get_score(token_h4)
  score_h4_2 = {
      'Positive': float(score_h4['Positive']),
      'Negative': float(score_h4['Negative']),
      'Polarity': float(score_h4['Polarity']),
      'Subjectivity': float(score_h4['Subjectivity'])
  }
  return score_h4_2

h4_scor_udf = F.udf( lambda x: hiv4_scoring(x), returnType=Ts.MapType(Ts.StringType(), valueType=Ts.FloatType() ) )


In [None]:
def vader_scoring(x):
  token_vader = lm.tokenize(x)
  score_vader = lm.get_score(token_vader)
  score_vader_2 = {
      'Positive': float(score_vader['Positive']),
      'Negative': float(score_vader['Negative']),
      'Polarity': float(score_vader['Polarity']),
      'Subjectivity': float(score_vader['Subjectivity'])
  }
  return score_vader_2
  
vader_scor_udf = F.udf( lambda x: vader_scoring(x), returnType=Ts.MapType(keyType=Ts.StringType(), valueType=Ts.DoubleType() ) )


In [None]:
data2 = data.withColumn( 'comments_lm_scoring_dict', lm_scor_udf(F.col('comments')) )\
            .withColumn( 'lm_scor_Positive', F.col('comments_lm_scoring_dict')['Positive'])\
            .withColumn( 'lm_scor_Negative', F.col('comments_lm_scoring_dict')['Negative'])\
            .withColumn( 'lm_scor_Polarity', F.col('comments_lm_scoring_dict')['Polarity'])\
            .withColumn( 'lm_scor_Subjectivity', F.col('comments_lm_scoring_dict')['Subjectivity'])\
            .drop('comments_lm_scoring_dict')
            
data2.show()

+---+----------+-----+------------+---------+--------------------+--------------------+--------------------+-----+-----+----------+----------+----------+---------+-----------+----------------+----------------+----------------+--------------------+
|_c0|      date|likes|comments_num|posts_num|              titles|            comments|            selftext| Open| High|       Low|     Close| Adj Close|   Volume|       test|lm_scor_Positive|lm_scor_Negative|lm_scor_Polarity|lm_scor_Subjectivity|
+---+----------+-----+------------+---------+--------------------+--------------------+--------------------+-----+-----+----------+----------+----------+---------+-----------+----------------+----------------+----------------+--------------------+
|318|2021-12-25|    6|           0|        6|To my fellow Apes...| I am a bot from ...| So, while we're ...|154.0|155.0|146.020004|152.139999|152.139999|1055500.0|{a -> null}|          1694.0|          4354.0|      -0.4398148|          0.07018195|
+---+---

In [None]:
data2 = data2.withColumn( 'comments_h4_scoring_dict', h4_scor_udf(F.col('comments')) )\
            .withColumn( 'h4_scor_Positive', F.col('comments_h4_scoring_dict')['Positive'])\
            .withColumn( 'h4_scor_Negative', F.col('comments_h4_scoring_dict')['Negative'])\
            .withColumn( 'h4_scor_Polarity', F.col('comments_h4_scoring_dict')['Polarity'])\
            .withColumn( 'h4_scor_Subjectivity', F.col('comments_h4_scoring_dict')['Subjectivity'])\
            .drop('comments_h4_scoring_dict')

data2.show()

+---+----------+-----+------------+---------+--------------------+--------------------+--------------------+-----+-----+----------+----------+----------+---------+-----------+----------------+----------------+----------------+--------------------+----------------+----------------+----------------+--------------------+
|_c0|      date|likes|comments_num|posts_num|              titles|            comments|            selftext| Open| High|       Low|     Close| Adj Close|   Volume|       test|lm_scor_Positive|lm_scor_Negative|lm_scor_Polarity|lm_scor_Subjectivity|h4_scor_Positive|h4_scor_Negative|h4_scor_Polarity|h4_scor_Subjectivity|
+---+----------+-----+------------+---------+--------------------+--------------------+--------------------+-----+-----+----------+----------+----------+---------+-----------+----------------+----------------+----------------+--------------------+----------------+----------------+----------------+--------------------+
|318|2021-12-25|    6|           0|     

In [None]:
data2 = data2.withColumn( 'comments_vader_scoring_dict', h4_scor_udf(F.col('comments')) )\
            .withColumn( 'vader_scor_Positive', F.col('comments_vader_scoring_dict')['Positive'])\
            .withColumn( 'vader_scor_Negative', F.col('comments_vader_scoring_dict')['Negative'])\
            .withColumn( 'vader_scor_Polarity', F.col('comments_vader_scoring_dict')['Polarity'])\
            .withColumn( 'vader_scor_Subjectivity', F.col('comments_vader_scoring_dict')['Subjectivity'])\
            .drop('comments_vader_scoring_dict')

data2.show()

+---+----------+-----+------------+---------+--------------------+--------------------+--------------------+-----+-----+----------+----------+----------+---------+-----------+----------------+----------------+----------------+--------------------+----------------+----------------+----------------+--------------------+-------------------+-------------------+-------------------+-----------------------+
|_c0|      date|likes|comments_num|posts_num|              titles|            comments|            selftext| Open| High|       Low|     Close| Adj Close|   Volume|       test|lm_scor_Positive|lm_scor_Negative|lm_scor_Polarity|lm_scor_Subjectivity|h4_scor_Positive|h4_scor_Negative|h4_scor_Polarity|h4_scor_Subjectivity|vader_scor_Positive|vader_scor_Negative|vader_scor_Polarity|vader_scor_Subjectivity|
+---+----------+-----+------------+---------+--------------------+--------------------+--------------------+-----+-----+----------+----------+----------+---------+-----------+----------------+

In [None]:
data2.write.format("csv").save("storage/text_processed.csv")
