In [1]:
from pyspark.sql import SparkSession
import sparknlp
from sparknlp.base import DocumentAssembler
from pyspark.ml import Pipeline
from sparknlp.annotator import *
from sparknlp.base import *
from pyspark.sql import types as T
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
from pyspark.sql import functions as F
from sparknlp.base import Finisher
import itertools
from pyspark.sql.functions import col, when, least, greatest, lit

spark = SparkSession \
        .builder \
        .appName("network") \
        .getOrCreate()

24/12/10 14:04:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [9]:
data = spark.read.csv("../data/cleaned_moral_scores.csv", header= True).select('id', 'cleaned_text', 'emo_pos', 'emo_neg', 
                                                                          'emo_anx','emo_anger','emo_sad', 'moral')

In [4]:
data.show(10)

+-----+--------------------+-------+-------+-------+---------+-------+-----+
|   id|        cleaned_text|emo_pos|emo_neg|emo_anx|emo_anger|emo_sad|moral|
+-----+--------------------+-------+-------+-------+---------+-------+-----+
|hk5r2|i had an appointm...|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|iqimz|i created this si...|   2.56|    0.0|    0.0|      0.0|    0.0| 1.71|
|pfzt5|hello everyone  i...|   2.06|    0.0|    0.0|      0.0|    0.0| 0.52|
|pk714|i grew up with bo...|   1.71|    1.2|   0.34|      0.0|   0.51| 0.68|
|q0q8x|i have to ask whe...|   1.25|   1.61|   0.18|     0.18|    0.9| 0.18|
|q412v|nothing but oppor...|   1.05|   3.16|    0.0|      0.0|   3.16|  0.0|
|q5mqk|im getting out of...|   3.27|   1.96|   1.31|      0.0|    0.0|  0.0|
|q70xe|hey everyone firs...|    0.0|   1.96|    0.0|      0.0|    0.0|  0.0|
|q7mrn|facebook is great...|   0.96|    0.0|    0.0|      0.0|    0.0|  0.0|
|qcsyp|okay so im 18 yea...|   0.74|   0.74|    0.0|      0.0|    0.0|  0.0|

Data preprocessing

In [6]:
#Define stopwords
english = [
    "a", "about", "above", "after", "again", "against", "all", "am", "an", "and", "any", "are", "as", "at", "be", 
    "because", "been", "before", "being", "below", "between", "both", "but", "by", "can", "cannot", "could", "did", 
    "do", "does", "doing", "down", "during", "each", "few", "for", "from", "further", "had", "has", "have", "having", 
    "he", "her", "here", "hers", "herself", "him", "himself", "his", "how", "i", "if", "in", "into", "is", "it", 
    "its", "itself", "let", "me", "more", "most", "must", "my", "myself", "no", "nor", "not", "of", "off", "on", 
    "once", "only", "or", "other", "our", "ours", "ourselves", "out", "over", "own", "same", "she", "some", "such", 
    "than", "that", "the", "their", "theirs", "them", "themselves", "then", "there", "these", "they", "this", "those", 
    "through", "to", "too", "under", "until", "up", "very", "was", "we", "were", "what", "when", "where", "which", 
    "while", "who", "whom", "why", "with", "would", "you", "your", "yours", "yourself", "yourselves", "will", "ll", 
    "re", "ve", "d", "s", "m", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", 
    "s", "t", "u", "v", "w", "x", "y", "z", "many", "us", "ok", "hows", "ive", "ill", "im", "cant", "topics", "topic",
    "discuss", "thoughts", "yo", "thats", "whats", "lets", "nothing", "oh", "omg", 
         "things", "stuff", "yall", "haha", "yes", "no", "wo", "like", 'good', 
         'work', 'got', 'going', 'dont', 'really', 'want', 'make', 'think', 
         'know', 'feel', 'people', 'life', "getting", "lot" "great", "i", "me", 
         "my", "myself", "we", "our", "ours", "ourselves", 
        "you", "your", "yours", "yourself", "yourselves", "he", "him", "his", 
        "himself", "she", "her", "hers", "herself", "it", "its", "itself", 
        "they", "them", "their", "theirs","themselves", "what", "which", "who", 
        "whom", "this", "that", "these", "those", "am", "is", "are", "was", 
        "were", "be", "been", "being", "have", "has", "had", "having", "do", 
        "does", "did", "doing", "will", "would", "can", "could", "may",
        "might", "shall", "ought", "about", "above", "across", "after", 
        "against", "along", "amid", "among", "around", "as", "at", "before", "behind",
        "below", "beneath", "beside", "between", "beyond", "but", "by", 
        "considering", "despite", "down", "during", "except", "for",
        "from", "in", "inside", "into", "like", "near", "next", "notwithstanding",
        "of", "off", "on", "onto", "opposite", "out", "outside", "over", "past",
        "regarding", "round", "since", "than", "through", "throughout", "till", 
        "to", "toward", "towards", "under", "underneath", "unlike", "until", "up",
        "upon", "versus", "via", "with", "within", "without", "cant", "cannot", 
        "couldve", "couldnt", "didnt", "doesnt", "dont", "hadnt", "hasnt", 
        "havent", "hed", "hell", "hes", "howd", "howll", "hows", "id", "ill", 
        "im", "ive", "isnt", "itd", "itll", "its", "lets", "mightve", 
        "shant", "shed", "shell", "shes", 
        "thatll", "thats", "thered", "therell", "therere", "theres", "theyd", 
        "theyll", "theyre", "theyve", "wed", "well", "were", "weve", "werent", 
        "whatd", "whatll", "whatre", "whats", "whatve", "whend", "whenll", 
        "whens", "whered", "wherell", "wheres", "whichd", "whichll", "whichre", 
        "whichs", "whod", "wholl", "whore", "whos", "whove", "whyd", "whyll", 
        "whys", "wont", "wouldve", "wouldnt", "youd", "youll", "youre", "youve",
        "f", "m", "because", "go", "lot", "get", "still", "way", "something", "much",
        "thing", "someone", "person", "anything", "goes", "ok", "so", "just", "mostly", 
        "put", "also", "lots", "yet", "ha", "etc", "even", "one", "bye", "take", "wasnt"]

time = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", 
        "sunday", "morning", "noon", "afternoon", "evening", "night", "midnight",
        "dawn", "dusk", "week", "weekend", "weekends","weekly", "today", 
        "yesterday", "tomorrow", "yesterdays", "todays", "mondays", "tuesdays",
        "wednesdays", "thursdays", "fridays", "saturdays", "sundays", "day",
        "everyday", "daily", "workday", 'time', 'month', 'year', 'pm', 'am', "ago",
        "year", "now"]

reddit = ["welcome", "hi", "hello", "sub", "reddit", "thanks", "thank", "maybe",
          "wo30", "mods", "mod", "moderators", "subreddit", "btw", "aw", "aww", 
          "aww", "hey", "hello", "join", "joined", "post", "rselfimprovement", "blah"]

topic_specific = ["self", "improvement", "change", "action",
    'change', 'start', 'goal', 'habit', 'new', 'old', 
    'care', 'world', 'everyone', 'love', 'u', 'right', 'mean', 'matter',
    'best', 'step', 'focus', 'hard', 'small',
    'bad', 'help', 'time', 'problem', 'issue', 'advice',
    'bit', 'experience', 'different',
    'point', 'situation', 'negative', 'control', 'positive',
    'use', 'question', 'idea', 'amp', 'medium', 'hour', 'day', 'minute',
    'aaaaloot', "selfimprovement", "_", "ampxb"]

stopwords = english + time + reddit + topic_specific

In [7]:
documentAssembler = DocumentAssembler()\
     .setInputCol("cleaned_text")\
     .setOutputCol('document')

tokenizer = Tokenizer() \
            .setInputCols(['document'])\
            .setOutputCol('tokenized')

normalizer = Normalizer() \
     .setInputCols(['tokenized']) \
     .setOutputCol('normalized')

lemmatizer = LemmatizerModel.load("../models/lemma_ewt_en_3.4.3_3.0_1651416655397/")\
      .setInputCols("normalized")\
      .setOutputCol("lemmatized")

stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['lemmatized']) \
     .setOutputCol('words') \
     .setStopWords(stopwords)

finisher = Finisher().setInputCols(['words'])

my_pipeline = Pipeline(
      stages = [
          documentAssembler,
          tokenizer,
          normalizer,
          lemmatizer,
          stopwords_cleaner,
          finisher
      ])

In [10]:
pipelineModel = my_pipeline.fit(data)
processed_data = pipelineModel.transform(data)
processed_data.persist()
processed_data.show(10)

[Stage 5:>                                                          (0 + 1) / 1]

+-----+--------------------+-------+-------+-------+---------+-------+-----+--------------------+
|   id|        cleaned_text|emo_pos|emo_neg|emo_anx|emo_anger|emo_sad|moral|      finished_words|
+-----+--------------------+-------+-------+-------+---------+-------+-----+--------------------+
|hk5r2|i had an appointm...|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|[appointment, den...|
|iqimz|i created this si...|   2.56|    0.0|    0.0|      0.0|    0.0| 1.71|[create, site, se...|
|pfzt5|hello everyone  i...|   2.06|    0.0|    0.0|      0.0|    0.0| 0.52|[recently, made, ...|
|pk714|i grew up with bo...|   1.71|    1.2|   0.34|      0.0|   0.51| 0.68|[grow, body, dysm...|
|q0q8x|i have to ask whe...|   1.25|   1.61|   0.18|     0.18|    0.9| 0.18|[content, never, ...|
|q412v|nothing but oppor...|   1.05|   3.16|    0.0|      0.0|   3.16|  0.0|[butt, opportunit...|
|q5mqk|im getting out of...|   3.27|   1.96|   1.31|      0.0|    0.0|  0.0|[comfort, zone, t...|
|q70xe|hey everyone 

                                                                                

In [11]:
#Apply TF-IDF filtering
tfizer = CountVectorizer(inputCol='finished_words', outputCol='tf_features', minDF=0.01, vocabSize=1000)
tf_model = tfizer.fit(processed_data)
tf_result = tf_model.transform(processed_data)
vocabulary = tf_model.vocabulary


idfizer = IDF(inputCol='tf_features', outputCol='tf_idf_features')
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

processed_data.unpersist()
tfidf_result.persist()

                                                                                

DataFrame[id: string, cleaned_text: string, emo_pos: string, emo_neg: string, emo_anx: string, emo_anger: string, emo_sad: string, moral: string, finished_words: array<string>, tf_features: vector, tf_idf_features: vector]

In [12]:
# Define a function to filter words by their TF-IDF score
# UDF to map indices to words using the vocabulary
def filter_tfidf(features, threshold=1, vocabulary=None):
    if features is not None:
        # Filter based on TF-IDF score and map indices to actual words
        return [vocabulary[features.indices[i]] for i in range(len(features.values)) if features.values[i] >= threshold]
    return []

# Register the UDF
filter_udf = udf(lambda features: filter_tfidf(features, threshold=1, vocabulary=vocabulary), ArrayType(StringType()))

# Apply the filtering function
df_filtered_tfidf = tfidf_result.withColumn("filtered_words_tfidf", filter_udf("tf_idf_features"))

df_filtered_tfidf.show()
tfidf_result.unpersist()

[Stage 13:>                                                         (0 + 1) / 1]

+-----+--------------------+-------+-------+-------+---------+-------+-----+--------------------+--------------------+--------------------+--------------------+
|   id|        cleaned_text|emo_pos|emo_neg|emo_anx|emo_anger|emo_sad|moral|      finished_words|         tf_features|     tf_idf_features|filtered_words_tfidf|
+-----+--------------------+-------+-------+-------+---------+-------+-----+--------------------+--------------------+--------------------+--------------------+
|hk5r2|i had an appointm...|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|[appointment, den...|(805,[13,37,44,77...|(805,[13,37,44,77...|[never, happen, s...|
|iqimz|i created this si...|   2.56|    0.0|    0.0|      0.0|    0.0| 1.71|[create, site, se...|(805,[0,3,58,135,...|(805,[0,3,58,135,...|[find, hope, futu...|
|pfzt5|hello everyone  i...|   2.06|    0.0|    0.0|      0.0|    0.0| 0.52|[recently, made, ...|(805,[6,9,11,19,2...|(805,[6,9,11,19,2...|[look, learn, kee...|
|pk714|i grew up with bo...|   1.7

                                                                                

DataFrame[id: string, cleaned_text: string, emo_pos: string, emo_neg: string, emo_anx: string, emo_anger: string, emo_sad: string, moral: string, finished_words: array<string>, tf_features: vector, tf_idf_features: vector]

Generate pairs of words that co-occur on the same documents

In [13]:
def generate_edges(tokens):
    return [list(pair) for pair in itertools.combinations(tokens, 2)]

generate_edges_udf = udf(generate_edges, ArrayType(ArrayType(StringType())))

In [14]:
df_edges = df_filtered_tfidf.withColumn("edges", generate_edges_udf(F.col("filtered_words_tfidf")))
df_edges.show(10)

[Stage 14:>                                                         (0 + 1) / 1]

+-----+--------------------+-------+-------+-------+---------+-------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|   id|        cleaned_text|emo_pos|emo_neg|emo_anx|emo_anger|emo_sad|moral|      finished_words|         tf_features|     tf_idf_features|filtered_words_tfidf|               edges|
+-----+--------------------+-------+-------+-------+---------+-------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|hk5r2|i had an appointm...|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|[appointment, den...|(805,[13,37,44,77...|(805,[13,37,44,77...|[never, happen, s...|[[never, happen],...|
|iqimz|i created this si...|   2.56|    0.0|    0.0|      0.0|    0.0| 1.71|[create, site, se...|(805,[0,3,58,135,...|(805,[0,3,58,135,...|[find, hope, futu...|[[find, hope], [f...|
|pfzt5|hello everyone  i...|   2.06|    0.0|    0.0|      0.0|    0.0| 0.52|[recently, mad

                                                                                

In [15]:
df_explode = df_edges.select(
    F.col("id"),
    F.explode(F.col("edges")).alias("edge"), F.col('emo_pos'),
    F.col('emo_neg'), F.col('emo_anx'), F.col('emo_anger'), 
    F.col('emo_sad'), F.col('moral'))

df_explode.show(10)

[Stage 15:>                                                         (0 + 1) / 1]

+-----+-------------------+-------+-------+-------+---------+-------+-----+
|   id|               edge|emo_pos|emo_neg|emo_anx|emo_anger|emo_sad|moral|
+-----+-------------------+-------+-------+-------+---------+-------+-----+
|hk5r2|    [never, happen]|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|      [never, sure]|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|      [never, last]|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|       [never, two]|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|      [never, call]|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|[never, completely]|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|     [never, phone]|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|    [never, forget]|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|     [never, smoke]|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|     [never, three]|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
+-----+-----

                                                                                

Create edges df

In [16]:
edges_df = df_explode.select(
    F.col("edge")[0].alias("node1"),
    F.col("edge")[1].alias("node2"))

edges_df = edges_df.withColumn("weight", lit(1))

In [17]:
# Normalize the pairs: ensure node1 is always less than node2, so they can be always on the same order
edges_df = edges_df.withColumn("node1_norm", least(col("node1"), col("node2"))) \
             .withColumn("node2_norm", greatest(col("node1"), col("node2"))) \
             .select('node1_norm', 'node2_norm', 'weight')

In [18]:
edges_df = edges_df.groupBy("node1_norm", "node2_norm").sum("weight") \
                        .withColumnRenamed("sum(weight)", "weight")

Create nodes df

In [19]:
nodes_df = df_explode.select(
    F.col("id"),
    F.explode(F.col("edge")).alias("node"), F.col('emo_pos'),
    F.col('emo_neg'), F.col('emo_anx'), F.col('emo_anger'), 
    F.col('emo_sad'), F.col('moral'))

nodes_df.show(20) 

nodes_df_g = nodes_df.groupBy("id", "node").agg(
    F.first('emo_pos').alias('emo_pos'),
    F.first('emo_neg').alias('emo_neg'),
    F.first('emo_anx').alias('emo_anx'),
    F.first('emo_anger').alias('emo_anger'),
    F.first('emo_sad').alias('emo_sad'),
    F.first('moral').alias('moral'))

nodes_df_g.show(20)

                                                                                

+-----+----------+-------+-------+-------+---------+-------+-----+
|   id|      node|emo_pos|emo_neg|emo_anx|emo_anger|emo_sad|moral|
+-----+----------+-------+-------+-------+---------+-------+-----+
|hk5r2|     never|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|    happen|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|     never|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|      sure|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|     never|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|      last|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|     never|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|       two|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|     never|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|      call|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|     never|    0.0|    0.0|    0.0|      0.0|    0.0|  0.0|
|hk5r2|completely|    0.0|    0.0|    0.0|      0.0|    0.0|  



+-------+----------+-------+-------+-------+---------+-------+-----+
|     id|      node|emo_pos|emo_neg|emo_anx|emo_anger|emo_sad|moral|
+-------+----------+-------+-------+-------+---------+-------+-----+
|1001497|     first|   0.53|   4.79|   2.13|     0.53|    1.6|  0.0|
|1001497|    friend|   0.53|   4.79|   2.13|     0.53|    1.6|  0.0|
|10018jv|   attract|   1.36|   1.82|    0.0|     1.82|    0.0| 0.45|
|1001diz|      rule|   0.78|   3.13|   1.56|      0.0|   0.78|  0.0|
|1001mlo|       two|    1.0|   1.34|   0.67|     0.33|    0.0| 0.67|
|1001vtv|  continue|    2.1|    2.1|    0.0|      1.4|    0.0|  0.7|
|10021rc|    effect|   1.01|   1.01|    0.0|      0.0|    0.0|  0.0|
|10023zp|     crazy|   1.14|    0.0|    0.0|      0.0|    0.0| 0.57|
|100268m|      club|   0.93|   2.78|   0.93|      0.0|   0.93|  0.0|
|100268m|      room|   0.93|   2.78|   0.93|      0.0|   0.93|  0.0|
|1002gvr|      true|   5.46|   1.09|    0.0|     0.55|    0.0| 1.09|
|1002lda|     grade|   0.82|   2.8

                                                                                

Now, aggregate all words and average their emotions and morality scores for all documents in which they appear

In [20]:
final_nodes = nodes_df_g.groupBy("node").agg(
    F.avg('emo_pos').alias('emo_pos'),
    F.avg('emo_neg').alias('emo_neg'),
    F.avg('emo_anx').alias('emo_anx'),
    F.avg('emo_anger').alias('emo_anger'),
    F.avg('emo_sad').alias('emo_sad'),
    F.avg('moral').alias('moral'))

In [22]:
final_nodes.sort(final_nodes.moral.desc()).show(50)



+--------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+
|          node|           emo_pos|           emo_neg|            emo_anx|          emo_anger|            emo_sad|              moral|
+--------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+
|       deserve|1.5325340000000003|          1.209236|0.22293399999999997| 0.2511029999999999|           0.215441| 1.3315949999999999|
|         shame|1.0600765843385027|2.2937583764120237|  0.322152019911928|0.23373348650201034|0.27635075627034267|  1.323130384836301|
|         blame|0.9059750435287285| 1.304735925710969|0.24512768427161924| 0.2946967498549042|0.22843441671503195| 1.3038073128264656|
|         judge|1.0985308109304128| 1.176126783936276|0.38571412766898994|0.20677066047129108|0.15007854851200353| 1.3013209425821444|
|         fault|0.9347229891404379|1.3532210565065341|0

                                                                                