Data ingestion

In [None]:
from sqlalchemy import create_engine, Column, Integer, String, MetaData, Table, text, inspect
from pandas.io.sql import get_schema

import pandas as pd

from dotenv import load_dotenv
import os

In [None]:
# Load environment variables from .env file
load_dotenv()

# Get the password from the environment variable
password = os.getenv("POSTGRES_PASSWORD")

# Define the database table and source data file name
table_name = 'customer_features_test' # for example purposes, change as needed
file_name = 'customer_features' # for example purposes, change as needed

In [None]:
# Define database connection
db_uri = f"postgresql+psycopg2://user:{password}@localhost:4321/mydb"
engine = create_engine(db_uri)

In [None]:
# Read the CSV file with the customer features into a DataFrame
df = pd.read_csv(f'../data/{file_name}.csv')

In [None]:
# experimental code to insert data into the database
# def get_date_by_month(month: int, year: int = 2025, day: int = 1):
#     return pd.to_datetime(f"{year}-{month:02d}-{day:02d}")

# # Example usage:
# print(get_date_by_month(10))

In [None]:
df.head()

In [None]:
# Convert the 'date' column to datetime format with UTC timezone
df['date'] = pd.to_datetime(df['date'], utc=True)

In [None]:
# Confirm the data type of the 'date' column
df.date.dtype

In [None]:
# Get the schema of the dataframe as it would be created in SQL

inspector = inspect(engine)
schema_sql = get_schema(df, name='table_name', con=engine)
print(schema_sql)

schema_sql = schema_sql.replace(
    'CREATE TABLE',
    'CREATE TABLE IF NOT EXISTS',
    1  # The '1' ensures we only replace the first instance
)
print(schema_sql)

... see the difference? "IF NOT EXISTS" added!

In [None]:
# 1. Create an inspector object from the engine
inspector = inspect(engine)

# 2. Check if the table already exists in the database
if not inspector.has_table(table_name):
    print(f'Table {table_name} does not exist. Creating it now...')
    with engine.connect() as conn:
        conn.execute(text(schema_sql))
        conn.commit() # Commit the table creation
    print(f"Table {table_name} created successfully.")
else:
    print(f"Table {table_name} already exists.")

# 3. Append data
print(f'Appending data to {table_name} table...')
with engine.connect() as conn:
    df.to_sql(table_name, con=conn, if_exists='append', index=False)
    # The to_sql method in pandas often uses its own transaction handling,
    # but an explicit commit here is safe and good practice in SQLAlchemy 2.0.
    conn.commit()

print("Data successfully written to the database.")
