In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import numpy as np
import pandas as pd
import os
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"


In [2]:
# Build connection
spark = SparkSession.builder. \
    master("local[*]"). \
    appName("PySpark"). \
    config("spark.driver.memory","16g"). \
    config("spark.driver.maxResultSize", "4g"). \
    getOrCreate()

23/03/18 21:31:52 WARN Utils: Your hostname, tiancaimeishaonvs-MacBook.local resolves to a loopback address: 127.0.0.1; using 10.249.155.12 instead (on interface en0)
23/03/18 21:31:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/18 21:31:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Load Dataset
data_path = 'data/convincing_data.csv'
data = spark.read.option("header",True).csv(data_path).rdd

In [4]:
data.take(1)

                                                                                

[Row(type='comment', id='imlcpab', subreddit.id='2qh1i', subreddit.name='askreddit', subreddit.nsfw='False', created_utc='1661990065', permalink='https://old.reddit.com/r/AskReddit/comments/x2fj3g/whats_a_controversial_topic_no_one_wants_to/imlcpab/', sentiment='0.469', score='2', body_cleaned="['need', 'chang', 'law', 'worth', 'sell', 'agricultur', 'product', 'us', 'rather', 'export', 'also', 'need', 'chang', 'law', 'monetari', 'penalti', 'grow', 'crop', 'particular', 'viabl', 'area', 'natur', 'climat', 'stand', 'right', 'neighbor', 'make', 'doubl', 'price', 'per', 'head', 'cattl', 'export', 'countri', 'would', 'sell', 'right', 'peopl', 'complain', 'climat', 'chang', 'probabl', 'complain']", climate_count='2', change_count='3', body_length='403', climate_proportion='0.004962779156327543', change_proportion='0.007444168734491315')]

In [5]:
def score_label(score):
    if score>0: return 1
    elif score<0: return -1
    else: return 0
        
id_label_content = data.map(lambda x: (x['id'], score_label(float(x['sentiment'])), x['body_cleaned']))

In [6]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
def evaluation(y, y_pred):
    accuracy = accuracy_score(y, y_pred)
    precision = precision_score(y, y_pred, average='weighted')
    recall = recall_score(y, y_pred, average='weighted')
    f1 = f1_score(y, y_pred, average='weighted')
    return accuracy, precision, recall, f1

### AFINN lexicon

In [7]:
# !pip install afinn
from afinn import Afinn
afinn = Afinn()
id_label_score_afinn = id_label_content.map(lambda x: (x[0], x[1], afinn.score(x[2])))   
id_true_pred_afinn = id_label_score_afinn.map(lambda x: (x[0], x[1], score_label(x[2])))


In [8]:
%%time
res_afinn = pd.DataFrame(id_true_pred_afinn.collect())
print('Accuracy, Precision, Recall, F1:', evaluation(res_afinn.iloc[:,1],res_afinn.iloc[:,2]))

                                                                                

Accuracy, Precision, Recall, F1: (0.7044006446364192, 0.7524181936953306, 0.7044006446364192, 0.7237871614428109)
CPU times: user 850 ms, sys: 90.7 ms, total: 940 ms
Wall time: 4min 35s


### VADER lexicon

In [9]:
# !pip install vaderSentiment
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
analyzer = SentimentIntensityAnalyzer()
id_label_score_vader = id_label_content.map(lambda x: (x[0], x[1], analyzer.polarity_scores(x[2])['compound']))  
id_true_pred_vader = id_label_score_vader.map(lambda x: (x[0], x[1], score_label(x[2])))


In [10]:
%%time
res_vader = pd.DataFrame(id_true_pred_vader.collect())
print('Accuracy, Precision, Recall, F1:', evaluation(res_vader.iloc[:,1],res_vader.iloc[:,2]))

                                                                                

Accuracy, Precision, Recall, F1: (0.7603382226251592, 0.7711465291335221, 0.7603382226251592, 0.7628042003837002)
CPU times: user 813 ms, sys: 92.1 ms, total: 905 ms
Wall time: 2min 52s


### Hu and Liu Lexicon

In [11]:
positive_words = pd.read_csv('https://raw.githubusercontent.com/jeffreybreen/twitter-sentiment-analysis-tutorial-201107/master/data/opinion-lexicon-English/positive-words.txt',
                        names=['word'], comment=';', encoding='latin-1')['word'].tolist()
negative_words = pd.read_csv('https://raw.githubusercontent.com/jeffreybreen/twitter-sentiment-analysis-tutorial-201107/master/data/opinion-lexicon-English/negative-words.txt',
                        names=['word'], comment=';', encoding='latin-1')['word'].tolist()

def get_sentiment_huliu(words):
    # Define variables to keep track of the positive and negative scores
    pos_score = 0
    neg_score = 0
    
    # Loop through each word and check if it's in the positive or negative word list
    for word in words:
        if word in positive_words:
            pos_score += 1
        elif word in negative_words:
            neg_score += 1
    
    # Calculate the sentiment score for the text
    if pos_score > neg_score:
        return 1
    elif pos_score < neg_score:
        return -1
    else:
        return 0

id_true_pred_huliu = id_label_content.map(lambda x: (x[0], x[1], get_sentiment_huliu(x[2])))


In [18]:
%%time
res_huliu = pd.DataFrame(id_true_pred_huliu.collect())
print('Accuracy, Precision, Recall, F1:', evaluation(res_huliu.iloc[:,1],res_vader.iloc[:,2]))

[Stage 7:>                                                          (0 + 8) / 8]

TypeError: _log() got an unexpected keyword argument 'exc_info'

### SentiWordNet Lexicon

In [54]:
from nltk.corpus import wordnet as wn
from nltk.corpus import sentiwordnet as swn
from nltk.tokenize import word_tokenize

def get_sentiment_nltk(words):
    pos_score = 0
    neg_score = 0
    obj_score = 0
    for word in words:
        synsets = wn.synsets(word)
        if synsets:
            swn_synset = swn.senti_synset(synsets[0].name())
            pos_score += swn_synset.pos_score()
            neg_score += swn_synset.neg_score()

    # normalize the scores
    if pos_score > neg_score:
        return 1
    elif pos_score < neg_score:
        return -1
    else:
        return 0

id_true_pred_nltk = id_label_content.map(lambda x: (x[0], x[1], get_sentiment_nltk(x[2])))


In [None]:
data = pd.read_csv(data_path)
data = data[['id','sentiment','body_cleaned']]

In [None]:
spark.close()