### Task 1: Data Ingestion, Union & Ticker Extraction

This notebook implements a scalable pipeline to ingest, union, and extract stock tickers from the WallStreetBets subreddit data (posts and comments).  

- **Purpose:** Parellely read compressed Reddit submission and comment archives, unify them into a single DataFrame, and extract valid stock tickers for downstream analysis.  

- **Environment:** Developed and tested interactively in JupyterHub on an AWS EMR cluster using PySpark.  

- **Data Location:** Raw `.zst` files were initially stored in an S3 bucket (`wsb-research-data-shef-20250522`). All the imported and generated data of this script, as well as subsequent  processing has been migrated to the Midway, where the remainder of the workflow will run. 
 
- **Workflow Details:** See the project README for a full description of data sources, directory structure, and end-to-end processing steps.  

#### 1.1 Spark session calling

In [None]:
from pyspark.sql import SparkSession, functions as F, types as T
import site, os

JUPY_PY  = "/usr/bin/python3"
SITE_DIR = site.getsitepackages()[0]
ZSTD_JAR = "/usr/lib/hadoop-yarn/share/hadoop/common/lib/hadoop-zstd.jar"

spark = (
    SparkSession.builder
        .appName("WSB_ETL_JVM_Zstd")
        .master("yarn")
        # Python path
        .config("spark.pyspark.python",        JUPY_PY)
        .config("spark.pyspark.driver.python", JUPY_PY)
        .config("spark.executorEnv.PYTHONPATH", SITE_DIR)
        # JVM and zstd
        .config("spark.driver.extraClassPath",   ZSTD_JAR)
        .config("spark.executor.extraClassPath", ZSTD_JAR)
        .config("spark.hadoop.io.compression.codecs",
                "org.apache.hadoop.io.compress.ZStandardCodec")
        # resource management
        .config("spark.executor.instances",      "3")
        .config("spark.executor.cores",          "4")
        .config("spark.executor.memory",         "20g")
        .config("spark.executor.memoryOverhead", "4g")
        .config("spark.driver.memory",           "8g")
        # Shuffle / AQE
        .config("spark.sql.adaptive.enabled",   "true")
        .config("spark.sql.shuffle.partitions", "120")
        .getOrCreate()
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 1.2 Extraction helper functions

In [None]:
# ─── Cell 2 : Reader & Extraction ────────────────────────────────────────────
import io, json
from pyspark.sql import functions as F, types as T
from pyspark.storagelevel import StorageLevel          

def _schema(kind: str) -> T.StructType:
    """
    Construct a Spark StructType schema for either posts or comments.

    Parameters
    ----------
    kind : str
        Type of file to read; either "post" or "comment".

    Returns
    -------
    StructType
        A schema with fields common to both plus either 'title'/'selftext'
        for posts or 'body' for comments.
    """
    # Base fields common to both posts and comments
    base = [T.StructField(c, T.StringType(), True)
            for c in ("id", "created_utc", "subreddit")]
    # Extend schema for submissions (posts)
    if kind == "post":
        base += [
            T.StructField("title",    T.StringType(), True),
            T.StructField("selftext", T.StringType(), True)
        ]
    # Extend schema for comments
    else:
        base.append(T.StructField("body", T.StringType(), True))
    return T.StructType(base)

def _standardize(df, kind: str):
    """
    Standardize DataFrame columns for downstream processing.

    - Cast 'created_utc' to long
    - Ensure common column names: id, created_utc, subreddit, title, selftext

    Parameters
    ----------
    df : DataFrame
        Input DataFrame with raw fields.
    kind : str
        "post" or "comment" indicating which transformation to apply.

    Returns
    -------
    DataFrame
        Transformed and pruned DataFrame with a uniform column set.
    """
    # For comments, rename 'body' → 'selftext' and add missing title column
    if kind == "comment":
        df = (df.withColumn("title",    F.lit(None).cast("string"))
                .withColumn("selftext", F.col("body"))
                .drop("body"))
    # Cast timestamp and select only the standardized columns
    return (df.withColumn("created_utc", F.col("created_utc").cast("long"))
              .select("id", "created_utc", "subreddit", "title", "selftext"))

def read_zst(path: str, kind: str):
    """
    Read a .zst-compressed JSON lines file into a standardized Spark DataFrame.

    The function first attempts to use Spark's built-in Text reader. 
    If that fails, it falls back to Python-side streaming decompression using the
    zstandard library.

    Parameters
    ----------
    path : str
        URI of the .zst file or files (e.g. "s3://bucket/file.zst").
    kind : str
        Either "post" (for submissions) or "comment".

    Returns
    -------
    DataFrame
        A Spark DataFrame with columns [id, created_utc, subreddit, title, selftext].
    """
    kind = kind.lower()
    schema = _schema(kind)

    # ------------ fast path : JVM unzip via spark.read.text ---------------
    try:
        # Read each line as raw text, parse JSON, and apply the schema
        df = (spark.read
                    .text(path)                             # Hadoop will decompress if codec is present
                    .select(F.from_json("value", schema).alias("j"))
                    .select("j.*"))
    except Exception:
        # ---------- fallback : Python zstandard streaming decompression ----------
        import zstandard as zstd, contextlib
        need_cols = schema.names

        def _decode(iterator):
            """
            Decompress and parse each .zst file's bytes into JSON records.
            Runs within each Spark partition.
            """
            dctx = zstd.ZstdDecompressor()
            for _, pds in iterator:
                # 'pds' is PortableDataStream; open it as a file-like object
                with contextlib.closing(pds.open()) as raw:
                    # Stream-decompress and decode to text
                    rdr = dctx.stream_reader(raw)
                    txt = io.TextIOWrapper(rdr, encoding="utf-8")
                    # Parse each line of JSON
                    for ln in txt:
                        rec = json.loads(ln)
                        # Return only the columns defined in the schema
                        yield {k: rec.get(k) for k in need_cols}

        # Read raw bytes in parallel, decode, and then create DataFrame with schema
        rdd = (spark.sparkContext
                 .binaryFiles(path, minPartitions=60)  # partition count for parallelism
                 .mapPartitions(_decode))
        df = spark.createDataFrame(rdd, schema)

    # Standardize column names, types, and select only necessary columns
    return _standardize(df, kind)



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 1.3 Appending reddit wsb posts and comments

In [None]:
# ─── Cell 3: Append posts and comments ────────────────────────
# Define S3 bucket and prefix for raw data files
BUCKET  = "wsb-research-data-shef-20250522"
SRC_PRE = "raw/wsb24"

# Read WallStreetBets submissions (.zst) into a DataFrame, tag rows as not comments
posts = (read_zst(f"s3://{BUCKET}/{SRC_PRE}/wallstreetbets_submissions.zst","post")
           .withColumn("is_comment", F.lit(False)))

# Read WallStreetBets comments (.zst) into a DataFrame, tag rows as comments         
coms  = (read_zst(f"s3://{BUCKET}/{SRC_PRE}/wallstreetbets_comments.zst","comment")
           .withColumn("is_comment", F.lit(True)))


# Union submissions and comments into a single DataFrame,
# then repartition to 200 partitions for balanced parallelism,
# and persist to disk-only storage to avoid repeated recomputation.
union_df = (posts.unionByName(coms)
                  .repartition(200)                # down to 200 partitions
                  .persist(StorageLevel.DISK_ONLY))

print("Total rows:", union_df.count())

# Define output path
stage1 = f"s3a://{BUCKET}/stage01_union_parquet/"

# Write the unified DataFrame out as Parquet
(union_df.write.mode("overwrite").parquet(stage1))

# Confirm successful write
print("Parquet saved to", stage1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total rows: 88951309
Parquet saved to s3a://wsb-research-data-shef-20250522/stage01_union_parquet/

#### 1.4 Read CRSP (stock ticker information)

In [None]:
# ─── Cell 4: Read CRSP & Broadcast ────────────────────────
from pyspark.sql.functions import unix_timestamp, upper, col, broadcast

# Load the CRSP ticker list from S3 into a Spark DataFrame:
# - Read CSV with header
# - Select and transform columns:
#   • Uppercase the "Ticker" column for consistency
#   • Parse SecInfoStartDt and SecInfoEndDt strings ("yyyyMMdd") into epoch seconds
#   • Cast timestamps to long type and alias as start_ts and end_ts
# - Drop any rows missing a ticker symbol
crsp_df = (
    spark.read
         .option("header", True)
         .csv(f"s3a://{BUCKET}/crsp_ticker_list_unique.csv")
         .select(
             upper(col("Ticker")).alias("ticker"),
             unix_timestamp(col("SecInfoStartDt"), "yyyyMMdd").cast("long").alias("start_ts"),
             unix_timestamp(col("SecInfoEndDt"),   "yyyyMMdd").cast("long").alias("end_ts")
         )
         .na.drop(subset=["ticker"])
)

# Broadcast the CRSP DataFrame to all executors to optimize subsequent joins
crsp_b = broadcast(crsp_df)

# Print the number of valid tickers loaded
print("Broadcasted CRSP tickers:", crsp_df.count())



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Broadcasted CRSP tickers: 152518

#### 1.5: Indentifying the stock ticker that the texts is describing

In [None]:
from pyspark.sql.functions import expr, explode, length, col, broadcast

# ---------- Reddit ----------

# Combine the title and selftext into a single string column 'raw_txt'
posts2 = posts.withColumn(
    "raw_txt", expr("concat_ws(' ', title, selftext)")
)

# Extract all candidate ticker-like tokens matching $?[A-Z]{1,5} into an array 'cand'
posts3 = posts2.withColumn(
    "cand", expr(r"regexp_extract_all(raw_txt, '(\$?[A-Z]{1,5})\b', 0)")
)

# Explode the array of candidate tokens so each row has one 'tok_raw'
exploded = posts3.select(
    "id", "created_utc", "subreddit", "is_comment",
    explode("cand").alias("tok_raw")
)

# Remove any leading '$' and uppercase the token to produce a normalized 'ticker'
cleaned = exploded.withColumn(
    "ticker", upper(expr("regexp_replace(tok_raw, '^\\$', '')"))
)

# ---------- join + filter -------

# Inner join with broadcasted CRSP table on 'ticker' to keep only valid symbols
joined = cleaned.join(crsp_b, on="ticker", how="inner")

# Filter rows where the post timestamp falls within the ticker's valid start/end dates
# Also enforce that single-letter tickers must have originally started with '$'
filtered = joined.filter(
    (col("created_utc") >= col("start_ts")) &
    (col("created_utc") <= col("end_ts")) &
    (
        (length("ticker") >= 2) |
        ((length("ticker") == 1) & expr("tok_raw LIKE '$%'"))
    )
).drop("start_ts", "end_ts", "tok_raw")

# Define output path for per-ticker Parquet files
stage2 = f"s3a://{BUCKET}/stage02_by_ticker/"

# Write out the filtered data:
# - Repartition by 'ticker' for balanced writes
# - Partition files by 'ticker' so each symbol lands in its own folder
(filtered
   .repartition("ticker")
   .write.mode("overwrite")
   .partitionBy("ticker")
   .parquet(stage2)
)

print("√ Stage-2 Parquet written:", stage2)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

? Stage-2 Parquet written: s3a://wsb-research-data-shef-20250522/stage02_by_ticker/