In [33]:
import zstandard
import os
import json
import sys
from datetime import datetime
import logging.handlers

In [34]:
log = logging.getLogger("bot")
log.setLevel(logging.DEBUG)
log.addHandler(logging.StreamHandler())


def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0):
	chunk = reader.read(chunk_size)
	bytes_read += chunk_size
	if previous_chunk is not None:
		chunk = previous_chunk + chunk
	try:
		return chunk.decode()
	except UnicodeDecodeError:
		if bytes_read > max_window_size:
			raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes")
		log.info(f"Decoding error with {bytes_read:,} bytes, reading another chunk")
		return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read)


In [35]:
def read_lines_zst(file_name):
	with open(file_name, 'rb') as file_handle:
		buffer = ''
		reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
		while True:
			chunk = read_and_decode(reader, 2**27, (2**29) * 2)

			if not chunk:
				break
			lines = (buffer + chunk).split("\n")

			for line in lines[:-1]:
				yield line, file_handle.tell()

			buffer = lines[-1]

		reader.close()


In [36]:
import json
import zstandard as zstd
import csv
from tqdm import tqdm
from datetime import datetime

def extract_posts_after_date(input_file, output_file, min_date="2022-01-01"):
    output_rows = []
    min_dt = datetime.strptime(min_date, "%Y-%m-%d")
    
    fieldnames = [
        "id", "title", "selftext", "text",
        "score", "upvote_ratio", "num_comments", "created_utc"
    ]

    with open(input_file, 'rb') as fh:
        dctx = zstd.ZstdDecompressor()
        with dctx.stream_reader(fh) as reader:
            for chunk in tqdm(iter(lambda: reader.read(2**24), b''), desc=f"Processing {input_file}", unit="chunk"):
                lines = chunk.decode("utf-8", errors="ignore").split("\n")
                for line in lines:
                    if not line.strip():
                        continue
                    try:
                        obj = json.loads(line)
                        created_raw = obj.get("created_utc", 0)
                        try:
                            created_dt = datetime.utcfromtimestamp(int(float(created_raw)))
                        except Exception:
                            continue

                        # ✅ Filter here
                        if created_dt < min_dt:
                            continue

                        output_rows.append({
                            "id": obj.get("id"),
                            "title": obj.get("title", ""),
                            "selftext": obj.get("selftext", ""),
                            "text": f"{obj.get('title', '')} {obj.get('selftext', '')}",
                            "score": obj.get("score", 0),
                            "upvote_ratio": obj.get("upvote_ratio", ""),
                            "num_comments": obj.get("num_comments", 0),
                            "created_utc": created_dt.strftime("%Y-%m-%d %H:%M:%S")
                        })
                    except json.JSONDecodeError:
                        continue

    # Save to CSV
    if output_rows:
        with open(output_file, "w", newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(output_rows)
        print(f"✅ Saved {len(output_rows)} posts to {output_file}")
    else:
        print("⚠️ No posts after", min_date)

In [37]:
extract_posts_after_date("wallstreetbets__submissions.zst", "wsb_posts_2022_onward.csv")
extract_posts_after_date("stocks_submissions.zst", "stocks_posts_2022_onward.csv")

Processing wallstreetbets__submissions.zst: 3chunk [00:00,  6.23chunk/s]


✅ Saved 1245 posts to wsb_posts_2022_onward.csv


Processing stocks_submissions.zst: 57chunk [00:08,  6.84chunk/s]


✅ Saved 107129 posts to stocks_posts_2022_onward.csv


In [38]:
extract_posts_after_date("investing_submissions.zst", "investing_posts_2022_onward.csv")

Processing investing_submissions.zst: 55chunk [00:07,  7.23chunk/s]


✅ Saved 103729 posts to investing_posts_2022_onward.csv


In [40]:
extract_posts_after_date("StocksAndTrading_submissions.zst", "SAT_posts_2022_onward.csv")
extract_posts_after_date("StocksInFocus_submissions.zst", "SIF_posts_2022_onward.csv")
extract_posts_after_date("stockstobuytoday_submissions.zst", "STB_2022_onward.csv")
extract_posts_after_date("investingforbeginners_submissions.zst", "IFB_2022_onward.csv")

Processing StocksAndTrading_submissions.zst: 0chunk [00:00, ?chunk/s]

Processing StocksAndTrading_submissions.zst: 9chunk [00:01,  6.51chunk/s]


✅ Saved 15525 posts to SAT_posts_2022_onward.csv


Processing StocksInFocus_submissions.zst: 20chunk [00:02,  7.89chunk/s]


⚠️ No posts after 2022-01-01


Processing stockstobuytoday_submissions.zst: 6chunk [00:00,  7.60chunk/s]


✅ Saved 25221 posts to STB_2022_onward.csv


Processing investingforbeginners_submissions.zst: 4chunk [00:00,  6.97chunk/s]


✅ Saved 12477 posts to IFB_2022_onward.csv


In [43]:
extract_posts_after_date("wallstreetbets_submissions.zst", "WSB_2022_onward.csv")
extract_posts_after_date("wallstreetbetscontest_submissions.zst", "WSBC_2022_onward.csv")

Processing wallstreetbets_submissions.zst: 423chunk [00:59,  7.06chunk/s]


✅ Saved 544513 posts to WSB_2022_onward.csv


Processing wallstreetbetscontest_submissions.zst: 1chunk [00:00, 1441.34chunk/s]

⚠️ No posts after 2022-01-01



