In [1]:
from dask.diagnostics import ProgressBar
from dask.distributed import Client
from nltk.corpus import stopwords
from nltk.sentiment import SentimentIntensityAnalyzer
from nltk.stem import WordNetLemmatizer, PorterStemmer

import dask.dataframe as dd
import html
import psutil
import pandas as pd
import pickle as pkl
import re
import time

In [2]:
# Read the data
df_comments = pd.read_pickle('./pickle_dataframes/comments_typecasted.pkl')
df_posts = pd.read_pickle('./pickle_dataframes/posts_typecasted.pkl')

df_postlinks = pd.read_pickle('./pickle_dataframes/post_links_typecasted.pkl')
df_tags = pd.read_pickle('./pickle_dataframes/tags_typecasted.pkl')
df_users = pd.read_pickle('./pickle_dataframes/users_typecasted.pkl')

### Filter active users (>50 posts/comments)

In [3]:
# Filtering posts to include only questions
questions_df = df_posts[df_posts['PostTypeId'] == 1]

In [4]:
# Removing entries with -1 in UserId and OwnerUserId columns
df_comments = df_comments[df_comments['UserId'] != -1]
df_posts = df_posts[df_posts['OwnerUserId'] != -1]

In [None]:
# Calculating post and comment counts for each user
user_posts_count = df_posts.groupby('OwnerUserId').size().rename('PostCount')
user_comments_count = df_comments.groupby('UserId').size().rename('CommentCount')

In [None]:
# Merging counts with user data and filling missing values
user_data = df_users.merge(user_posts_count, left_on='Id', right_index=True, how='left') \
                    .merge(user_comments_count, left_on='Id', right_index=True, how='left') \
                    .fillna({'PostCount': 0, 'CommentCount': 0})

In [None]:
# Adding a column for total activity and filtering for active users
active_users = user_data.assign(TotalActivity=lambda x: x['PostCount'] + x['CommentCount'])
active_users = active_users[active_users['TotalActivity'] > 50]

In [None]:
# Creating a set of active user IDs
active_user_ids = set(active_users['Id'])

# Filtering dataframes for active user activity
filtered_questions_df = questions_df[questions_df['OwnerUserId'].isin(active_user_ids)]

# Combining filters for comments related to active users
active_user_post_ids = set(df_posts[df_posts['OwnerUserId'].isin(active_user_ids)]['Id'])

# Filtering for answers by active users
active_user_answers = df_posts[(df_posts['PostTypeId'] == 2) & (df_posts['OwnerUserId'].isin(active_user_ids))]

# Getting ParentIds of answers by active users
active_user_answer_parent_ids = set(active_user_answers['ParentId'])

In [None]:
# Adjusting the logic for filtered comments
filtered_comments = df_comments[(df_comments['UserId'].isin(active_user_ids)) |
                                (df_comments['PostId'].isin(active_user_post_ids)) |
                                (df_comments['PostId'].isin(active_user_answer_parent_ids))].drop_duplicates()


In [None]:
# Printing sizes for comparison
print("Questions DataFrame size:", questions_df.size)
print("Filtered Questions DataFrame size:", filtered_questions_df.size, '\n')

print("Comments DataFrame size:", df_comments.size)
print("Filtered Comments DataFrame size:", filtered_comments.size, '\n')

### Preprocess text

In [11]:
# Modify preprocess_text function
def preprocess_text(text, remove_stopwords=False, use_lemmatize=True, use_stemmer=False):
    # Handle None or non-string inputs
    if not isinstance(text, str):
        return ""
        
    # Decode HTML entities
    text = html.unescape(text)

    # Remove HTML tags
    text = re.sub(r'<[^>]+>', '', text)

    # Remove non-alphanumeric characters and convert to lowercase
    text = re.sub(r'[^a-zA-Z0-9]', ' ', text.lower())

    words = text.split()
    if remove_stopwords:
        words = [word for word in words if word not in stopwords.words('english')]
    if use_lemmatize:
        lemmatizer = WordNetLemmatizer()
        words = [lemmatizer.lemmatize(word) for word in words]
    elif use_stemmer:  # Apply stemming only if use_stemmer is True
        stemmer = PorterStemmer()
        words = [stemmer.stem(word) for word in words]

    text = ' '.join(words)
    
    return text

In [12]:
# Preprocess Title, Body, and Tags using Dask DataFrames
# Convert the DFs to Dask DataFrames
ddf_comments = dd.from_pandas(filtered_questions_df, npartitions=8)
ddf_posts = dd.from_pandas(filtered_questions_df, npartitions=8)

In [14]:
# Preprocess Columns
ddf_comments['Text_Processed'] = ddf_comments['Text'].map_partitions(lambda x: x.apply(lambda y: preprocess_text(y, remove_stopwords=True, use_lemmatize=True, use_stemmer=False)))

# Convert back to pandas DataFrame and save as pickle
df_comments_processed = ddf_comments.compute()
df_comments_processed.to_pickle('df_comments_processed.pkl')

In [18]:
# Preprocess Posts
ddf_posts['Title_Processed'] = ddf_posts['Title'].map_partitions(lambda x: x.apply(lambda y: preprocess_text(y, remove_stopwords=True, use_lemmatize=True, use_stemmer=False)))
ddf_posts['Body_Processed'] = ddf_posts['Body'].map_partitions(lambda x: x.apply(lambda y: preprocess_text(y, remove_stopwords=True, use_lemmatize=True, use_stemmer=False)))

In [19]:
# Convert back to pandas DataFrame and save as pickle
df_posts_processed = ddf_posts.compute()
df_posts_processed.to_pickle('df_posts_processed.pkl')

### Estimate memory usage

In [20]:
# Initialize SentimentIntensityAnalyzer once
sia = SentimentIntensityAnalyzer()

# Function to apply sentiment analysis
def analyze_sentiment(text):
    # Check if the text is missing or NaN, return 0.0 in such cases
    if pd.isna(text):
        return 0.0
    # Ensure the text is encoded as a string
    text = str(text)
    return sia.polarity_scores(text)['compound']

In [21]:
# def process_data(df, npartitions=None):
#     start_time = time.time()
#     
#     # If npartitions is not specified, default to 1 (suitable for sequential processing)
#     if npartitions is None:
#         npartitions = 1
# 
#     # Convert DataFrame to Dask DataFrame
#     ddf = dd.from_pandas(df, npartitions=npartitions)
#     # Apply sentiment analysis
#     ddf['sentiment'] = ddf['Body'].map(analyze_sentiment, meta=('Body', 'float64'))
# 
#     # Compute result and monitor memory usage
#     result = ddf.compute()
#     memory_usage = psutil.virtual_memory()
#     
#     end_time = time.time()
#     return result, end_time - start_time, memory_usage.used

# Sequential processing (no parallelism)
# print("Running sequentially...")
# seq_result, seq_time, _ = process_data(df_posts)
# print(f"Sequential processing time: {seq_time} seconds")
# 
# # Parallel processing with multiple cores
# core_counts = [2, 4, 6, 7, 8, 10]
# for cores in core_counts:
#     print(f"Running with {cores} cores...")
#     with Client(n_workers=cores, threads_per_worker=2) as client:  # Adjust threads_per_worker as needed
#         _, parallel_time, mem_usage = process_data(df_posts, npartitions=cores)
#         efficiency = seq_time / (cores * parallel_time)
#         print(f"Time with {cores} cores: {parallel_time} seconds, Efficiency: {efficiency}, Memory used: {mem_usage} bytes")

### Sentiment Analysis Time

In [22]:
# Convert pandas DataFrame to Dask DataFrame
df_comments_dask = dd.from_pandas(df_comments_processed, npartitions=8)  # Adjust npartitions based on memory usage results
df_posts_dask = dd.from_pandas(df_posts_processed, npartitions=8)  # Adjust npartitions based on memory usage results

# Apply sentiment analysis
df_comments_dask['Text_sentiment'] = df_comments_dask['Text'].map(analyze_sentiment)

In [23]:
df_posts_dask['body_sentiment'] = df_posts_dask['Body'].map(analyze_sentiment)

In [24]:
df_posts_dask['title_sentiment'] = df_posts_dask['Title'].map(analyze_sentiment)

In [25]:
# Compute the results with progress bar
with ProgressBar():
    df_comments_result = ddf_comments.compute()

[########################################] | 100% Completed | 27m 22s


In [26]:
with ProgressBar():
    df_posts_result = ddf_posts.compute()

[########################################] | 100% Completed | 21m 39s


In [27]:
df_comments_result.to_pickle('df_comments_result.pkl')
df_posts_result.to_pickle('df_posts_result.pkl')

### Save Results