### Import the necessary libraries

In [0]:
import nltk
import string
from pyspark.sql.column import Column
from pyspark.sql import functions as F
from nltk.stem import WordNetLemmatizer
from pyspark.sql.functions import array_contains
from pyspark.ml.feature import Tokenizer, StopWordsRemover, StringIndexer
from pyspark.sql.types import StringType, ArrayType, StructType, StructField

### Configure NLTK Data Path

In [0]:
def setup_nltk_data_path():

  nltk_data_path = '/dbfs/nltk_data'  

  if nltk_data_path not in nltk.data.path:
    
    nltk.data.path.append(nltk_data_path)

### Load and Filter Reddit Comments Data from CSV File

In [0]:
schema = StructType([
    StructField('comment', StringType(), True),
])

df = spark.read.schema(schema).csv('/mnt/2024-team20/reddit comments/politics_comments.csv')

df = df.filter(F.col('comment').isNotNull())

df = df.distinct()

### Define Regular Expressions for Text Cleaning

In [0]:
def remove_pattern(col: Column, pattern: str) -> Column:
    return F.regexp_replace(col, pattern, "")

url_pattern = r'http\S+|www\S+|https\S+'
handle_pattern = r'@\w+'
hashtag_pattern = r'#\w+'
numerical_pattern = r'\d+'
punctuation_pattern = r'^[^\w\s]+|[^\w\s]+$'
emoji_pattern = "[" \
                u"\U0001F600-\U0001F64F" \
                u"\U0001F300-\U0001F5FF" \
                u"\U0001F680-\U0001F6FF" \
                u"\U0001F700-\U0001F77F" \
                u"\U0001F780-\U0001F7FF" \
                u"\U0001F800-\U0001F8FF" \
                u"\U0001F900-\U0001F9FF" \
                u"\U0001FA70-\U0001FAFF" \
                u"\U00002702-\U000027B0" \
                u"\U000024C2-\U0001F251" \
                u"\U0001F1E0-\U0001F1FF" \
                "]+"

### Clean Text Data by Removing URLs, Handles, Hashtags, and Other Patterns

In [0]:
df_clean = df.withColumn('clean_comments', F.col('comment'))
df_clean = df_clean.withColumn('clean_comments', remove_pattern(F.col('clean_comments'), url_pattern))
df_clean = df_clean.withColumn('clean_comments', remove_pattern(F.col('clean_comments'), handle_pattern))
df_clean = df_clean.withColumn('clean_comments', remove_pattern(F.col('clean_comments'), hashtag_pattern))
df_clean = df_clean.withColumn('clean_comments', remove_pattern(F.col('clean_comments'), numerical_pattern))
df_clean = df_clean.withColumn('clean_comments', remove_pattern(F.col('clean_comments'), punctuation_pattern))
df_clean = df_clean.withColumn('clean_comments', remove_pattern(F.col('clean_comments'), emoji_pattern))

### Filter Reddit Comments with Insufficient Word Count

In [0]:
df_clean = df_clean.filter(F.size(F.split(df_clean.clean_comments, '\\s+')) > 3)

### Tokenize and Clean Reddit Comments Tokens

In [0]:
tokenizer = Tokenizer(inputCol = 'clean_comments', outputCol = 'tokens')

df_tokens = tokenizer.transform(df_clean)

df_tokens = df_tokens.withColumn('tokens_cleaned', F.expr("filter(tokens, x -> x IS NOT NULL AND x != '')"))

### Removal of Unnecessary Comment by the Reddit Team

In [0]:
# remove unnecessary comments produced by reddit team
df_tokens = df_tokens.filter(~array_contains(df_tokens['tokens_cleaned'], "bot"))

### Remove Stop Words and Filter Empty Tokens

In [0]:
remover = StopWordsRemover(inputCol = 'tokens_cleaned', outputCol = 'filtered_tokens')

df_filtered = remover.transform(df_tokens)

df_filtered = df_filtered.filter(F.size(F.col('filtered_tokens')) > 0)

### Refine Tokens by Stripping Punctuation

In [0]:
def clean_array(arr):

    cleaned_arr = []

    for item in arr:

        item = item.strip(string.punctuation)

        if item:

            cleaned_arr.append(item)
            
    return cleaned_arr
  
clean_array_udf = udf(clean_array, ArrayType(StringType()))

df_filtered = df_filtered.withColumn('filtered_tokens', clean_array_udf(F.col('filtered_tokens')))

### Filter Comments to Retain Only Those with Sufficient Content

In [0]:
df_filtered = df_filtered.filter(F.size(df_filtered['filtered_tokens']) >= 4)

### Apply Lemmatization to Standardize Token Forms

In [0]:
def lemmatize_words(words):

  setup_nltk_data_path()

  lemmatizer = WordNetLemmatizer()

  return [lemmatizer.lemmatize(word) for word in words]

lemmatize_words_udf = udf(lemmatize_words, ArrayType(StringType()))

df_lemmatized = df_filtered.withColumn('lemmatized_tokens', lemmatize_words_udf(F.col('filtered_tokens')))

### Filter Out Empty Token Lists After Lemmatization

In [0]:
df_lemmatized = df_lemmatized.filter(F.size(F.col('lemmatized_tokens')) > 0)

### Remove Unnecessary Columns

In [0]:
columns_to_drop = ['clean_comments', 'tokens', 'tokens_cleaned', 'filtered_tokens']

final_df = df_lemmatized.drop(*columns_to_drop)

### Export the Final Dataset to Parquet Format

In [0]:
final_df.write.mode('overwrite').parquet('/mnt/2024-team20/cleaned_reddit_comments_parquet')