In [3]:
import os
import logging
import pandas as pd
import psycopg2
import warnings
from sqlalchemy import create_engine

warnings.filterwarnings("ignore", category=UserWarning)
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Configuration
postgres_connection_string = {
    "host": "localhost",
    "database": "northwind",
    "user": "postgres",
    "password": "thewindisblowing",
    "port": 5432
}


csv_file_path = '.\data\order_details.csv'
output_directory = '.\data'

# Connect to the Postgres database
logging.info('Connecting to the Postgres database...')
conn = psycopg2.connect(**postgres_connection_string)
cursor = conn.cursor()


2023-07-11 14:15:50,679 - INFO - Connecting to the Postgres database...


In [4]:
# List tables
logging.info('Listing tables...')
query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"
cursor.execute(query)
tables = cursor.fetchall()

# Display the tables found
logging.info('Tables found:')
for table in tables:
    logging.info(table[0])

2023-07-11 14:15:51,129 - INFO - Listing tables...
2023-07-11 14:15:51,141 - INFO - Tables found:
2023-07-11 14:15:51,145 - INFO - us_states
2023-07-11 14:15:51,146 - INFO - customers
2023-07-11 14:15:51,147 - INFO - orders
2023-07-11 14:15:51,148 - INFO - employees
2023-07-11 14:15:51,149 - INFO - shippers
2023-07-11 14:15:51,150 - INFO - categories
2023-07-11 14:15:51,151 - INFO - products
2023-07-11 14:15:51,152 - INFO - suppliers
2023-07-11 14:15:51,153 - INFO - region
2023-07-11 14:15:51,154 - INFO - territories
2023-07-11 14:15:51,155 - INFO - employee_territories
2023-07-11 14:15:51,156 - INFO - customer_demographics
2023-07-11 14:15:51,157 - INFO - customer_customer_demo


In [5]:
# List of tables to extract
tables = [
    'us_states',
    'customers',
    'orders',
    'employees',
    'shippers',
    'categories',
    'products',
    'suppliers',
    'region',
    'territories',
    'employee_territories',
    'customer_demographics',
    'customer_customer_demo'
]  

execution_day = '2021-01-01'  # Replace with the desired execution day

for table in tables:
    logging.info(f'Extracting data from the table: {table}')
    query = f'SELECT * FROM {table};'
    df = pd.read_sql_query(query, conn)
    output_path = os.path.join(output_directory, "postgres", table, execution_day, f'{table}.csv')
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    df.to_csv(output_path, index=False)
    logging.info(f'Data from the table {table} extracted and saved to {output_path}')


2023-07-11 14:15:51,180 - INFO - Extracting data from the table: us_states
2023-07-11 14:15:51,197 - INFO - Data from the table us_states extracted and saved to .\data\postgres\us_states\2021-01-01\us_states.csv
2023-07-11 14:15:51,201 - INFO - Extracting data from the table: customers
2023-07-11 14:15:51,213 - INFO - Data from the table customers extracted and saved to .\data\postgres\customers\2021-01-01\customers.csv
2023-07-11 14:15:51,218 - INFO - Extracting data from the table: orders
2023-07-11 14:15:51,245 - INFO - Data from the table orders extracted and saved to .\data\postgres\orders\2021-01-01\orders.csv
2023-07-11 14:15:51,249 - INFO - Extracting data from the table: employees
2023-07-11 14:15:51,260 - INFO - Data from the table employees extracted and saved to .\data\postgres\employees\2021-01-01\employees.csv
2023-07-11 14:15:51,261 - INFO - Extracting data from the table: shippers
2023-07-11 14:15:51,274 - INFO - Data from the table shippers extracted and saved to .\dat

In [6]:
# Check integrity of each table
logging.info('Checking table integrity...')
for table in tables:
    query = f"SELECT COUNT(*) FROM {table}"
    cursor.execute(query)
    result = cursor.fetchone()
    count = result[0]
    logging.info(f"Table {table} has {count} rows.")

2023-07-11 14:15:51,457 - INFO - Checking table integrity...
2023-07-11 14:15:51,464 - INFO - Table us_states has 51 rows.
2023-07-11 14:15:51,467 - INFO - Table customers has 91 rows.
2023-07-11 14:15:51,468 - INFO - Table orders has 830 rows.
2023-07-11 14:15:51,470 - INFO - Table employees has 9 rows.
2023-07-11 14:15:51,471 - INFO - Table shippers has 6 rows.
2023-07-11 14:15:51,473 - INFO - Table categories has 8 rows.
2023-07-11 14:15:51,475 - INFO - Table products has 77 rows.
2023-07-11 14:15:51,478 - INFO - Table suppliers has 29 rows.
2023-07-11 14:15:51,479 - INFO - Table region has 4 rows.
2023-07-11 14:15:51,481 - INFO - Table territories has 53 rows.
2023-07-11 14:15:51,482 - INFO - Table employee_territories has 49 rows.
2023-07-11 14:15:51,484 - INFO - Table customer_demographics has 0 rows.
2023-07-11 14:15:51,485 - INFO - Table customer_customer_demo has 0 rows.


In [7]:
# Extract data from the CSV file
logging.info('Extracting data from the CSV file...')
df = pd.read_csv(csv_file_path)
output_path = os.path.join(output_directory, 'csv', execution_day, 'orders_details.csv')
os.makedirs(os.path.dirname(output_path), exist_ok=True)
df.to_csv(output_path, index=False)
logging.info(f'Data from the CSV file extracted and saved to {output_path}')

# Close the Postgres connection
cursor.close()
conn.close()

2023-07-11 14:15:51,552 - INFO - Extracting data from the CSV file...
2023-07-11 14:15:51,591 - INFO - Data from the CSV file extracted and saved to .\data\csv\2021-01-01\orders_details.csv


In [8]:
# Load data from the local disk into the final database
logging.info('Connecting to the final database...')
final_database_connection_string = {
    'host': 'localhost',
    'database': 'northwind_v2',
    'user': 'postgres',
    'password': '123456'
}

# Connect to the final database
conn = create_engine('postgresql://postgres:123456@localhost/northwind_v2')


2023-07-11 14:15:51,622 - INFO - Connecting to the final database...


In [9]:
# Load data from the Postgres extracts
postgres_extract_directory = os.path.join(output_directory, 'postgres', execution_day)
for table in tables:
    extract_file_path = os.path.join(output_directory, 'postgres', table, execution_day, f'{table}.csv')
    df = pd.read_csv(extract_file_path)
    df.to_sql(table, conn, if_exists='replace', index=False)
    logging.info(f'Data from {table} loaded into the final database')

2023-07-11 14:15:52,455 - INFO - Data from us_states loaded into the final database
2023-07-11 14:15:52,540 - INFO - Data from customers loaded into the final database
2023-07-11 14:15:52,707 - INFO - Data from orders loaded into the final database
2023-07-11 14:15:52,782 - INFO - Data from employees loaded into the final database
2023-07-11 14:15:52,843 - INFO - Data from shippers loaded into the final database
2023-07-11 14:15:52,907 - INFO - Data from categories loaded into the final database
2023-07-11 14:15:52,992 - INFO - Data from products loaded into the final database
2023-07-11 14:15:53,070 - INFO - Data from suppliers loaded into the final database
2023-07-11 14:15:53,140 - INFO - Data from region loaded into the final database
2023-07-11 14:15:53,207 - INFO - Data from territories loaded into the final database
2023-07-11 14:15:53,262 - INFO - Data from employee_territories loaded into the final database
2023-07-11 14:15:53,347 - INFO - Data from customer_demographics loade

In [10]:
# Load data from the CSV extract
csv_extract_directory = os.path.join(output_directory, 'csv', execution_day)
csv_extract_file_path = os.path.join(csv_extract_directory, 'orders_details.csv')
df = pd.read_csv(csv_extract_file_path)
df.to_sql('order_details', conn, if_exists='replace', index=False)
logging.info('Data from CSV loaded into the final database')

2023-07-11 14:15:53,709 - INFO - Data from CSV loaded into the final database


In [11]:
# Run a query to retrieve the orders and their details from the final database
logging.info('Running the query to retrieve orders and details...')
final_query = """
SELECT o.*, od.*
FROM orders o
JOIN order_details od ON o.order_id = od.order_id;
"""
output_file_path = os.path.join(output_directory, 'final_result', execution_day)
os.makedirs(os.path.join(output_file_path), exist_ok=True)
output_file_path = os.path.join(output_file_path, "final_result.csv")

df = pd.read_sql_query(final_query, conn)
df.to_csv(output_file_path, index=False)
logging.info(f'Final query result saved to {output_file_path}')

logging.info('Pipeline completed successfully.')


2023-07-11 14:15:53,743 - INFO - Running the query to retrieve orders and details...
2023-07-11 14:15:53,850 - INFO - Final query result saved to .\data\final_result\2021-01-01\final_result.csv
2023-07-11 14:15:53,852 - INFO - Pipeline completed successfully.
