In [None]:
import zstandard
import os
import json
import sys
import csv
from datetime import datetime
import logging.handlers
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

log = logging.getLogger("bot")
log.setLevel(logging.DEBUG)
log.addHandler(logging.StreamHandler())   

In [None]:
def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0):
    chunk = reader.read(chunk_size)
    bytes_read += chunk_size
    if previous_chunk is not None:
        chunk = previous_chunk + chunk
    try:
        return chunk.decode()
    except UnicodeDecodeError:
        if bytes_read > max_window_size:
            raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes")
        return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read)


def read_lines_zst(file_name):
    with open(file_name, 'rb') as file_handle:
        buffer = ''
        reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
        while True:
            chunk = read_and_decode(reader, 2**27, (2**29) * 2)
            if not chunk:
                break
            lines = (buffer + chunk).split("\n")

            for line in lines[:-1]:
                yield line, file_handle.tell()

            buffer = lines[-1]
        reader.close()

def process_json_to_parquet(input_file_path, output_file_path, fields, row_group_size):
    # Get file size for progress tracking
    file_size = os.stat(input_file_path).st_size
    file_lines = 0
    file_bytes_processed = 0
    bad_lines = 0
    created = None

    # Collect rows for Parquet
    rows = []

    # Open the input file and process each line
    try:
        for line, file_bytes_processed in read_lines_zst(input_file_path):
            try:
                # Parse the JSON line into a Python object
                obj = json.loads(line)
                output_obj = []

                # Extract the relevant fields from the JSON object
                for field in fields:
                    output_obj.append(str(obj[field]).encode("utf-8", errors='replace').decode())

                # Append to rows (we collect all rows for batch writing to Parquet)
                rows.append(output_obj)

                # Track the 'created' timestamp from the JSON object
                created = datetime.utcfromtimestamp(int(obj['created_utc']))

            except json.JSONDecodeError as err:
                bad_lines += 1
                log.warning(f"JSON decode error on line {file_lines}: {err}")

            except KeyError as err:
                bad_lines += 1
                log.warning(f"Missing expected key {err} on line {file_lines}")
                log.warning(f"Problematic line: {line}")

            file_lines += 1

            # Log progress every million lines
            if file_lines % 1000000 == 0:
                log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S') if created else 'Unknown Time'} : {file_lines:,} : {bad_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
        
        # After processing all lines, convert rows to a Parquet table
        if rows:
            # Write to Parquet
            rows = pd.DataFrame(rows)
            rows.columns = fields
            rows.to_parquet(output_file_path, row_group_size=row_group_size)
            log.info(f"Parquet file written to: {output_file_path}")

    
    except Exception as err:
        log.error(f"Unexpected error: {err}")

    # Final completion log
    log.info(f"Complete: {file_lines:,} lines processed. {bad_lines:,} bad lines.")



def unzip(kind):
    if kind == 'comments':
        input_file_path = '/Users/jm7581/Documents/Projects/Active/AITA/data/AmItheAsshole_comments.zst'
        output_file_path = '/Users/jm7581/Documents/Projects/Active/AITA/data/AmItheAsshole_comments.parquet'
        fields = ['id', 'body', 'author', 'created_utc', 'score', 'link_id', 'parent_id', 'distinguished', 'author_flair_text', 'author_flair_css_class']  
    
    else:
        input_file_path = '/Users/jm7581/Documents/Projects/Active/AITA/data/AmItheAsshole_submissions.zst'
        output_file_path = '/Users/jm7581/Documents/Projects/Active/AITA/data/AmItheAsshole_submissions.parquet'
        fields = ['id', 'selftext', 'title', 'author', 'created_utc', 'num_comments', 'score', 'distinguished', 'link_flair_text', 'link_flair_css_class', 'author_flair_text', 'author_flair_css_class']

    process_json_to_parquet(input_file_path, output_file_path, fields, row_group_size=64*1024*1024)

    return None




In [None]:
unzip('comments')
#unzip('submissions')