# Reddit Mental Health Data Using Spark NLP

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').appName('spark_ml').config("spark.driver.memory","8G").getOrCreate()

### Load the data

First, we load the data.

In [2]:
reddit = spark.read.options(delimiter=',', header=True, inferSchema=True, multiLine = True, escape = '\"').csv('mental_disorders_reddit.csv')

In [3]:
reddit.printSchema()

root
 |-- title: string (nullable = true)
 |-- selftext: string (nullable = true)
 |-- created_utc: integer (nullable = true)
 |-- over_18: boolean (nullable = true)
 |-- subreddit: string (nullable = true)



In [4]:
reddit.show(30)

+--------------------+--------------------+-----------+-------+---------+
|               title|            selftext|created_utc|over_18|subreddit|
+--------------------+--------------------+-----------+-------+---------+
|Life is so pointl...|Does anyone else ...| 1650356960|  false|      BPD|
|          Cold rage?|Hello fellow frie...| 1650356660|  false|      BPD|
|I don’t know who ...|My [F20] bf [M20]...| 1650355379|  false|      BPD|
|HELP! Opinions! A...|Okay, I’m about t...| 1650353430|  false|      BPD|
|                help|           [removed]| 1650350907|  false|      BPD|
|My ex got diagnos...|Without going int...| 1650350635|  false|      BPD|
|Is misdiagnosis o...|(Reposting here o...| 1650349446|  false|      BPD|
|I have trouble id...|I grew up mostly ...| 1650349125|  false|      BPD|
|     Needing advice…|I posted on this ...| 1650349094|  false|      BPD|
|      Do I have BPD?|           [removed]| 1650349072|   true|      BPD|
|How do you deal w...|If they were to 

In [5]:
from pyspark.sql.functions import regexp_replace

In [6]:
reddit = reddit.withColumn("subreddit", regexp_replace(reddit["subreddit"], "bipolar", "BPD"))

In [7]:
reddit = reddit.repartition(30)

In [8]:
import time
start_time = time.time()
print(reddit.count())
print("--- %s seconds ---" % (time.time() - start_time))

701787
--- 70.41638016700745 seconds ---


In [9]:
pip install nltk

Note: you may need to restart the kernel to use updated packages.


In [10]:
pip install textblob

Note: you may need to restart the kernel to use updated packages.


In [11]:
import nltk
import textblob
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [12]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession
from nltk.corpus import stopwords
from textblob import TextBlob
import matplotlib.pyplot as plt
from pyspark.sql.functions import col, udf
import re

In [13]:
# handle null values in clean_freetext column
def replace_null(x):
    if x is None:
        return ''
    else:
        return x

In [14]:
replace_null_udf = udf(replace_null, StringType())
reddit = reddit.withColumn('selftext', replace_null_udf(col('selftext')))

In [15]:
# remove stopwords from freetext column
stop_words = set(stopwords.words('english'))
filter_spam = F.udf(lambda x: " ".join([word for word in x.lower().split() if word not in stop_words]), StringType())
reddit = reddit.withColumn('clean_selftext', filter_spam(col('selftext')))

In [16]:
# remove stopwords from Title column
stop_words = set(stopwords.words('english'))
filter_spam = F.udf(lambda x: " ".join([word for word in x.lower().split() if word not in stop_words]), StringType())
reddit = reddit.withColumn('clean_title', filter_spam(col('title')))

In [17]:
reddit.show(10)

+--------------------+--------------------+-----------+-------+----------+--------------------+--------------------+
|               title|            selftext|created_utc|over_18| subreddit|      clean_selftext|         clean_title|
+--------------------+--------------------+-----------+-------+----------+--------------------+--------------------+
|Anxiety is random...|Hi everyone. I've...| 1628004889|  false|   Anxiety|hi everyone. i've...|anxiety randomly ...|
|              help ¿|hii I'm 16 (f)\nS...| 1649141333|  false|   Anxiety|hii i'm 16 (f) ne...|              help ¿|
|DAE track their m...|My therapist sugg...| 1542860454|  false|       BPD|therapist suggest...|     dae track mood?|
|Latuda is expensi...|So I have been on...| 1649687216|  false|       BPD|latuda years comp...|latuda expensive!...|
|I’m realizing how...|My perspective of...| 1648447783|  false|   Anxiety|perspective stutt...|i’m realizing oth...|
|Mental health com...|Some of us can’t ...| 1635742205|  false| 

In [18]:
#reddit = reddit.filter(F.col("clean_title").isNotNull())

### Word Counts of the 'SelfText' Column

In [19]:
import pyspark.sql.functions as fn
import time

start_time = time.time()

#Split the lines into words
words = reddit.select(fn.explode(fn.split(fn.concat_ws(" ", reddit.clean_selftext), ' ')).alias('word'))

#Generate word count
word_counts = words.groupBy('word').count()

# Sort by count in descending order and take the top 10
top10 = word_counts.orderBy('count', ascending=False).limit(10)

top10.show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+------+
|  word| count|
+------+------+
|  like|720840|
|  feel|578344|
|   i'm|521733|
|   i’m|471377|
|  know|404265|
|   get|372578|
|  want|346615|
|really|313670|
|  even|308379|
|people|234724|
+------+------+

--- 82.79292392730713 seconds ---


### Word Counts of the 'Title' Column

In [20]:
from pyspark.sql import functions as fn
reddit = reddit.filter(fn.col("title").isNotNull())

In [21]:
reddit.count()

701741

In [22]:
import pyspark.sql.functions as fn
import time

start_time = time.time()

#Split the lines into words
words2 = reddit.select(fn.explode(fn.split(fn.concat_ws(" ", reddit.clean_title), ' ')).alias('word2'))

#Generate word count
word_counts2 = words2.groupBy('word2').count()

# Sort by count in descending order and take the top 10
top10_2 = word_counts2.orderBy('count', ascending=False).limit(10)

top10_2.show()

print("--- %s seconds ---" % (time.time() - start_time))

+-------+-----+
|  word2|count|
+-------+-----+
|   feel|46177|
|anxiety|45834|
|   like|37183|
| anyone|35617|
|    i'm|32209|
|    i’m|31356|
|    bpd|27480|
|    get|25525|
|   help|24560|
|   need|21865|
+-------+-----+

--- 71.45522737503052 seconds ---


### Sentiment Analysis of 'Self Text' grouped by Subreddit

In [23]:
!pip install pyspark
!pip install nltk
!pip install textblob

Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.9.5


In [24]:
# calculate sentiment score of clean_freetext column
get_sentiment = F.udf(lambda x: TextBlob(x).sentiment.polarity, StringType())
reddit = reddit.withColumn('sentiment', get_sentiment(col('clean_selftext')))

In [25]:
reddit.printSchema()

root
 |-- title: string (nullable = true)
 |-- selftext: string (nullable = true)
 |-- created_utc: integer (nullable = true)
 |-- over_18: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- clean_selftext: string (nullable = true)
 |-- clean_title: string (nullable = true)
 |-- sentiment: string (nullable = true)



In [26]:
# calculate sentiment score and label of clean_freetext column
def get_sentiment(x):
    if x:
        score = TextBlob(x).sentiment.polarity
        if score > 0:
            return 'Positive'
        elif score < 0:
            return 'Negative'
        else:
            return 'Neutral'
    else:
        return None

In [27]:
get_sentiment_udf = udf(get_sentiment, StringType())
reddit = reddit.withColumn('sentiment_category', get_sentiment_udf(col('clean_selftext')))

In [28]:
reddit.show(10)

+--------------------+--------------------+-----------+-------+----------+--------------------+--------------------+--------------------+------------------+
|               title|            selftext|created_utc|over_18| subreddit|      clean_selftext|         clean_title|           sentiment|sentiment_category|
+--------------------+--------------------+-----------+-------+----------+--------------------+--------------------+--------------------+------------------+
|Anxiety is random...|Hi everyone. I've...| 1628004889|  false|   Anxiety|hi everyone. i've...|anxiety randomly ...|0.020278679653679653|          Positive|
|              help ¿|hii I'm 16 (f)\nS...| 1649141333|  false|   Anxiety|hii i'm 16 (f) ne...|              help ¿|  0.2233977978543196|          Positive|
|DAE track their m...|My therapist sugg...| 1542860454|  false|       BPD|therapist suggest...|     dae track mood?| 0.16319444444444445|          Positive|
|Latuda is expensi...|So I have been on...| 1649687216|  f

In [29]:
# group by channel and sentiment and count the number of records
sentiment_count = reddit.groupBy('subreddit', 'sentiment_category').count()

# pivot the sentiment column and fill null values with 0
sentiment_count = sentiment_count.groupBy('subreddit').pivot('sentiment_category', ['Negative', 'Neutral', 'Positive']).sum('count').fillna(0)

In [30]:
sentiment_count.show()

+-------------+--------+-------+--------+
|    subreddit|Negative|Neutral|Positive|
+-------------+--------+-------+--------+
|          BPD|  101856|  47748|  130120|
|   depression|   57201|  41203|   58302|
|mentalillness|   17627|   8605|   18009|
|      Anxiety|   77670|  12857|   76491|
|schizophrenia|    4122|  10227|    5926|
+-------------+--------+-------+--------+



### Topic Extraction

In [54]:
from pyspark.sql.functions import explode, split, regexp_extract, col, lower

In [63]:
# Replace special characters and punctuation with spaces and change text to lowercase
reddit = reddit.withColumn("selftext_cleaned", lower(regexp_replace(col("selftext"), "[^a-zA-Z0-9\\s.,?]+", " ")))

In [64]:
# Define a regular expression pattern to match topics
pattern = r"(?i)\b(anxiety|depression|bipolar|schizophrenia|PTSD|OCD|ADHD|autism|panic|stress)\b"

In [65]:
# Split the selftext column by whitespace and explode the resulting array
exploded = reddit.select("subreddit", explode(split(col("selftext"), " ")).alias("word"))

In [66]:
# Extract topics using the regular expression pattern and group by subreddit and topic
topics = exploded.filter(regexp_extract(col("word"), pattern, 0) != "").groupBy("subreddit", "word").count()

In [67]:
# Rename the "word" column to "topic" and sort by subreddit and count
topics = topics.withColumnRenamed("word", "topic").orderBy("subreddit", "count", ascending=False)

In [68]:
# Show the resulting DataFrame
topics.show()

+-------------+--------------+-----+
|    subreddit|         topic|count|
+-------------+--------------+-----+
|schizophrenia| schizophrenia| 2062|
|schizophrenia|       anxiety|  594|
|schizophrenia|schizophrenia.|  568|
|schizophrenia|schizophrenia,|  338|
|schizophrenia|    depression|  302|
|schizophrenia|        stress|  266|
|schizophrenia|Schizophrenia”|  257|
|schizophrenia|       bipolar|  241|
|schizophrenia|         panic|  216|
|schizophrenia|schizophrenia?|  182|
|schizophrenia| Schizophrenia|  163|
|schizophrenia|      anxiety,|  136|
|schizophrenia|      anxiety.|  105|
|schizophrenia|   depression,|  100|
|schizophrenia|           OCD|   96|
|schizophrenia|           ocd|   95|
|schizophrenia|        autism|   83|
|schizophrenia|   depression.|   78|
|schizophrenia|          ADHD|   75|
|schizophrenia|          PTSD|   63|
+-------------+--------------+-----+
only showing top 20 rows

