In [1]:
import pandas as pd
from sqlalchemy import create_engine

In [3]:
# Pass the connection string to a variable, conn_url
conn_url = 'postgresql://postgres:123@localhost/project_test'

# Create an engine that connects to PostgreSQL server
engine = create_engine(conn_url)

# Establish a connection
connection = engine.connect()
connection.close

<bound method Connection.close of <sqlalchemy.engine.base.Connection object at 0x0000027D4B59A020>>

In [3]:
drop_and_create_table_statement = """
DROP TABLE IF EXISTS product_promo CASCADE;
DROP TABLE IF EXISTS inventory_purchase CASCADE;
DROP TABLE IF EXISTS purchase_order CASCADE;
DROP TABLE IF EXISTS order_sale CASCADE;
DROP TABLE IF EXISTS order_delivery CASCADE;
DROP TABLE IF EXISTS orders CASCADE;
DROP TABLE IF EXISTS inventory CASCADE;
DROP TABLE IF EXISTS employment CASCADE;
DROP TABLE IF EXISTS product CASCADE;
DROP TABLE IF EXISTS category CASCADE;
DROP TABLE IF EXISTS supplier CASCADE;
DROP TABLE IF EXISTS employee CASCADE;
DROP TABLE IF EXISTS customer CASCADE;
DROP TABLE IF EXISTS store CASCADE;
DROP TABLE IF EXISTS address CASCADE;

CREATE TABLE address(
    id          SERIAL NOT NULL,
    street      VARCHAR(50),
    city        VARCHAR(50),
    state       VARCHAR(50),
    zip_code    VARCHAR(10),
    PRIMARY KEY (id)
);

CREATE TABLE store(
    id          SERIAL NOT NULL,
    name        VARCHAR(50),
    size   DOUBLE PRECISION,
    address_id  INT NOT NULL,
    PRIMARY KEY (id),
    FOREIGN KEY (address_id) REFERENCES address(id)
);

CREATE TABLE customer(
    id              SERIAL NOT NULL,
    phone_number    VARCHAR(20),
    email           VARCHAR(50),
    membership      VARCHAR(20),
    address_id      INT,
    PRIMARY KEY (id),
    FOREIGN KEY (address_id) REFERENCES address (id)
);

CREATE TABLE employee(
    id          SERIAL NOT NULL,
    first_name  VARCHAR(100),
    last_name   VARCHAR(100),
    PRIMARY KEY (id)
);

CREATE TABLE supplier(
    id              SERIAL NOT NULL,
    name            VARCHAR(255),
    contact_name    VARCHAR(255),
    contact_phone   VARCHAR(20),
    PRIMARY KEY (id)
);

CREATE TABLE category(
    id        SERIAL NOT NULL,
    name      VARCHAR(255),
    PRIMARY KEY (id)
);

CREATE TABLE product( 
    id              SERIAL NOT NULL,
    name            VARCHAR(255),
    category_id     INT NOT NULL,
    PRIMARY KEY (id),
    FOREIGN KEY (category_id) REFERENCES category(id)
);

CREATE TABLE employment(
    employee_id INT NOT NULL,
    store_id    INT NOT NULL,
    salary      DECIMAL(10, 2),
    PRIMARY KEY (employee_id,store_id),
    FOREIGN KEY (employee_id) REFERENCES employee(id),
    FOREIGN KEY (store_id) REFERENCES store(id)
);

CREATE TABLE inventory(
    store_id    INT NOT NULL,
    product_id  INT NOT NULL,
    quantity    INT,
    PRIMARY KEY (store_id,product_id),
    FOREIGN KEY (store_id) REFERENCES store(id),
    FOREIGN KEY (product_id) REFERENCES product(id)
);

CREATE TABLE orders(
    id              SERIAL NOT NULL,
    store_id        INT,
    customer_id     INT,
    payment_type    VARCHAR(20),
    amount          DECIMAL(10,2),
    datetime        TIMESTAMP,
    PRIMARY KEY (id),
    FOREIGN KEY (store_id) REFERENCES store(id),
    FOREIGN KEY (customer_id) REFERENCES customer(id)
);

CREATE TABLE order_delivery(
    order_id        INT NOT NULL,
    address_id      INT NOT NULL,
    delivery_time   TIMESTAMP,
    PRIMARY KEY (order_id),
    FOREIGN KEY (order_id) REFERENCES orders(id),
    FOREIGN KEY (address_id) REFERENCES address(id)
);

CREATE TABLE order_sale(
    order_id        INT NOT NULL,
    product_id      INT NOT NULL,
    quantity        INT,
    price           DECIMAL(10, 2),
    PRIMARY KEY (order_id, product_id),
    FOREIGN KEY (order_id) REFERENCES orders(id),
    FOREIGN KEY (product_id) REFERENCES product(id)
);

CREATE TABLE purchase_order(
    id                  SERIAL NOT NULL,
    supplier_id         INT,
    store_id            INT,
    amount              Decimal(10,2),
    payment_type        VARCHAR(20),
    datetime            TIMESTAMP,
    PRIMARY KEY (id),
    FOREIGN KEY (supplier_id) REFERENCES supplier(id)
);

CREATE TABLE inventory_purchase(
    purchase_order_id   INT NOT NULL,
    product_id          INT NOT NULL,
    quantity            INT,
    price               DECIMAL(10, 2),
    PRIMARY KEY (purchase_order_id, product_id),
    FOREIGN KEY (purchase_order_id) REFERENCES purchase_order(id),
    FOREIGN KEY (product_id) REFERENCES product(id)
);

CREATE TABLE product_promo(
    id                      VARCHAR(20) NOT NULL,
    product_id              INT NOT NULL,
    store_id                INT NOT NULL,
    discount_percentage     DECIMAL(5, 2),
    start_date              DATE,
    end_date                DATE,
    PRIMARY KEY (id),
    FOREIGN KEY (product_id) REFERENCES product(id),
    FOREIGN KEY (store_id) REFERENCES store(id)
);

"""

trigger_order_sale_inventory = """
CREATE OR REPLACE FUNCTION decrease_inventory()
RETURNS TRIGGER AS $$
BEGIN
  UPDATE inventory
  SET quantity = quantity - NEW.quantity
  WHERE store_id = (
          SELECT store_id FROM orders WHERE id = NEW.order_id
        )
        AND product_id = NEW.product_id;

  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trigger_decrease_inventory
AFTER INSERT ON order_sale
FOR EACH ROW EXECUTE FUNCTION decrease_inventory();
"""

trigger_purchase_order_inventory = """
CREATE OR REPLACE FUNCTION increase_inventory()
RETURNS TRIGGER AS $$
BEGIN
  UPDATE inventory
  SET quantity = quantity + NEW.quantity
  WHERE store_id = (
          SELECT store_id FROM purchase_order WHERE id = NEW.purchase_order_id
        )
        AND product_id = NEW.product_id;

  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trigger_increase_inventory
AFTER INSERT ON inventory_purchase
FOR EACH ROW EXECUTE FUNCTION increase_inventory();
"""

In [4]:
connection.execute(drop_and_create_table_statement)
connection.execute(trigger_order_sale_inventory)
connection.execute(trigger_purchase_order_inventory)

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x293f219c0a0>

In [5]:
expected_table_columns = {
    'address': ['id', 'street', 'city', 'state', 'zip_code'],
    'store': ['id', 'name', 'size', 'address_id'],
    'customer': ['id', 'phone_number', 'email', 'membership', 'address_id'],
    'employee': ['id', 'first_name', 'last_name'],
    'supplier': ['id', 'name', 'contact_name', 'contact_phone'],
    'category': ['id', 'name'],
    'product': ['id', 'name', 'category_id'],
    'employment': ['employee_id', 'store_id', 'salary'],
    'inventory': ['store_id', 'product_id', 'quantity'],
    'orders': ['id', 'store_id', 'customer_id', 'payment_type', 'amount', 'datetime'],
    'order_delivery': ['order_id', 'address_id', 'delivery_time'],
    'order_sale': ['order_id', 'product_id', 'quantity', 'price'],
    'purchase_order': ['id', 'supplier_id', 'store_id', 'amount', 'payment_type', 'datetime'],
    'inventory_purchase': ['purchase_order_id', 'product_id', 'quantity', 'price'],
    'product_promo': ['id', 'product_id', 'store_id', 'discount_percentage', 'start_date', 'end_date'],
}

# Check the columns for each DataFrame and insert if they match
def insert_df_sql(df, table_name, engine):
    try:
        if set(df.columns) == set(expected_table_columns[table_name]):
            df.to_sql(table_name, con=engine, index=False, if_exists='append')
            print(f"Inserted data into {table_name} with expected columns")
        else:
            print(f"Column mismatch for {table_name}. Data not inserted.")
    except Exception as e:
        print(e)

In [6]:
# read in all csv
table_df = dict()
for key in expected_table_columns.keys():
    try:
        table_df[key] = pd.read_csv(f'{key}.csv')
        print(f'Read in {key}.csv correctly')
    except Exception as e:
        print(e)

Read in address.csv correctly
Read in store.csv correctly
Read in customer.csv correctly
Read in employee.csv correctly
Read in supplier.csv correctly
Read in category.csv correctly
Read in product.csv correctly
Read in employment.csv correctly
Read in inventory.csv correctly
Read in orders.csv correctly
Read in order_delivery.csv correctly
Read in order_sale.csv correctly
Read in purchase_order.csv correctly
Read in inventory_purchase.csv correctly
Read in product_promo.csv correctly


In [7]:
# datetime formatting
# TIMESTAMP = 'orders': 'datetime' | 'order_delivery': 'delivery_time' | 'purchase_order': 'datetime'
table_df['orders']['datetime'] = pd.to_datetime(table_df['orders']['datetime'])
table_df['order_delivery']['delivery_time'] = pd.to_datetime(table_df['order_delivery']['delivery_time'], format='%m/%d/%Y %H:%M')
table_df['purchase_order']['datetime'] = pd.to_datetime(table_df['purchase_order']['datetime'], format='%m/%d/%y %H:%M')

# DATE = 'product_promo': 'start_date', 'end_date'
table_df['product_promo']['start_date'] = pd.to_datetime(table_df['product_promo']['start_date'])
table_df['product_promo']['end_date'] = pd.to_datetime(table_df['product_promo']['end_date'])

In [8]:
# Insert tables without FK
tables_list = ['address','employee','supplier','category']
for table in tables_list:
    insert_df_sql(table_df[table], table, engine)

Inserted data into address with expected columns
Inserted data into employee with expected columns
Inserted data into supplier with expected columns
Inserted data into category with expected columns


In [9]:
# Insert static tables
tables_list = ['store','customer','employment','product','inventory']
for table in tables_list:
    insert_df_sql(table_df[table], table, engine)

Inserted data into store with expected columns
Inserted data into customer with expected columns
Inserted data into employment with expected columns
Inserted data into product with expected columns
Inserted data into inventory with expected columns


In [10]:
# Insert dynamic tables
tables_list = ['product_promo','purchase_order','inventory_purchase','orders','order_delivery','order_sale']
for table in tables_list:
    insert_df_sql(table_df[table], table, engine)

Inserted data into product_promo with expected columns
Inserted data into purchase_order with expected columns
Inserted data into inventory_purchase with expected columns
Inserted data into orders with expected columns
Inserted data into order_delivery with expected columns
Inserted data into order_sale with expected columns
