# Create DB

In order to integrate with the API and for the purposes of code modularity, we're using SQLAlchemy here. The process of learning how to set up a database has been enlightening, and I'm motivated to continue learning about databases; both working in concert with APIs; and as a means of producing a Data Warehouse of tangible business insights using CRON jobs and ETL.  

Whether we use a sqlite database or a postgres one, SQLAlchemy provides an abstraction layer which will make our code more modular should we decide to migrate to a new tech stack. 

For this project I'll be using a sqlite3 database, to focus more on local ETL processes and analytics/modelling. Since the data comes in a form which could be made into a relational database (some tables missing primary keys), I've decided to showcase my continued curiosity and interest in managing data; from ingestion, through processing, to a reader-friendly output which can answer business questions. 

In a previous notebook on dynamic programming and algorithms, I noticed that passing the python built-in list() as a default argument (for a function) resulted in all calls to that function utilizing the same list (in the functions definition) within the same instance of the interpreter/notebook.

The Base class (generated from declarative_base()) is the parent of all tables we'll create for this project, and holds the shared metadata from which they will each inherit and map their relations. 

When our table models have been declared (in the models.py file), we can create_all() models, and bind them to the engine. 

In [2]:
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
import pandas as pd
from fastapi import Depends
import models
from my_url import _SQLALCHEMY_DATABASE_URL
from table_loader import TableLoader
#from database import Base


### creating a database
SQLALCHEMY_DATABASE_URL = _SQLALCHEMY_DATABASE_URL

# this engine has a special 'check_same_thread' argument for sqlite3
# engine = create_engine(
#     SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
# )
engine = create_engine(
    SQLALCHEMY_DATABASE_URL)

# the sessionmaker function creates a new Session for our database interactions, and is the preferred method of transacting with the db when using SQLAlchemy
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


# this Base is the parent class of our Tables; they can be found in models.py. 
Base = declarative_base()

# # when we create all of the Table models from within models.py, each child class will have an equivalent metadata object (shared by the parent)
models.Base.metadata.create_all(bind=engine)


print(engine.table_names())

['campaign_desc', 'campaign_table', 'causal_data', 'coupon', 'coupon_redempt', 'hh_demographic', 'product', 'transaction_data']


  print(engine.table_names())


In [3]:
# for table_name in a.table_names:
#     db=next(get_db())
#     db.execute(f'DROP TABLE {table_name} CASCADE')
#     db.commit()
#     print(table_name)

In [4]:
a = TableLoader()
import csv 
chunksize = 1000000
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


for table_name in a.table_names:
    # db.execute(f'DELETE FROM {table_name}')
    print(table_name)
    try:
        with open('../data/'+table_name+'.csv') as f:
            reader = csv.reader(f)
            fields = [x.casefold() for x in next(reader)]
            chunk = []
            for idx, row in enumerate(reader):
                obj = a.name_model_map[table_name](**dict(zip(fields, row)))
                chunk.append(obj)

                if idx % chunksize == 0:
                    print(f'row {idx}', end='\r')
                    db=next(get_db())

                    db.add_all(chunk)
                    db.commit()
                    chunk.clear()

    except StopIteration:  
            db=next(get_db())      
            db.add_all(chunk)
            db.commit()   

    
    except BaseException as e:
        print('failed', e)
        db.rollback()
        
    finally:
        db.close()
        continue

{'campaign_desc': 0,
 'campaign_table': 0,
 'causal_data': 0,
 'coupon': 0,
 'coupon_redempt': 0,
 'hh_demographic': 0,
 'product': 0,
 'transaction_data': 0}
campaign_desc
campaign_table
causal_data


failed 0000
coupon
coupon_redempt
hh_demographic
failed (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "hh_demographic_household_key_key"
DETAIL:  Key (household_key)=(1) already exists.

[SQL: INSERT INTO hh_demographic (age_desc, hh_comp_desc, homeowner_desc, household_key, household_size_desc, income_desc, kid_category_desc, marital_status_code) VALUES (%(age_desc)s, %(hh_comp_desc)s, %(homeowner_desc)s, %(household_key)s, %(household_size_desc)s, %(income_desc)s, %(kid_category_desc)s, %(marital_status_code)s) RETURNING hh_demographic.index]
[parameters: {'age_desc': '65+', 'hh_comp_desc': '2 Adults No Kids', 'homeowner_desc': 'Homeowner', 'household_key': '1', 'household_size_desc': '2', 'income_desc': '35-49K', 'kid_category_desc': 'None/Unknown', 'marital_status_code': 'A'}]
(Background on this error at: https://sqlalche.me/e/14/gkpj)
product
failed (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "product_product

In [None]:
obj = SessionLocal().query(models.TransactionData).first()

In [None]:
SessionLocal().

AttributeError: 'Session' object has no attribute 'Table'



    note: within the *schemas.py* file we can find schemas, which inherit from 'pydantic.BaseModel'. 
    These are for use with fastAPI, and not to be confused with the SQLAlchemy database abstraction.

    Those pydantic dataclass models define some additional parameters which might be necessary in a development database; for example for specific types of transactions. Account creation information like passwords or emails; or additional, mapped information which might be available in the database already for example previous purchases, or queries from an altogether new database ("gold" data tables).

    These schemas add an additional level of complexity on top of our basic database, but they integrate well with my tech stack, and I believe they offer a significant upside in terms of data accessibility. 
    
    By running some sort of local data API; or automating weekly reports of accounts performance
    an API such as this one could serve a variety of data analytics and reporting use-cases. This automation could save time and money for any company requiring data analytics.


# CRUD UTILITIES


Below we define some basic 'CRUD'-style procedures for our local database. First and foremost, we define a dependency -- this wrapper around our sessionmaker() call which we populated as SessionLocal.

In [None]:

# a wrapper to instantiate a session allows us to ensure the database connection is closed after our transaction; that a new Session is generated each time we interact with the db.
# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


# abstracted data write
def post_data(model,
              db:Session):
    db.add(model)
    db.commit()
    db.refresh(model)
    return model

def read_table(model: models.Base,
               db: Session = Depends(get_db)):
    return db.query(models.model).all()

def delete_row(model_type,
               id_col,
               id,
              db: Session):
    db.query(models.model_type).filter(models.model_type.id_col==id).delete()
    db.commit()



def write_transactions(db:Session):
    df = pd.read_csv('../data/transaction_data.csv')
    fields = [x.casefold() for x in df.columns]
    for row in df.values:
        data = dict(zip(fields, row))
        obj = models.TransactionData(**data)
        post_data(model=obj, db=db)
        #print(data)


    # To abstract the local --> db data ingestion step, we could map the Models to their respective file locations...




The cell below runs a check on the tables in our database, dunnhumby.db. In this case, we're going to store the index (skip/offset) for each data source.

# HELPER FUNCTIONS

For the purposes of inspecting the database... 

1. Existing Data Index (allows for restart on data ingestion fail or sequential data ingestion)

    - by finding the existing rowcount/index, we can define a skip/offset to any new CRUD operations, be they through the API or locally.
        - Considerations related to the 'front-end'; be it an App, a customer-facing website, or an internal API would define how our data gets to us.

        - The steps I want to focus on are the creation of new, clean tables -- the exploratory data analysis and required ETL transactions -- and so we'll move on quickly past the data ingestion step.



In [None]:
tables = ['campaign_desc',
'campaign_table',
'causal_data',
'coupon',
'coupon_redempt',
'hh_demographic',
'product',
'transaction_data']

con = engine.raw_connection()
cursor = con.cursor()

existing_rowcounts = dict()

for x in tables:
    cursor.execute(f'select count(1) from {x}')
    # cursor.execute(f'drop table {x}')
    print(x)
    res = cursor.fetchall()
    existing_rowcounts[x] = res[0][0]
    print(res[0][0])


    

campaign_desc
30
campaign_table
0
causal_data
0
coupon
0
coupon_redempt
0
hh_demographic
0
product
0
transaction_data
0


Now that we've instantiated our connection to the database and are satisfied with the status of our tables (, pydantic schemas,) and models, we can examine the task of data ingestion from a raw data source. 

The functions below describe some templates for crud activities from within the database/SQLAlchemy.

I noticed that postgresql seems to have a better bulk insert methodology than sqlite3; but depending on the tech stack/use case, these database migration procedures could vary widely. 

Since this is not, in fact, streaming data, I've opted for 

In [None]:


#### local database transaction functions
def read_hh(db: Session = Depends(get_db)):
    return db.query(models.HHDemographic).all()

def post_hh(hh_object:models.HHDemographic,
            db: Session = Depends(get_db)):
    db.add(hh_object)
    db.commit()
    db.refresh(hh_object)
    return hh_object

def delete_hh(hh_id
              ,db: Session):
    # try:
    db.query(models.HHDemographic).filter(models.HHDemographic.household_key==hh_id).delete()
    db.commit()
    # except:
    #     print('that household id was not found')


def write_demo(db:SessionLocal=SessionLocal()
               ,):
    df = pd.read_csv('../data/hh_demographic.csv')
    fields = [x.casefold() for x in df.columns]
    for x in df.values:
        post_hh(hh_object=models.HHDemographic(**dict(zip(fields, x))), db=db) 



def extract_table(table_source_path
        ,db:SessionLocal=SessionLocal()
               ):
    
    # in this case use pandas...
    df = pd.read_csv(table_source_path)
    fields = [x.casefold() for x in df.columns]
    for x in df.values:
        hh_object=models.HHDemographic(**dict(zip(fields, x)))
        db.add(hh_object)
        db.commit()
        db.refresh(hh_object)



In [None]:
import pprint

class TableLoader:
    """simulated cron job processing class.
        depends on models.py file being loaded with necessary table models present
    - accepts:
        - data folder filepath
                - todo: some sort of qualification of which tables to "update"
        
        - could incorporate logging tools of your choice?   
        
        """

    def __init__(self,
                 data_source_prefix:str="../data/",
                 skip:int = 0,
                 offset:int = 0,
                 db:Session = SessionLocal()):

        # self.skip = 0
        # self.offset = 0
        self.chunksize = 10**7
        self.db=db
        self.data_folder = data_source_prefix

        ### specific data sources/endpoints using your prefix; source URL (or disk data)
        self.table_names = ['campaign_desc',
                        'campaign_table',
                        'causal_data',
                        'coupon',
                        'coupon_redempt',
                        'hh_demographic',
                        'product',
                        'transaction_data']

        ### along with a map of your table models (the abstraction layer of SQLALchemy)
        self.table_models = [models.CampaignDesc,
                            models.CampaignTable,
                            models.CausalData,
                            models.Coupon,
                            models.CouponRedempt,
                            models.HHDemographic,
                            models.Product,
                            models.TransactionData]

        ### map the two together for reference
        self.name_model_map = dict(zip(self.table_names, self.table_models)) 
        self.existing_rowcounts = dict()

        ### instantiate logging...
        self._log = ""

        ### ping db to find existing rowcount/index for known tables
        self.update_existing_rowcounts()
        
        # run the auto-updating feature to ensure all rows are accounted for...
        # for x in self.table_names:
        #     self.run_update(x)



    def get_existing_rowcount(self, table_name):
        con = engine.raw_connection()
        cursor = con.cursor()
        cursor.execute(f'select count(1) from {table_name}')
        res = cursor.fetchall()
        return res[0][0]
        #self.existing_rowcounts[x] = res[0][0]


    def update_existing_rowcounts(self):
        con = engine.raw_connection()
        cursor = con.cursor()
        for x in self.table_names:
            self.existing_rowcounts[x] = self.get_existing_rowcount(x)


    def delete_known_table(self, table_name):
        con = engine.raw_connection()
        cursor = con.cursor()
        cursor.execute(f'drop table {table_name}')
    

    def delete_known_tables(self):
        for x in self.table_names:
            self.delete_known_table(x)


    ### logger
    @property
    def log(self):
        return self._log
    
    @log.setter
    def log(self, new):
        print(new, flush=True)
        self._log += "\n " + new
        

    def print_log(self):
        pprint.pprint(self.log)


    #### chunker
    # def start_stop(self, offset=0, limit=10**7, chunk_size=10**7):
    #     start=offset
    #     stop=limit
    #     while start < limit:
    #         try: 
    #             yield start, stop
    #         except BaseException as e:
    #             self.log = f"Chunker failed with {e}"
    #         finally:
    #             start = stop
    #             stop = stop + chunk_size

    #### clunker
    # def update_table(self, table_name):
    #     '''cron job'''

    #     #### start index
    #     start_index = self.existing_rowcounts[table_name] ### the offset

    #     #### EXTRACT DATA 
    #     df = pd.read_csv(self.data_folder+table_name+'.csv').reset_index().set_index('index')
    #     if start_index == 0:
    #         df.to_sql(name =table_name, con=engine, chunksize=5000000, if_exists='append', method='multi')


    #     else:
            
    #         fields = [x.casefold() for x in df.columns]
    #         stop_index = df.shape[0] ### assume this value is the limit (length) of the data we need to pull from the .csv.
            

    #         #### DATA VERIFICATION ASSERTIONS...ADD REAL INDEX COMPARISON?
    #         assert start_index < stop_index, f'start {start_index} < {stop_index} stop'

    #         self.log = f"Beginning '{table_name}' Update..."
    #         self.log = f"Starting at 0-index {start_index}, going up to but not including {stop_index}..."
    #         self.log = f"Shape is {df.shape}. Fields are {fields}..."
            
    #         rows_to_rip = stop_index - start_index
    #         try:
    #             #### automatic check for filesize --> do we need chunking?
    #             if rows_to_rip < 10**7:
    #                 self.log = f"no chunking necessary..."
    #                 #### regular row-level upload?
    #                 for x in df.values[start_index:stop_index]:
    #                     hh_object=self.name_model_map[table_name](**dict(zip(fields, x)))
    #                     self.db.add(hh_object)
    #                     self.db.commit()
    #                     self.db.refresh(hh_object)
    #             else:
    #                 #### enter chunking logic
    #                 self.log = f'entering chunking logic;'
    #                 start_stopper = self.start_stop(offset=start_index
    #                                                 , limit=stop_index
    #                                                 , chunk_size=10**7)

    #                 while start_index <= stop_index:
    #                     start_index, stop_index = next(start_stopper)
    #                     self.log = f'rows {start_index} through {stop_index} of {len(df.values)}'
    #                     chunk = list(df.values[start_index:stop_index])

    #                     self.db.add_all([self.name_model_map[table_name](**dict(zip(fields, x))) for x in chunk])
    #                     self.db.commit()

    #         except BaseException as e:
    #             #print(f'process failed on table {file}: {e}')
    #             self.log += f'process failed on table {table_name}: {e}'

    
    def insert_table(self, table_name):
        """for our use case, we just need the whole data to go in.
         only use this if the table models have been created, but the table is empty """
        
        try:
            self.log = f" inserting {x}"
            assert self.existing_rowcounts[table_name] == 0, f'{table_name} already has data populated. Use update_table'
            df = pd.read_csv(self.data_folder+table_name+'.csv')
            df.to_sql(name =table_name, con=engine, chunksize=500000, if_exists='append', method='multi')
        except AssertionError as e:
            self.log = str(e)


    def insert_all(self, all_names:list=None):
        if all_names == None:
            all_names = self.table_names
        for x in all_names:
            self.log = f'reading table {x}'
            try:
                self.insert_table(x)
            except:
                continue
                

In [None]:
a = TableLoader(db=SessionLocal())

# for x in a.table_names:
#     if x not in  ['causal_data', 'coupon', 'transaction_data', 'product']:
#         try:
#             a.run_update(x)
#         except AssertionError as e:
#             print(x, "ok", e)
#             continue
# first_log = a.log

# b = TableLoader(db=SessionLocal())

# for x in b.table_names:
#     try:
#         a.run_update(x)
#     except AssertionError as e:
#         print(x, "ok", e)
#         continue

# second_log = a.log



In [None]:
a.insert_all()

 inserting transaction_data
campaign_desc already has data populated. Use update_table
 inserting transaction_data
campaign_table already has data populated. Use update_table
 inserting transaction_data
 inserting transaction_data
 inserting transaction_data
coupon_redempt already has data populated. Use update_table
 inserting transaction_data
hh_demographic already has data populated. Use update_table
 inserting transaction_data
 inserting transaction_data


In [None]:
pprint.pprint(a.log)


('\n'
 '  inserting transaction_data\n'
 ' campaign_desc already has data populated. Use update_table\n'
 '  inserting transaction_data\n'
 ' campaign_table already has data populated. Use update_table\n'
 '  inserting transaction_data\n'
 '  inserting transaction_data\n'
 '  inserting transaction_data\n'
 ' coupon_redempt already has data populated. Use update_table\n'
 '  inserting transaction_data\n'
 ' hh_demographic already has data populated. Use update_table\n'
 '  inserting transaction_data\n'
 '  inserting transaction_data')


In [None]:
a.get_existing_rowcount('campaign_desc')

30

In [None]:
a.log

'\n  inserting transaction_data\n campaign_desc already has data populated. Use update_table\n  inserting transaction_data\n campaign_table already has data populated. Use update_table\n  inserting transaction_data\n  inserting transaction_data\n  inserting transaction_data\n coupon_redempt already has data populated. Use update_table\n  inserting transaction_data\n hh_demographic already has data populated. Use update_table\n  inserting transaction_data\n  inserting transaction_data'

In [None]:
import pprint
pprint.pprint(a.log)

('\n'
 '  inserting transaction_data\n'
 ' campaign_desc already has data populated. Use update_table\n'
 '  inserting transaction_data\n'
 ' campaign_table already has data populated. Use update_table\n'
 '  inserting transaction_data\n'
 '  inserting transaction_data\n'
 '  inserting transaction_data\n'
 ' coupon_redempt already has data populated. Use update_table\n'
 '  inserting transaction_data\n'
 ' hh_demographic already has data populated. Use update_table\n'
 '  inserting transaction_data\n'
 '  inserting transaction_data')


In [None]:
a.run_update()

AttributeError: 'TableLoader' object has no attribute 'run_update'

In [None]:
# resulting_objects = SessionLocal().query(models.HHDemographic).all()
# len(resulting_objects)

In [None]:
a.run_update()

In [None]:
import pprint 
pprint.pprint(a.log)

In [None]:
df = pd.read_csv('../data/causal_data.csv')

In [None]:
df.columns

In [None]:

    # print(x)
    # df = pd.read_csv(f'{filepath}{x}'+".csv")
    # df.columns = [x.casefold() for x in df.columns]
    # df.reset_index(inplace=True)
    # df.to_sql(x, con=engine, if_exists='append')
    # print(df.info())

        # for row in df.values:
        #     #
        #     data = dict(zip(fields, row))
        #     # create row object with values
        #     obj = name_model_map[x](**data)
        #     # 
        #     post_data(model=obj, db=db)
    
    # ### verify rowcount
    # con = engine.raw_connection()
    # cursor = con.cursor()
    # cursor.execute(f'select count(1) from {x}')
    # res = cursor.fetchall()
    # print(f'{res[0][0]} rows loaded into Table {x}')
    # #assert res[0][0] == df.shape[0]


    # ### verify column names? note that some models have a new, generated index/primary key column...
    # con = engine.raw_connection()
    # cursor = con.cursor()
    # cursor.execute(f'select * from {x} limit 1')
    # res = cursor.fetchall()
    # print(f'{len(res)} columns loaded to Table {x}')
    # print(df.shape)
        #assert len(res[0]) == df.shape[1]


In [None]:
write_demo()

In [None]:
read_hh(next(get_db()))

SQLAlchemy recommends using its engine to import data. In this method we try a shortcut. After establishing our engine, and binding the Parent class for our table models using the models.Base.metadata.create_all(bind=engine) call, we've instantiated the 'image' of our row instances into the database. 

In [None]:
next(get_db()).query(models.HHDemographic).filter(models.HHDemographic.household_key == 1).first()

In [None]:
read_table(models.HHDemographic, db=next(get_db()))

In [None]:
from sqlalchemy import inspect
import models
mapper = inspect(engine)

In [None]:
mapper.__dict__

In [None]:
df = pd.read_csv('../data/campaign_desc.csv')
df = df.reset_index()

In [None]:
df

# Method 1: Importing using pandas.DataFrame.to_sql

In [None]:
import pandas as pd
df = pd.read_csv('../data/transaction_data.csv')
df

In [None]:
con = engine.raw_connection()
cursor = con.cursor()

In [None]:
df.to_sql("transaction_data", con, if_exists='append')

In [None]:
cursor.execute('drop table transaction_data')
res = cursor.fetchall()
print(res)

In [None]:
from sqlalchemy import MetaData
metadata_obj = MetaData()

In [None]:
from sqlalchemy.schema import MetaData

In [None]:
# for schema in schemas:
#     print("schema: %s" % schema)
#     for table_name in inspector.get_table_names(schema=schema):
#         for column in inspector.get_columns(table_name, schema=schema):
#             print("Column: %s" % column)

In [None]:
# write_demo(next(get_db()))

In [None]:
import sqlite3
import glob
import os
import pandas as pd

con = sqlite3.connect("src/dunnhumby.db")

cur = con.cursor()

In [None]:
# checking existing tables
cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
output_tables = [x[0] for x in cur.fetchall()]

output_tables

In [None]:
for x in output_tables:
    
    cur.execute(f"SELECT count(1) FROM {x}")
    res = cur.fetchall()
    print(res)
#     cur.execute(f"DROP TABLE IF EXISTS RAW_{x}")
    cur.execute(f"SELECT * FROM {x} LIMIT 1;")
    res = cur.fetchall()
    print(res)

# Writing data to db from .csv files 
## using pandas

glob module to get a list of filenames.

pandas built-in function DataFrame.to_sql() using the connection to dunnhumby.db

- if_exists='fail' parameter to force an error if the table exists already. 

In [None]:
for file in glob.glob('data/*.csv'):
    print('*'*50)
    filename = file.split("\\")[1][:-4]
    print(filename)
    df = pd.read_csv(file)
    #display(df.head(5))
    #print(df.info())
    df.to_sql(filename, con, index=False)
    print(f'{filename} written to database')
    print('*'*50)


#   non-pandas data imputation    

In [None]:

for file in glob.glob('data/*.csv'):
    
    
    columns = ",".join(df.columns)
    #cur.execute(f"CREATE TABLE IF NOT EXISTS RAW_{filename}({columns})")
    data = [tuple(x) for x in df.values]
    cur.executemany(f"INSERT INTO {filename} VALUES({', '.join(['?' for x in df.columns])})", data)

# Checking the tables exist

In [None]:
# checking existing tables
cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
output_tables = [x[0] for x in cur.fetchall()]

output_tables

# Checking the rowcounts match the .csv rowcounts

In [None]:
 
cur.execute(f"SELECT * from hh_demographic where household_key=1")
res = cur.fetchall()
res

In [None]:
for x in output_tables:
    
    cur.execute(f"SELECT count(1) FROM {x}")
    res = cur.fetchall()
    print(res)
#     cur.execute(f"DROP TABLE IF EXISTS RAW_{x}")

# Verifying rows look as they should

In [None]:
for x in output_tables:
    cur.execute(f"SELECT * FROM {x} LIMIT 5;")
    res = cur.fetchall()
    display(pd.DataFrame(res))
    # note column names don't come back out of the database

Despite the coupon tables being stored in bytecode in the db, pandas can read them without issue.

# Joining Household Data onto aggregate sales data



In [None]:
cur.execute(f"SELECT SUM(SALES_VALUE) FROM transaction_data GROUP BY household_key")

In [None]:
res = cur.fetchall()


In [None]:

pd.DataFrame(res)