# DATA PREPROCESSING
##### Required files in the same directory 

# trumpfinal.csv, kamalafinal.csv, data.csv

In [2]:
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName('CSVHeadExample').getOrCreate()
data_df = spark.read.csv('data.csv', header=True, inferSchema=True)
trump2024_df = spark.read.csv('trumpfinal.csv', header=True, inferSchema=True)
print("Head of data.csv:")
data_df.show(5)

print("Head of trump2024.csv:")
trump2024_df.show(5)

print("Schema of data.csv:")
data_df.printSchema()

print("Schema of trump2024.csv:")
trump2024_df.printSchema()

print("Summary of data.csv:")
data_df.describe().show()

print("Summary of trump2024.csv:")
trump2024_df.describe().show()

data_row_count = data_df.count()
trump2024_row_count = trump2024_df.count()

print(f"Number of rows in data.csv: {data_row_count}")
print(f"Number of rows in trumpfinal.csv: {trump2024_row_count}")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/16 23:53:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Head of data.csv:
+-------+--------------------+--------------------+-----+--------------+-------------------+---------+--------------------+-----------+--------------------+
|     id|               title|            selftext|score|comments_count|        created_utc|subreddit|                 url|      topic|            comments|
+-------+--------------------+--------------------+-----+--------------+-------------------+---------+--------------------+-----------+--------------------+
| bfht1b|What I think abou...|                NULL| 3184|           169|2019-04-20 22:16:30| Firearms|https://i.redd.it...|Gun Control|"['Australia has ...|
| o5lhvu|CNN accidentally ...|                NULL| 3024|           408|2021-06-22 12:02:59| Firearms|https://i.imgur.c...|Gun Control|"[""It's almost l...|
| mbviya|     Gun control now|                NULL| 2991|           132|2021-03-24 02:38:34| Firearms|https://i.redd.it...|Gun Control|"['You had me in ...|
| j2kopd|"The only kind of...|          

24/12/16 23:53:36 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+
|summary|                  id|               title|            selftext|               score|      comments_count|       created_utc|           subreddit|                 url|               topic|            comments|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+
|  count|               41298|               25822|               13653|               15628|               13768|             12540|               11702|               11145|               10578|               10237|
|   mean|            Infinity|   960.6665745856354|   622.5316129032258|  2114.2751404151404|  303.96983135540285| 884.908847184

                                                                                

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|summary|          created_at|            tweet_id|               tweet|               likes|       retweet_count|              source|             user_id|           user_name|    user_screen_name|    user_description|      user_join_date|user_followers_count|       user_location|                 lat|                long|                city|             country|           continent|               state|          state_code|        collected_at|
+-------+--------------------+--------------------+--------------------+--------------------+-----

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, unix_timestamp, coalesce, when

# Initialize Spark session
spark = SparkSession.builder \
    .appName("RedditToTwitterConversion") \
    .getOrCreate()


data_df = spark.read.option("header", "true").csv("data.csv")
kamala2024_df = spark.read.option("header", "true").csv("kamalafinal.csv")
data_df = data_df.withColumn("created_at", unix_timestamp(col("created_utc"), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
data_df = data_df.withColumnRenamed("id", "tweet_id")
data_df = data_df.withColumn(
    "tweet", 
    coalesce(col("title"), lit("")) + " " + coalesce(col("selftext"), lit(""))
)

data_df = data_df.withColumn("likes", col("score"))
data_df = data_df.withColumn("retweet_count", col("comments_count"))
data_df = data_df.withColumn("source", lit("Reddit"))


data_df = data_df.withColumn("user_id", lit(None).cast("string"))
data_df = data_df.withColumn("user_name", lit(None).cast("string"))
data_df = data_df.withColumn("user_screen_name", lit(None).cast("string"))
data_df = data_df.withColumn("user_description", lit(None).cast("string"))
data_df = data_df.withColumn("user_join_date", lit(None).cast("string"))
data_df = data_df.withColumn("user_followers_count", lit(None).cast("integer"))
data_df = data_df.withColumn("user_location", lit(None).cast("string"))


data_df = data_df.withColumn("lat", lit(None).cast("double"))
data_df = data_df.withColumn("long", lit(None).cast("double"))
data_df = data_df.withColumn("city", lit("Unknown"))
data_df = data_df.withColumn("country", lit("Unknown"))
data_df = data_df.withColumn("continent", lit("Unknown"))
data_df = data_df.withColumn("state", lit("Unknown"))
data_df = data_df.withColumn("state_code", lit("Unknown"))

data_df = data_df.withColumn("collected_at", unix_timestamp(lit("now")).cast("timestamp"))

final_df = data_df.select(
    "created_at",
    "tweet_id",
    "tweet",
    "likes",
    "retweet_count",
    "source",
    "user_id",
    "user_name",
    "user_screen_name",
    "user_description",
    "user_join_date",
    "user_followers_count",
    "user_location",
    "lat",
    "long",
    "city",
    "country",
    "continent",
    "state",
    "state_code",
    "collected_at"
)

final_df.show(5)

spark.stop()


24/12/16 23:54:03 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-------------------+--------+-----+-----+-------------+------+-------+---------+----------------+----------------+--------------+--------------------+-------------+----+----+-------+-------+---------+-------+----------+------------+
|         created_at|tweet_id|tweet|likes|retweet_count|source|user_id|user_name|user_screen_name|user_description|user_join_date|user_followers_count|user_location| lat|long|   city|country|continent|  state|state_code|collected_at|
+-------------------+--------+-----+-----+-------------+------+-------+---------+----------------+----------------+--------------+--------------------+-------------+----+----+-------+-------+---------+-------+----------+------------+
|2019-04-20 22:16:30|  bfht1b| NULL| 3184|          169|Reddit|   NULL|     NULL|            NULL|            NULL|          NULL|                NULL|         NULL|NULL|NULL|Unknown|Unknown|  Unknown|Unknown|   Unknown|        NULL|
|2021-06-22 12:02:59|  o5lhvu| NULL| 3024|          408|Reddit| 

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, unix_timestamp, coalesce, when

# Initialize Spark session
spark = SparkSession.builder \
    .appName("RedditToTwitterConversion") \
    .getOrCreate()


data_df = spark.read.option("header", "true").csv("data.csv")
trump2024_df = spark.read.option("header", "true").csv("trumpfinal.csv")
data_df = data_df.withColumn("created_at", unix_timestamp(col("created_utc"), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
data_df = data_df.withColumnRenamed("id", "tweet_id")
data_df = data_df.withColumn(
    "tweet", 
    coalesce(col("title"), lit("")) + " " + coalesce(col("selftext"), lit(""))
)

data_df = data_df.withColumn("likes", col("score"))
data_df = data_df.withColumn("retweet_count", col("comments_count"))
data_df = data_df.withColumn("source", lit("Reddit"))


data_df = data_df.withColumn("user_id", lit(None).cast("string"))
data_df = data_df.withColumn("user_name", lit(None).cast("string"))
data_df = data_df.withColumn("user_screen_name", lit(None).cast("string"))
data_df = data_df.withColumn("user_description", lit(None).cast("string"))
data_df = data_df.withColumn("user_join_date", lit(None).cast("string"))
data_df = data_df.withColumn("user_followers_count", lit(None).cast("integer"))
data_df = data_df.withColumn("user_location", lit(None).cast("string"))


data_df = data_df.withColumn("lat", lit(None).cast("double"))
data_df = data_df.withColumn("long", lit(None).cast("double"))
data_df = data_df.withColumn("city", lit("Unknown"))
data_df = data_df.withColumn("country", lit("Unknown"))
data_df = data_df.withColumn("continent", lit("Unknown"))
data_df = data_df.withColumn("state", lit("Unknown"))
data_df = data_df.withColumn("state_code", lit("Unknown"))

data_df = data_df.withColumn("collected_at", unix_timestamp(lit("now")).cast("timestamp"))

final_df = data_df.select(
    "created_at",
    "tweet_id",
    "tweet",
    "likes",
    "retweet_count",
    "source",
    "user_id",
    "user_name",
    "user_screen_name",
    "user_description",
    "user_join_date",
    "user_followers_count",
    "user_location",
    "lat",
    "long",
    "city",
    "country",
    "continent",
    "state",
    "state_code",
    "collected_at"
)

final_df.show(5)

spark.stop()


+-------------------+--------+-----+-----+-------------+------+-------+---------+----------------+----------------+--------------+--------------------+-------------+----+----+-------+-------+---------+-------+----------+------------+
|         created_at|tweet_id|tweet|likes|retweet_count|source|user_id|user_name|user_screen_name|user_description|user_join_date|user_followers_count|user_location| lat|long|   city|country|continent|  state|state_code|collected_at|
+-------------------+--------+-----+-----+-------------+------+-------+---------+----------------+----------------+--------------+--------------------+-------------+----+----+-------+-------+---------+-------+----------+------------+
|2019-04-20 22:16:30|  bfht1b| NULL| 3184|          169|Reddit|   NULL|     NULL|            NULL|            NULL|          NULL|                NULL|         NULL|NULL|NULL|Unknown|Unknown|  Unknown|Unknown|   Unknown|        NULL|
|2021-06-22 12:02:59|  o5lhvu| NULL| 3024|          408|Reddit| 

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName('CSVHeadExample').getOrCreate()

data_df = spark.read.csv('data.csv', header=True, inferSchema=True)
trump2024_df = spark.read.csv('trumpfinal.csv', header=True, inferSchema=True)

data_df_cleaned = data_df.withColumn('score', F.col('score').cast('double'))
data_df_cleaned = data_df_cleaned.withColumn('comments_count', F.col('comments_count').cast('double'))

data_df_cleaned = data_df_cleaned.fillna({'title': 'Unknown', 'selftext': 'Unknown'})

q1 = data_df_cleaned.approxQuantile("score", [0.25], 0.05)[0]
q3 = data_df_cleaned.approxQuantile("score", [0.75], 0.05)[0]
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
data_df_cleaned = data_df_cleaned.filter((data_df_cleaned.score >= lower_bound) & (data_df_cleaned.score <= upper_bound))

q1_comments = data_df_cleaned.approxQuantile("comments_count", [0.25], 0.05)[0]
q3_comments = data_df_cleaned.approxQuantile("comments_count", [0.75], 0.05)[0]
iqr_comments = q3_comments - q1_comments
lower_bound_comments = q1_comments - 1.5 * iqr_comments
upper_bound_comments = q3_comments + 1.5 * iqr_comments
data_df_cleaned = data_df_cleaned.filter((data_df_cleaned.comments_count >= lower_bound_comments) & (data_df_cleaned.comments_count <= upper_bound_comments))

data_df_cleaned.write.csv('preprocessed.csv', header=True, mode ="overwrite")

print(f"Number of rows in the cleaned data: {data_df_cleaned.count()}")


                                                                                

Number of rows in the cleaned data: 5228
