### Case study objectives

1. Gather all relevant data from the sources 
2. Implement several checks for quality assurance 
3. Automating of the ingestion pipeline



#### Gathering the data


In [1]:
## import libraries
import os
from datetime import datetime
import scipy.stats as stats
import pandas as pd
import numpy as np
import sqlite3

# specify the directory you saved the data in
data_dir = os.path.join(".","data")

In [1]:
def connect_db(file_path):
    try:
        conn = sqlite3.connect(file_path)
        print("...successfully connected to db\n")
    except Error as e:
        print("...unsuccessful connection\n",e)
    
    return(conn)

In [2]:
## connect to the database
conn = connect_db(os.path.join(data_dir,"aavail-customers.db"))

## take a look of the tables when connecting the database
tables = [t[0] for t in conn.execute("SELECT name FROM sqlite_master WHERE type='table';")]
print(tables)

['CUSTOMER', 'INVOICE', 'INVOICE_ITEM', 'COUNTRY']


In [3]:
# sql query to retrieve the needed data
query = """
SELECT cu.customer_id, cu.last_name, cu.first_name, cu.DOB,
       cu.city, cu.state, co.country_name, cu.gender
FROM CUSTOMER cu
INNER JOIN COUNTRY co
ON cu.country_id = co.country_id;
"""

_data = [d for d in conn.execute(query)]
columns = ["customer_id","last_name","first_name","DOB","city","state","country","gender"]
df_db = pd.DataFrame(_data,columns=columns)
##
df_db.head()

Unnamed: 0,customer_id,last_name,first_name,DOB,city,state,country,gender
0,1,Todd,Kasen,07/30/98,Rock Hill,South Carolina,united_states,m
1,2,Garza,Ensley,04/12/89,singapore,,singapore,f
2,3,Carey,Lillian,09/12/97,Auburn,Alabama,united_states,f
3,4,Christensen,Beau,01/28/99,Hempstead,New York,united_states,m
4,5,Gibson,Ernesto,03/23/98,singapore,,singapore,m



Extracting the relevant data from the CSV file as needed.


In [4]:
## retrieve the relevant data from a csv file
df_streams = pd.read_csv(os.path.join(".",r"Data/aavail-streams.csv"))
print(df_streams.shape)
df_streams.head()

(18859, 5)


Unnamed: 0,customer_id,stream_id,date,invoice_item_id,subscription_stopped
0,1,1420.0,2018-10-21,2.0,0
1,1,1343.0,2018-10-23,2.0,0
2,1,1756.0,2018-11-05,2.0,0
3,1,1250.0,2018-11-06,2.0,0
4,1,1324.0,2018-11-12,2.0,0


In [5]:
## creating a churn table 
customer_ids = df_streams['customer_id'].values
unique_ids = np.unique(df_streams['customer_id'].values)
streams = df_streams['subscription_stopped'].values
has_churned = [0 if streams[customer_ids==uid].max() > 0 else 1 for uid in unique_ids]
df_churn = pd.DataFrame({"customer_id": unique_ids,"is_subscriber": has_churned})

#df_churn.head(4)

### Checks for quality assurance

Sometimes it is known in advance which types of data integrity issues to expect, but other times it is during the Exploratory Data Analysis (EDA) process that these issues are identified.  After extracting data it is important to include checks for quality assurance even on the first pass through the AI workflow. 


In [6]:
## check & display the results
print("\nCleaning Summary\n{}".format("-"*35))
duplicate_rows = df_db.duplicated()

if True in duplicate_rows:
    df_db = df_db[~duplicate_rows]
print("Removed {} duplicate rows".format(np.where(duplicate_rows==True)[0].size))

missing_stream_ids = np.isnan(df_streams['stream_id'])    
if True in missing_stream_ids:
    df_streams = df_streams[~missing_stream_ids]
    
    
print("Removed {} missing stream ids".format(np.where(missing_stream_ids==True)[0].size))

print("\nMissing Value Summary\n{}".format("-"*35))
print("\ndf_db\n{}".format("-"*15))
print(df_db.isnull().sum(axis = 0))
print("\ndf_streams\n{}".format("-"*15))
print(df_streams.isnull().sum(axis = 0))


Cleaning Summary
-----------------------------------
Removed 7 duplicate rows
Removed 1164 missing stream ids

Missing Value Summary
-----------------------------------

df_db
---------------
customer_id      0
last_name        0
first_name       0
DOB              0
city             0
state          300
country          0
gender           0
dtype: int64

df_streams
---------------
customer_id             0
stream_id               0
date                    0
invoice_item_id         0
subscription_stopped    0
dtype: int64


### Combining the data into a single data structure

In [None]:
df_clean = df_churn.copy()
df_clean = df_clean[np.in1d(df_clean['customer_id'].values,df_db['customer_id'].values)]
unique_ids = df_clean['customer_id'].values

## ensure we are working with correctly ordered customer_ids df_db
if not np.array_equal(df_clean['customer_id'],df_db['customer_id']): 
    raise Exception("indexes are out of order or unmatched---needs to fix")

## query the db to create a invoice item map
query = """
SELECT i.invoice_item_id, i.invoice_item
FROM INVOICE_ITEM i;
"""

## variables for new df creation
invoice_item_map = {d[0]:d[1] for d in conn.execute(query)}
streams_stopped = df_streams['subscription_stopped'].values
streams_cid = df_streams['customer_id'].values
streams_iid = df_streams['invoice_item_id'].values
subscriber_invoice_mode = [stats.mode(streams_iid[streams_cid==uid])[0][0] for uid in unique_ids]

## create the new df
df_clean['country'] = df_db['country']
df_clean['age'] = np.datetime64('today') - df_db['DOB'].astype('datetime64')
df_clean['customer_name'] = df_db['first_name'] + " " + df_db['last_name']
df_clean['subscriber_type'] = [invoice_item_map[int(sim)] for sim in subscriber_invoice_mode]
df_clean['num_streams'] = [streams_stopped[streams_cid==uid].size for uid in unique_ids]

## convert age to days
df_clean['age'] = [a.astype('timedelta64[Y]').astype(int) for a in df_clean['age'].values]

#df_clean.head()

### Automating the process

1. Save the cleaned, combined data as a CSV file.
2. Create a function or class that performs all of the steps given a database file and a streams CSV file.
3. Run the function in batches and write a check to ensure the same result.

There would be some logic involved in the automation process. 

In [14]:
## code to split the streams csv into batches
df_all = pd.read_csv(os.path.join(".","data/aavail-streams.csv"))
half = int(round(df_all.shape[0] * 0.5))
df_part1 = df_all[:half]
df_part2 = df_all[half:]
df_part1.to_csv(os.path.join(".","data/aavail-streams-1.csv"),index=False)
df_part2.to_csv(os.path.join(".","data/aavail-streams-2.csv"),index=False)

Automating the process

In [85]:
%%writefile aavail-data-ingestor.py
#!/usr/bin/env python

import os
import sys
import getopt
import scipy.stats as stats
import pandas as pd
import numpy as np
import sqlite3

DATA_DIR = os.path.join("..","data")

def connect_db(file_path):
    """
    function to connection to aavail database
    """
    try:
        conn = sqlite3.connect(file_path)
        print("...successfully connected to db")
    except Error as e:
        print("...unsuccessful connection",e)
    
    return(conn)

## passing the parameter "conn" from last output
def ingest_db_data(conn):
    """
    load and clean the db data
    """
    
    query = """
            SELECT cu.customer_id, cu.last_name, cu.first_name, cu.DOB,
            cu.city, cu.state, co.country_name, cu.gender
            FROM CUSTOMER cu
            INNER JOIN COUNTRY co
            ON cu.country_id = co.country_id;
            """
    _data = [d for d in conn.execute(query)]
    columns = ["customer_id","last_name","first_name","DOB","city","state","country","gender"]
    df_db = pd.DataFrame(_data,columns=columns)
    ## the duplicated rows:
    duplicate_rows = df_db.duplicated()
    if True in duplicate_rows:
        df_db = df_db[~duplicate_rows]
        df_db.reset_index()
    print("... removed {} duplicate rows in db data".format(np.where(duplicate_rows==True)[0].size))
    return(df_db)


def ingest_stream_data(file_path):
    """
    load and clean the stream data
    """
    
    df_streams = pd.read_csv(file_path)
    customer_ids = df_streams['customer_id'].values
    unique_ids = np.unique(df_streams['customer_id'].values)
    streams = df_streams['subscription_stopped'].values
    has_churned = [0 if streams[customer_ids==uid].max() > 0 else 1 for uid in unique_ids]
    df_churn = pd.DataFrame({"customer_id": unique_ids,"is_subscriber": has_churned})
    
    missing_stream_ids = np.isnan(df_streams['stream_id'])    
    if True in missing_stream_ids:
        df_streams = df_streams[~missing_stream_ids]
        df_streams.reset_index()
    print("... removed {} missing stream ids".format(np.where(missing_stream_ids==True)[0].size))
    
    return(df_streams,df_churn)


def process_dataframes(df_streams,df_db,df_churn):
    """
    add data to target csv
    """

    df_clean = df_churn.copy()
    df_db = df_db[np.in1d(df_db['customer_id'].values,df_clean['customer_id'].values)]
    df_db.reset_index()
    unique_ids = df_clean['customer_id'].values

    ## ensure we are working with correctly ordered customer_ids df_db
    if not np.array_equal(df_clean['customer_id'],df_db['customer_id']):
        raise Exception("indexes are out of order or unmatched---needs to fix")

    ## query the db t create a invoice item map
    query = """
    SELECT i.invoice_item_id, i.invoice_item
    FROM INVOICE_ITEM i;
    """

    ## variables for new df creation
    invoice_item_map = {d[0]:d[1] for d in conn.execute(query)}
    streams_stopped = df_streams['subscription_stopped'].values
    streams_cid = df_streams['customer_id'].values
    streams_iid = df_streams['invoice_item_id'].values
    subscriber_invoice_mode = [stats.mode(streams_iid[streams_cid==uid])[0][0] for uid in unique_ids]

    ## create the new df
    df_clean['country'] = df_db['country']
    df_clean['age'] = np.datetime64('today') - df_db['DOB'].astype('datetime64')
    df_clean['age'] = [a.astype('timedelta64[Y]').astype(int) for a in df_clean['age'].values]
    df_clean['customer_name'] = df_db['first_name'] + " " + df_db['last_name']
    df_clean['subscriber_type'] = [invoice_item_map[int(sim)] for sim in subscriber_invoice_mode]
    df_clean['num_streams'] = [streams_stopped[streams_cid==uid].size for uid in unique_ids]
    
    return(df_clean)

## upate the target file    
def update_target(target_file,dat,overwrite=False):
    """
    update line by line in case data are large
    """

    if overwrite or not os.path.exists(target_file):
        dat.to_csv(target_file,index=False)   
    else:
        df_target = pd.read_csv(target_file)
        df_target.to_csv(target_file, mode='a',index=False)
        
        
         
if __name__ == "__main__":
  
    ## collect args
    arg_string = "%s -d db_filepath -s streams_filepath"%sys.argv[0]
    try:
        optlist, args = getopt.getopt(sys.argv[1:],'d:s:')
    except getopt.GetoptError:
        print(getopt.GetoptError)
        raise Exception(arg_string)

    ## handle args
    streams_file = None
    db_file = None
    for o, a in optlist:
        if o == '-d':
            db_file = a
        if o == '-s':
            streams_file = a
    streams_file = os.path.join(".",streams_file)
    db_file = os.path.join(".",db_file)
    target_file = os.path.join(".","aavail-target.csv")
    
    
    ## make the connection to the database
    conn = connect_db(db_file)

    ## ingest data base data
    df_db = ingest_db_data(conn)
    df_streams, df_churn = ingest_stream_data(streams_file)
    df_clean = process_dataframes(df_streams, df_db, df_churn)
    
    ## write
    update_target(target_file,dat,overwrite=False)
    print("done")

Overwriting aavail-data-ingestor.py


Let's run the script created from the commandline directly or from within this notebook using:

In [19]:
!python aavail-data-ingestor.py aavail-customers.db aavail-streams-1.csv

...successfully connected to db
... removed 7 duplicate rows in db data
... removed 1164 missing stream ids
   customer_id  is_subscriber  ...   subscriber_type  num_streams
0            1              1  ...    aavail_premium           23
1            2              0  ...  aavail_unlimited           12
2            3              0  ...    aavail_premium           22
3            4              1  ...      aavail_basic           19
4            5              1  ...    aavail_premium           23

[5 rows x 7 columns]


Run the script once for each batch and then load both the original and batch versions back into the notebook to check if they are the same. 

In [86]:
!rm ../data/aavail-target.csv
!python aavail-data-ingestor.py -d aavail-customers.db -s aavail-streams.csv
!wc -l ../data/aavail-target.csv

...successfully connected to db
... removed 7 duplicate rows in db data
... removed 1164 missing stream ids
done
    1001 ../data/aavail-target.csv


In [87]:
!rm ../data/aavail-target.csv
!python aavail-data-ingestor.py -d aavail-customers.db -s aavail-streams-1.csv
!wc -l ../data/aavail-target.csv
!python aavail-data-ingestor.py -d aavail-customers.db -s aavail-streams-2.csv
!wc -l ../data/aavail-target.csv

...successfully connected to db
... removed 7 duplicate rows in db data
... removed 577 missing stream ids
done
     507 ../data/aavail-target.csv
...successfully connected to db
... removed 7 duplicate rows in db data
... removed 587 missing stream ids
done
    1014 ../data/aavail-target.csv
