In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install pyspark
!pip install afinn

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 68kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 20.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=88fec3885f825325ce67088117ba5f0b864d30a36ba45875de233018bcc67635
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1
Coll

In [None]:
from pyspark.sql import SparkSession as ss
from pyspark.sql.functions import udf
from pyspark.ml.feature import MaxAbsScaler
# from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import FloatType,StringType
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from afinn import Afinn
from textblob import TextBlob
import nltk
nltk.download('vader_lexicon')
spark = ss.builder.getOrCreate()

[nltk_data] Downloading package vader_lexicon to /root/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


In [None]:
class lex_anal:
  def __init__(self,file):
      self.f = file
      self.path_read = '/content/drive/My Drive/'
      self.path_write = '/content/drive/My Drive/'
      self.df = spark.read.option("header","true").csv(self.path_read+self.f,inferSchema = True,multiLine = True)
  
  def process(self):
      print("PROCESSING SCORES")
      self.process_score()
      print("SCALING")
      self.scale()
      print("PROCESSING CLASS")
      self.process_class()
      print("SAVING")
      self.save()
  
  
  @staticmethod
  @udf(returnType=FloatType())
  def vader_pol(text):
      vader = SentimentIntensityAnalyzer()
      return dict(vader.polarity_scores(text))['compound']

  @staticmethod
  @udf(returnType=FloatType())
  def afinn_pol(text):
      af = Afinn()
      return af.score(text)

  @staticmethod
  @udf(returnType = FloatType())
  def blob_pol(text):
      return TextBlob(text).polarity

  def process_score(self):
      self.df = self.df.withColumn('vader_score',lex_anal.vader_pol('pre_text_vader'))
      self.df = self.df.withColumn('afinn_score',lex_anal.afinn_pol('pre_text_all_upd'))
      self.df = self.df.withColumn('blob_score',lex_anal.blob_pol('pre_text_all_upd'))
  
  def get_df(self):
      return self.df

  def process_class(self):
      self.df = self.df.withColumn('vader_class',lex_anal.classify('vader_score'))
      self.df = self.df.withColumn('afinn_class',lex_anal.classify('afinn_score'))
      self.df = self.df.withColumn('blob_class',lex_anal.classify('blob_score'))
  
  def save(self):
      self.df.write.mode("overwrite").option("header","true").csv(self.path_write+self.f+'_res')


  def scale(self):
      columns_to_scale = ["afinn_score"]
      assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
      scalers = [MaxAbsScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
      pipeline = Pipeline(stages=assemblers + scalers)
      scalerModel = pipeline.fit(self.df)
      scaledData = scalerModel.transform(self.df)
    # scaledData = scaledData.drop('afinn_score_vec')
      unlist = udf(lambda x: float(list(x)[0]), FloatType())
      scaledData = scaledData.withColumn('afinn_score_scaled_f',unlist('afinn_score_scaled'))
      scaledData = scaledData.drop('afinn_score_scaled','afinn_score_vec','afinn_score')
      scaledData = scaledData.withColumnRenamed('afinn_score_scaled_f','afinn_score')
      self.df = scaledData


  @staticmethod
  @udf(returnType = StringType())
  def classify(score):
      if score>0.5:
          return 'VPos'
      if score>0 and score<=0.5:
          return 'Pos'
      if score<0 and score>=-0.5:
          return 'Neg'
      if score<-0.5:
          return 'VNeg'
      return 'Neu'



In [None]:
import os
os.chdir('/content/drive/My Drive/')
os.listdir()

['Untitled0.ipynb',
 'Colab Notebooks',
 'part-00005-acd68a15-c739-4cf0-a97b-c470b4925346-c000.csv',
 'part-5',
 'total_combined_text_and_ids_preprocessed.csv',
 'total_combined_text_and_ids_preprocessed_upd']

In [None]:
os.chdir('/content/drive/My Drive/sentiment140_cleaned_csv_preprocessed')
files = [file for file in os.listdir() if file.endswith('.csv')]
files

['part-00000-acd68a15-c739-4cf0-a97b-c470b4925346-c000.csv',
 'part-00001-acd68a15-c739-4cf0-a97b-c470b4925346-c000.csv',
 'part-00003-acd68a15-c739-4cf0-a97b-c470b4925346-c000.csv',
 'part-00002-acd68a15-c739-4cf0-a97b-c470b4925346-c000.csv',
 'part-00004-acd68a15-c739-4cf0-a97b-c470b4925346-c000.csv',
 'part-00005-acd68a15-c739-4cf0-a97b-c470b4925346-c000.csv']

In [None]:
import pandas as pd
df = pd.read_csv('/content/drive/My Drive/total_combined_text_and_ids_preprocessed.csv')

In [None]:
l = lex_anal('total_combined_text_and_ids_preprocessed_upd')
l.process()

PROCESSING SCORES
SCALING
PROCESSING CLASS
SAVING


In [None]:
df.drop_duplicates('pre_text_all_upd',inplace = True)

In [None]:
df.to_csv('/content/drive/My Drive/total_combined_text_and_ids_preprocessed.csv',index = False,header = True)

In [None]:
from pyspark.sql.types import *
mySchema = StructType([ StructField("text", StringType(), True),StructField("pre_text_vader", StringType(), True),StructField("pre_text_all_upd", StringType(), True)])

In [None]:
df_sp = spark.createDataFrame(df,schema = mySchema)

In [None]:
df_sp.write.format('csv').option("header","true").save("/content/drive/My Drive/total_combined_text_and_ids_preprocessed_upd")

In [None]:
df_sp1 = spark.read.option("header","true").csv("/content/drive/My Drive/total_combined_text_and_ids_preprocessed_upd_res",inferSchema = True,multiLine = True)

In [None]:
df_sp1.show()

+--------------------+--------------------+--------------------+
|                text|      pre_text_vader|    pre_text_all_upd|
+--------------------+--------------------+--------------------+
|Unfortunately the...|Unfortunately the...|unfortunately the...|
|@Russo_Brothers #...|best way to stay ...|best way to stay ...|
|#NewYorkCity
WE A...|WE ARE OPEN MONDA...|we are open monda...|
|It’s a blessing w...|Its a blessing wh...|its a blessing wh...|
|@rejectedjokes @n...|Are you sure the ...|are you sure the ...|
|@didikins4life tr...|trumps wants to c...|trumps wants to c...|
|“To be a function...|To be a function ...|to be a function ...|
|@Iheartnoise @Bad...|It lives. For worse.|  it lives for worse|
|I think we can al...|I think we can al...|i think we can al...|
|@XAn0nXm0uSX @ber...|ppl got to b dens...|ppl got to b dens...|
|@BoozyBadger Well...|Well I use Steam ...|well i use steam ...|
|https://t.co/5WyK...|all different typ...|all different typ...|
|Thank you to all ...|han