In [3]:
# Session configuration with Spark Standalone
if False:
    from pyspark.sql import SparkSession
    from operator import add

    # New API
    spark_session = SparkSession\
            .builder\
            .master("spark://192.168.2.207:7077") \
            .appName("sentiment_analysis_group19")\
            .config("spark.executor.cores",2)\
            .config("spark.dynamicAllocation.enabled", False)\
            .config("spark.dynamicAllocation.shuffleTracking.enabled", False)\
            .config("spark.shuffle.service.enabled", False)\
            .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
            .config("spark.driver.port",9998)\
            .config("spark.blockManager.port",10005)\
            .getOrCreate()

    # Old API (RDD)
    spark_context = spark_session.sparkContext

    spark_context.setLogLevel("ERROR")

In [2]:
# Session configuration with Spark on Yarn with shared modules built with conda
import os
import pyspark
from pyspark.sql import SparkSession
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark_session = SparkSession.builder.master("yarn") \
                    .appName('spark-yarn-conda_env') \
                    .config(
                        "spark.archives",  # 'spark.yarn.dist.archives' in YARN.
                        "hdfs://192.168.2.250:9000/user/ubuntu/share/envs/pyspark_conda_env.tar.gz#environment") \
                    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# necessary imports
from operator import add
import time
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F

# sentiment analysis libraries
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer
nltk.download('vader_lexicon')

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/ubuntu/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


True

### Define sentiment analysis function

In [4]:
def analyze_sentiment(text):
    """Determines the sentiment of a given text. 
    Returns 0 if text sentiment is neither positive nor negative.
    Returns 1 if text sentiment is mainly positive
    Returns -1 if text sentiment is mainly negative. """
    try:
        sia = SentimentIntensityAnalyzer()
    except:
        nltk.download('vader_lexicon')
        sia = SentimentIntensityAnalyzer()
    try:
        scores = sia.polarity_scores(text)
    except: 
        pass
    pos = scores['pos']
    neg = scores['neg']
    if pos == neg:
        return 0
    elif pos > neg:
        return 1
    else:
        return -1

In [5]:
# check function
print(analyze_sentiment('I love you!'))
print(analyze_sentiment('I hate you..'))

1
-1


### Check how the sentiment of a post relates to it's score:
- Do posts with positive sentiment have a higher average score than negative posts?

In [6]:
def apply_sentiment_analysis(path):
    # read in data from path
    df = spark_session.read.json(path).cache()
    # select columns
    df = df.select('body', 'score').cache()
    # instantiate user defined function
    analyze_sentiment_fct = udf(analyze_sentiment, IntegerType())
    # apply 'analyze_sentiment' on 'body and save result in column 'sentiment'
    df = df.withColumn('sentiment', analyze_sentiment_fct(df.body)).cache()
    
    # only consider posts with score larger than 10
    df = df.filter(df.score > 15).cache()
    
    # posts with positive sentiment
    df_neg = df.filter(df.sentiment == -1)
    
    # posts with negative sentiment
    df_pos = df.filter(df.sentiment == 1)
    
    # num of posts with positive / negative sentiment
    num_neg = df_neg.count()
    num_pos = df_pos.count()
    print('num_neg ', num_neg)
    print('num_pos ', num_pos)
    
    # total score
    sum_score_neg = df_neg.groupBy().agg(F.sum('score')).collect()
    sum_score_pos = df_pos.groupBy().agg(F.sum('score')).collect()
    
    print('sum_score_neg ', sum_score_neg[0][0])
    print('sum_score_pos ', sum_score_pos[0][0])

    # average score per post for negative / positive sentiment
    avg_neg = abs(sum_score_neg[0][0])/num_neg
    avg_pos = abs(sum_score_pos[0][0])/num_pos
     
    return avg_neg, avg_pos

### Measure runtime

In [7]:
start_time = time.time()

path = "hdfs://192.168.2.207:9000/input/RC_2008-07"
avg_neg, avg_pos = apply_sentiment_analysis(path)

print("--- %s seconds ---" % (time.time() - start_time))


print('average score negative sentiment: ', avg_neg)
print('average score pos sentiment: ', avg_pos)

                                                                                

num_neg  8370
num_pos  9710




sum_score_neg  319235
sum_score_pos  364263
--- 449.6372423171997 seconds ---
average score negative sentiment:  38.140382317801674
average score pos sentiment:  37.514212152420185


                                                                                

In [None]:
file_1_path = "hdfs://192.168.2.207:9000/input/RC_2008-07"
file_2_path = "hdfs://192.168.2.207:9000/input/RC_2009-05"
file_3_path = "hdfs://192.168.2.207:9000/input/RC_2010-11"
#file_4_path = "hdfs://192.168.2.207:9000/input/RC_2011-08"

#files = [file_1_path, file_2_path, file_3_path, file_4_path]
files = [file_1_path, file_2_path, file_3_path]

for filepath in files:
    start_time = time.time()
    avg_neg, avg_pos = apply_sentiment_analysis(filepath)
    print('average score negative sentiment: ', avg_neg)
    print('average score pos sentiment: ', avg_pos)    
    print("--- %s seconds ---" % (time.time() - start_time))
    print("\n")

2022-03-22 22:38:26,934 WARN execution.CacheManager: Asked to cache already cached data.
                                                                                

num_neg  8370
num_pos  9710


                                                                                

sum_score_neg  319235
sum_score_pos  364263
average score negative sentiment:  38.140382317801674
average score pos sentiment:  37.514212152420185
--- 427.6989998817444 seconds ---




                                                                                

num_neg  15694
num_pos  19632


                                                                                

sum_score_neg  656164
sum_score_pos  795217
average score negative sentiment:  41.80986364215624
average score pos sentiment:  40.506163406682965
--- 784.625257730484 seconds ---




[Stage 40:>                                                        (0 + 2) / 25]

In [None]:
spark_context.stop()