In [2]:
import pandas as pd
import pyarrow.parquet as pq
import re
import json
import boto3


In [3]:
!pip install pandas pyarrow s3fs boto3 smart_open



Collecting smart_open
  Downloading smart_open-7.1.0-py3-none-any.whl.metadata (24 kB)
Downloading smart_open-7.1.0-py3-none-any.whl (61 kB)
Installing collected packages: smart_open
Successfully installed smart_open-7.1.0


In [4]:
s3 = boto3.client("s3")
bucket_name = "bigdatateaching"
sub_prefix = "reddit-project/reddit/parquet/submissions/yyyy=2024/mm=01/"
com_prefix = "reddit-project/reddit/parquet/comments/yyyy=2024/mm=01/"

submissions = []
comments = []

sub_response = s3.list_objects_v2(Bucket=bucket_name, Prefix=sub_prefix)
com_response = s3.list_objects_v2(Bucket=bucket_name, Prefix=com_prefix)
if "Contents" in sub_response:
    for obj in sub_response["Contents"]:
        submissions.append(obj["Key"])  
else:
    print("No files found at this location.")
#print("submissions:", submissions)
if "Contents" in com_response:
    for obj in com_response["Contents"]:
        comments.append(obj["Key"])  
else:
    print("No files found at this location.")
#print("comments:", comments)



In [17]:
s3_submissions_path = f"s3://{bucket_name}/{submissions[0]}"  # First submission file
print(s3_submissions_path)
s3_comments_path = f"s3://{bucket_name}/{comments[0]}"  # First comment file
print(s3_comments_path)

# Read data directly from S3
submissions_df = pd.read_parquet(s3_submissions_path, storage_options={"anon": True})
comments_df = pd.read_parquet(s3_comments_path, storage_options={"anon": True})

# Display first few rows
#print(submissions_df.head())
#print(comments_df.head())

s3://bigdatateaching/reddit-project/reddit/parquet/submissions/yyyy=2024/mm=01/submissions_RS_2024-01.zst_1.parquet
s3://bigdatateaching/reddit-project/reddit/parquet/comments/yyyy=2024/mm=01/comments_RC_2024-01.zst_1.parquet


In [23]:
submissions_data = []

# Loop through files 1 to 42
for i in range(1, 42):
    s3_submissions_path = f"s3://{bucket_name}/{submissions[i]}"
    file_name = f"submissions_RS_2024-01.zst_{i}.parquet"
    file_path = sub_prefix + file_name 
    try:
        # Use pushdown predicates to filter only the desired subreddits
        df = pd.read_parquet(s3_submissions_path, 
                             filters=[("subreddit", "in", ["AskNYC", "washingtondc"])])
        submissions_data.append(df)
        print(f"Successfully loaded: {file_name}")
    except FileNotFoundError:
        print(f"File not found: {file_name}")
    except Exception as e:
        print(f"Error reading {file_name}: {e}")

# Combine all the data into one DataFrame
submissions_df = pd.concat(submissions_data, ignore_index=True)
print(submissions_df.head())

Successfully loaded: submissions_RS_2024-01.zst_1.parquet
Successfully loaded: submissions_RS_2024-01.zst_2.parquet
Successfully loaded: submissions_RS_2024-01.zst_3.parquet
Successfully loaded: submissions_RS_2024-01.zst_4.parquet
Successfully loaded: submissions_RS_2024-01.zst_5.parquet
Successfully loaded: submissions_RS_2024-01.zst_6.parquet
Successfully loaded: submissions_RS_2024-01.zst_7.parquet
Successfully loaded: submissions_RS_2024-01.zst_8.parquet
Successfully loaded: submissions_RS_2024-01.zst_9.parquet
Successfully loaded: submissions_RS_2024-01.zst_10.parquet
Successfully loaded: submissions_RS_2024-01.zst_11.parquet
Successfully loaded: submissions_RS_2024-01.zst_12.parquet
Successfully loaded: submissions_RS_2024-01.zst_13.parquet
Successfully loaded: submissions_RS_2024-01.zst_14.parquet
Successfully loaded: submissions_RS_2024-01.zst_15.parquet
Successfully loaded: submissions_RS_2024-01.zst_16.parquet
Successfully loaded: submissions_RS_2024-01.zst_17.parquet
Succes

In [24]:
comments_data = []


# Loop through files 1 to 101
for i in range(1, 101):
    s3_comments_path = f"s3://{bucket_name}/{comments[i]}"
    file_name = f"comments_RC_2024-01.zst_{i}.parquet"  # Adjust the file name pattern if necessary
    file_path = com_prefix + file_name 
    
    try:
        # Use pushdown predicates to filter only the desired subreddits
        df = pd.read_parquet(s3_comments_path, 
                             filters=[("subreddit", "in", ["AskNYC", "washingtondc"])])
        comments_data.append(df)
        print(f"Successfully loaded: {file_name}")
    except FileNotFoundError:
        print(f"File not found: {file_name}")
    except Exception as e:
        print(f"Error reading {file_name}: {e}")

# Combine all the data into one DataFrame
comments_df = pd.concat(comments_data, ignore_index=True)
comments_df["parent_id"] = comments_df["parent_id"].astype(str).map(lambda x: re.sub(r'^t\d_', '', x))
#print(comments_df.head())

Successfully loaded: comments_RC_2024-01.zst_1.parquet
Successfully loaded: comments_RC_2024-01.zst_2.parquet
Successfully loaded: comments_RC_2024-01.zst_3.parquet
Successfully loaded: comments_RC_2024-01.zst_4.parquet
Successfully loaded: comments_RC_2024-01.zst_5.parquet
Successfully loaded: comments_RC_2024-01.zst_6.parquet
Successfully loaded: comments_RC_2024-01.zst_7.parquet
Successfully loaded: comments_RC_2024-01.zst_8.parquet
Successfully loaded: comments_RC_2024-01.zst_9.parquet
Successfully loaded: comments_RC_2024-01.zst_10.parquet
Successfully loaded: comments_RC_2024-01.zst_11.parquet
Successfully loaded: comments_RC_2024-01.zst_12.parquet
Successfully loaded: comments_RC_2024-01.zst_13.parquet
Successfully loaded: comments_RC_2024-01.zst_14.parquet
Successfully loaded: comments_RC_2024-01.zst_15.parquet
Successfully loaded: comments_RC_2024-01.zst_16.parquet
Successfully loaded: comments_RC_2024-01.zst_17.parquet
Successfully loaded: comments_RC_2024-01.zst_18.parquet
S

In [36]:
import json

# Perform a LEFT JOIN on id (submissions) and parent_id (comments)
merged_df = submissions_df.merge(comments_df, left_on="id", right_on="parent_id", how="left")

# Convert the merged DataFrame into a JSON array
json_output = merged_df.to_json(orient="records", lines=False)

# Print the first 500 characters to check the structure
print(json_output[:500])  

# Optional: Save the JSON to a file
with open("merged_data.json", "w") as f:
    f.write(json_output)


[{"author_x":"Puzzleheaded-Farm431","author_flair_css_class_x":null,"author_flair_text_x":null,"created_utc_x":1704655142,"distinguished_x":null,"domain":"self.AskNYC","edited_x":null,"id_x":"190zx7t","is_self":true,"locked":false,"num_comments":1,"over_18":false,"quarantine":false,"retrieved_on_x":1704655160,"score_x":1,"selftext":"Hi all, I work for a small company with very few employees and no form of office really just a fabricating shop for local 28,\nI will be applying for paid family lea


In [37]:
print(merged_df.columns)

# Group comments by submission id
grouped_comments = merged_df.groupby("id_x").apply(
    lambda x: x[["author_y", "body", "score_y", "created_utc_y", "id_y"]].dropna().to_dict(orient="records")
).to_dict()

merged_df["created_utc_x"] = pd.to_datetime(merged_df["created_utc_x"], unit="s")
merged_df["created_utc_y"] = pd.to_datetime(merged_df["created_utc_y"], unit="s")

# Extract year and month
merged_df["year_x"] = merged_df["created_utc_x"].dt.year
merged_df["month_x"] = merged_df["created_utc_x"].dt.month
merged_df["year_y"] = merged_df["created_utc_y"].dt.year
merged_df["month_y"] = merged_df["created_utc_y"].dt.month

final_json = {}

for _, row in merged_df.iterrows():
    submission_id = row["id_x"]

    # Base submission data
    final_json[submission_id] = {
        "subreddit": row["subreddit_x"],
        "author": row["author_x"],
        "title": row["title"],
        "selftext": row["selftext"],
        "year": row["year_x"],  # Use extracted year
        "month": row["month_x"],  # Use extracted month
        "score": row["score_x"],
        "num_comments": row["num_comments"],
        "comments": grouped_comments.get(submission_id, [])  # Attach comments or empty list
    }

# Save JSON to a file
import json

json_filename = "reddit_AskNYC_washingtondc.json"
with open(json_filename, "w") as f:
    json.dump(final_json, f, indent=4)

print(f"JSON saved to {json_filename}")

Index(['author_x', 'author_flair_css_class_x', 'author_flair_text_x',
       'created_utc_x', 'distinguished_x', 'domain', 'edited_x', 'id_x',
       'is_self', 'locked', 'num_comments', 'over_18', 'quarantine',
       'retrieved_on_x', 'score_x', 'selftext', 'stickied_x', 'subreddit_x',
       'subreddit_id_x', 'title', 'url', 'yyyy_x', 'mm_x', 'author_y',
       'author_flair_css_class_y', 'author_flair_text_y', 'body',
       'controversiality', 'created_utc_y', 'distinguished_y', 'edited_y',
       'gilded', 'id_y', 'link_id', 'parent_id', 'retrieved_on_y', 'score_y',
       'stickied_y', 'subreddit_y', 'subreddit_id_y', 'yyyy_y', 'mm_y'],
      dtype='object')


  arr, tz_parsed = tslib.array_with_unit_to_datetime(arg, unit, errors=errors)


JSON saved to reddit_AskNYC_washingtondc.json
