In [1]:
# Create Spark Job
import os

from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame

conf = (
    SparkConf()
    .setAppName("Social_Media_Application")
    .setMaster("spark://172.29.15.3:7077")
    .set("spark.executor.instances", "4")
    .set("spark.executor.cores", "4")
    .set("spark.executor.memory", "16g")
    .set("spark.cores.max", "8")
)

spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext

abs_folder_path = os.path.abspath("./data/crawl_data")

df = spark.read.option("multiLine", True).json(f"file://{abs_folder_path}")
df.show()
print("Total crawl posts: ", df.count())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/22 16:49:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-------------------+-------------+-----+------+---------+-------------------+----------+-------+----------+-----------+--------------------+---------+------------+--------------+-----------+--------------------+------+--------------------+--------------------+------+------------+--------------------+
|             author|  created_utc|downs|    id|is_locked|is_original_content|is_over_18|is_self|is_spoiler|is_stickied|     link_flair_text|     name|num_comments|num_crossposts|num_reports|           permalink| score|            selftext|               title|   ups|upvote_ratio|                 url|
+-------------------+-------------+-----+------+---------+-------------------+----------+-------+----------+-----------+--------------------+---------+------------+--------------+-----------+--------------------+------+--------------------+--------------------+------+------------+--------------------+
|   ScienceModerator|1.521000374E9|    0|84aiy6|    false|              false|     false|  

In [2]:
QUOTES = (
    r"\u0022"  # quotation mark (")
    r"\u0027"  # apostrophe (')
    r"\u00ab"  # left-pointing double-angle quotation mark
    r"\u00bb"  # right-pointing double-angle quotation mark
    r"\u2018"  # left single quotation mark
    r"\u2019"  # right single quotation mark
    r"\u201a"  # single low-9 quotation mark
    r"\u201b"  # single high-reversed-9 quotation mark
    r"\u201c"  # left double quotation mark
    r"\u201d"  # right double quotation mark
    r"\u201e"  # double low-9 quotation mark
    r"\u201f"  # double high-reversed-9 quotation mark
    r"\u2039"  # single left-pointing angle quotation mark
    r"\u203a"  # single right-pointing angle quotation mark
    r"\u300c"  # left corner bracket
    r"\u300d"  # right corner bracket
    r"\u300e"  # left white corner bracket
    r"\u300f"  # right white corner bracket
    r"\u301d"  # reversed double prime quotation mark
    r"\u301e"  # double prime quotation mark
    r"\u301f"  # low double prime quotation mark
    r"\ufe41"  # presentation form for vertical left corner bracket
    r"\ufe42"  # presentation form for vertical right corner bracket
    r"\ufe43"  # presentation form for vertical left corner white bracket
    r"\ufe44"  # presentation form for vertical right corner white bracket
    r"\uff02"  # fullwidth quotation mark
    r"\uff07"  # fullwidth apostrophe
    r"\uff62"  # halfwidth left corner bracket
    r"\uff63"  # halfwidth right corner bracket
)
QUOTES_PATTERN = r"".join(QUOTES)
print(QUOTES_PATTERN)

\u0022\u0027\u00ab\u00bb\u2018\u2019\u201a\u201b\u201c\u201d\u201e\u201f\u2039\u203a\u300c\u300d\u300e\u300f\u301d\u301e\u301f\ufe41\ufe42\ufe43\ufe44\uff02\uff07\uff62\uff63


In [3]:
from pyspark.sql.functions import from_unixtime, regexp_replace, col

# Process Data
process_df = df.dropDuplicates(["id"])
process_df = process_df.withColumn(
    "created_utc", from_unixtime("created_utc", "yyyy-MM-dd HH:mm:ss")
)

# Drop columns that have all Null data
not_null_cols = [
    col
    for col in process_df.columns
    if process_df.select(col).dropna(how="all").count() > 0
]
process_df = process_df.select(not_null_cols)

# Remove Quotation Mark value
# process_df = process_df.withColumn(
#     "title", regexp_replace(col("title"), QUOTES_PATTERN, "")
# )
# process_df = process_df.withColumn(
#     "selftext", regexp_replace(col("selftext"), QUOTES_PATTERN, "")
# )

print("Total Posts after process: ", process_df.count())
print(process_df.select(["title", "selftext", "link_flair_text"]).show(truncate=False))

Total Posts after process:  5532
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+----------------+
|title                                                                                                                                                                                                                                               |selftext|link_flair_text |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+----------------+
|India set an ‘incredibly important precedent’ by banning TikTok, FCC Commissioner says, 'don't see a path forward for anything other than a blanket

In [12]:
import pickle

pandas_df = process_df.toPandas()

with open("data/processed_data/data.pkl", "wb") as file:
    pickle.dump(pandas_df, file)

# Write all available labels
label = (
    process_df.select("link_flair_text")
    .dropDuplicates()
    .orderBy("link_flair_text", ascending=True)
)
label.coalesce(1).write.mode("overwrite").text(
    "file:///home/hduser/Khang/social_media/data/processed_data/label.txt"
)

In [55]:
spark.stop()

In [53]:
df.where(df.id == "1g7mj3n").select("title").show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title                                                                                                                                                                                                                                                        |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Human scientists are still better than AI ones – for now | A simulator for the process of scientific discovery shows that AI models still fall short of human scientists and engineers in coming up with hypotheses and carrying out ex