<a href="https://colab.research.google.com/github/lvallejomendez/AI-Enterprise-Workflow-Repo/blob/main/m1-u6-case-study.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Setup

```python
# Setup
import os
from google.colab import drive
drive.mount('/content/drive')
os.chdir('/content/drive/My Drive/Colab Notebooks/..directory/')
os.getcwd()
```

# Case Study - Data ingestion

The goal of this case study is to put into practice the important concepts from module 1.  We will go through the basic process that begins with refining the business opportunity and ensuring that it is articulated using a scientific thought process.

The business opportunity and case study was first mentioned in Unit 2 of module 1 and like the AAVIAL company itself these data were created for learning purposes.  We will be using the AAVAIL example as a basis for this case study. You will be gathering data from several provided sources, staging it for quality assurance and saving it in a target destination that is most appropriate.

Watch the video to review the important concepts from the units you just covered and to see an overview of the objectives for this case study.

## Case study overall objectives

1. Gather all relevant data from the sources of provided data
2. Implement several checks for quality assurance 
3. Take the initial steps towards automation of the ingestion pipeline

## Getting started

Download this notebook and open it locally using a Jupyter server. Alternatively you may use Watson Studio.  To make using Watson Studio easier we have provided a zip archive file containing the files needed to complete this case study in Watson Studio. See the [Getting started with Watson Studio](m1-u5-5-watson-studio.rst) page.

**You will need the following files to complete this case study**

* [m1-u6-case-study.ipynb](m1-u6-case-study.ipynb)
* [m1-u6-case-study-solution.ipynb](./notebooks/m1-u6-case-study-solution.ipynb)
* [aavail-customers.db](./data/aavail-customers.db)
* [aavail-steams.csv](./data/aavail-streams.csv)

1. Fill in all of the places in this notebook marked with ***YOUR CODE HERE*** or ***YOUR ANSWER HERE***
2. When you have finished the case study there will be a short quiz

You may review the rest of this content as part of the notebook, but once you are ready to get started be ensure that you are working with a *live* version either as part of Watson Studio or locally.

## Data Sources

The data you will be sourcing from is contained in two sources.

1. A database ([SQLite](https://www.sqlite.org/index.html)) of `customer` data
2. A [CSV file](https://en.wikipedia.org/wiki/Comma-separated_values) of `stream` level data

   >You will create a simple data pipeline that
   (1) simplifies the data for future analysis
   (2) performs quality assurance checks.

The process of building *the data ingestion pipeline* entails extracting data, transforming it, and loading it into an appropriate data storage technology.  When constructing a pipeline it is important to keep in mind that they generally process data in batches.  For example, data may be compiled during the day and the batch could be processed during the night.  The data pipeline may also be optimized to execute as a streaming computation (i.e., every event is handled as it occurs).

## PART 1: Gathering the data

The following is an [Entity Relationship Diagram (ERD)](https://en.wikipedia.org/wiki/Entity%E2%80%93relationship_model) that details the tables and contents of the database.

In [None]:
## all the imports needed for this case study
import os
import pandas as pd
import numpy as np
import sqlite3

## specify the directory you saved the data in
data_dir = os.path.join("..","data")

Much of the data exist in a database.  You can connect to it using the `sqlite3` Python package with the function shown below.  Note that is is good practice to wrap your connect functions in a [try-except statement](https://docs.python.org/3/tutorial/errors.html) to cleanly handle exceptions.

In [None]:
def connect_db(file_path):
    try:
        conn = sqlite3.connect(file_path)
        print("...successfully connected to db\n")
    except Error as e:
        print("...unsuccessful connection\n",e)
    
    return(conn)

In [None]:
## make the connection to the database
conn = connect_db(os.path.join(data_dir,"aavail-customers.db"))

## print the table names
tables = [t[0] for t in conn.execute("SELECT name FROM sqlite_master WHERE type='table';")]
print(tables)

...successfully connected to db

['CUSTOMER', 'INVOICE', 'INVOICE_ITEM', 'COUNTRY']


### QUESTION 1:

**extract the relevant data from the DB**

Query the database and extract the following data into a [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html).
 
* Customer ID (integer)
* Last name
* First name
* DOB (Date Of Birth)
* City
* State
* Country (the name NOT the country_id)
* Gender

Remember that that SQL is case-insensitive, but it is traditional to use ALL CAPS for SQL keywords. It is also a convention to end SQL statements with a semi-colon.  

#### Resources

* [W3 schools SQL tutorial](https://www.w3schools.com/sql)
* [W3 schools SQL joins](https://www.w3schools.com/sql/sql_join.asp)

```python
# first query

for c in conn.execute('SELECT customer_id, last_name, first_name, DOB, city, state, gender FROM CUSTOMER'):
    print(c)
```

```python
# joining the tables

for c in conn.execute('SELECT customer_id, last_name, first_name, DOB, city, state, country_name, gender \
FROM CUSTOMER \
INNER JOIN COUNTRY ON CUSTOMER.country_id = COUNTRY.country_id'):
    print(c)
```

In [None]:
## YOUR CODE HERE

# the query to DataFrame

customer_df = conn.execute('SELECT customer_id, last_name, first_name, DOB, city, state, country_name, gender \
FROM CUSTOMER \
INNER JOIN COUNTRY ON CUSTOMER.country_id = COUNTRY.country_id;')

columns_name = ['customer_id', 'last_name', 'first_name', 'DOB', 'city', 'state', 'country_name', 'gender']

customer_df = pd.DataFrame(customer_df, columns=columns_name)
customer_df.head(15)


Unnamed: 0,customer_id,last_name,first_name,DOB,city,state,country_name,gender
0,1,Todd,Kasen,07/30/98,Rock Hill,South Carolina,united_states,m
1,2,Garza,Ensley,04/12/89,singapore,,singapore,f
2,3,Carey,Lillian,09/12/97,Auburn,Alabama,united_states,f
3,4,Christensen,Beau,01/28/99,Hempstead,New York,united_states,m
4,5,Gibson,Ernesto,03/23/98,singapore,,singapore,m
5,6,Murray,Deshawn,09/18/97,Portland,Maine,united_states,m
6,7,Tate,Daxton,12/23/70,singapore,,singapore,m
7,8,Small,Tenley,07/21/72,Paterson,New Jersey,united_states,f
8,9,Chase,Kyra,05/19/98,Temple,Texas,united_states,f
9,10,Barber,London,07/17/93,Somerville,Massachusetts,united_states,f


### QUESTION 2:

**Extract the relevant data from the CSV file**

For each ```customer_id``` determine if a customer has stopped their subscription or not and save it in a dictionary or another data container.

In [None]:
df_streams = pd.read_csv(os.path.join(data_dir,r"aavail-streams.csv"))
df_streams.head()

Unnamed: 0,customer_id,stream_id,date,invoice_item_id,subscription_stopped
0,1,1420.0,2018-10-21,2.0,0
1,1,1343.0,2018-10-23,2.0,0
2,1,1756.0,2018-11-05,2.0,0
3,1,1250.0,2018-11-06,2.0,0
4,1,1324.0,2018-11-12,2.0,0


In [None]:
## YOUR CODE HERE
# DataFrame info.
df_streams.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 18859 entries, 0 to 18858
Data columns (total 5 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   customer_id           18859 non-null  int64  
 1   stream_id             17695 non-null  float64
 2   date                  18859 non-null  object 
 3   invoice_item_id       18859 non-null  float64
 4   subscription_stopped  18859 non-null  int64  
dtypes: float64(2), int64(2), object(1)
memory usage: 736.8+ KB


In [None]:
# "customer_id" unique values
df_streams.customer_id.nunique()

1000

In [None]:
# "subscription_stopped" values. 0 for not, 1 for yes.
df_streams.subscription_stopped.value_counts()

0    18570
1      289
Name: subscription_stopped, dtype: int64

In [None]:
# grouping for "customer_id" and looking for the max value per grouped id.
df_streams.groupby("customer_id").subscription_stopped.describe()

Unnamed: 0_level_0,count,mean,std,min,25%,50%,75%,max
customer_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
1,24.0,0.000000,0.000000,0.0,0.0,0.0,0.0,0.0
2,13.0,0.076923,0.277350,0.0,0.0,0.0,0.0,1.0
3,22.0,0.045455,0.213201,0.0,0.0,0.0,0.0,1.0
4,19.0,0.000000,0.000000,0.0,0.0,0.0,0.0,0.0
5,24.0,0.000000,0.000000,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...
996,15.0,0.066667,0.258199,0.0,0.0,0.0,0.0,1.0
997,24.0,0.000000,0.000000,0.0,0.0,0.0,0.0,0.0
998,20.0,0.000000,0.000000,0.0,0.0,0.0,0.0,0.0
999,16.0,0.000000,0.000000,0.0,0.0,0.0,0.0,0.0


In [None]:
# For each customer_id determine if a customer has stopped their subscription or not and save it in a data container.
has_stopped = df_streams.groupby("customer_id").apply(lambda x : True if x['subscription_stopped'].max() > 0 else False)
has_stopped

customer_id
1       False
2        True
3        True
4       False
5       False
        ...  
996      True
997     False
998     False
999     False
1000    False
Length: 1000, dtype: bool

## PART 2: Checks for quality assurance

Sometimes it is known in advance which types of data integrity issues to expect, but other times it is during the Exploratory Data Analysis (EDA) process that these issues are identified.  After extracting data it is important to include checks for quality assurance even on the first pass through the AI workflow.  Here you will combine the data into a single structure and provide a couple checks for quality assurance.

### QUESTION 3: 

**Implement checks for quality assurance**

1. In the customer dataframe loaded question 1, remove any repeat customers based on ```customer_id```
2. In the streams dataset, remove stream data that do not have an associated ```stream_id```
3. Check for missing values in both datasets.

In [None]:
## YOUR CODE HERE
# "customer_id" repeated values in customer DF
customer_df.customer_id.value_counts()

1      2
11     2
21     2
31     2
401    2
      ..
661    1
660    1
659    1
658    1
500    1
Name: customer_id, Length: 1000, dtype: int64

In [None]:
# Dropping duplicates
customer_df.drop_duplicates(inplace=True)

In [None]:
# new "customer_id" values in customer DF
customer_df.customer_id.value_counts()

1000    1
329     1
342     1
341     1
340     1
       ..
662     1
661     1
660     1
659     1
1       1
Name: customer_id, Length: 1000, dtype: int64

In [None]:
# "stream_id" na values in streams DF
df_streams.stream_id.isna().value_counts()

False    17695
True      1164
Name: stream_id, dtype: int64

In [None]:
# dropping na's
df_streams.dropna(inplace=True)

In [None]:
# new "stream_id" na values in streams DF 
df_streams.stream_id.isna().sum()

0

### QUESTION 4: 

**combine the data into a single data structure**

For this example, the two most convenient structures for this task are Pandas dataframes and NumPy arrays.  At a minimum ensure that your structure accommodates the following.

1. A column for `customer_id`
2. A column for `country`
3. A column for ```age``` that is created from ```DOB```
4. A column ```customer_name``` that is created from ```first_name``` and ```last_name```
5. A column to indicate churn called ```is_subscriber```
7. A column that indicates ```subscriber_type``` that comes from ```invoice_item```
6. A column to indicate the total ```num_streams```

> HINT: For the subscriber type use the most frequent invoice_item_id and link it to the relevant invoice_item thanks to the INVOICE table in the database

#### Resources

* [Python's datetime library](https://docs.python.org/3/library/datetime.html)
* [NumPy's datetime data type](https://docs.scipy.org/doc/numpy/reference/arrays.datetime.html)


In [None]:
now = pd.Timestamp('now')
dob = pd.to_datetime(customer_df.DOB, infer_datetime_format=True)
dob = dob.where(dob < now, dob - np.timedelta64(100, 'Y'))
customer_age = (now - dob).astype('timedelta64[Y]')
customer_age

0      22.0
1      32.0
2      23.0
3      22.0
4      23.0
       ... 
995    56.0
996    24.0
997    26.0
998    41.0
999    21.0
Name: DOB, Length: 1000, dtype: float64

In [None]:
## YOUR CODE HERE
# DataFrame Combined
df_comb = pd.DataFrame()
df_comb['customer_id'] = customer_df.customer_id
df_comb['customer_name'] = customer_df.first_name + ' ' + customer_df.last_name
df_comb['customer_age'] = customer_age.astype(int)
df_comb['country'] = customer_df.country_name
df_comb['is_subscriber'] = df_streams.groupby("customer_id").max().subscription_stopped.values
df_comb['subscriber_type'] = df_streams.groupby("customer_id").max().invoice_item_id.values.astype(int)
df_comb['num_streams'] = df_streams.groupby('customer_id').size().values

In [None]:
df_comb

Unnamed: 0,customer_id,customer_name,customer_age,country,is_subscriber,subscriber_type,num_streams
0,1,Kasen Todd,22,united_states,0,2,23
1,2,Ensley Garza,32,singapore,1,3,12
2,3,Lillian Carey,23,united_states,1,2,22
3,4,Beau Christensen,22,united_states,0,1,19
4,5,Ernesto Gibson,23,singapore,0,2,23
...,...,...,...,...,...,...,...
995,996,Peyton Enriquez,56,singapore,1,3,14
996,997,Amina Manning,24,united_states,0,1,24
997,998,Brooks Ventura,26,united_states,0,3,17
998,999,Nayeli Mathis,41,united_states,0,3,16


In [None]:
df_comb.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1000 entries, 0 to 999
Data columns (total 7 columns):
 #   Column           Non-Null Count  Dtype 
---  ------           --------------  ----- 
 0   customer_id      1000 non-null   int64 
 1   customer_name    1000 non-null   object
 2   customer_age     1000 non-null   int64 
 3   country          1000 non-null   object
 4   is_subscriber    1000 non-null   int64 
 5   subscriber_type  1000 non-null   int64 
 6   num_streams      1000 non-null   int64 
dtypes: int64(5), object(2)
memory usage: 62.5+ KB


## PART 3: Automating the process

To ensure that you code can be used to automate this process.  First you will save you dataframe or numpy array as a CSV file.  

### QUESTION 5:

**Take the initial steps towards automation**

1. Save your cleaned, combined data as a CSV file.
2. From the code above create a function or class that performs all of the steps given a database file and a streams CSV file.
3. Run the function in batches and write a check to ensure you got the same result that you did in the code above.

There will be some logic involved to ensure that you do not write the same data twice to the target CSV file.

Shown below is some code that will split your streams file into two batches. 

```python
## code to split the streams csv into batches
df_all = pd.read_csv(os.path.join(data_dir,"aavail-streams.csv"))
customers_arr = df_all['customer_id'].unique()
# split the data set by customer ID.
customer_batches = customers_arr.reshape(2, int(customers_arr.shape[0]/2))
df_part1 = df_all[df_all['customer_id'].isin(customer_batches[0])]
df_part2 = df_all[df_all['customer_id'].isin(customer_batches[1])]
# save the batches as csv.
df_part1.to_csv(os.path.join(data_dir, "aavail-streams-1.csv"), index=False)
df_part2.to_csv(os.path.join(data_dir, "aavail-streams-2.csv"), index=False)
```

In [None]:
## YOUR CODE HERE

# Saving the cleaned, combined data as a CSV file.
df_comb.to_csv(os.path.join(data_dir, "aavail-subscribers.csv"), index=False)

You will need to save your function as a .py file.  The following cell demonstrates how to do this from within a notebook. 

In [None]:
%%writefile aavail-data-ingestor.py
# The line above create a file called "aavail-data-ingestor.py" in the current working
# directory and write the reste of the cell in this file.

import os
import sys
import getopt
import pandas as pd
import numpy as np
import sqlite3

DATA_DIR = os.path.join("..","data")

pass

Overwriting aavail-data-ingestor.py


You will also need to be able to pass the file names to your function without hardcoding them into the script itself.  This is an important step towards automation.  Here are the two libraries commonly used to accomplish this in Python.

* [getopt](https://docs.python.org/3/library/getopt.html)
* [argparse](https://docs.python.org/3/library/argparse.html)

You may run the script you just created from the commandline directly or from within this notebook using:

```
!python aavail-data-ingestor.py aavail-customers.db aavail-streams.csv

```

In [None]:
%%writefile aavail-data-ingestor.py


import os
import sys
import getopt
import pandas as pd
import numpy as np
import sqlite3

DATA_DIR = os.path.join("..","data")


def connect_db(file_path):
    """
    function to connection to aavail database
    INPUT : the file path of the data base
    OUTPUT : the sqlite connection to the database
    """
    ## YOUR CODE HERE
    try:
      conn = sqlite3.connect(file_path)
      print("...successfully connected to db\n")
    except Error as e:
      print("...unsuccessful connection\n",e)
    
    return(conn)

def ingest_db_data(conn):
    """
    load and clean the db data
    INPUT : the sqlite connection to the database
    OUPUT : the customer dataframe
    """
    ## YOUR CODE HERE
    customer_df = conn.execute(
        'SELECT customer_id, last_name, first_name, DOB, city, state, country_name, gender \
        FROM CUSTOMER \
        INNER JOIN COUNTRY ON CUSTOMER.country_id = COUNTRY.country_id;')

    columns_name = ['customer_id', 'last_name', 'first_name', 'DOB', 'city', 'state', 'country_name', 'gender']

    customer_df = pd.DataFrame(customer_df, columns=columns_name)

    
    # Remove duplicates
    size_before = len(customer_df)
    customer_df.drop_duplicates(inplace=True)
    size_after = len(customer_df)
    print("... removed {} duplicate rows in customer data".format(size_before-size_after))
    return customer_df
    

def ingest_stream_data(file_path):
    """
    load and clean the stream data
    INPUT : the file path of the stream csv
    OUTPUT : the streams dataframe and a mapping of the customers that churned
    """
    ## YOUR CODE HERE
    df_streams = pd.read_csv(file_path)
    size_before = len(df_streams)
    df_streams = df_streams[~df_streams['stream_id'].isna()]
    size_after = len(df_streams)
    print("... removed {} missing stream ids".format(size_before-size_after))

    has_stopped = df_streams.groupby("customer_id").apply(lambda x : True if x['subscription_stopped'].max() > 0 else False)

    return df_streams, has_stopped
    

def process_dataframes(customer_df, df_streams, has_stopped, conn):
    """
    create the target dataset from the different data imported 
    INPUT : the customer dataframe, the stream data frame, the map of churned customers and the connection to the database
    OUTPUT : the cleaned data set as described question 4
    """
    ## YOUR CODE HERE
    df_comb = pd.DataFrame()
    df_comb['customer_id'] = customer_df.customer_id
    df_comb['customer_name'] = customer_df.first_name + ' ' + customer_df.last_name

    # Create age column from DOB
    now = pd.Timestamp('now')
    dob = pd.to_datetime(customer_df.DOB, infer_datetime_format=True)
    dob = dob.where(dob < now, dob - np.timedelta64(100, 'Y'))
    customer_age = (now - dob).astype('timedelta64[Y]')
    customer_age
    df_comb['customer_age'] = customer_age.astype(int)

    df_comb['country'] = customer_df.country_name
    df_comb['is_subscriber'] = df_streams.groupby("customer_id").max().subscription_stopped.values
    df_comb['subscriber_type'] = df_streams.groupby("customer_id").max().invoice_item_id.values.astype(int)
    df_comb['num_streams'] = df_streams.groupby('customer_id').size().values

    return (df_comb)
    
    
def update_target(target_file, df_comb, overwrite=False):
    """
    write the clean data in a target file located in the working directory.
    Overwrite the existing target file if overwrite is false, otherwise append the clean data to the target file
    INPUT : the name of the target file, the cleaned dataframe and an overwrite flag
    OUPUT : None
    """
    ## YOUR CODE HERE
    if overwrite or not os.path.exists(target_file):
        df_comb.to_csv(target_file, index=False)   
    else:
        df_comb.to_csv(target_file, mode='a', header=False, index=False)
    
        
if __name__ == "__main__":
  
    ## collect and handle arguments with getopt or argparse
    ## YOUR CODE HERE

    ## collect args
    arg_string = "%s -d db_filepath -s streams_filepath"%sys.argv[0]
    try:
        optlist, args = getopt.getopt(sys.argv[1:],'d:s:')
    except getopt.GetoptError:
        print(getopt.GetoptError)
        raise Exception(arg_string)

    ## handle args
    streams_file = None
    db_file = None
    for o, a in optlist:
        if o == '-d':
            db_file = a
        if o == '-s':
            streams_file = a
    streams_file = os.path.join(DATA_DIR,streams_file)
    db_file = os.path.join(DATA_DIR,db_file)
    target_file = os.path.join(DATA_DIR, "aavail-subscribers.csv")
    
    ## make the connection to the database
    ## YOUR CODE HERE
    conn = connect_db(db_file)

    ## ingest the data and transform it
    ## YOUR CODE HERE
    customer_df = ingest_db_data(conn)
    df_streams, df_churn = ingest_stream_data(streams_file)
    df_comb = process_dataframes(customer_df, df_streams, df_churn, conn)
    
    ## write the transformed data to a csv
    ## YOUR CODE HERE
    update_target(target_file, df_comb, overwrite=False)
    print("done")

    

Overwriting aavail-data-ingestor.py


Run the script once for each batch that you created and then load both the original and batch versions back into the notebook to check that they are the same. 

In [None]:
## YOUR CODE HERE

!rm ../data/aavail-subscribers.csv
!python aavail-data-ingestor.py -d aavail-customers.db -s aavail-streams.csv
!wc -l ../data/aavail-subscribers.csv

...successfully connected to db

... removed 7 duplicate rows in customer data
... removed 1164 missing stream ids
done
1001 ../data/aavail-subscribers.csv


### QUESTION 6:

**How can you improve the process?**

In paragraph form or using bullets write down some of the ways that you could improve this pipeline.

YOUR ANSWER HERE


