# Phase 1b: Load to MongoDB (HDFS Version)
This notebook loads the processed Parquet data from HDFS into MongoDB.

In [None]:
import os
import pymongo
from pyspark.sql import SparkSession
from tqdm import tqdm

# Configuration
BATCH_SIZE = 5000

# Configurable Limit (Set to 0 for full dataset)
# WARNING: Using 0 (Full) with toPandas() might crash the driver memory if data is large.
LIMIT = 1000

In [None]:
# Connect to Mongo
client = pymongo.MongoClient("mongodb://127.0.0.1:27017")
db = client["goodreads"]
print("‚úÖ Connected to MongoDB.")

In [None]:
# Start Spark (Cluster Mode - Reading HDFS)
spark = SparkSession.builder \
    .appName("Goodreads_Mongo_Loader") \
    .config("spark.driver.memory", "8g") \
    .config("spark.driver.maxResultSize", "4g") \
    .getOrCreate()

print("‚úÖ Spark Session active.")

In [None]:
def push_to_mongo(spark_df, collection_name, db, batch_size=2000, limit=0):
    if limit > 0:
        print(f"‚ö†Ô∏è Limiting {collection_name} to {limit} rows for testing.")
        spark_df = spark_df.limit(limit)
        
    print(f"\nüì• Converting {collection_name} to Pandas...")
    try:
        # Convert to Pandas
        pdf = spark_df.toPandas()
        
        # Convert to Dictionary records
        records = pdf.to_dict(orient='records')
        total = len(records)
        print(f"üöÄ Uploading {total} docs to '{collection_name}'...")
        
        collection = db[collection_name]
        
        # Smaller batches to prevent crashing the DB
        for i in tqdm(range(0, total, batch_size)):
            chunk = records[i : i + batch_size]
            try:
                collection.insert_many(chunk, ordered=False)
            except Exception as e:
                # If a single batch fails, print error but keep going
                print(f"‚ö†Ô∏è Batch error: {e}")
            
        print(f"‚úÖ {collection_name} Upload Complete!")
        
    except Exception as e:
        print(f"‚ùå CRITICAL ERROR uploading {collection_name}: {e}")

In [None]:
# HDFS Paths
hdfs_base = "hdfs:///user/ubuntu/goodreads_data/processed"
interactions_path = f"{hdfs_base}/master_interactions"
reviews_path = f"{hdfs_base}/master_reviews"

print(f"Reading interactions from {interactions_path}")
try:
    df_inter = spark.read.parquet(interactions_path)
    push_to_mongo(df_inter, "user_activity", db, BATCH_SIZE, LIMIT)
except Exception as e:
    print(f"‚ö†Ô∏è Failed to read interactions: {e}")

print(f"Reading reviews from {reviews_path}")
try:
    df_rev = spark.read.parquet(reviews_path)
    push_to_mongo(df_rev, "enriched_reviews", db, BATCH_SIZE, LIMIT)
except Exception as e:
    print(f"‚ö†Ô∏è Failed to read reviews: {e}")
    
print("\nüéâ Load Complete!")