# Purpose
This notebook describes the typical activities carried out at the beginning to a project / thread when customer shares new data. We will be trying to understand the tables, columns and information flow. Typically we also look for data issues and confirm with respective owners for resolution. At the end of this activity, the data sources and their treatment is finalized. Code in this notebook will not be part of the production code.

This data can be downloaded from [here](https://drive.google.com/file/d/14FaUvJuhQLZevvqQcnOZZxvkvKbdPhZ7/view?usp=sharing)

# Initialization

In [0]:
%load_ext autoreload
%autoreload 2 

In [0]:
from ta_lib.core.api import get_package_path
PACKAGE_PATH=get_package_path()

In [0]:
import sys
import os.path as op
CURR_LOC_IN_DBFS=op.join(PACKAGE_PATH,"..","databricks/notebooks/03_reference_notebooks/python/")
sys.path.append(CURR_LOC_IN_DBFS)

In [0]:
import os
import os.path as op
import pandas as pd 
import great_expectations as ge
from dateutil.relativedelta import relativedelta
os.environ['TA_DEBUG'] = "False"
os.environ['TA_ALLOW_EXCEPTIONS'] = "True"

In [0]:
import warnings

warnings.filterwarnings('ignore', message="The sklearn.metrics.classification module", category=FutureWarning)
warnings.filterwarnings('ignore', message=".*title_format is deprecated. Please use title instead.*")

In [0]:
%%time
from ta_lib.core.api import (
    create_context,
    display_as_tabs,
    initialize_environment,
    string_cleaning,
    setanalyse,
    merge_expectations
)
import ta_lib.core.api as dataset
import ta_lib.eda.api as analysis
import ta_lib.reports.api as health

In [0]:
# Initialization
initialize_environment(debug=False, hide_warnings=True)

In [0]:
REPORT_LOC = op.join(PACKAGE_PATH,'..','databricks')

# Data

## Background

Customer is a distributor of consumption goods. There are a variety of data files which we would be using. For more refer to the readme in the data archive. Business wants to know whether a particular customer will churn or not with Machine learning. 

Below is an overview of each of the keys in the dataset:

| | Key | Dataset | Details |
|--|--|--|--|
| 1 | pri_bpm | sales.csv | The monthly primary sales from the manufacturer to each distributor. The column pri_sales_amount indicates the sales. We will use this column for defining churn of a distributor, i.e. whenever the sales turn 0 or negative, the distributor would be defined as churned. The cutoff or reference date we'll use to predict future churn is 2013-05-01 |
| 2 | sec_bpm | net_value.csv | The monthly secondary business, with column sec_netvalues identifying the total sales from each distributor |
| 3 |retail_program|retail_outlet.csv | Details on retail programs organized by the client. The column wholesale_net_value indicates the the amount that the distributor sold on wholesale |
| 4 | order_alloc | orders.csv | Details of orders placed by retailers from the distributor. Column allocated_value indicates the total order amount to be fulfilled by the distributor |
|5 | orders_and_allocated_reason | orders_extended.csv | If the distributor was not able to fulfil the retailer's order completely, then the reason for the same is provided here, under column reason_code_description_new_new|
| 6 | ordered_with_app | order_with_app.csv | The distributors use an app to order from the client's sales manager. This file contains information on whether the order was placed on the app | 
| 7 | ordered_without_app | order_without_app.csv | Information about distributors who order without app | 
| 8 | doj | date_of_joining.csv | The exact date on which the distributor joined | 
| 9 | returns | returns.csv | The distributor's profits/ returns | 
| 10 | coverage | monthly_coverage.csv | Details about how much the distributor is covering the retailers orders
| 11 | dist_retail_invoice | invoice.csv | Details about how many unique retailers are there for each distributor and how many times retailers bought from distributor | 
| 12 | no_obj | no_objection.csv | Quarterly data on policy related information for the distributor | 
| 13 | ec | business.csv | Information on number of retailers for each distributors |

In [0]:
config_path = op.join(CURR_LOC_IN_DBFS,'conf', 'config.yml')
context = create_context(config_path)
dataset.list_datasets(context)

In [0]:
# Loading all datasets in a loop
data = dict()
for i in dataset.list_datasets(context):
    if '/raw/' in i:
        key_ = i.replace('/raw/','')+'_df'
        data[key_] = dataset.load_dataset(context,i)
        # Standardize column names
        data[key_].columns = string_cleaning(data[key_].columns,lower=True)

## Exploratory Data Analysis

### Shape of Data

In [0]:
(
    pd.DataFrame({x:data[x].shape for x in data.keys()})
    .T
    .rename(columns={0:'rows',1:'columns'})
    .sort_values('rows',ascending=False)
)

## Variable summary

In [0]:
summaries = [analysis.get_variable_summary(data[x]) for x in data.keys()]
displayHTML(display_as_tabs([(x, summaries[idx]) for idx, x in enumerate(data.keys())], cloud_env='Databricks'))

## Merging

### Expected data validation rules

1. Month should be between 1 & 12.
2. One customer is mapped to only one area code in  all tables
3. One area name is mapped to only one area code in  all tables
4. area name in returns_df should be in sec_bpm_df or retail_program_df
5. primary keys
6. No time gaps
7. customer_code in all tables should be subset of doj

#### Great Expectations 

We will leverage the [Great Expectations](https://docs.greatexpectations.io/en/latest/) library for exploring the data quality & validating whether it conforms to expected business rules.

In [0]:
# Great Expectation dataframes
data = {key: ge.from_pandas(df) for key, df in data.items()}

#### Rule 1 verification

In [0]:
verification_dict = {}
dict_key_cols = {2: ['customer_code','asm_area_code'], 3: ['asm_area_name','asm_area_code']}

# Rule tests
rule_1_dict = {}
test_success_flag = True
for key_ in data.keys():
    # Rule 1 verification
    if 'month' in data[key_].columns:
        rule_val = data[key_].expect_column_values_to_be_between('month', min_value=1, max_value=12, mostly=None, 
                                                            result_format="BASIC", include_config=True).to_json_dict()
        if rule_val["success"] == False:
            test_success_flag = False
            print(("Rule 1 failed for table: {}").format(key_))
                  
        rule_1_dict[key_] = rule_val

# Rule 1 - verification append
verification_dict["rule_1_test"] = rule_1_dict
if test_success_flag:
    print(('Rule {} passed').format(1))
else:
    print(('Rule {} failed').format(1))

#### Rule 2, 3 verification

In [0]:
dict_key_cols = {2: ['customer_code','asm_area_code'], 3: ['asm_area_name','asm_area_code']}

# Rule tests
for rule_num, key_cols in dict_key_cols.items():
    rule_dict = {}
    for key_ in data.keys():
        col1, col2 = key_cols
        if len(set(key_cols)-set(data[key_].columns)) == 0:
            df_temp = data[key_][key_cols].drop_duplicates()
            rule_dict[key_] = df_temp.groupby(col1)[col2].nunique().min()

    # Rule - verification append
    test_name = "rule_{}_test".format(rule_num)
    df_temp = ge.from_pandas(pd.DataFrame(rule_dict.items(), columns=["keys", "nunique"]))
    verification_dict[test_name] = df_temp.expect_column_values_to_be_between('nunique', min_value=0, max_value=1, mostly=None, 
                                                            result_format="BASIC", include_config=True).to_json_dict()
    
    if verification_dict[test_name]["success"]:
        print(('Rule {} passed').format(rule_num))
    else:
        print(('Rule {} failed').format(rule_num))

#### Rule 4 verification

In [0]:
area_name_super = data['sec_bpm_df']['asm_area_name'].tolist()
area_name_super = area_name_super+ (
    data['retail_program_df']['asm_area_name']
    .tolist()
)

verification_dict["rule_4_test"] = data['returns_df'].expect_column_values_to_be_in_set('asm_area_name', area_name_super, mostly=None, 
                                                      result_format="BASIC", include_config=True).to_json_dict()

if verification_dict["rule_4_test"]["success"]:
    print(('Rule {} passed').format(4))
else:
    print(('Rule {} failed').format(4))

#### Rule 5 verification

In [0]:
pk_rules = {
    'doj_df': ['customer_code'],
    'no_obj_df': ['claim_year', 'quarter', 'customer_code',
                  'calendar_quarter', 'calendar_year'],
    'pri_bpm_df': ['asm_area_code', 'customer_code', 'month', 'year'],
    'sec_bpm_df': ['customer_code', 'asm_area_name', 'channel_desc',
                   'month', 'year'],
    'order_alloc_df': ['customer_code', 'year', 'month'],
    'orders_and_allocated_reason_df': ['customer_code', 'year', 'month'
            ,'reason_code_description_new_new'],
    'returns_df': ['customer_code', 'asm_area_name', 'month', 'year'],
    'coverage_df': ['asm_area_code', 'customer_code', 'month', 'year'],
    'retail_program_df': ['customer_code', 'asm_area_name',
                          'asm_area_code', 'month', 'year'],
    'ec_df': ['asm_area_code', 'customer_code', 'year', 'month'],
    'ordered_with_app_df': ['asm_area_code', 'customer_code', 'year',
                            'month'],
    'ordered_without_app_df': ['asm_area_code', 'customer_code', 'year'
                               , 'month'],
    'dist_retail_invoice_df': ['asm_area_code', 'customer_code', 'year'
                               , 'month'],
    }

rule_5_dict = {}
for table_, primary_keys in pk_rules.items():
    rule_val = data[table_].expect_compound_columns_to_be_unique(primary_keys, mostly=None, 
                                                      result_format="BASIC", include_config=True).to_json_dict()
    if rule_val["success"] == False:
        print(("Rule {0} failed for table: {1}").format(5, table_))

    rule_5_dict[table_] = rule_val
    
verification_dict["rule_5_test"] = rule_5_dict

#### Rule 6 verification

In [0]:
gap_cols = ['year','month']
rule_6_dict = {}
df_validation = pd.DataFrame()
for key_ in data.keys():
    if len(set(gap_cols)-set(data[key_].columns)) == 0:
        df_temp = ge.from_pandas(data[key_][gap_cols].drop_duplicates())
        df_temp['date'] = pd.to_datetime(df_temp['year'].astype(str)+'-'+df_temp['month'].astype(str).str.zfill(2)+'-'+'01')
        start_date, end_date = df_temp['date'].min(), df_temp['date'].max()
        expected_size = (end_date.year - start_date.year) * 12 + end_date.month - start_date.month + 1
        rule_6_dict[key_] = df_temp.expect_table_row_count_to_equal(expected_size, 
                                                                         result_format="BASIC", include_config=True).to_json_dict()
        if rule_6_dict[key_]["success"] == False:
            print(("Rule {0} failed for table: {1}").format(6, key_))

verification_dict["rule_6_test"] = rule_6_dict

This needs to be highlighted to customer. No action required in data pipelining

#### Rule 7 verification

In [0]:
rule_7_dict = {}
expected_set = data['doj_df']['customer_code'].tolist()
for key_ in data.keys():
    if 'customer_code' in data[key_].columns:
        rule_7_dict[key_] = data[key_].expect_column_values_to_be_in_set('customer_code', expected_set, mostly=None, 
                                                      result_format="BASIC", include_config=True).to_json_dict()
        if rule_7_dict[key_]["success"] == False:
            print(("Rule 7 failed for tables: {}").format(key_))

verification_dict["rule_7_test"] = rule_7_dict

In [0]:
# Great Expectation dataframes
data = {key: pd.DataFrame(df) for key, df in data.items()}

### Fixes for failed rules

#### Rule 5

In [0]:
df_temp = (
    data['no_obj_df']
    .groupby(pk_rules['no_obj_df'])
    .size()
    .reset_index()
    .rename(columns={0:'size'})
    .query('size > 1')
    .sort_values('size')
)

verification_dict["merge_1"] = merge_expectations(data['no_obj_df'], df_temp, pk_rules['no_obj_df'])
print(verification_dict["merge_1"]["actionable_warnings"])
sample_duplicates = (
    data['no_obj_df']
    .merge(df_temp,on=pk_rules['no_obj_df'])
    .sort_values(['size']+pk_rules['no_obj_df'])
    .drop('size',axis=1)
)
displayHTML(display_as_tabs([
    ('freq',str(df_temp['size'].value_counts())), 
    ('head',df_temp.head()), 
    ('tail',df_temp.tail()),
    ('sample_duplicates_head',sample_duplicates.head()),
    ('sample_duplicates_tail',sample_duplicates.tail())
], cloud_env='Databricks'))

##### Data duplicates seems to be problem. Trying by dropping duplicates

In [0]:
(
    data['no_obj_df']
    .drop_duplicates()
    .groupby(pk_rules['no_obj_df'])
    .size()
    .reset_index()
    .rename(columns={0:'size'})
    .query('size > 1')
    .sort_values('size')
).shape

#### Rule 7

Ideally all the customers present in all data sources are should be present in the doj_df(date of joining data). However there seem to be 284 customers who have data in the business data but dont have a date of joining.

This has to be reported to the client.

However based on EDA we can choose to either select the customers present in both doj_df and ec_df or use ec_df and fill in proxy date of joining for the missing ones.

## Health Analysis

Get an overview of the overall health of your dataset. This is usually quick to compute and hopefully highlights some problems to focus on.

### Summary Plot

Provides a high level summary of the health your dataset.

**Watch out for:**

* too few numeric values
* high % of missing values
* high % of duplicate values
* high % of duplicate columns

In [0]:
summaries_and_plots = [analysis.get_data_health_summary(data[x], return_plot=True) for x in data.keys()]
plots = [x[1] for x in summaries_and_plots]
displayHTML(display_as_tabs([(x, plots[idx]) for idx, x in enumerate(data.keys())], cloud_env='Databricks'))

**Dev NOTES**

<details>
1. Datatypes : We have both numeric and other types. The bulk of them seem to be numeric. `Numeric` is defined to be one of [float|int|date] and the rest are categorized as `Others`. A column is assumed to have `date` values if it has the string `date` in the column name.

**[TODO]** We prob. need more types: integral, float, bool, dates/timestamps, strings. We have this functionality in Dataprocessor.

2. The missing value plot seems to indicate missing values are not present but we do have them. 

**[TODO]** The plot can be improved to better display small values

3. We are looking for duplicate observations (rows in the data). The plot shows the % of rows that are an exact replica of another row (using `df.duplicated`)

4. We are looking for duplicate features (columns in the data).

**[TODO]** The tigerml code seems complicated but it looks like we compare each column against all other similar columns (numeric/categoric) after dropping nans, infs


**[TODO]** We need better data inspectors. The current data inspectors show columns from the dataframe used to construct the plot and **not** the original data. This does not make sense for an end-user who didn't expicitly construct the intermediate data used for the plot. It would be more meaningful to have labels that match the legends (e.g unique_columns:100%, duplicate_columns:0). Also, the y-axis label doesen't tell anything. The x-axis prob. needs an axis (0 to 100%).

</details>

### Missing Values summary

This provides an overall view focussing on amount of missing values in the dataset.

**Watch out for:**
* A few columns have significant number of missing values 
* Most columns have significant number of missing values

In [0]:
summaries_and_plots = [analysis.get_missing_values_summary(data[x], return_plot=True) for x in data.keys()]
plots = [x[1] for x in summaries_and_plots]
displayHTML(display_as_tabs([(x, plots[idx]) for idx, x in enumerate(data.keys())], cloud_env='Databricks'))

**Dev notes:**

<details>
    
    * By default, the following are considered missing/NA values : `[np.Nan, pd.NaT, 'NA', None]`
    * additional values can be passed to tigerml (add_additional_na_values)
    * these are applied to all columns.
    
    * some of the above information can be learnt from the data discovery step (see discussion below)
    
</details>

### Duplicate Columns

In [0]:
summaries = [analysis.get_duplicate_columns(data[x]) for x in data.keys()]
displayHTML(display_as_tabs([(x, summaries[idx]) for idx, x in enumerate(data.keys())], cloud_env='Databricks'))

### Outlier Checks

In [0]:
summaries = [analysis.get_outliers(data[x]) for x in data.keys()]
displayHTML(display_as_tabs([(x, summaries[idx]) for idx, x in enumerate(data.keys())], cloud_env='Databricks'))

## Health Analysis report

Generate a report that has all the above data in a single html. This could be useful to submit to a client

In [0]:
os.environ['DEBUG'] = 'true'
health_reports_gen = [health.summary_report(data[x],save_path=REPORT_LOC+r'/health_report_'+x+'.html') for x in data.keys()]