In [1]:
import pandas as pd
import os
import json
from sqlalchemy import create_engine
import logging
import time

logging.basicConfig(
    filename="yelpy/insert.db.log",
    level=logging.DEBUG,
    format="%(asctime)s - %(levelname)s - %(message)s",
    filemode="a"
)

engine = create_engine('sqlite:///yelper.db')

def ingest_db(df, table_name, engine, if_exists="append"):
    """Ingest DataFrame into database table."""
    df.to_sql(table_name, con=engine, if_exists=if_exists, index=False)

def load_raw_data():
    """Load JSON files in chunks, preprocess, and ingest into DB."""
    start = time.time()
    folder_path = r'C:\Users\hp\Desktop\Data Analyst\Data Analyst Real project\Tech Classes\New folder\AnotherData'

    for file in os.listdir(folder_path):
        if file.endswith('.json'):
            file_path = os.path.join(folder_path, file)
            table_name = file[:-5]  # remove .json for table name

            logging.info(f'Starting ingestion for {file}')

            # Process JSON in chunks
            chunk_iter = pd.read_json(file_path, lines=True, chunksize=50000)  # adjust chunk size
            first_chunk = True

            for chunk_df in chunk_iter:
                # Convert dict columns into JSON strings before ingestion
                for col in chunk_df.columns:
                    if chunk_df[col].apply(lambda x: isinstance(x, dict)).any():
                        chunk_df[col] = chunk_df[col].apply(lambda x: json.dumps(x) if isinstance(x, dict) else x)

                ingest_db(chunk_df, table_name, engine, if_exists="replace" if first_chunk else "append")
                first_chunk = False

            logging.info(f'Finished ingestion for {file}')
    
    end = time.time()
    total_time = (end - start) / 60
    logging.info('-------------Ingestion Complete---------------')
    logging.info(f'\nTotal Time Taken : {total_time:.2f} minutes')

if __name__ == '__main__':
    load_raw_data()