In [1]:
import pandas as pd
import numpy as np
import logging 
import os
import requests
import json

# Create a logger 
logger = logging.getLogger('etl_logger') 
logger.setLevel(logging.INFO) 
file_handler = logging.FileHandler('etl_log.log') 
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') 
file_handler.setFormatter(formatter) 
logger.addHandler(file_handler) 


In [2]:
data= pd.read_csv(r'C:\Users\tevin_solutech\Desktop\Tems\Bootcamp\q2-data-engineering-workshop\04-data-warehousing\cleaned_sample_agric_commodities.csv')
data.shape

(2000, 9)

In [3]:
# Example: Handling missing values
data.dropna(inplace=True)

# Example: Converting data types
data['date'] = pd.to_datetime(data['date'])


In [4]:
# Create dimension tables
dim_commodity = data[['commodity', 'classification']].drop_duplicates().reset_index(drop=True)
dim_commodity['commodity_id'] = dim_commodity.index + 1

dim_market = data[['market', 'county']].drop_duplicates().reset_index(drop=True)
dim_market['market_id'] = dim_market.index + 1

In [5]:
data.head()

Unnamed: 0,id,commodity,classification,market,wholesale,retail,supply_volume,county,date
0,1,Wheat,Not Classified,Kibiok,75.0,80.0,450.0,Nandi,2024-03-10
1,2,Wheat,Not Classified,Kabiyet Market,75.0,80.0,1000.0,Nandi,2024-03-08
2,3,Wheat,Not Classified,Nakuru Wakulima,75.0,80.0,0.0,Nakuru,2024-03-08
3,4,Wheat,Not Classified,Daraja Mbili,75.0,80.0,250.0,Kisii,2024-03-08
4,5,Wheat,Not Classified,Maua,75.0,80.0,1980.0,Meru,2024-03-07


In [6]:
# Create the fact table
fact_commodity_market = data.merge(dim_commodity, on=['commodity', 'classification']) \
                            .merge(dim_market, on=['market', 'county']) \
                            [['commodity_id', 'market_id', 'wholesale', 'retail', 'supply_volume', 'date']]

In [7]:
fact_commodity_market = data.merge(dim_commodity, on=['commodity', 'classification']) \
                            .merge(dim_market, on=['market', 'county'])

In [8]:
fact_commodity_market

Unnamed: 0,id,commodity,classification,market,wholesale,retail,supply_volume,county,date,commodity_id,market_id
0,1,Wheat,Not Classified,Kibiok,75.0,80.0,450.0,Nandi,2024-03-10,1,1
1,2,Wheat,Not Classified,Kabiyet Market,75.0,80.0,1000.0,Nandi,2024-03-08,1,2
2,3,Wheat,Not Classified,Nakuru Wakulima,75.0,80.0,0.0,Nakuru,2024-03-08,1,3
3,4,Wheat,Not Classified,Daraja Mbili,75.0,80.0,250.0,Kisii,2024-03-08,1,4
4,5,Wheat,Not Classified,Maua,75.0,80.0,1980.0,Meru,2024-03-07,1,5
...,...,...,...,...,...,...,...,...,...,...,...
1995,1996,Dry Maize,White Maize,Kerugoya,75.0,80.0,5000.0,Kirinyaga,2024-03-01,12,82
1996,1997,Red Sorghum,Not Classified,Kagio,75.0,80.0,0.0,Kirinyaga,2024-01-15,2,7
1997,1998,Red Sorghum,Not Classified,Kangemi Market,75.0,80.0,0.0,Nairobi,2024-01-15,2,21
1998,1999,Rice,Sindano,Nakuru Wakulima,75.0,80.0,0.0,Nakuru,2024-02-08,3,3


In [9]:
dim_commodity.rename(columns={'commodity': 'Commodity_Name', 'classification': 'Classification','commodity_id':'Commodity_ID'}, inplace=True)
dim_market.rename(columns={'market': 'Market_Name', 'county': 'County','market_id':'Market_ID'}, inplace=True)
fact_commodity_market.rename(columns={'wholesale': 'Wholesale_Price', 
                                      'retail': 'Retail_Price',
                                      'supply_volume':'Supply_Volume',
                                      'date':'Date' , 
                                      'commodity_id':'Commodity_ID',
                                      'market_id':'Market_ID'}, inplace=True)

In [10]:
import psycopg2 as db

# Create a connection with the database
dbase = db.connect(
    host="aws-0-eu-central-1.pooler.supabase.com",
    database="postgres",
    user="postgres.jqanxzpkmpzbmlufiazd",
    password="",
    port=6543
)

# Create a cursor object
cursor = dbase.cursor()

try:
    # Insert data into dim_commodity
    for _, row in dim_commodity.iterrows():
        cursor.execute(
            """
            INSERT INTO dim_commodity (Commodity_ID, Commodity_Name, Classification)
            VALUES (%s, %s, %s)
            ON CONFLICT (Commodity_ID) 
            DO UPDATE SET Commodity_Name = EXCLUDED.Commodity_Name, Classification = EXCLUDED.Classification
            """,
            (row['Commodity_ID'], row['Commodity_Name'], row['Classification'])
        )

    # Insert data into dim_market
    for _, row in dim_market.iterrows():
        cursor.execute(
            """
            INSERT INTO dim_market (Market_ID, County, Market_Name)
            VALUES (%s, %s, %s)
            ON CONFLICT (Market_ID)
            DO UPDATE SET County = EXCLUDED.County, Market_Name = EXCLUDED.Market_Name
            """,
            (row['Market_ID'], row['County'], row['Market_Name'])
        )

    # Insert data into fact_commodity_market
    for _, row in fact_commodity_market.iterrows():
        cursor.execute(
            """
            INSERT INTO fact_commodity_market (Commodity_ID, Market_ID, Wholesale_Price, Retail_Price, Supply_Volume, Date_ID)
            VALUES (%s, %s, %s, %s, %s, %s)
            ON CONFLICT (Commodity_ID, Market_ID)
            DO UPDATE SET Wholesale_Price = EXCLUDED.Wholesale_Price, Retail_Price = EXCLUDED.Retail_Price, Supply_Volume = EXCLUDED.Supply_Volume, Date_ID = EXCLUDED.Date_ID
            """,
            (row['Commodity_ID'], row['Market_ID'], row['Wholesale_Price'], row['Retail_Price'], row['Supply_Volume'], row['Date_ID'])
        )

    # Commit the transaction
    dbase.commit()

except Exception as e:
    print(f"Error occurred: {str(e)}")
    dbase.rollback()

finally:
    # Close the cursor and connection
    cursor.close()
    dbase.close()


Error occurred: column "commodity_id" of relation "dim_commodity" does not exist
LINE 2:             INSERT INTO dim_commodity (Commodity_ID, Commodi...
                                               ^

