# NLP

In [2]:
# Setup - Run only once per Kernel App
%conda install https://anaconda.org/conda-forge/openjdk/11.0.1/download/linux-64/openjdk-11.0.1-hacce0ff_1021.tar.bz2

# install PySpark
!pip install sagemaker_pyspark
%pip install pyspark==3.4.0

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

Retrieving notices: ...working... done

Downloading and Extracting Packages:
                                                                                

Downloading and Extracting Packages:

Preparing transaction: done
Verifying transaction: done
Executing transaction: done

Note: you may need to restart the kernel to use updated packages.
Collecting sagemaker_pyspark
  Using cached sagemaker_pyspark-1.4.5-py3-none-any.whl
Collecting pyspark==3.3.0 (from sagemaker_pyspark)
  Using cached pyspark-3.3.0-py2.py3-none-any.whl
Collecting py4j==0.10.9.5 (from pyspark==3.3.0->sagemaker_pyspark)
  Using cached py4j-0.10.9.5-py2.py3-none-any.whl.metadata (1.5 kB)
Using cached py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
Installing collected packages: py4j, pyspark, sagemaker_pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0 sagemaker_pyspark-1.4.5
[0mCollecting pyspark==3.4.0
  Using cached pyspark-3.4.0-py2.py3-none-any.whl
Collecting py4j==0.10.9.7 (from pyspark==3.4.0)
  Using

In [3]:
import sagemaker
sess = sagemaker.Session()
bucket = sess.default_bucket() 
print(f"the default SageMaker region specific bucket for this account is {bucket}")

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
the default SageMaker region specific bucket for this account is sagemaker-us-east-1-165729782536


In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder \
    .appName("sagemaker-spark") \
    .master("local[*]") \
    .config("spark.driver.memory", "8G") \
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.ContainerCredentialsProvider") \
    .getOrCreate()

print(spark.version)

:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f449963c-4c35-4ced-af4e-14203e0e4c85;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;5.1.3 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.828 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlombok#lombok;1.16.8 in central
	found com.google.cloud#google-cloud-storage;2.20.1 in central
	found com.google.guava#guava;31.1-jre in c

3.4.0


In [6]:
import sagemaker
session = sagemaker.Session()
bucket = session.default_bucket()

output_prefix_data_submissions = "project/submissions/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_submissions}"
print(f"reading submissions from {s3_path}")

posts = spark.read.parquet(s3_path, header=True)
posts = posts.filter(
    (posts.subreddit == "news") | (posts.subreddit == "worldnews")
)


reading submissions from s3a://sagemaker-us-east-1-165729782536/project/submissions/yyyy=*


24/05/01 16:50:44 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
24/05/01 16:50:52 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [7]:
#reading comments
output_prefix_data_comments = "project/comments/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_comments}"
comments = spark.read.parquet(s3_path, header=True)
comments = comments.filter(
    (comments.subreddit == "news") | (comments.subreddit == "worldnews")
)

In [8]:
#import packages
import pyspark.sql.functions as f
from sklearn.decomposition import LatentDirichletAllocation
from sklearn.feature_extraction.text import TfidfVectorizer
import nltk
import pandas as pd

## Adding variables to comments

In [9]:
## Clean the comments body content
# define a function
def clean_text(df):
    # Lowercase all text
    df = df.withColumn("body", f.lower(f.col("body")))
    # Remove special characters (keeping only alphanumeric and spaces)
    df = df.withColumn("body", f.regexp_replace(f.col("body"), "[^a-zA-Z0-9\\s]", ""))
    # Trim spaces
    df = df.withColumn("body", f.trim(f.col("body")))
    return df

# Apply the cleaning function
comments = clean_text(comments)

In [10]:
# Beside correct typing, also adding the potential typos
comments = comments.withColumn('misinfo_class', 
                    f.when(comments.body.rlike(r'fake news|bullshit|misinfo|clickbait|unreliable|propoganda|propaganda|fraud|deceptive|fabricated|deep state|wake up|truth about'), True)\
                    .otherwise(False))

## Part 1 LDA

### run topic modeling

In [None]:
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml import Pipeline
from nltk.corpus import stopwords
from pyspark.ml.feature import CountVectorizer , IDF
from pyspark.ml.clustering import LDA
import pyspark.sql.functions as f
from pyspark.sql.types import StringType, ArrayType, FloatType
from itertools import chain

In [None]:
#create small df to use for LDA
small_df = posts.select('title', 'id')

In [None]:
#create tokenizer
tokenizer = Tokenizer(outputCol="words")
tokenizer.setInputCol("title")

In [None]:
#remove stop words 
StopWords = stopwords.words("english")
#removing stop words in other languages and other common words
additional = ['@reuters:', '–' '&amp;', '@ap:', 'rt', ':', 'از', 'آهنگ', 'دانلود', 'در', 'به', 'جدید', '@apentertainment:',
             '|', 'के', 'में', 'و', 'في', 'من', '@bbcworld:', 'de', 'la', 'di', 'की', 'से', 'bio', 'many','know', 'age', 'says', 'one',
             'net', 'user]', '[deleted', 'look', '–']
StopWords = StopWords + additional
remover = StopWordsRemover(stopWords=StopWords)
remover.setInputCol("words")
remover.setOutputCol("filtered")

In [None]:
#count vectorizer
cv = CountVectorizer(inputCol="filtered", outputCol="raw_features", vocabSize=5000, minDF=25)
# IDF
idf = IDF(inputCol="raw_features", outputCol="features")

In [None]:
#lda model 
lda = LDA()
lda.setK(8)
lda.setMaxIter(10)
lda.setSeed(13)

In [None]:
pipeline = Pipeline(stages=[tokenizer, remover, cv, idf, lda])

In [None]:
model = pipeline.fit(small_df)

In [None]:
topics = model.stages[-1].describeTopics()
terms = model.stages[-3].vocabulary

In [None]:
#get word from index of term 
def indices_to_terms(indices, terms=terms):
        terms_subset = [terms[index] for index in indices]
        return terms_subset
# Defining Spark UDF from above function
udf_indices_to_terms = f.udf(indices_to_terms, ArrayType(StringType()))

topics = (
    topics
       .withColumn("terms", udf_indices_to_terms(f.col("termIndices")))
    )

In [None]:
#topics.take(20)

In [None]:
#naming topics 
topic_dict = {0: 'economics/russia&ukraine', 1: 'presidental news', 2: 'supreme court/law', 3: 'global politics', 4: 'us politics', 
              5: 'covid/russia&ukraine', 6: 'crime/protest', 7: 'tv shows'}
              

In [None]:
small_df.cache()

In [None]:
small_transform = model.transform(small_df)

In [None]:
small_df.unpersist()


In [None]:
#map to topics
mapping_expr = f.create_map([f.lit(x) for x in chain(*topic_dict.items())])

In [None]:
#udf to get the top topic 
max_topic = f.udf(lambda v:float(v.argmax()),FloatType())
#using mao and udf to create a topic column
topic = small_transform.withColumn('topic_num', max_topic("topicDistribution"))\
.withColumn("topic", mapping_expr[f.col("topic_num")]).select('id','topic')

In [None]:
mini_posts = posts.select('created_utc', 'title', 'id')

In [None]:
#merging relevant columns wuth the topic column
merged_df = mini_posts.join(topic, 'id')

In [None]:
merged_df.prinSchema()

### read in saved topic

In [11]:
output_prefix_data_submissions = "project/clean_topic_data.parquet"
s3_path = f"s3a://sagemaker-us-east-1-562166416351/{output_prefix_data_submissions}"
topic_df = spark.read.parquet(s3_path, header=True)
topic_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- title: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- comment_created: timestamp (nullable = true)
 |-- body: string (nullable = true)
 |-- misinfo_class: boolean (nullable = true)
 |-- comment_id: string (nullable = true)



In [12]:


# Rename 'created_utc' to 'post_created'
topic_df = topic_df.withColumnRenamed('created_utc', 'post_created')
topic_df = topic_df.drop('body', 'post_created', 'misinfo_class', 'comment_created')

# Show the updated schema to confirm changes
topic_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- comment_id: string (nullable = true)



In [13]:
#renaming columns and removing the t3_ from the link id to get the post id on the comment
mini_comments = comments.select('created_utc','body','misinfo_class', 'link_id', 'id')\
.withColumn('comment_created', f.col('created_utc')).withColumn('comment_id', f.col('id'))\
.withColumn('id', f.regexp_extract('link_id', 't3_(.*)$', 1))
mini_comments.printSchema()

root
 |-- created_utc: timestamp (nullable = true)
 |-- body: string (nullable = true)
 |-- misinfo_class: boolean (nullable = false)
 |-- link_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- comment_created: timestamp (nullable = true)
 |-- comment_id: string (nullable = true)



In [14]:
#merging dataframes
total_df = topic_df.join(mini_comments, ['comment_id', 'id'])

In [15]:
total_df.printSchema()

root
 |-- comment_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- body: string (nullable = true)
 |-- misinfo_class: boolean (nullable = false)
 |-- link_id: string (nullable = true)
 |-- comment_created: timestamp (nullable = true)



### Counting by topic

In [None]:
topic_misinfo_counts = total_df.filter(f.col('misinfo_class') == True).groupBy('topic').count().toPandas()

In [None]:
topic_misinfo_counts.sum()

In [None]:
topic_misinfo_counts.to_csv('../data/csv/topic_misinfo_true_count.csv', index = False)

In [None]:
topic_counts = total_df.groupBy('topic').count().toPandas()

In [None]:
topic_counts

In [None]:
topic_counts.to_csv('../data/csv/topic_counts.csv', index = False)

In [None]:
topic_misinfo_total = total_df.groupBy(['topic', 'misinfo_class']).count().toPandas()

In [None]:
topic_misinfo_total

In [None]:
topic_misinfo_total.to_csv('../data/csv/topic_misinfo_total.csv', index = False)

## Part2 Sentiment Analysis

In [17]:
!pip install vaderSentiment textblob
# reference for VADER:
# https://medium.com/@tom.bailey.courses/sentiment-analysis-in-snowflake-using-python-31d7296abe1a
# https://github.com/cjhutto/vaderSentiment

Collecting vaderSentiment
  Using cached vaderSentiment-3.3.2-py2.py3-none-any.whl.metadata (572 bytes)
Collecting textblob
  Using cached textblob-0.18.0.post0-py3-none-any.whl.metadata (4.5 kB)
Using cached vaderSentiment-3.3.2-py2.py3-none-any.whl (125 kB)
Using cached textblob-0.18.0.post0-py3-none-any.whl (626 kB)
Installing collected packages: vaderSentiment, textblob
Successfully installed textblob-0.18.0.post0 vaderSentiment-3.3.2
[0m

In [18]:
total_df.printSchema()

root
 |-- comment_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- body: string (nullable = true)
 |-- misinfo_class: boolean (nullable = false)
 |-- link_id: string (nullable = true)
 |-- comment_created: timestamp (nullable = true)



### VADER and TextBlob

In [19]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import pyspark.sql.types as T
from textblob import TextBlob


In [20]:
# Use two libraries comparing the sentiment result
def vader_sentiment(text):
    analyzer = SentimentIntensityAnalyzer()
    vader_score = analyzer.polarity_scores(text)
    return vader_score['compound']  


def textblob_sentiment(text):
    return TextBlob(text).sentiment.polarity

In [21]:
# UDF
vader_udf = f.udf(vader_sentiment, T.FloatType())
textblob_udf = f.udf(textblob_sentiment, T.FloatType())

In [22]:
# Add vader score to total df
total_df = total_df.withColumn("vader_score", vader_udf(f.col("body")))

# Add textblob score to total df
total_df = total_df.withColumn("textblob_score", textblob_udf(f.col("body")))


### output the result

In [None]:
total_df.printSchema()

In [None]:
# Add a year column
total_df = total_df.withColumn("year", f.year("comment_created"))

In [None]:
# Get distinct years
year_list = [2023, 2021] 

In [None]:
def save_par

In [None]:
for y in year_list:
    # Filter the DataFrame for one year
    df_year = total_df.filter(total_df.year == y)
    print(f"--- start saving {y} ------")

    # Define the output path for this particular year
    output_path = f"s3a://{bucket}/project/output/year={y}"

    # Write the filtered DataFrame to Parquet
    df_year.drop("year").write.mode('overwrite').parquet(output_path)
    print(f"{y}'s data now saved in {output_path} ------")


In [None]:
total_df.count()

In [None]:
comments.count()

In [None]:
output_path = f"s3a://{bucket}/project/output/total.parquet"

# Write the DataFrame to Parquet on S3
total_df.write.mode('overwrite').parquet(output_path)
# total_df.save(path=output_path, source='parquet', mode='overwrite')

### Comparison

In [None]:
total_df.cache()

In [None]:
# display a sample to manually inspect differences
total_df.select("body", "vader_score", "textblob_score").show()


In [None]:
total_df.unpersist()

In [None]:
total_df.describe(['vader_score', 'textblob_score']).show()


In [None]:
# compute correlation between the scores
total_df.stat.corr("sentiment_score", "textblob_score")


### Top Topics that people comment they are fake news

In [None]:
misinfo_comments = total_df.filter((f.col("vader_score") < 0) & (f.col("misinfo_class") == True))

In [None]:
misinfo_comments.cache()

In [None]:
# aggregating misinformation comments by topic
misinfo_comments_count = misinfo_comments.groupBy("vader_score").count().toPandas()


In [None]:
# save to csv file
misinfo_comments_count.to_csv('../data/csv/misinfo_comments_count.csv', index = False)

In [None]:
misinfo_comments.printSchema()

In [None]:
# Topics comment counts and percentage of vadar score <0

misinfo_comments = misinfo_comments.withColumn("vader_neg", (f.col("vader_score") < 0).cast("int"))

# group by the 'topic' to calculate the total comments
neg_comments_count = misinfo_comments.groupBy("topic").agg(f.count("comment_id").alias("total_comments"),
                                                           f.sum("vader_neg").alias("negative_vader_count")  
    )

# Calculate the percentage of negative VADER scores
neg_comments_count = neg_comments_count.withColumn("percentage_neg_vader", 
                           (f.col("negative_vader_count") / f.col("total_comments")) * 100)

neg_comments_count.show()

In [None]:
misinfo_comments.cache()

In [None]:
from pyspark.sql import functions as f

# Adjusting the condition to check for VADER score less than -0.8 and renaming the column
misinfo_comments = misinfo_comments.withColumn("vader_below_neg_0_8", (f.col("vader_score") < -0.8).cast("int"))

# Group by the 'topic' to calculate the total comments and the sum of negative comments with the new column name
neg_comments_count = misinfo_comments.groupBy("topic").agg(
    f.count("comment_id").alias("total_comments"),
    f.sum("vader_below_neg_0_8").alias("count_below_neg_0_8")  
)

# Calculate the percentage of negative VADER scores and rename the percentage column appropriately
neg_comments_count = neg_comments_count.withColumn("percentage_below_neg_0_8", 
                           (f.col("count_below_neg_0_8") / f.col("total_comments")) * 100)

# Display the results
neg_comments_count.show()


In [None]:
# save to csv file
neg_comments_count_pd = neg_comments_count.toPandas()
neg_comments_count_pd.to_csv('../data/csv/neg_comments_count.csv', header=True, index=False)


In [None]:
misinfo_comments.unpersist()

## Part 3

#### read data

In [None]:
# Define the path to the CSV file
s3_path = f"s3a://{bucket}/project/covid_data/epidemiology.csv"

# Read the CSV file
epi_data = spark.read.csv(s3_path, header=True, inferSchema=True)

# Show the DataFrame to verify the content
epi_data.show()


In [None]:
file = "google-search-trends.csv"

s3_path = f"s3a://{bucket}/project/covid_data/{file}"

# Read the CSV file
search_data = spark.read.csv(s3_path, header=True, inferSchema=True)

# Show the DataFrame to verify the content
search_data.show()
search_data.printSchema()

In [None]:
#Global_vaccination_search_insights.csv

file = "Global_vaccination_search_insights.csv"

s3_path = f"s3a://{bucket}/project/covid_data/{file}"

# Read the CSV fil
vaccine_search = spark.read.csv(s3_path, header=True, inferSchema=True)

# Show the DataFrame to verify the content
# vaccine_data.show()
vaccine_search.printSchema()

In [None]:
# Define the path to the CSV file
s3_path = f"s3a://{bucket}/project/covid_data/vaccinations.csv"

# Read the CSV file
vac_data = spark.read.csv(s3_path, header=True, inferSchema=True)

# Show the DataFrame to verify the content
vac_data.printSchema()


#### functions

In [23]:
def add_time_columns(df, date_col='date'):

    return df.withColumn("year", f.year(date_col)) \
             .withColumn("month", f.month(date_col)) \
             .withColumn("week", f.weekofyear(date_col))

In [24]:
def group_by_weekly(df, cols):

    # Prepare aggregation expressions
    aggregations = {col: "sum" for col in cols}
    

    return df.groupBy("year", "week").agg(
        *(f.sum(c).alias(f"sum_{c}") for c in cols)
    ).orderBy("year", "week")

In [25]:
def group_by_monthly(df, cols):

    aggregations = {col: "sum" for col in cols}
    
    # Group by year and month, then aggregate based on the provided columns
    return df.groupBy("year", "month").agg(
        *(f.sum(c).alias(f"sum_{c}") for c in cols)
    ).orderBy("year", "month")


In [26]:
def write_df_to_csv(df, fname):
    
    output_path = f'../data/csv/{fname}.csv'
    pandas_df = df.toPandas()
    pandas_df.to_csv(output_path, index=False)

#### preprocess

In [None]:
# Extract the month from the date and group by it
epi_data = add_time_columns(epi_data)

epi_agg_cols =  ["new_confirmed", "new_deceased"]
monthly_epi_global = group_by_monthly(epi_data, epi_agg_cols)
weekly_epi_global = group_by_weekly(epi_data, epi_agg_cols)


In [None]:
# Extract the month from the date and group by it
vac_data = add_time_columns(vac_data)

vac_agg_cols =  ["new_persons_vaccinated", "new_persons_fully_vaccinated", "new_vaccine_doses_administered"]
monthly_vac_global = group_by_monthly(vac_data, vac_agg_cols)
weekly_vac_global = group_by_weekly(vac_data, vac_agg_cols)


In [None]:
write_df_to_csv(weekly_vac_global, "weekly_vac_global")
write_df_to_csv(monthly_vac_global, "monthly_vac_global")
write_df_to_csv(weekly_epi_global, "weekly_epi_global")
write_df_to_csv(monthly_epi_global, "monthly_epi_global")

In [None]:
weekly_covid = weekly_epi_global.join(weekly_vac_global, ["year", "week"], 'outer')
monthly_covid = monthly_epi_global.join(monthly_vac_global, ["year", "month"], 'outer')
weekly_covid.show(5)

In [None]:
write_df_to_csv(weekly_covid, "weekly_covid")
write_df_to_csv(monthly_covid, "monthly_covid")

#### agg total

In [27]:
total_df.cache()

total_df = total_df.withColumnRenamed("created_utc", "time_code")

total_df.printSchema()

root
 |-- comment_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- time_code: timestamp (nullable = true)
 |-- body: string (nullable = true)
 |-- misinfo_class: boolean (nullable = false)
 |-- link_id: string (nullable = true)
 |-- comment_created: timestamp (nullable = true)
 |-- vader_score: float (nullable = true)
 |-- textblob_score: float (nullable = true)



In [28]:
total_df = total_df.drop("link_id")
total_df = add_time_columns(total_df, "time_code")


In [29]:
total_df = total_df.withColumn(
    "year_month",
    f.concat(
        f.col("year").cast("string"), f.lit("/"), f.format_string("%02d", "month")
    )
)

# Create a new column combining 'year' and 'week'
total_df = total_df.withColumn(
    "year_week",
    f.concat(
        f.col("year").cast("string"), f.lit("/"), f.format_string("%02d", "week")
    )
)

total_df.printSchema()

root
 |-- comment_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- time_code: timestamp (nullable = true)
 |-- body: string (nullable = true)
 |-- misinfo_class: boolean (nullable = false)
 |-- comment_created: timestamp (nullable = true)
 |-- vader_score: float (nullable = true)
 |-- textblob_score: float (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- year_month: string (nullable = true)
 |-- year_week: string (nullable = true)



In [30]:
total_df = total_df.repartition("year_month")
total_df.cache()

DataFrame[comment_id: string, id: string, title: string, topic: string, time_code: timestamp, body: string, misinfo_class: boolean, comment_created: timestamp, vader_score: float, textblob_score: float, year: int, month: int, week: int, year_month: string, year_week: string]

In [None]:
monthly_misinfo_counts = total_df.filter(f.col('misinfo_class') == True).groupBy('year_month').count().toPandas()
monthly_misinfo_counts.to_csv('../data/csv/monthly_misinfo_counts.csv', index = False)

In [None]:
weekly_misinfo_counts = total_df.filter(f.col('misinfo_class') == True).groupBy('year_week').count().toPandas()
weekly_misinfo_counts.to_csv('../data/csv/weekly_misinfo_counts.csv', index = False)

In [None]:
weekly_summary = total_df.groupBy("year_week").agg(
    f.sum("vader_score").alias("sum_vader_score"),
    f.sum("textblob_score").alias("sum_textblob_score"),
    f.countDistinct("id").alias("posts_count"),
    f.countDistinct("comment_id").alias("comments_count"),
    f.sum(f.when(f.col("misinfo_class"), 1).otherwise(0)).alias("misinfo_count")
).toPandas()
# weekly_summary.show(5)
weekly_summary.to_csv('../data/csv/weekly_summary.csv', index = False)

In [None]:
# monthly_posts_counts = total_df.groupBy('year_month').agg(f.countDistinct("comment_id"))
monthly_posts_counts = total_df.groupBy('year_month').agg(f.countDistinct("comment_id"))
print("start output")
monthly_posts_counts = monthly_posts_counts.toPandas()
# .toPandas()
# monthly_posts_counts.to_csv('../data/csv/monthly_posts_counts.csv', index = False)

start output


[Stage 5:>                                                        (0 + 4) / 200]

In [None]:
monthly_counts = total_df.groupBy('year_month').count().toPandas()


In [None]:
monthly_misinfo_counts.head()

In [None]:
monthly_counts.rename(columns={'count': 'total_posts'}, inplace=True)
monthly_misinfo_counts.rename(columns={'count': 'misinfo_posts'}, inplace=True)
monthly_df = pd.merge(monthly_counts, monthly_misinfo_counts, on='year_month', how='outer')
monthly_df.head()

In [None]:
weekly_counts = total_df.groupBy('year_week').count().toPandas()
weekly_counts.to_csv('../data/csv/weekly_counts.csv', index = False)

In [None]:
# weekly_posts = total_df.groupBy("year", "week").agg(f.countDistinct("id")).toPandas()
weekly_misinfo = total_df.filter(f.col('misinfo_class') == True).groupBy("year", "week").count().toPandas()
weekly_misinfo

In [None]:
monthly_misinfo = total_df.filter(f.col('misinfo_class') == True).groupBy("year", "month").count().toPandas()
monthly_misinfo

In [None]:
weekly_comments = total_df.groupBy("year", "week").agg(f.countDistinct("comment_id")).toPandas()
weekly_comments

In [None]:
weekly_summary.cache()
weekly_summary.count()

In [None]:
weekly_summary.coalesce(1).write.csv(path=f'../data/csv/weekly_summary', mode='overwrite', header=True)

In [None]:
# Repartition the DataFrame to a reasonable number before writing
repartitioned_df = weekly_summary.repartition(10)  # Adjust the number of partitions based on your cluster size and data volume

# Then write to CSV
repartitioned_df.write.csv(path="path/to/output/directory", mode='overwrite', header=True)


In [None]:
weekly_summary_df = weekly_summary.toPandas()

In [None]:
monthly_summary = total_df.groupBy("year", "month").agg(
    # f.sum("vader_score").alias("sum_vader_score"),
    # f.sum("textblob_score").alias("sum_textblob_score"),
    f.countDistinct("id").alias("posts_count"),
    f.countDistinct("comment_id").alias("comments_count"),
    f.sum(f.when(f.col("misinfo_class"), 1).otherwise(0)).alias("misinfo_count")
)
# weekly_summary.show(5)


In [None]:
monthly_summary.cache()


In [None]:
monthly_summary.count()

In [None]:
# Apply the same for the misinfo_class=True subset
misinfo_weekly = total_df.filter("misinfo_class = True").groupBy("year", "week").agg(
    f.sum("vader_score").alias("sum_vader_score"),
    f.sum("textblob_score").alias("sum_textblob_score"),
    f.countDistinct("id").alias("posts_count"),
    f.countDistinct("comment_id").alias("comments_count")
)

# Show the results

misinfo_weekly.show(5)
write_df_to_csv(misinfo_weekly, "misinfo_weekly")


In [None]:
monthly_summary = total_df.groupBy("year", "month").agg(
    f.sum("vader_score").alias("sum_vader_score"),
    f.sum("textblob_score").alias("sum_textblob_score"),
    f.countDistinct("id").alias("posts_count"),
    f.countDistinct("comment_id").alias("comments_count"),
    f.sum(f.when(f.col("misinfo_class"), 1).otherwise(0)).alias("misinfo_count")
)

# Apply the same for the misinfo_class=True subset
misinfo_monthly = total_df.filter("misinfo_class = True").groupBy("year", "month").agg(
    f.sum("vader_score").alias("sum_vader_score"),
    f.sum("textblob_score").alias("sum_textblob_score"),
    f.countDistinct("id").alias("posts_count"),
    f.countDistinct("comment_id").alias("comments_count")
)

# Show the results
monthly_summary.show(5)
misinfo_monthly.show(5)

In [None]:
write_df_to_csv(monthly_summary, "monthly_summary")


write_df_to_csv(misinfo_monthly, "misinfo_monthly")
