In [1]:
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
import os
from sqlalchemy import create_engine
import time
import logging
import psycopg2

# ----------- Logging Setup -----------
logging.basicConfig(
    filename="logs/postgres_ingestion.log",
    level=logging.DEBUG,
    format="%(asctime)s - %(levelname)s - %(message)s",
    filemode="a"
)

# ----------- PostgreSQL Connection Setup -----------
DB_USER = 'postgres'      # Change to your PostgreSQL username
DB_PASSWORD = '2526'      # Change to your password
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'blinkit_db'    # Change to your DB name

# ----------- Auto Create Database if not exists -----------
try:
    # Connect to default 'postgres' database first
    conn = psycopg2.connect(
        dbname='postgres',
        user=DB_USER,
        password=DB_PASSWORD,
        host=DB_HOST,
        port=DB_PORT
    )
    conn.autocommit = True
    cursor = conn.cursor()

    # Check if DB exists, if not create
    cursor.execute(f"SELECT 1 FROM pg_database WHERE datname = '{DB_NAME}'")
    exists = cursor.fetchone()
    if not exists:
        cursor.execute(f'CREATE DATABASE {DB_NAME}')
        logging.info(f"Database '{DB_NAME}' created successfully.")
    else:
        logging.info(f"Database '{DB_NAME}' already exists.")

    cursor.close()
    conn.close()
except Exception as e:
    logging.error(f"Error creating database: {e}")
    raise

# ----------- SQLAlchemy Engine Setup -----------
engine = create_engine(f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}')

# ----------- Function to Ingest Data -----------
def ingest_db(df, table_name, engine):
    df.to_sql(table_name, con=engine, if_exists='replace', index=False)
    logging.info(f"Successfully ingested {table_name} into database.")

# ----------- Load Files and Ingest -----------
def load_raw_data():
    start = time.time()
    logging.info("------ Starting Ingestion ------")

    for file in os.listdir('data'):
        file_path = os.path.join('data', file)

        try:
            if file.endswith('.csv'):
                df = pd.read_csv(file_path)
                table_name = file[:-4]
            elif file.endswith('.xlsx') or file.endswith('.xls'):
                df = pd.read_excel(file_path)
                table_name = file[:-5]
            elif file.endswith('.json'):
                df = pd.read_json(file_path)
                table_name = file[:-5]
            else:
                logging.warning(f"Skipped unsupported file: {file}")
                continue

            # Clean column names for PostgreSQL (max length 63)
            df.columns = [col.strip().replace('"', '').replace(";", "")[:63] for col in df.columns]

            logging.info(f"Ingesting file: {file}")
            ingest_db(df, table_name, engine)

        except Exception as e:
            logging.error(f"Error processing file {file}: {e}")

    end = time.time()
    total_time = (end - start) / 60
    logging.info("------ Ingestion Complete ------")
    logging.info(f"Total Time Taken: {total_time:.2f} minutes")

# ----------- Run the Script -----------
if __name__ == '__main__':
    load_raw_data()

In [2]:
# PostgreSQL connection 
conn = psycopg2.connect(
    host="localhost",
    database="blinkit_db",   
    user="postgres",       
    password="2526", 
    port="5432"
)

In [3]:
cur = conn.cursor()


# Table names fetch
cur.execute("""
    SELECT table_name 
    FROM information_schema.tables
    WHERE table_schema = 'public';
""")

tables = cur.fetchall()

# Print table names
print("Tables in database:")
for table in tables:
    print(table[0])

Tables in database:
BlinkIT
Blinkit Grocery Data


In [4]:
# BlinkIT table fetch
df1 = pd.read_sql('SELECT * FROM "BlinkIT";', conn)
df1.head()

Unnamed: 0,Item_Fat_Content,Item_Identifier,Item_Type,Outlet_Establishment_Year,Outlet_Identifier,Outlet_Location_Type,Outlet_Size,Outlet_Type,Item_Visibility,Item_Weight,Sales,Rating
0,Regular,FDX32,Fruits and Vegetables,2012,OUT049,Tier 1,Medium,Supermarket Type1,0.100014,15.1,145.4786,5.0
1,Low Fat,NCB42,Health and Hygiene,2022,OUT018,Tier 3,Medium,Supermarket Type2,0.008596,11.8,115.3492,5.0
2,Regular,FDR28,Frozen Foods,2016,OUT046,Tier 1,Small,Supermarket Type1,0.025896,13.85,165.021,5.0
3,Regular,FDL50,Canned,2014,OUT013,Tier 3,High,Supermarket Type1,0.042278,12.15,126.5046,5.0
4,Low Fat,DRI25,Soft Drinks,2015,OUT045,Tier 2,Small,Supermarket Type1,0.03397,19.6,55.1614,5.0


In [5]:
# Blinkit Grocery Data table fetch
df2 = pd.read_sql('SELECT * FROM "Blinkit Grocery Data";', conn)
df2.head()

Unnamed: 0,Item_Fat_Content,Item_Identifier,Item_Type,Outlet_Establishment_Year,Outlet_Identifier,Outlet_Location_Type,Outlet_Size,Outlet_Type,Item_Visibility,Item_Weight,Sales,Rating
0,Regular,FDX32,Fruits and Vegetables,2012,OUT049,Tier 1,Medium,Supermarket Type1,0.100014,15.1,145.4786,5.0
1,Low Fat,NCB42,Health and Hygiene,2022,OUT018,Tier 3,Medium,Supermarket Type2,0.008596,11.8,115.3492,5.0
2,Regular,FDR28,Frozen Foods,2016,OUT046,Tier 1,Small,Supermarket Type1,0.025896,13.85,165.021,5.0
3,Regular,FDL50,Canned,2014,OUT013,Tier 3,High,Supermarket Type1,0.042278,12.15,126.5046,5.0
4,Low Fat,DRI25,Soft Drinks,2015,OUT045,Tier 2,Small,Supermarket Type1,0.03397,19.6,55.1614,5.0
