In [104]:
!pip install findspark
!pip install pyspark



In [105]:
from google.colab import drive
drive.flush_and_unmount()
drive.mount("/content/gdrive")

import os, json
cwd = os.getcwd()  

# Variables to change
LANG = "en"
SOCIAL_MEDIA = "Reddit" 

INPUT_FOLDER = "reddit/reddit2/"
OUTPUT_FOLDER = cwd + "/gdrive/MyDrive/CS4225/ML team/lexicon/test"

INPUT_PATH = cwd +"/gdrive/MyDrive/CS4225/ML team/pre-processed data/" + INPUT_FOLDER + LANG + ".csv"
AFINN_PATH = cwd + "/gdrive/MyDrive/CS4225/ML team/languages/AFINN-" + LANG + ".json"
OUTPUT_PATH = os.path.join(OUTPUT_FOLDER, "reddit2")

Mounted at /content/gdrive


In [106]:
import findspark
findspark.init()
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import lower, col 
from pyspark.sql.types import StringType, ArrayType, IntegerType

spark = SparkSession.builder.appName("Lexicon Sentiment").getOrCreate()

data = spark.read.csv(INPUT_PATH, header=True)
data = data.select(col("_c0").cast(IntegerType()).alias("_c0"), "text", "lang", "created_at")
if LANG == "en":
  data = data.withColumn("text", lower("text"))
data.show()

AFINN_DICT = spark.read.json(AFINN_PATH, multiLine=True)
AFINN_DICT = dict(zip(AFINN_DICT.schema.names, *AFINN_DICT.collect()))

+---+--------------------+----+-------------------+
|_c0|                text|lang|         created_at|
+---+--------------------+----+-------------------+
|  0|       ['badumtsss']| ceb|2022-03-18 21:00:59|
|  1|             ['mha']| ceb|2022-03-20 10:09:03|
|  2|     ['afghanistan']| ceb|2022-03-13 00:20:50|
|  3|     ['ibrahimovic']| ceb|2022-03-13 04:04:29|
|  4|  ['ailing', 'love']| ceb|2022-03-13 07:12:34|
|  5|     ['afghanistan']| ceb|2022-03-13 04:11:58|
|  6|           ['bonza']| crs|2022-03-24 03:43:28|
|  7|             ['man']| crs|2022-03-25 13:14:17|
|  8|['c', 'c', 'c', '...| crs|2022-03-27 16:33:31|
|  9|['whataboutwhatab...| crs|2022-03-17 11:08:07|
| 10|['nice', 'autocor...| crs|2022-03-13 19:23:58|
| 11|['esteemed', 'cou...|  en|2022-03-13 05:33:39|
| 12|['scomo', 'hand',...|  en|2022-03-20 01:10:45|
| 13|['article', 'says...|  en|2022-03-20 00:32:10|
| 14|['news', 'univers...|  en|2022-03-19 23:16:11|
| 15|['okay', 'mr', 'p...|  en|2022-03-20 03:46:14|
| 16|['price

In [107]:
from pyspark.sql.functions import udf, explode, first
from ast import literal_eval

format_text = udf(lambda text: literal_eval(text), ArrayType(StringType()))
formatted_data = data.withColumn("tokens", format_text(col("text")))

def calculate_score(word):
  pos = 0
  neg = 0
  neu = 0
  if word in AFINN_DICT:
    score = AFINN_DICT[word]
    if (score > 0):
      return score, neg, neu
    elif (score < 0):
      return pos, -score, neu
    else:
      return pos, neg, 1
  else:
    return pos, neg, 1

word_score = udf(lambda arr: [calculate_score(x) for x in arr], ArrayType(ArrayType(IntegerType())))

formatted_data = formatted_data.withColumn("test", word_score(col("tokens")))
formatted_data = formatted_data.withColumn("word_score", explode(formatted_data.test)).drop("test", "tokens")
formatted_data.show(10)

+---+--------------------+----+-------------------+----------+
|_c0|                text|lang|         created_at|word_score|
+---+--------------------+----+-------------------+----------+
|  0|       ['badumtsss']| ceb|2022-03-18 21:00:59| [0, 0, 1]|
|  1|             ['mha']| ceb|2022-03-20 10:09:03| [0, 0, 1]|
|  2|     ['afghanistan']| ceb|2022-03-13 00:20:50| [0, 0, 1]|
|  3|     ['ibrahimovic']| ceb|2022-03-13 04:04:29| [0, 0, 1]|
|  4|  ['ailing', 'love']| ceb|2022-03-13 07:12:34| [0, 0, 1]|
|  4|  ['ailing', 'love']| ceb|2022-03-13 07:12:34| [3, 0, 0]|
|  5|     ['afghanistan']| ceb|2022-03-13 04:11:58| [0, 0, 1]|
|  6|           ['bonza']| crs|2022-03-24 03:43:28| [0, 0, 1]|
|  7|             ['man']| crs|2022-03-25 13:14:17| [0, 0, 1]|
|  8|['c', 'c', 'c', '...| crs|2022-03-27 16:33:31| [0, 0, 1]|
+---+--------------------+----+-------------------+----------+
only showing top 10 rows



In [108]:
from pyspark.ml.functions import array_to_vector
from pyspark.ml.stat import Summarizer

with_vec = formatted_data.withColumn("word_score", array_to_vector("word_score"))
with_vec = with_vec.groupBy("_c0").agg(Summarizer.sum(with_vec.word_score).alias("score"))
with_vec.show(5)

+---+-------------+
|_c0|        score|
+---+-------------+
|  1|[0.0,0.0,1.0]|
|  3|[0.0,0.0,1.0]|
|  5|[0.0,0.0,1.0]|
|  6|[0.0,0.0,1.0]|
|  9|[0.0,0.0,1.0]|
+---+-------------+
only showing top 5 rows



In [109]:
from pyspark.sql.functions import lit

def calculate_sentiment(arr):
  pos = arr[0]
  neg = arr[1]
  if (pos > neg):
    return "positive"
  elif (pos < neg):
    return "negative"
  else:
    return "neutral"

sentence_sentiment = udf(lambda arr: calculate_sentiment(arr), StringType())

sentiments = data.join(with_vec, data._c0 == with_vec._c0).select(col("text").alias("Text"), 
                                                                  sentence_sentiment(col("score")).alias("Sentiment"), 
                                                                  col("lang").alias("Lang"), 
                                                                  col("created_at").alias("Created_at"), 
                                                                  lit(SOCIAL_MEDIA).alias("Social_media"))
sentiments.show(5)

+------------------+---------+----+-------------------+------------+
|              Text|Sentiment|Lang|         Created_at|Social_media|
+------------------+---------+----+-------------------+------------+
|     ['badumtsss']|  neutral| ceb|2022-03-18 21:00:59|      Reddit|
|           ['mha']|  neutral| ceb|2022-03-20 10:09:03|      Reddit|
|   ['afghanistan']|  neutral| ceb|2022-03-13 00:20:50|      Reddit|
|   ['ibrahimovic']|  neutral| ceb|2022-03-13 04:04:29|      Reddit|
|['ailing', 'love']| positive| ceb|2022-03-13 07:12:34|      Reddit|
+------------------+---------+----+-------------------+------------+
only showing top 5 rows



In [110]:
TEMP_FOLDER = os.path.join(OUTPUT_FOLDER, "temp")

sentiments.write.format("csv").option("header", "true").save(TEMP_FOLDER)
combined_files = spark.read.option('header', 'true').csv(TEMP_FOLDER)
combined_files.coalesce(1).write.format('csv').option('header', 'true').save(OUTPUT_PATH)

In [111]:
for filename in os.listdir(OUTPUT_PATH):
  base_file, ext = os.path.splitext(filename)
  if ext == ".csv":   
      os.rename(os.path.join(OUTPUT_PATH, filename), os.path.join(OUTPUT_PATH, LANG + ".csv"))
  else:
    os.remove(os.path.join(OUTPUT_PATH, filename))

import shutil
shutil.rmtree(TEMP_FOLDER)