In [1]:
import os
import sys
import json
import zstandard as zstd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import from_unixtime, col

from pyspark.sql.types import (
    StructType, StructField, TimestampType,
    StringType, IntegerType, BooleanType, LongType
)

In [2]:
def decompress_and_parse_file(path):
    results = []
    try:
        dctx = zstd.ZstdDecompressor(max_window_size=2**31)

        with open(path, "rb") as fh:
            with dctx.stream_reader(fh) as reader:
                buffer = ""
                while True:
                    # read 128 MB chunk
                    chunk = reader.read(2**27)
                    if not chunk:
                        break

                    chunk_str = buffer + chunk.decode("utf-8", errors="ignore")
                    lines = chunk_str.split("\n")
                    # keep incomplete line
                    buffer = lines[-1]  

                    for line in lines[:-1]:
                        if line.strip():
                            try:
                                data = json.loads(line)
                                results.append(data)
                            except json.JSONDecodeError:
                                continue
    except Exception as e:
        print(f"[{path}] Decompression error: {e}")
    return results

In [3]:
schema = StructType([
    StructField("controversiality", IntegerType(), True),
    StructField("body", StringType(), True),
    StructField("subreddit_id", StringType(), True),
    StructField("link_id", StringType(), True),
    StructField("stickied", BooleanType(), True),
    StructField("subreddit", StringType(), True),
    StructField("score", IntegerType(), True),
    StructField("ups", IntegerType(), True),
    StructField("author_flair_css_class", StringType(), True),
    StructField("created_utc", LongType(), True),
    StructField("author_flair_text", StringType(), True),
    StructField("author", StringType(), True),
    StructField("id", StringType(), True),
    StructField("edited", BooleanType(), True),
    StructField("parent_id", StringType(), True),
    StructField("gilded", IntegerType(), True),
    StructField("distinguished", StringType(), True),
    StructField("retrieved_on", LongType(), True),
    # StructField("year", StringType(), True),
    # StructField("month", StringType(), True),
])

In [4]:
spark = SparkSession.builder.appName("project-zstd").getOrCreate()

In [5]:
input_path = "../data/comments/"

for file in os.listdir(input_path):
    # create df
    file_path = os.path.join(input_path, file)
    decompressed_results = decompress_and_parse_file(file_path)

    # check edited type:
    decompressed_results = [{**row, "edited": row.get("edited") if isinstance(row.get("edited"), bool) else False}
                          for row in decompressed_results]

    # create dataframe
    df = spark.createDataFrame(decompressed_results, schema=schema)
    
    #  timestamps
    df = df\
        .withColumn("created_utc_ts", from_unixtime(col("created_utc")).cast("timestamp"))\
        .withColumn("retrieved_on_ts", from_unixtime(col("retrieved_on")).cast("timestamp"))\
        .drop("retrieved_on")\
        .drop("created_utc")
    
    # year & month
    year  = file.split('-')[0].split('_')[1]
    month = file.split('-')[1].split('.')[0]
    df = df.withColumn("month", F.lit(month))\
             .withColumn("year", F.lit(year))

    df.coalesce(1)\
            .write.parquet(f"../data/raw/pq/{year}/{month}", mode="overwrite")