# Parallelization of sentiment analysis

Example run with dataset of 10 videos

### While initializing cluster, run the following bash-script to install textblob across all nodes:

In [7]:
from pyspark.sql import SparkSession

BUCKET_NAME = 'st446-w9-stine'

spark = SparkSession.builder \
    .appName("SimpleGCSLoad") \
    .getOrCreate()

# Load the CSV file (change to df if integrated)
df = spark.read.csv(f'gs://{BUCKET_NAME}/test_10_videos.csv', header=True, inferSchema=True)

df.show()

+--------------------+-------------------+--------+--------------------+--------------------+
|               Title|               Date|Duration|                 URL|                text|
+--------------------+-------------------+--------+--------------------+--------------------+
|Why has Israel la...|2024-04-19 00:00:00| 0:01:01|https://www.youtu...|according to offi...|
|US restricts trav...|2024-04-12 00:00:00| 0:03:41|https://www.youtu...|we begin the prog...|
|Israel's military...|2024-04-06 00:00:00| 0:00:43|https://www.youtu...|alarm as a day nu...|
|McDonald's to buy...|2024-04-05 00:00:00| 0:00:59|https://www.youtu...|McDonald's says i...|
|Gaza evacuation w...|2024-04-05 00:00:00| 0:03:14|https://www.youtu...|evacuation warnin...|
|What happened to ...|2024-04-09 00:00:00| 0:01:00|https://www.youtu...|what happened to ...|
|On board Royal Na...|2024-03-25 00:00:00| 0:03:24|https://www.youtu...|captain of a Roya...|
|Israel says it’s ...|2024-04-14 00:00:00| 0:00:59|https://w

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, DoubleType, StructType, StructField
import string
from textblob import TextBlob

spark = SparkSession.builder.appName("TextProcessing").getOrCreate()

# text preprocessing function
def preprocess_text_textblob(text):
    text = text.lower()
    punct_to_remove = string.punctuation.replace('!', '')
    translator = str.maketrans('', '', punct_to_remove)
    text = text.translate(translator)
    return text

# Sentiment analysis function
def get_sentiment(text):
    processed_text = preprocess_text_textblob(text)
    blob = TextBlob(processed_text)
    return blob.sentiment.polarity, blob.sentiment.subjectivity

# Register UDFs
preprocess_udf = udf(preprocess_text_textblob, StringType())
sentiment_udf = udf(get_sentiment, StructType([
    StructField("polarity", DoubleType(), False),
    StructField("subjectivity", DoubleType(), False)
]))

# Apply preprocessing UDF
df = df.withColumn("processed_text", preprocess_udf(col("text")))

# Apply sentiment analysis UDF
df = df.withColumn("sentiment", sentiment_udf(col("processed_text")))

# Expand into separate columns
df = df.select("*", "sentiment.*")

# Show the updated DataFrame
df.show()

+--------------------+-------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|               Title|               Date|Duration|                 URL|                text|      processed_text|           sentiment|            polarity|       subjectivity|
+--------------------+-------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|Why has Israel la...|2024-04-19 00:00:00| 0:01:01|https://www.youtu...|according to offi...|according to offi...|[0.25769230769230...|  0.2576923076923077|0.39038461538461544|
|US restricts trav...|2024-04-12 00:00:00| 0:03:41|https://www.youtu...|we begin the prog...|we begin the prog...|[0.08032945736434...| 0.08032945736434108| 0.3919573643410853|
|Israel's military...|2024-04-06 00:00:00| 0:00:43|https://www.youtu...|alarm as a day nu...|alarm as a day nu...| 