In [0]:
mount_name = "youtube-news-comments"
mount_point = f"/mnt/{mount_name}"

In [0]:
all_items = dbutils.fs.ls(mount_point+"/youtube-comments/")
json_files = [item.path for item in all_items if item.path.endswith(".json")]
new_file = sorted(json_files, reverse=True)[0]

In [0]:
df = spark.read.json(new_file)
df.printSchema()

root
 |-- author: string (nullable = true)
 |-- comment_id: string (nullable = true)
 |-- like_count: long (nullable = true)
 |-- published_at: string (nullable = true)
 |-- text: string (nullable = true)
 |-- video_id: string (nullable = true)



In [0]:
# preprocessing
from pyspark.sql.functions import col, lower

df = df.dropDuplicates()
df = df.filter(col("text").isNotNull() & (col("text") != ""))   # remove nulls
df = df.withColumn("text", lower(col("text")))  # lowercasing

In [0]:
# special character removal
from pyspark.sql.functions import regexp_replace

url_pattern = r'http\S+'
df = df.withColumn("text", regexp_replace(df["text"], url_pattern, ''))
df = df.withColumn("text", regexp_replace(df["text"], r'[^a-zA-Z\s]', ''))

In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover

tokenizer = Tokenizer(inputCol="text", outputCol="words")
df = tokenizer.transform(df)

remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
df = remover.transform(df)

In [0]:
df.show()

+--------------------+--------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+
|              author|          comment_id|like_count|        published_at|                text|   video_id|               words|      filtered_words|
+--------------------+--------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+
|       Gloria Waslyn|Ugw3k-ZTd_0tMiJN8...|         1|2023-08-30T01:59:33Z|ginny thomas was ...|VIf1802ihuo|[ginny, thomas, w...|[ginny, thomas, c...|
|       US Asiri News|UgyGOnAC1pFmVWHXG...|         0|2023-08-30T06:15:47Z|the chief of staf...|VIf1802ihuo|[the, chief, of, ...|[chief, staff, pr...|
|         angel smith|UgwX7a4_qrCpgA44E...|         2|2023-08-30T03:59:05Z|oh it must be tue...|K3vd9Ln-pcA|[oh, it, must, be...|[oh, must, tuesda...|
|        Kal Palnicki|UgwLk0572XtCHreKa...|         1|2023-08-30T00:54:11Z|in politics truth..

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def array_to_string(my_list):
    return ','.join(my_list)

array_to_string_udf = udf(array_to_string, StringType())

# convert 'words' column to string
df = df.withColumn("words", array_to_string_udf(df["words"]))

# convert 'filtered_words' column to string
df = df.withColumn("filtered_words", array_to_string_udf(df["filtered_words"]))

In [0]:
# save processed json file as a csv
import os

# extract filename from the original path
filename = new_file.split("/")[-1]

# replace the .json extension with _processed.csv
processed_filename = filename.replace(".json", "_processed.csv")

# construct the new path for the processed data
processed_path = os.path.join(os.path.dirname(new_file), processed_filename)
temp_path = processed_path + "_temp"

# save the processed DataFrame to the new path as a single CSV
df.coalesce(1).write.option("header", "true").option("delimiter", "|").csv(temp_path)

In [0]:
# locate the "part-" file in the temporary location
files = dbutils.fs.ls(temp_path)
part_file = next((f for f in files if f.name.startswith("part-")), None)
if part_file:
    old_path = os.path.join(temp_path, part_file.name)

    # rename this "part-" file to the desired location
    new_path = os.path.join('/mnt/youtube-news-comments/youtube-comments/', processed_filename)
    if dbutils.fs.cp(old_path, new_path):  
        dbutils.fs.rm(temp_path, recurse=True)  