In [1]:
import pandas as pd
import numpy as np
import scipy as sp
import seaborn as sns
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline

import findspark
findspark.init()

import pyspark
from pyspark.sql import *
import pyspark.sql.functions as func
from pyspark.sql.types import *

# Language processing
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk.stem.snowball import SnowballStemmer
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import TweetTokenizer

# Language processing with TextBlob
from textblob import TextBlob
from textblob.sentiments import NaiveBayesAnalyzer

from collections import Counter

In [2]:
# Create spark session
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# Load and preprocess sample data

In [3]:
messages = spark.read.load('../sample_parquet/first_1000/')

In [174]:
# Prepare stopwords, stemmer and lemmatizer for messages preprocessing.
en_stopwords = stopwords.words('english')
en_stemmer = SnowballStemmer('english')
en_lemmatizer = WordNetLemmatizer()

In [175]:
cleaned_messages = messages.filter("body != '[removed]' and body != '[deleted]'")

In [176]:
def process_body(body, n_grams=1, left_pad_symbol=None, right_pad_symbol=None, lemmatizer=None, stemmer=None, \
                   stop_words=None, lemmatize_stop_words=False, stem_stop_words=False, remove_stop_words=False):
    """
    Process the message bodies of the given rdd
        
    Parameters:
        body: 
            string message body
        n_gram: 
            size of the n_grams in the rdd output
        lemmatizer: 
            lemmatizer to use on the message words. If None, words are not lemmatize
        stemmer: 
            stemmer to use on the message words. If None, words are not stemmed.
        stop_words: 
            list of words to consider as stop words
        lemmatize_stop_words: 
            boolean to lemmatize stop words
        stem_stop_words: 
            boolean to stem stop words
        remove_stop_words: 
            boolean to remove stop words from the tokens
        
    Returns:
        rdd of the form (parent_id, id, processed_msg_body)
    """
    
    if n_grams < 1:
        raise ValueError("n_grams should be bigger than 1")
    
    tknzr = TweetTokenizer()
    tokens = tknzr.tokenize(body)
    
    if stop_words is None:
        stop_words = []
    if lemmatizer is not None and stemmer is not None:
        if remove_stop_words:
            tokens = [lemmatizer.lemmatize(stemmer.stem(token)) for token in tokens if token not in stop_words]
        elif not lemmatize_stop_words and not stem_stop_words:
            tokens = [lemmatizer.lemmatize(stemmer.stem(token)) if token not in stop_words else token for token in tokens]
        elif not lemmatize_stop_words:
            tokens = [lemmatizer.lemmatize(stemmer.stem(token)) if token not in stop_words else stemmer.stem(token) for token in tokens]
        elif not stem_stop_words:
            tokens = [lemmatizer.lemmatize(stemmer.stem(token)) if token not in stop_words else lemmatizer.lemmatize(token) for token in tokens]
    elif lemmatizer is not None:
        if remove_stop_words:
            tokens = [lemmatizer.lemmatize(token) for token in tokens if token not in stop_words]
        elif not lemmatize_stop_words:
            tokens = [lemmatizer.lemmatize(token) if token not in stop_words else token for token in tokens]
    elif stemmer is not None:
        if remove_stop_words:
            tokens = [stemmer.stem(token) for token in tokens if token not in stop_words]
        elif not stem_stop_words is not None:
            tokens = [stemmer.stem(token) if token not in stop_words else token for token in tokens]
    elif stemmer is not None and lemmatizer is not None:
            tokens = [token for token in tokens if token not in stop_words]

    if left_pad_symbol is not None and right_pad_symbol is not None:
        tokens = list(nltk.ngrams(tokens, n_grams, True, True, left_pad_symbol, right_pad_symbol))
    elif left_pad_symbol is not None:
        tokens = list(nltk.ngrams(tokens, n_grams, pad_left=True, left_pad_symbol=left_pad_symbol))
    elif right_pad_symbol is not None:
        tokens = list(nltk.ngrams(tokens, n_grams, pad_right=True, right_pad_symbol=right_pad_symbol))
    else:
        tokens = list(nltk.ngrams(tokens, n_grams))

    return [list(token) for token in tokens]

In [177]:
process_body_udf = func.udf(process_body, ArrayType(ArrayType(StringType(), False), False))
spark.udf.register('process_body', process_body, ArrayType(ArrayType(StringType(), False), False))

<function __main__.process_body(body, n_grams=1, left_pad_symbol=None, right_pad_symbol=None, lemmatizer=None, stemmer=None, stop_words=None, lemmatize_stop_words=False, stem_stop_words=False, remove_stop_words=False)>

# Sentence polarity using NLTK

In [178]:
def compute_nltk_polarity(msg_body):
    sid = SentimentIntensityAnalyzer()
    msg_body = sid.polarity_scores(msg_body)
    return msg_body

compute_nltk_polarity_udf = func.udf(compute_nltk_polarity, MapType(StringType(), FloatType(), False))
spark.udf.register('compute_nltk_polarity', compute_nltk_polarity_udf)

<function __main__.compute_nltk_polarity(msg_body)>

In [179]:
sent_bodies = cleaned_messages.selectExpr('id', 'created_utc', "compute_nltk_polarity(body) as scores")
sent_nltk_scores = sent_bodies.select('id', 'created_utc', 'scores.neg', 'scores.neu', 'scores.pos')
sent_nltk_scores = sent_nltk_scores.toDF('id', 'created_utc', 'nltk_negativity', 'nltk_neutrality', 'nltk_positivity')

In [180]:
sent_nltk_scores.show()

+-------+-----------+---------------+---------------+---------------+
|     id|created_utc|nltk_negativity|nltk_neutrality|nltk_positivity|
+-------+-----------+---------------+---------------+---------------+
|c595rma| 1341368093|          0.065|          0.935|            0.0|
|c595rqe| 1341368108|            0.0|            1.0|            0.0|
|c595rwc| 1341368128|            0.0|            1.0|            0.0|
|c595sdr| 1341368193|            0.0|          0.878|          0.122|
|c595sop| 1341368236|            0.0|            1.0|            0.0|
|c595sui| 1341368256|            0.0|           0.84|           0.16|
|c595t5u| 1341368301|           0.36|          0.443|          0.197|
|c595ti7| 1341368347|            0.0|            1.0|            0.0|
|c595u4r| 1341368440|            0.0|            1.0|            0.0|
|c595ule| 1341368505|          0.377|          0.623|            0.0|
|c595up4| 1341368519|            0.0|          0.408|          0.592|
|c595v6i| 1341368588

# Sentence polarity using TextBlob

### Using simple sentence polarity analysis

In [181]:
def compute_blob_polarity(msg_body):
    sentiment = TextBlob(msg_body).sentiment
    return {'polarity': sentiment.polarity, 'subjectivity': sentiment.subjectivity}

compute_blob_polarity_udf = func.udf(compute_blob_polarity, MapType(StringType(), FloatType(), False))
spark.udf.register('compute_blob_polarity', compute_blob_polarity_udf)

<function __main__.compute_blob_polarity(msg_body)>

In [182]:
sent_blob_bodies = cleaned_messages.selectExpr('id', 'created_utc', "compute_blob_polarity(body) as scores")
sent_blob_scores = sent_blob_bodies.select('id', 'created_utc', 'scores.polarity', 'scores.subjectivity')
sent_blob_scores = sent_blob_scores.toDF('id', 'created_utc', 'text_blob_polarity', 'text_blob_subjectivity')

In [183]:
sent_blob_scores.show()

+-------+-----------+------------------+----------------------+
|     id|created_utc|text_blob_polarity|text_blob_subjectivity|
+-------+-----------+------------------+----------------------+
|c595rma| 1341368093|      -0.041666668|                 0.425|
|c595rqe| 1341368108|               0.0|                   0.0|
|c595rwc| 1341368128|        0.20454545|             0.6818182|
|c595sdr| 1341368193|               0.5|                   0.5|
|c595sop| 1341368236|               0.0|                   0.0|
|c595sui| 1341368256|        0.06666667|                   0.3|
|c595t5u| 1341368301|               1.0|                   1.0|
|c595ti7| 1341368347|               0.0|                   0.0|
|c595u4r| 1341368440|               0.0|                   0.5|
|c595ule| 1341368505|       -0.15982144|            0.60892856|
|c595up4| 1341368519|        0.43333334|             0.8333333|
|c595v6i| 1341368588|       -0.41666666|             0.6666667|
|c595w8b| 1341368743|               0.0|

### Using twitter trained positive/negative naive bayes classifier

In [184]:
def compute_blob_class_polarity(msg_body):
    pol_class = TextBlob(msg_body, analyzer=NaiveBayesAnalyzer()).sentiment
    return {'classification': -1 if pol_class.classification == 'neg' else 1, 'p_pos': pol_class.p_pos, 'p_neg': pol_class.p_neg}

compute_blob_class_polarity_udf = func.udf(compute_blob_class_polarity, MapType(StringType(), FloatType(), False))
spark.udf.register('compute_blob_class_polarity', compute_blob_class_polarity_udf)

<function __main__.compute_blob_class_polarity(msg_body)>

In [185]:
sent_blob_class_bodies = cleaned_messages.selectExpr('id', 'created_utc', "compute_blob_class_polarity(body) as scores")
sent_blob_class_scores = sent_blob_class_bodies.select('id', 'created_utc', 'scores.classification', 'scores.p_pos', 'scores.p_neg')

In [186]:
# This does not finish, classifier takes too long
# sent_blob_class_scores.show()

# Other metrics (Vulgarity, hate speech)

In [187]:
tokens = cleaned_messages.selectExpr('id', 'created_utc', 'process_body(body) as tokens')
tokens.show()

+-------+-----------+--------------------+
|     id|created_utc|              tokens|
+-------+-----------+--------------------+
|c595rma| 1341368093|[[The], [fear], [...|
|c595rqe| 1341368108|     [[Upvote], [!]]|
|c595rwc| 1341368128|[[It's], [real], ...|
|c595sdr| 1341368193|[[What's], [he], ...|
|c595sop| 1341368236|[[does], [it], [e...|
|c595sui| 1341368256|[[Your], [user], ...|
|c595t5u| 1341368301|[[nope], [:D], [b...|
|c595ti7| 1341368347|[[Along], [with],...|
|c595u4r| 1341368440|[[Those], [both],...|
|c595ule| 1341368505|[[If], [you], [ar...|
|c595up4| 1341368519|[[Because], [easy...|
|c595v6i| 1341368588|[[Does], [no], [o...|
|c595w8b| 1341368743|[[Has], [the], [t...|
|c595whq| 1341368782|[[Just], [because...|
|c595xbt| 1341368907|[[Also], [,], [do...|
|c595yos| 1341369104|[[Meh], [,], [it'...|
|c595ypd| 1341369107|[[You], [enjoy], ...|
|c595z4z| 1341369174|[[[], [Finnish], ...|
|c595zjd| 1341369231|[[>], [*], [It's]...|
|c595zrv| 1341369266|[[Good], [insight...|
+-------+--

In [24]:
"""def count_matches(msg_grams, ref_grams, ref_grams_intensity=None):
    msg_grams_joined = [' '.join(msg_gram) for msg_gram in msg_grams]
    msg_grams_counter = Counter(msg_grams_joined)
    count = 0.0
    intensity = 0.0
    for i, ref_gram in enumerate(ref_grams):
        count = count + msg_grams_counter[ref_gram]
        if ref_grams_intensity is not None:
            intensity = intensity + msg_grams_counter[ref_gram] * ref_grams_intensity[i]
    
    if ref_grams_intensity is None:
        return count
    else: 
        return {'count':count, 'intensity':intensity}"""
    
def count_matches(msg_grams, ref_grams_counter, ref_grams_intensity=None):
    msg_grams_joined = [' '.join(msg_gram) for msg_gram in msg_grams]
    print(msg_grams_joined)
    msg_grams_counter = Counter(msg_grams_joined)
    res_counter = msg_grams_counter & ref_grams_counter
    
    if ref_grams_intensity is not None:
        res_intensity = dict()
        for w, occ in res_counter.items():
            res_intensity[w] = occ * ref_grams_intensity[w]
    
    count = sum(res_counter.values())
    if ref_grams_intensity is None:
        return count
    else:
        intensity = sum(res_intensity.values())
        return {'count':count, 'intensity':intensity}
    
def df_count_matches(gram_counter):
    return func.udf(lambda c: count_matches(c, gram_counter), FloatType())

def df_count_matches_intensity(gram_counter, intensity_dict):
    return func.udf(lambda c: count_matches(c, gram_counter, intensity_dict), MapType(StringType(), FloatType()))

#df_count_matches_udf = func.udf(df_count_matches, FloatType())
#df_count_matches_intensity_udf = func.udf(df_count_matches_intensity, MapType(StringType(), FloatType()))

#spark.udf.register('df_count_matches', df_count_matches_udf)
#spark.udf.register('df_count_matches_intensity', df_count_matches_intensity_udf)

def df_count_matches_sql(gram_counter, sql_fun_name):
    udf = func.udf(lambda c: count_matches(c, gram_counter), FloatType())
    spark.udf.register(sql_fun_name, udf)

def df_count_matches_intensity_sql(gram_counter, intensity_dict, sql_fun_name):
    udf = func.udf(lambda c: count_matches(c, gram_counter, intensity_dict), MapType(StringType(), FloatType()))
    spark.udf.register(sql_fun_name, udf)

## Vulgarity

In [5]:
bad_words = spark.read.csv('../bad_words_lexicon/en.csv', header=True)
bw_gram_rank = bad_words.withColumn('gram_rank', func.udf(lambda gram: len(gram.split()), IntegerType())(func.col('en_bad_words')))
bw_gram_rank.show()

+------------------+---------+
|      en_bad_words|gram_rank|
+------------------+---------+
|              2g1c|        1|
|     2 girls 1 cup|        4|
|    acrotomophilia|        1|
|alabama hot pocket|        3|
|  alaskan pipeline|        2|
|              anal|        1|
|         anilingus|        1|
|              anus|        1|
|           apeshit|        1|
|          arsehole|        1|
|               ass|        1|
|           asshole|        1|
|          assmunch|        1|
|       auto erotic|        2|
|        autoerotic|        1|
|          babeland|        1|
|       baby batter|        2|
|        baby juice|        2|
|          ball gag|        2|
|        ball gravy|        2|
+------------------+---------+
only showing top 20 rows



In [7]:
bw_1_grams = {i.en_bad_words:np.inf for i in bw_gram_rank.filter('gram_rank == 1').select('en_bad_words').collect()}
bw_1_grams

{'2g1c': inf,
 'acrotomophilia': inf,
 'anal': inf,
 'anilingus': inf,
 'anus': inf,
 'apeshit': inf,
 'arsehole': inf,
 'ass': inf,
 'asshole': inf,
 'assmunch': inf,
 'autoerotic': inf,
 'babeland': inf,
 'bangbros': inf,
 'bareback': inf,
 'barenaked': inf,
 'bastard': inf,
 'bastardo': inf,
 'bastinado': inf,
 'bbw': inf,
 'bdsm': inf,
 'beaner': inf,
 'beaners': inf,
 'bestiality': inf,
 'bimbos': inf,
 'birdlock': inf,
 'bitch': inf,
 'bitches': inf,
 'blowjob': inf,
 'blumpkin': inf,
 'bollocks': inf,
 'bondage': inf,
 'boner': inf,
 'boob': inf,
 'boobs': inf,
 'bukkake': inf,
 'bulldyke': inf,
 'bullshit': inf,
 'bunghole': inf,
 'busty': inf,
 'butt': inf,
 'buttcheeks': inf,
 'butthole': inf,
 'camgirl': inf,
 'camslut': inf,
 'camwhore': inf,
 'carpetmuncher': inf,
 'circlejerk': inf,
 'clit': inf,
 'clitoris': inf,
 'clusterfuck': inf,
 'cock': inf,
 'cocks': inf,
 'coprolagnia': inf,
 'coprophilia': inf,
 'cornhole': inf,
 'coon': inf,
 'coons': inf,
 'creampie': inf,
 'c

In [191]:
bw_counter = tokens.withColumn("tokens", df_count_matches(bw_1_grams)(func.col("tokens"))).withColumnRenamed('tokens', 'nb_bw_matches')
bw_counter.show()

+-------+-----------+-------------+
|     id|created_utc|nb_bw_matches|
+-------+-----------+-------------+
|c595rma| 1341368093|          0.0|
|c595rqe| 1341368108|          0.0|
|c595rwc| 1341368128|          0.0|
|c595sdr| 1341368193|          0.0|
|c595sop| 1341368236|          0.0|
|c595sui| 1341368256|          0.0|
|c595t5u| 1341368301|          0.0|
|c595ti7| 1341368347|          0.0|
|c595u4r| 1341368440|          0.0|
|c595ule| 1341368505|          0.0|
|c595up4| 1341368519|          0.0|
|c595v6i| 1341368588|          0.0|
|c595w8b| 1341368743|          0.0|
|c595whq| 1341368782|          0.0|
|c595xbt| 1341368907|          0.0|
|c595yos| 1341369104|          0.0|
|c595ypd| 1341369107|          0.0|
|c595z4z| 1341369174|          0.0|
|c595zjd| 1341369231|          0.0|
|c595zrv| 1341369266|          0.0|
+-------+-----------+-------------+
only showing top 20 rows



## Hate speech

### Raw hate words (basic)

In [31]:
hate_words = spark.read.csv('../hatespeech_lexicon/hatebase_dict.csv', header=True)
hate_words = hate_words.withColumnRenamed("uncivilised',", 'hate_words') \
                        .withColumn('hate_words', func.udf(lambda d: d[1:-2])(func.col('hate_words')))
hw_gram_rank = hate_words.withColumn('gram_rank', func.udf(lambda gram: len(gram.split()), IntegerType())(func.col('hate_words')))
hw_gram_rank.show()

+------------+---------+
|  hate_words|gram_rank|
+------------+---------+
|        gypo|        1|
|       gypos|        1|
|        cunt|        1|
|       cunts|        1|
|  peckerwood|        1|
| peckerwoods|        1|
|     raghead|        1|
|    ragheads|        1|
|     cripple|        1|
|    cripples|        1|
|      niggur|        1|
|     niggurs|        1|
| yellow bone|        2|
|yellow bones|        2|
|      muzzie|        1|
|     muzzies|        1|
|      niggar|        1|
|     niggars|        1|
|      nigger|        1|
|     niggers|        1|
+------------+---------+
only showing top 20 rows



In [32]:
hw_1_grams = {i.hate_words: np.inf for i in hw_gram_rank.filter('gram_rank == 1').select('hate_words').collect()}
hw_1_grams

{'gypo': inf,
 'gypos': inf,
 'cunt': inf,
 'cunts': inf,
 'peckerwood': inf,
 'peckerwoods': inf,
 'raghead': inf,
 'ragheads': inf,
 'cripple': inf,
 'cripples': inf,
 'niggur': inf,
 'niggurs': inf,
 'muzzie': inf,
 'muzzies': inf,
 'niggar': inf,
 'niggars': inf,
 'nigger': inf,
 'niggers': inf,
 'greaseball': inf,
 'greaseballs': inf,
 'faggot': inf,
 'faggots': inf,
 'darkie': inf,
 'darkies': inf,
 'hoser': inf,
 'hosers': inf,
 'Jihadi': inf,
 'Jihadis': inf,
 'retard': inf,
 'retards': inf,
 'hillbilly': inf,
 'hillbillies': inf,
 'fag': inf,
 'fags': inf,
 'pikey': inf,
 'pikies': inf,
 'nicca': inf,
 'niccas': inf,
 'tranny': inf,
 'trannies': inf,
 'wigger': inf,
 'wiggers': inf,
 'wetback': inf,
 'wetbacks': inf,
 'nigglet': inf,
 'nigglets': inf,
 'wigga': inf,
 'wiggas': inf,
 'dhimmi': inf,
 'dhimmis': inf,
 'honkey': inf,
 'honkies': inf,
 'eurotrash': inf,
 'eurotrashes': inf,
 'yardie': inf,
 'yardies': inf,
 'niggah': inf,
 'niggahes': inf,
 'yokel': inf,
 'yokels':

In [194]:
hw_counter = tokens.withColumn("tokens", df_count_matches(hw_1_grams)(func.col("tokens"))).withColumnRenamed('tokens', 'nb_hw_matches')
hw_counter.show()

+-------+-----------+-------------+
|     id|created_utc|nb_hw_matches|
+-------+-----------+-------------+
|c595rma| 1341368093|          0.0|
|c595rqe| 1341368108|          0.0|
|c595rwc| 1341368128|          0.0|
|c595sdr| 1341368193|          0.0|
|c595sop| 1341368236|          0.0|
|c595sui| 1341368256|          0.0|
|c595t5u| 1341368301|          0.0|
|c595ti7| 1341368347|          0.0|
|c595u4r| 1341368440|          0.0|
|c595ule| 1341368505|          1.0|
|c595up4| 1341368519|          0.0|
|c595v6i| 1341368588|          0.0|
|c595w8b| 1341368743|          0.0|
|c595whq| 1341368782|          0.0|
|c595xbt| 1341368907|          0.0|
|c595yos| 1341369104|          0.0|
|c595ypd| 1341369107|          0.0|
|c595z4z| 1341369174|          0.0|
|c595zjd| 1341369231|          0.0|
|c595zrv| 1341369266|          0.0|
+-------+-----------+-------------+
only showing top 20 rows



### Refined hate words

In [38]:
hw_ref_schema = StructType([StructField('hate_words_ref', StringType(), False), StructField('intensity', FloatType(), False)])
hate_words_ref = spark.read.csv('../hatespeech_lexicon/refined_ngram_dict.csv', header=True, schema=hw_ref_schema)
hw_ref_gram_rank = hate_words_ref.withColumn('gram_rank', func.udf(lambda gram: len(gram.split()), IntegerType())(func.col('hate_words_ref')))
hw_ref_gram_rank.show()

+--------------+---------+---------+
|hate_words_ref|intensity|gram_rank|
+--------------+---------+---------+
|   allah akbar|     0.87|        2|
|        blacks|    0.583|        1|
|         chink|    0.467|        1|
|        chinks|    0.542|        1|
|         dykes|    0.602|        1|
|        faggot|    0.489|        1|
|       faggots|    0.675|        1|
|          fags|    0.543|        1|
|          homo|    0.667|        1|
|        inbred|    0.583|        1|
|        nigger|    0.584|        1|
|       niggers|    0.672|        1|
|        queers|      0.5|        1|
|         raped|    0.717|        1|
|       savages|    0.778|        1|
|         slave|    0.667|        1|
|          spic|     0.75|        1|
|       wetback|    0.667|        1|
|      wetbacks|    0.688|        1|
|        whites|    0.556|        1|
+--------------+---------+---------+
only showing top 20 rows



In [35]:
hw_ref_1_grams = {i.hate_words_ref: np.inf for i in hw_ref_gram_rank.filter('gram_rank == 1').select('hate_words_ref').collect()}
hw_ref_1_intensity = {i.hate_words_ref: i.intensity for i in hw_ref_gram_rank.filter('gram_rank == 1').select('hate_words_ref', 'intensity').collect()}
hw_ref_1_grams, hw_ref_1_intensity

({'blacks': inf,
  'chink': inf,
  'chinks': inf,
  'dykes': inf,
  'faggot': inf,
  'faggots': inf,
  'fags': inf,
  'homo': inf,
  'inbred': inf,
  'nigger': inf,
  'niggers': inf,
  'queers': inf,
  'raped': inf,
  'savages': inf,
  'slave': inf,
  'spic': inf,
  'wetback': inf,
  'wetbacks': inf,
  'whites': inf},
 {'blacks': 0.5830000042915344,
  'chink': 0.46700000762939453,
  'chinks': 0.5419999957084656,
  'dykes': 0.6019999980926514,
  'faggot': 0.48899999260902405,
  'faggots': 0.675000011920929,
  'fags': 0.5429999828338623,
  'homo': 0.6669999957084656,
  'inbred': 0.5830000042915344,
  'nigger': 0.5839999914169312,
  'niggers': 0.671999990940094,
  'queers': 0.5,
  'raped': 0.7170000076293945,
  'savages': 0.777999997138977,
  'slave': 0.6669999957084656,
  'spic': 0.75,
  'wetback': 0.6669999957084656,
  'wetbacks': 0.6880000233650208,
  'whites': 0.5559999942779541})

In [197]:
hw_ref_counter = tokens.withColumn("tokens", df_count_matches_intensity(hw_ref_1_grams, hw_ref_1_intensity)(func.col("tokens"))).withColumnRenamed('tokens', 'nb_hw_ref_matches')
hw_ref_scores = hw_ref_counter.select('id', 'created_utc', 'nb_hw_ref_matches.intensity', 'nb_hw_ref_matches.count')
hw_ref_scores = hw_ref_scores.toDF('id', 'created_utc', 'hate_ref_intensity', 'nb_hw_ref_matches')
hw_ref_scores.show()

+-------+-----------+------------------+-----------------+
|     id|created_utc|hate_ref_intensity|nb_hw_ref_matches|
+-------+-----------+------------------+-----------------+
|c595rma| 1341368093|               0.0|              0.0|
|c595rqe| 1341368108|               0.0|              0.0|
|c595rwc| 1341368128|               0.0|              0.0|
|c595sdr| 1341368193|               0.0|              0.0|
|c595sop| 1341368236|               0.0|              0.0|
|c595sui| 1341368256|               0.0|              0.0|
|c595t5u| 1341368301|               0.0|              0.0|
|c595ti7| 1341368347|               0.0|              0.0|
|c595u4r| 1341368440|               0.0|              0.0|
|c595ule| 1341368505|               0.0|              0.0|
|c595up4| 1341368519|               0.0|              0.0|
|c595v6i| 1341368588|               0.0|              0.0|
|c595w8b| 1341368743|               0.0|              0.0|
|c595whq| 1341368782|               0.0|              0.

In [198]:
df_count_matches_sql(bw_1_grams, 'bw_count_matches')
df_count_matches_sql(hw_1_grams, 'hw_count_matches')
df_count_matches_intensity_sql(hw_ref_1_grams, hw_ref_1_intensity, 'hw_ref_count_matches')

In [199]:
nlp_metrics_df = cleaned_messages.selectExpr('id', 'created_utc', 'body', 'process_body(body) as tokens')
nlp_metrics_df = nlp_metrics_df.selectExpr('id', 'created_utc', 'body', "compute_nltk_polarity(body) as nltk_scores", "compute_blob_polarity(body) as blob_scores", "bw_count_matches(tokens) as nb_bw_matches", "hw_count_matches(tokens) as nb_hw_matches", "hw_ref_count_matches(tokens) as hw_ref_matches")
nlp_metrics_df = nlp_metrics_df.selectExpr('id', 'created_utc', 'body', 'nltk_scores.neg as nltk_negativity', 'nltk_scores.neu as nltk_neutrality', 'nltk_scores.pos as nltk_positivity', 'blob_scores.polarity as text_blob_polarity', 'blob_scores.subjectivity as text_blob_subjectivity', 'nb_bw_matches', 'nb_hw_matches', 'hw_ref_matches.intensity as hw_ref_intensity', 'hw_ref_matches.count as nb_hw_ref_matches')
nlp_metrics_df.show()

+-------+-----------+--------------------+---------------+---------------+---------------+------------------+----------------------+-------------+-------------+----------------+-----------------+
|     id|created_utc|                body|nltk_negativity|nltk_neutrality|nltk_positivity|text_blob_polarity|text_blob_subjectivity|nb_bw_matches|nb_hw_matches|hw_ref_intensity|nb_hw_ref_matches|
+-------+-----------+--------------------+---------------+---------------+---------------+------------------+----------------------+-------------+-------------+----------------+-----------------+
|c595rma| 1341368093|The fear that it'...|          0.065|          0.935|            0.0|      -0.041666668|                 0.425|          0.0|          0.0|             0.0|              0.0|
|c595rqe| 1341368108|             Upvote!|            0.0|            1.0|            0.0|               0.0|                   0.0|          0.0|          0.0|             0.0|              0.0|
|c595rwc| 1341368128

In [200]:
nlp_metrics_df = nlp_metrics_df.withColumn('created_utc', func.from_unixtime(nlp_metrics_df['created_utc'], 'yyyy-MM-dd HH:mm:ss.SS').cast(DateType())) \
                               .withColumnRenamed('created_utc', 'creation_date')

nlp_metrics_df.registerTempTable("nlp_metrics")

In [None]:
nlp_pandas = nlp_metrics_df.drop('body').toPandas()

In [71]:
daily_nlp_metrics = spark.sql("""
SELECT
    creation_date,
    
    AVG(sum_nltk_negativity) OVER (
        ORDER BY creation_date
        RANGE BETWEEN 30 PRECEDING AND 30 FOLLOWING
    ) AS nltk_negativity_60d_avg,
    
    AVG(sum_nltk_neutrality) OVER (
        ORDER BY creation_date
        RANGE BETWEEN 30 PRECEDING AND 30 FOLLOWING
    ) AS nltk_neutrality_60d_avg,
    
    AVG(sum_nltk_positivity) OVER (
        ORDER BY creation_date
        RANGE BETWEEN 30 PRECEDING AND 30 FOLLOWING
    ) AS nltk_positivity_60d_avg,
    
    AVG(sum_text_blob_polarity) OVER (
        ORDER BY creation_date
        RANGE BETWEEN 30 PRECEDING AND 30 FOLLOWING
    ) AS text_blob_polarity_60d_avg,
    
    AVG(sum_text_blob_subjectivity) OVER (
        ORDER BY creation_date
        RANGE BETWEEN 30 PRECEDING AND 30 FOLLOWING
    ) AS text_blob_subjectivity_60d_avg,
    
    AVG(sum_nb_bw_matches) OVER (
        ORDER BY creation_date
        RANGE BETWEEN 30 PRECEDING AND 30 FOLLOWING
    ) AS nb_bw_matches_60d_avg,
    
    AVG(sum_nb_hw_matches) OVER (
        ORDER BY creation_date
        RANGE BETWEEN 30 PRECEDING AND 30 FOLLOWING
    ) AS nb_hw_matches_60d_avg,
    
    AVG(sum_hw_ref_intensity) OVER (
        ORDER BY creation_date
        RANGE BETWEEN 30 PRECEDING AND 30 FOLLOWING
    ) AS hw_ref_intensity_60d_avg,
    
    AVG(sum_nb_hw_ref_matches) OVER (
        ORDER BY creation_date
        RANGE BETWEEN 30 PRECEDING AND 30 FOLLOWING
    ) AS nb_hw_ref_matches_60d_avg
    
FROM (
    SELECT
        creation_date,
        SUM(nltk_negativity) AS sum_nltk_negativity,
        SUM(nltk_neutrality) AS sum_nltk_neutrality,
        SUM(nltk_positivity) AS sum_nltk_positivity,
        SUM(text_blob_polarity) AS sum_text_blob_polarity,
        SUM(text_blob_subjectivity) AS sum_text_blob_subjectivity, 
        SUM(nb_bw_matches) AS sum_nb_bw_matches,
        SUM(nb_hw_matches) AS sum_nb_hw_matches,
        SUM(hw_ref_intensity) AS sum_hw_ref_intensity,
        SUM(nb_hw_ref_matches) AS sum_nb_hw_ref_matches
    FROM nlp_metrics
    GROUP BY creation_date
    ORDER BY creation_date
)
""")

# ANALYSIS

In [169]:
date_extrema = spark.sql("""
SELECT MIN(creation_date), MAX(creation_date)
FROM nlp_metrics
""").collect()
min_date = date_extrema[0][0]
max_date = date_extrema[0][1]

print("The dataset starts on the {} and ends on the {}.".format(min_date, max_date))

The dataset starts on the 2005-12-12 and ends on the 2017-04-01.


In [170]:
total_messages = spark.sql("""
SELECT COUNT(*)
FROM nlp_metrics
""").collect()

tot_msg = total_messages[0][0]

print("The dataset contains {} messages".format(tot_msg))

The dataset contains 5757823 messages


In [171]:
total_no_respect = spark.sql("""
SELECT SUM(nb_bw_matches), SUM(nb_hw_matches), SUM(nb_hw_ref_matches)
FROM nlp_metrics
""").collect()

sum_bw = total_no_respect[0][0]
sum_hw = total_no_respect[0][1]
sum_ref_hw = total_no_respect[0][2]

print("Over all the messages, there are at least {} bad words, {} hate speech words, {} refined hate speech words".format(sum_bw, sum_hw, sum_ref_hw))

Over all the messages, there are at least 589284.0 bad words, 106483.0 hate speech words, 15617.0 refined hate speech words


In [None]:
total_polarity = spark.sql("""
SELECT SUM(nltk_negativity), SUM(nltk_neutrality), SUM(nltk_positivity), SUM(text_blob_polarity), SUM(text_blob_subjectivity)
FROM nlp_metrics
""").collect()

sum_nltk_neg = total_polarity[0][0]
sum_nltk_neu = total_polarity[0][1]
sum_nltk_pos = total_polarity[0][2]
sum_txt_blob_pol = total_polarity[0][3]
sum_txt_blob_subj = total_polarity[0][4]

print("The total pos/neu/neg score of the dataset is: negativity: {}, neutrality:{}, positivity: {}".format(nltk_neg, nltk_neu, nltk_pos))
print("The total polarity and subjectivity score of the dataset is: polarity: {}, subjectivity: {}".format(sum_txt_blob_pol, sum_txt_blob_subj))