In [3]:
import pandas as pd
import numpy as np
import psycopg2 as pg
import matplotlib.pyplot as plt
import seaborn as sns
from io import StringIO
from sqlalchemy import create_engine
from sqlalchemy import Table, Column, String, MetaData
import os

In [7]:
os.chdir('data')

In [8]:
# load local files
df1 = pd.read_csv("clean_sales_1.csv", index_col=0)
df2 = pd.read_csv("clean_sales_2.csv", index_col=0)
df_markets = pd.read_csv("aggregate_market_total_sales.csv", index_col=0)

In [3]:
# connect to PostgreSQL database on azure
# connection string information
host = "tutorial-db.postgres.database.azure.com"
dbname = "postgres"
user = "GTM_admin@tutorial-db"
password = "Enablement1"
sslmode = "require"
port = "5432"

In [20]:
# Construct connection string
conn_string = (
    f"host={host} user={user} dbname={dbname} password={password} sslmode={sslmode}"
)
conn = pg.connect(conn_string)
print("Connection established")

cursor = conn.cursor()

Connection established


In [5]:
# postgres+psycop2://myuser:[email protected]ample.com:5432/mydatabase

## check for duplicate invoice numbers across the data sets

In [6]:
print("min: ", df1["invoice"].min())
print("max: ", df1["invoice"].max())

min:  1005
max:  23137


In [7]:
print("min: ", df2["invoice"].min())
print("min: ", df2["invoice"].max())

min:  2
min:  47519


In [8]:
i1 = set(df1["invoice"].unique().tolist())
i2 = set(df2["invoice"].unique().tolist())
same_invoice = i1.intersection(i2)
len(same_invoice)

11715

### There are a large number of duplicates. These will need to be changed as this will be used as a key to join in other data once the data is split into separate tables

In [9]:
# offset the second data set information
df2["invoice"] = df2["invoice"] + 31343

In [10]:
i1 = set(df1["invoice"].unique().tolist())
i2 = set(df2["invoice"].unique().tolist())
same_invoice = i1.intersection(i2)
len(same_invoice)

0

In [11]:
# check the package_id for redundancies between stores
# this would be fine if I had used identical mapping to change the original item names for both stores, but I didn't
df2["package_id"] = df2["package_id"].map(lambda x: "{:.0f}".format(x))
p1 = set(df1["package_id"].unique().tolist())
p2 = set(df2["package_id"].unique().tolist())
same_package = p1.intersection(p2)
len(same_package)

0

## Join the sales transactions data sets

In [12]:
df = pd.concat((df1, df2), axis=0)

## Separate the transaction basket and item information. Package ID can be a key to join it back

In [13]:
transaction_contents = ["invoice", "package_id", "qty", "unit_price", "sale_amount"]
transaction_status = [
    "invoice",
    "date",
    "day",
    "time",
    "transaction",
    "category",
    "store_category",
    "campaign",
]
item_columns = ["package_id", "item_name"]

df_contents = df[transaction_contents].copy().reset_index(drop=True)
df_status = df[transaction_status].copy().reset_index(drop=True)
df_items = pd.DataFrame(df.groupby("package_id")[item_columns].max()).reset_index(
    drop=True
)  # in case there are some non-unique package_id / item_name pairs, take the most common one

In [14]:
df_contents.head()

Unnamed: 0,invoice,package_id,qty,unit_price,sale_amount
0,23137,A000006690,0.28,3.7,1.04
1,23136,A000006815,0.47,2.33,1.1
2,23135,A000006608,1.66,2.07,3.44
3,23134,A000006877,1.66,1.29,2.14
4,23134,A000006799,1.66,1.29,2.14


In [15]:
df_status.head()

Unnamed: 0,invoice,date,day,time,transaction,category,store_category,campaign
0,23137,2019-05-21,Tue,14:35:00,Good,Retail,Type_A,discount_type_1
1,23136,2019-05-21,Tue,14:28:00,Good,Retail,Type_A,discount_type_2
2,23135,2019-05-21,Tue,14:24:00,Good,Retail,Type_A,discount_type_1
3,23134,2019-05-21,Tue,14:21:00,Good,Retail,Type_A,discount_type_3
4,23134,2019-05-21,Tue,14:21:00,Good,Retail,Type_A,discount_type_3


In [16]:
df_items.head()

Unnamed: 0,package_id,item_name
0,1700000002,Snickers
1,1700000003,Snickers Crisper
2,1700000004,Hersheys Krackel
3,1700000005,3 Musketeers
4,1700000006,Reeses Peanut Butter cup


In [17]:
df_markets.head()

Unnamed: 0,Month,category_type,metro_area,county_1,county_2,county_3,county_4,county_5,county_6,total_market_sales
0,2015-10-01,Type_A,20546418.0,16398610.0,1743830.0,1881358.0,435580.0,,87040.0,41092836.0
1,2015-10-01,Type_B,25067072.0,19085375.0,,3091341.0,2446592.0,443764.0,,50134144.0
2,2015-03-01,Type_A,18393590.0,14406613.0,1562845.0,1861813.0,479975.0,,82344.0,36787180.0
3,2015-03-01,Type_B,3997657.0,862396.0,,1020552.0,1497796.0,616913.0,,7995314.0
4,2014-05-01,Type_A,18265403.0,14150620.0,1511922.0,2037535.0,467977.0,,97349.0,36530806.0


# Write the dataframes to the Azure PostgreSQL database as tables

In [18]:
# sqlalchemy can be used to write a pandas dataframe into a SQL table, one row at a time
# create the sql-write engine
engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}/{dbname}")

In [19]:
# df_status had an error, delete the table and start over
# define the bad_table and SQL statement
bad_table = "demand_forecasting_transactions_contents"
delete_statement = "DROP TABLE IF EXISTS %s;" % bad_table

# Execute the drop table command
cursor.execute(delete_statement)
conn.commit()
# conn.close()

In [21]:
# create a psql table from df_contents
# use the pandas .to_sql function with the sqlalchemy engine
df_contents.to_sql(
    "demand_forecasting_transactions_contents", engine
)  # this process will take some time

In [96]:
# create a psql table from df_items
df_items.to_sql("demand_forecasting_items", engine)  # this process will take some time

In [367]:
# create a psql table from df_status
df_status.to_sql(
    "demand_forecasting_transactions_status", engine
)  # this process will take some time

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3326, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-367-6c1f5385b75c>", line 1, in <module>
    df_status.to_sql('demand_forecasting_transactions_status', engine)
  File "/opt/anaconda3/lib/python3.7/site-packages/pandas/core/generic.py", line 2712, in to_sql
    method=method,
  File "/opt/anaconda3/lib/python3.7/site-packages/pandas/io/sql.py", line 518, in to_sql
    method=method,
  File "/opt/anaconda3/lib/python3.7/site-packages/pandas/io/sql.py", line 1319, in to_sql
    table.create()
  File "/opt/anaconda3/lib/python3.7/site-packages/pandas/io/sql.py", line 644, in create
    "Table '{name}' already exists.".format(name=self.name)
ValueError: Table 'demand_forecasting_transactions_status' already exists.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  F

ValueError: Table 'demand_forecasting_transactions_status' already exists.

### This table-creation had an error (connection interrupted) and did not complete. 

### Unfortunately, this operation will not restart partially through a table creation (at least how it is configured here), and must be restarted.

### The failed-table must be dropped, and the connection commited for the drop to take effect

In [None]:
# get all table names
cursor.execute(
    "select relname from pg_class where relkind='r' and relname !~ '^(pg_|sql_)';"
)
print(cursor.fetchall())

In [90]:
# df_status had an error, delete the table and start over
# define the bad_table and SQL statement
bad_table = "demand_forecasting_items"
delete_statement = "DROP TABLE IF EXISTS %s;" % bad_table

# Execute the drop table command
cursor.execute(delete_statement)
conn.commit()
# conn.close()

In [91]:
# reconnect the psycopg2 cursor
conn = pg.connect(conn_string)
print("Connection established")

cursor = conn.cursor()

Connection established


In [92]:
# get all table names
cursor.execute(
    "select relname from pg_class where relkind='r' and relname !~ '^(pg_|sql_)';"
)
print(cursor.fetchall())

[('demand_forecasting_transaction_status',), ('demand_forecasting_transactions_contents',), ('demand_forecasting_market_data',)]


In [44]:
# re-create a psql table from df_status
df_status.to_sql("demand_forecasting_transaction_status_", engine)

In [51]:
# create a psql table from df_markets
df_markets.to_sql(
    "demand_forecasting_market_data", engine
)  # this process will take some time

In [131]:
# get all table names
cursor.execute(
    "select relname from pg_class where relkind='r' and relname !~ '^(pg_|sql_)';"
)
print(cursor.fetchall())

[('demand_forecasting_items',), ('demand_forecasting_transaction_status',), ('demand_forecasting_transactions_contents',), ('demand_forecasting_market_data',)]


In [217]:
# check and see if column names were included in tables
cursor.execute(
    """
SELECT COLUMN_NAME
FROM information_schema.COLUMNS
WHERE TABLE_NAME = 'demand_forecasting_items';"""
)  #
print(cursor.fetchall())

[('index',), ('package_id',), ('item_name',)]


In [105]:
# everything looks good!
# close the connection for the day
conn.close()

# Query from the Postgres databases

In [274]:
test = "demand_forecasting_transaction_status"
statement = f"""
SELECT * 
from {test};"""
cursor.execute(statement)
out1 = cursor.fetchall()

statement2 = f"""
SELECT COLUMN_NAME
FROM information_schema.COLUMNS
WHERE TABLE_NAME = '{test}';"""
cursor.execute(statement2)
out2 = cursor.fetchall()
names = [x[0] for x in out2]

# create the dataframe
df_test = pd.DataFrame.from_records(
    out1, index="index", columns=names
)  # set_index('index')
df_test.head()

Unnamed: 0_level_0,invoice,date,day,time,transaction,category,store_category,campaign
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
0,23137,2019-05-21,Tue,14:35:00,Good,Retail,Type_A,discount_type_1
1,23136,2019-05-21,Tue,14:28:00,Good,Retail,Type_A,discount_type_2
2,23135,2019-05-21,Tue,14:24:00,Good,Retail,Type_A,discount_type_1
3,23134,2019-05-21,Tue,14:21:00,Good,Retail,Type_A,discount_type_3
4,23134,2019-05-21,Tue,14:21:00,Good,Retail,Type_A,discount_type_3


In [275]:
df_test.columns

Index(['invoice', 'date', 'day', 'time', 'transaction', 'category',
       'store_category', 'campaign'],
      dtype='object')

# Next:
### - Now this data is ready for the next steps in the process.
### - Continue with the 'Demand_forecasting_data_prep_from_SQL.ipynb' notebook