In [24]:
from dask.diagnostics import ProgressBar
from dask.distributed import Client
from nltk.corpus import stopwords
from nltk.sentiment import SentimentIntensityAnalyzer
from nltk.stem import WordNetLemmatizer, PorterStemmer

import dask.dataframe as dd
import html
import psutil
import pandas as pd
import pickle as pkl
import re
import time

In [25]:
# Read the data
df_comments1 = pd.read_pickle('./pickle_dataframes/comments1.pkl')
df_comments2 = pd.read_pickle('./pickle_dataframes/comments2.pkl')
df_comments = pd.concat([df_comments1,df_comments2])
df_comments.reset_index(drop=True, inplace=True)

df_posts1 = pd.read_pickle('./pickle_dataframes/posts1.pkl')
df_posts2 = pd.read_pickle('./pickle_dataframes/posts2.pkl')
df_posts3 = pd.read_pickle('./pickle_dataframes/posts3.pkl')
df_posts = pd.concat([df_posts1, df_posts2, df_posts3])
df_posts.reset_index(drop=True, inplace=True)

df_postlinks = pd.read_pickle('./pickle_dataframes/posts_links.pkl')
df_tags = pd.read_pickle('./pickle_dataframes/tags.pkl')
df_users = pd.read_pickle('./pickle_dataframes/users.pkl')

### Optionally filter only active users

### Preprocess text

In [26]:
# Modify preprocess_text function
def preprocess_text(text, remove_stopwords=False, use_lemmatize=True, use_stemmer=False):
    # Decode HTML entities
    text = html.unescape(text)

    # Remove HTML tags
    text = re.sub(r'<[^>]+>', '', text)

    # Remove non-alphanumeric characters and convert to lowercase
    text = re.sub(r'[^a-zA-Z0-9]', ' ', text.lower())

    words = text.split()
    if remove_stopwords:
        words = [word for word in words if word not in stopwords.words('english')]
    if use_lemmatize:
        lemmatizer = WordNetLemmatizer()
        words = [lemmatizer.lemmatize(word) for word in words]
    elif use_stemmer:  # Apply stemming only if use_stemmer is True
        stemmer = PorterStemmer()
        words = [stemmer.stem(word) for word in words]

    text = ' '.join(words)
    
    return text

In [None]:
# Preprocess Title, Body, and Tags using Dask DataFrames
# Convert the DFs to Dask DataFrames
ddf_comments = dd.from_pandas(df_comments, npartitions=8)  # adjust the number of partitions as needed
ddf_posts = dd.from_pandas(df_posts, npartitions=8)  # adjust the number of partitions as needed

In [None]:
# Preprocess Columns
ddf_comments['Text'] = ddf_comments['Text'].map_partitions(lambda x: x.apply(lambda y: preprocess_text(y, remove_stopwords=True, use_lemmatize=True, use_stemmer=False)))
ddf_comments.to_pickle('df_comments_processed.pkl')

In [None]:
# Preprocess Posts
ddf_posts['Title'] = ddf_posts['Title'].map_partitions(lambda x: x.apply(lambda y: preprocess_text(y, remove_stopwords=True, use_lemmatize=True, use_stemmer=False)))
ddf_posts['Body'] = ddf_posts['Body'].map_partitions(lambda x: x.apply(lambda y: preprocess_text(y, remove_stopwords=True, use_lemmatize=True, use_stemmer=False)))
ddf_posts.to_pickle('df_posts_processed.pkl')

### Estimate memory usage

In [8]:
# Initialize SentimentIntensityAnalyzer once
sia = SentimentIntensityAnalyzer()

# Function to apply sentiment analysis
def analyze_sentiment(text):
    # Check if the text is missing or NaN, return 0.0 in such cases
    if pd.isna(text):
        return 0.0
    # Ensure the text is encoded as a string
    text = str(text)
    return sia.polarity_scores(text)['compound']

In [9]:
# def process_data(df, npartitions=None):
#     start_time = time.time()
#     
#     # If npartitions is not specified, default to 1 (suitable for sequential processing)
#     if npartitions is None:
#         npartitions = 1
# 
#     # Convert DataFrame to Dask DataFrame
#     ddf = dd.from_pandas(df, npartitions=npartitions)
#     # Apply sentiment analysis
#     ddf['sentiment'] = ddf['Body'].map(analyze_sentiment, meta=('Body', 'float64'))
# 
#     # Compute result and monitor memory usage
#     result = ddf.compute()
#     memory_usage = psutil.virtual_memory()
#     
#     end_time = time.time()
#     return result, end_time - start_time, memory_usage.used

# Sequential processing (no parallelism)
# print("Running sequentially...")
# seq_result, seq_time, _ = process_data(df_posts)
# print(f"Sequential processing time: {seq_time} seconds")
# 
# # Parallel processing with multiple cores
# core_counts = [2, 4, 6, 7, 8, 10]
# for cores in core_counts:
#     print(f"Running with {cores} cores...")
#     with Client(n_workers=cores, threads_per_worker=2) as client:  # Adjust threads_per_worker as needed
#         _, parallel_time, mem_usage = process_data(df_posts, npartitions=cores)
#         efficiency = seq_time / (cores * parallel_time)
#         print(f"Time with {cores} cores: {parallel_time} seconds, Efficiency: {efficiency}, Memory used: {mem_usage} bytes")

### Sentiment Analysis Time

In [10]:
# Convert pandas DataFrame to Dask DataFrame
df_comments_dask = dd.from_pandas(df_comments, npartitions=8)  # Adjust npartitions based on memory usage results
df_posts_dask = dd.from_pandas(df_posts, npartitions=8)  # Adjust npartitions based on memory usage results

# Apply sentiment analysis
df_comments_dask['sentiment'] = df_comments_dask['Text'].map(analyze_sentiment)
df_posts_dask['body_sentiment'] = df_posts_dask['Body'].map(analyze_sentiment)
df_posts_dask['title_sentiment'] = df_posts_dask['Title'].map(analyze_sentiment)

# Compute the results with progress bar
with ProgressBar():
    df_comments_result = ddf_comments.compute()
    df_posts_result = ddf_posts.compute()

df_comments_result.to_pickle('df_comments_result.pkl')
df_posts_result.to_pickle('df_posts_result.pkl')



[########################################] | 100% Completed | 43.63 ss
[########################################] | 100% Completed | 12.50 ss


### Save Results