In [16]:
import pandas as pd
import json
from sqlalchemy import create_engine,text
import gc
import os

In [None]:
data_dir = "data/"
files = {
    "business": "yelp_academic_dataset_business.json",
    "checkin":  "yelp_academic_dataset_checkin.json",
    "review":   "yelp_academic_dataset_review.json",
    "tip":      "yelp_academic_dataset_tip.json",
    "user":     "yelp_academic_dataset_user.json"
}

# Only keep whatâ€™s needed *after* reading
def read_yelp_json(file_path, cols=None, chunksize=None):
    # Use chunks for very large datasets
    if chunksize:
        chunks = pd.read_json(file_path, lines=True, encoding='utf-8', chunksize=chunksize)
        df = pd.concat([chunk[cols] for chunk in chunks], ignore_index=True)
    else:
        df = pd.read_json(file_path, lines=True, encoding='utf-8')
        if cols:
            df = df[cols]
    return df.convert_dtypes(dtype_backend="pyarrow")

cols = {
    "business": ["business_id", "name", "city", "state", "stars", "review_count", "categories"],
    "checkin":  ["business_id", "date"],
    "review":   ["review_id", "user_id", "business_id", "stars", "date"],
    "tip":      ["user_id", "business_id", "text", "date"],
    "user":     ["user_id", "name", "review_count", "yelping_since", "fans"]
}

business_df = read_yelp_json(data_dir + files["business"], cols["business"])
checkin_df  = read_yelp_json(data_dir + files["checkin"], cols["checkin"])
review_df   = read_yelp_json(data_dir + files["review"], cols["review"], chunksize=100_000)
user_df     = read_yelp_json(data_dir + files["user"], cols["user"], chunksize=100_000)
tip_df      = read_yelp_json(data_dir + files["tip"], cols["tip"])

gc.collect()

In [10]:
print(business_df.shape)
print(checkin_df.shape)
print(review_df.shape)
print(tip_df.shape)
print(user_df.shape)

(150346, 7)
(131930, 2)
(6990280, 5)
(908915, 4)
(1987897, 5)


In [11]:

# --- Configuration ---
USER = "root"
PASSWORD = "1234"
HOST = "localhost"
PORT = 3306
DB_NAME = "yelp"

# --- Step 1. Connect to MySQL server (no database yet) ---
# Note: Don't include a database name in this connection
engine_root = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{HOST}:{PORT}/", echo=False)

# --- Step 2. Create database if not exists ---
with engine_root.connect() as conn:
    conn.execute(text(f"CREATE DATABASE IF NOT EXISTS {DB_NAME} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"))
    print(f"Database '{DB_NAME}' checked/created successfully.")

# --- Step 3. Connect to the newly created database ---
engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{HOST}:{PORT}/{DB_NAME}", echo=False)
print(f"Connected to MySQL database '{DB_NAME}'.")

Database 'yelp' checked/created successfully.
Connected to MySQL database 'yelp'.


In [15]:
import pandas as pd
import sqlalchemy
import gc
import os

# -----------------------------
# SQLAlchemy engine
# -----------------------------
# SQLite
# engine = sqlalchemy.create_engine('sqlite:///yelp.db')
# MySQL
# engine = sqlalchemy.create_engine('mysql+pymysql://user:password@host:port/dbname')

# -----------------------------
# Memory-efficient JSON loader
# -----------------------------
def load_json_in_chunks(file_path, table_name, engine, chunksize=50_000, if_exists='replace'):
    """
    Reads a JSON Lines file in chunks and writes to SQL efficiently.
    """
    total_rows = 0
    # Use iterator to read JSON lines
    for i, chunk in enumerate(pd.read_json(file_path, lines=True, chunksize=chunksize)):
        chunk.to_sql(
            table_name,
            con=engine,
            if_exists='append' if i > 0 else if_exists,
            index=False,
            method='multi',
            chunksize=chunksize
        )
        total_rows += len(chunk)
        gc.collect()
        print(f"Chunk {i+1}: Loaded {len(chunk):,} rows (Total: {total_rows:,}) into '{table_name}'.")

    print(f" Finished loading '{table_name}' with {total_rows:,} rows.")

# -----------------------------
# Load all files
# -----------------------------
data_dir = "data/"
files = {
    "business": "yelp_academic_dataset_business.json",
    "checkin":  "yelp_academic_dataset_checkin.json",
    "review":   "yelp_academic_dataset_review.json",
    "tip":      "yelp_academic_dataset_tip.json",
    "user":     "yelp_academic_dataset_user.json"
}

# Iterate over the dictionary properly
for table_name, file_name in files.items():
    file_path = os.path.join(data_dir, file_name)
    if os.path.exists(file_path):
        load_json_in_chunks(file_path, table_name, engine, chunksize=50_000)
    else:
        print(f" File not found: {file_path}")


TypeError: dict can not be used as parameter

In [17]:

def load_dataframe_safe(df, table_name, engine, chunksize=50_000):
    """
    Safe + efficient loader: handles partial failures and commits cleanly.
    """
    with engine.begin() as conn:
        # SQLite tuning (safe for local ETL builds)
        conn.execute(text("PRAGMA synchronous = OFF;"))
        conn.execute(text("PRAGMA journal_mode = MEMORY;"))
        conn.execute(text("PRAGMA temp_store = MEMORY;"))

    try:
        df.to_sql(
            table_name,
            con=engine,
            if_exists='replace',
            index=False,
            method='multi',
            chunksize=chunksize,
        )
        gc.collect()
        print(f"Loaded {len(df):,} rows into table '{table_name}'.")
    except Exception as e:
        print(f" Error while loading {table_name}: {e}")
        with engine.connect() as conn:
            conn.rollback()  # ensure rollback on failure
        raise

In [18]:
def load_and_reset(df, table_name, engine):
    load_dataframe_safe(df, table_name, engine)
    engine.dispose()

In [19]:
load_and_reset(business_df, 'business', engine)
load_and_reset(review_df, 'review', engine)
load_and_reset(user_df, 'user', engine)
load_and_reset(tip_df, 'tip', engine)
load_and_reset(checkin_df, 'checkin', engine)

ProgrammingError: (pymysql.err.ProgrammingError) (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'PRAGMA synchronous = OFF' at line 1")
[SQL: PRAGMA synchronous = OFF;]
(Background on this error at: https://sqlalche.me/e/20/f405)