# MADlib Flow 

## Credit & Debit Transaction Fraud Model

Use case:
Using available credit and debit card transaction data build a classification model to predict whether or not a new transaction is fraudulent.

This notebook demonstrate the model development, feature engineering, and operationalize the model for batch and low-latency operations.

MADlibFlow is used to deploy the model, and feature engineering. This particular model workflow requires features lookup feature to do the feature engineering. The MADlibFlow will automatically populates the cache elements needed in redis based on the input manifest.

### Pre-Requisites
1. A running instance of Greenplum with MADlib
2. A running instance of Redis
3. Python Libraries: pandas, geopy, numpy, enlighten, ipython-sql, psycopg2, psycopg2-binary
4. In tasklet 1; 
   - Modify database connection parameters
   - Modify the root_dir to MADlibFlowClient install folder.
5. Modify MADlibflow model deployment JSON file        $MADLIBFLOW_CLIENT/samples/credit_transactions/credit_transaction_flow.json
     1. Replace {DATABASE_HOST}, {PORT}, {DATABASE}, {USER}, {PASSWORD} with Greenplum connection information.
     2. Replace {REDIST_HOST}

### The notebook perform the below tasks

1. Connect to greenplum and setup the session
2. Create the schema and create all the tables needed
3. Run the data creator
4. Load the data to Greenpulm
5. Feature Engineering
6. Build and train model
7. Test Batch Score the model .
8. Operationalize model with MADlibflow


# Connect to Greenplum

In [None]:
import psycopg2               # Python-PostgreSQL Database Adapter - https://pypi.python.org/pypi/psycopg2
import pandas as pd           # Python Data Analysis Library - https://pandas.pydata.org/
import math  

%load_ext sql

# PLEASE MODIFY THE BELOW AS PER YOUR GREENPLUM CLUSTER SETTINGS
database_host = '127.0.0.1'
database_databasename = 'gpadmin'
database_username = 'gpadmin'
database_password = 'pivotal'
database_port = '9432'

# PLEASE MODIFY "root_dir" PATH TO YOUR MADlibFlowClient INSTALL DIRECTORY
root_dir="/Users/sridharpaladugu/MADlibFlowClient"
data_generator_path=root_dir+"/samples/credit_transactions/CreditCardTransactionGenerator"
transactions_path=data_generator_path+"/data/"

try:
    connString = "host='{}' dbname='{}' user='{}' password='{}' port={}".format(database_host,database_databasename,database_username,database_password,database_port)
    # print connString
    conn = psycopg2.connect(connString)
    cur = conn.cursor()
    conn.autocommit = True
        
    connection = 'postgresql://{}:{}@{}:{}/{}'.format(database_username,database_password,database_host,database_port,database_databasename)
    %sql $connection

    message = "<span style='color:green'>**Connection successful!**</span>"
    print(message)
except Exception as e:
    message = "<span style='color:red'>**ERROR: Unable to connect to the database ({})**</span>".format(e)
    print(message) 

# Create Schema

In [None]:
%%sql

-- create working schema
DO $$
BEGIN

    IF NOT EXISTS(
        SELECT schema_name
          FROM information_schema.schemata
          WHERE schema_name = 'credit_trans'
      )
    THEN
      EXECUTE 'CREATE SCHEMA credit_trans';
    ELSE
      EXECUTE 'DROP SCHEMA credit_trans CASCADE';
      EXECUTE 'CREATE SCHEMA credit_trans';  
    END IF;

END
$$;


DROP TABLE IF EXISTS credit_trans.raw_accounts;

CREATE TABLE credit_trans.raw_accounts (
    raw_accounts json
);

DROP TABLE IF EXISTS credit_trans.accounts;

CREATE TABLE credit_trans.accounts (
    account_number text,
    expiration_date text,
    cvv text,
    card_type text,
    account_city text,
    account_city_alias text,
    account_state text,
    account_long float,
    account_lat float,
    account_transaction_radius integer,
    trxn_mean float,
    trxn_std float,
    account_id integer
);

DROP TABLE IF EXISTS credit_trans.raw_locations;

CREATE TABLE credit_trans.raw_locations (
    raw_locations json
);

DROP TABLE IF EXISTS credit_trans.locations;
CREATE TABLE credit_trans.locations (
    rlb_location_key text,
    merchant_name text,
    merchant_trxn_mean float,
    merchant_trxn_std float,
    merchant_city text,
    merchant_state varchar(2),
    merchant_long float,
    merchant_lat float,
    merchant_city_alias text,
    transaction_id integer,
    location_id integer
);

DROP TABLE IF EXISTS credit_trans.raw_transactions;

CREATE TABLE credit_trans.raw_transactions (
    account_id integer,
    account_number text,
    card_type text,
    fraud_flag boolean,
    location_id integer,
    merchant_city text,
    merchant_city_alias text,
    merchant_lat double precision,
    merchant_long double precision,
    merchant_name text,
    merchant_state text,
    posting_date text,
    rlb_location_key text,
    transaction_amount double precision,
    transaction_date text,
    transaction_id integer
);


# Generate Test data

In [None]:
! cd $MADLIBFLOW_HOME/samples/credit_transactions/CreditCardTransactionGenerator; python Generator.py

# Load data from files

In [None]:
import psycopg2
import os

cur = conn.cursor()

with open(data_generator_path+"/accounts.json", 'r') as accountsFile:
#   accountsFile.readline()  # Skip the header row.
  cur.copy_from(accountsFile, 'credit_trans.raw_accounts')
  conn.commit()

with open(data_generator_path+"/locations.json", 'r') as locationsFile:
#   locationsFile.readline()  # Skip the header row.
  cur.copy_from(locationsFile, 'credit_trans.raw_locations')
  conn.commit()

datafiles = os.listdir(transactions_path)
for datafile in datafiles:
    print (datafile)
    if datafile.startswith("transactions_"):
        print ("loading file" + datafile +"............")
        with open(transactions_path+datafile, 'r') as f:
            f.readline()  # Skip the header row.
            cur.copy_from(f, 'credit_trans.raw_transactions', sep=',')
            conn.commit()


# ELT

In [None]:
%%sql

INSERT INTO credit_trans.accounts (
    SELECT raw_accounts->>'account_number' AS account_number
          ,raw_accounts->>'expiration_date' AS expiration_date
          ,raw_accounts->>'cvv' AS cvv
          ,raw_accounts->>'card_type' AS card_type
          ,raw_accounts->>'city' AS account_city
          ,raw_accounts->>'city_alias' AS account_city_alias
          ,(raw_accounts->>'state')::varchar(2) AS account_state 
          ,(raw_accounts->>'long')::float AS account_long
          ,(raw_accounts->>'lat')::float AS account_lat
          ,(raw_accounts->>'transaction_radius')::integer AS account_transaction_radius
          ,(raw_accounts->>'trxn_mean')::float AS trxn_mean
          ,(raw_accounts->>'trxn_std')::float AS trxn_std
          ,(raw_accounts->>'account_id')::integer AS account_id
    FROM (
        SELECT json_array_elements(raw_accounts)AS raw_accounts
        FROM credit_trans.raw_accounts
    ) foo
);
SELECT count(*) FROM credit_trans.accounts;

INSERT INTO credit_trans.locations (
    SELECT raw_locations->>'rlb_location_key' AS rlb_location_key
          ,raw_locations->>'merchant_name' AS merchant_name
          ,(raw_locations->>'merchant_trxn_mean')::float AS merchant_trxn_mean
          ,(raw_locations->>'merchant_trxn_std')::float AS merchant_trxn_std
          ,raw_locations->>'merchant_city' AS merchant_city
          ,(raw_locations->>'merchant_state')::varchar(2) AS merchant_state
          ,(raw_locations->>'merchant_long')::float AS merchant_long 
          ,(raw_locations->>'merchant_lat')::float AS merchant_lat
          ,raw_locations->>'merchant_city_alias' AS merchant_city_alias
          ,(raw_locations->>'transaction_id')::integer AS transaction_id
          ,(raw_locations->>'location_id')::integer AS location_id
    FROM (
        SELECT json_array_elements(raw_locations) AS raw_locations
        FROM credit_trans.raw_locations
    ) foo
);

SELECT count(*) FROM credit_trans.locations;

DROP TABLE IF EXISTS credit_trans.transactions;

CREATE TABLE credit_trans.transactions AS
SELECT 
    account_id,
    account_number,
    card_type,
    fraud_flag,
    location_id,
    merchant_city,
    merchant_city_alias,
    merchant_lat,
    merchant_long,
    merchant_name,
    merchant_state,
    to_timestamp(posting_date::float) AT TIME ZONE 'EST' AS posting_date,
    rlb_location_key,
    CASE WHEN transaction_amount < 0 THEN 0 ELSE transaction_amount END AS transaction_amount,
    to_timestamp(transaction_date::float) AT TIME ZONE 'EST' AS transaction_date,
    transaction_id
FROM credit_trans.raw_transactions
DISTRIBUTED RANDOMLY;

select count(*) from credit_trans.transactions


# Data Audit

## Summary Statistics

https://madlib.apache.org/docs/latest/group__grp__summary.html

In [None]:
%%sql

--drop existing table & run madlib summary stats function

DROP TABLE IF EXISTS credit_trans.transactions_summary;
SELECT madlib.summary('credit_trans.transactions','credit_trans.transactions_summary');

-- grab results from gpdb
SELECT * FROM credit_trans.transactions_summary;

--drop existing table & run madlib summary stats function
DROP TABLE IF EXISTS credit_trans.locations_summary;
SELECT madlib.summary('credit_trans.locations','credit_trans.locations_summary');

-- grab results from gpdb
SELECT * FROM credit_trans.locations_summary;

-- create join table for exploration
DROP TABLE IF EXISTS credit_trans.transactions_accounts;
CREATE TABLE credit_trans.transactions_accounts AS
SELECT t.*
      ,a.account_city
      ,a.account_city_alias 
      ,a.account_state 
      ,a.account_long
      ,a.account_lat
      ,a.account_transaction_radius
FROM credit_trans.transactions AS t
JOIN credit_trans.accounts AS a
USING (account_id);


# Data Exploration

### Create Joined Table For Exploration

In [None]:
%%sql

DROP TABLE IF EXISTS credit_trans.transactions_accounts;
CREATE TABLE credit_trans.transactions_accounts AS
SELECT t.*
      ,a.account_city
      ,a.account_city_alias 
      ,a.account_state 
      ,a.account_long
      ,a.account_lat
      ,a.account_transaction_radius
FROM credit_trans.transactions AS t
JOIN credit_trans.accounts AS a
USING (account_id);

SELECT * FROM credit_trans.transactions_accounts LIMIT 5;


## View Merchant Locations

In [None]:
# grab data
df = %sql SELECT merchant_lat, merchant_long FROM madlibflowdemo.locations;

lat = df.DataFrame()['merchant_lat'].values
lon = df.DataFrame()['merchant_long'].values

# Draw map
fig = plt.figure(figsize=(10, 10))
m = Basemap(projection='lcc', 
            resolution='c',
            width=0.7E6, 
            height=0.7E6 ,
            lat_0=33, 
            lon_0=-83)


m.drawcoastlines(color='gray')
m.drawcountries(color='gray')
m.drawstates(color='white')
m.drawcounties(color='gray')

try:
    m.scatter(lon, lat, latlon=True, cmap='Reds', alpha=0.6, marker='.')
except Exception as e:
    print(e)
    

try:
    m.shadedrelief()
except Exception as e:
    print(e)

# Feature Engineering

In [None]:
%%sql

-- transaction features

DROP TABLE IF EXISTS credit_trans.transactions_features;
CREATE TABLE credit_trans.transactions_features AS
SELECT transaction_id
      ,transaction_amount
      ,account_id
      ,location_id
      ,fraud_flag
FROM credit_trans.transactions
WHERE transaction_date > now() - interval '30 days';

-- merchant features

DROP TABLE IF EXISTS credit_trans.merchant_features;
CREATE TABLE credit_trans.merchant_features AS
SELECT l.merchant_name
      ,t.*
FROM credit_trans.locations AS l
JOIN (
  SELECT location_id
        ,merchant_state
        ,count(CASE WHEN fraud_flag = True THEN 1 ELSE null END) AS m_fraud_cases
        ,min(extract('hour' from transaction_date)) AS m_min_hour
        ,max(extract('hour' from transaction_date)) AS m_max_hour
        ,avg(transaction_amount) AS m_avg_transaction_amount
        ,min(transaction_amount) AS m_min_transaction_amount
        ,max(transaction_amount) AS m_max_transaction_amount
        ,stddev(transaction_amount) AS m_stddev_transaction_amount
        ,count(*) AS m_number_transactions
        ,count(distinct account_id) AS m_unique_accounts
        ,coalesce(count(CASE WHEN card_type = 'Discover' THEN 1 ELSE null END) / (count(*))::float,0) AS m_prop_discover_transactions
        ,coalesce(count(CASE WHEN card_type = 'Amex' THEN 1 ELSE null END) / (count(*))::float,0) AS m_prop_amex_transactions
        ,coalesce(count(CASE WHEN card_type = 'Diners' THEN 1 ELSE null END) / (count(*))::float,0) AS m_prop_diners_transactions
        ,coalesce(count(CASE WHEN card_type = 'MasterCard' THEN 1 ELSE null END) / (count(*))::float,0) AS m_prop_mastercard_transactions
        ,coalesce(count(CASE WHEN card_type = 'Visa' THEN 1 ELSE null END) / (count(*))::float,0) AS m_prop_visa_transactions
  FROM credit_trans.transactions
  WHERE transaction_date > now() - interval '30 days'
  GROUP BY 1, 2
) AS t
USING (location_id);

-- Account features

DROP TABLE IF EXISTS credit_trans.account_features;
CREATE TABLE credit_trans.account_features AS
SELECT a.account_state
      ,a.card_type
      ,t.*
FROM credit_trans.accounts AS a
JOIN (
  SELECT account_id
        ,min(extract('hour' from transaction_date)) AS a_min_hour
        ,max(extract('hour' from transaction_date)) AS a_max_hour
        ,avg(transaction_amount) AS a_avg_transaction_amount
        ,min(transaction_amount) AS a_min_transaction_amount
        ,max(transaction_amount) AS a_max_transaction_amount
        ,stddev(transaction_amount) AS a_stddev_transaction_amount
        ,count(*) AS a_number_transactions
  FROM credit_trans.transactions
  WHERE transaction_date > now() - interval '30 days'
  GROUP BY 1
) AS t
USING (account_id);


-- combined features

DROP TABLE IF EXISTS credit_trans.all_features;
CREATE TABLE credit_trans.all_features AS
SELECT *
      ,abs(t.transaction_amount - m.m_avg_transaction_amount) AS m_transaction_delta
      ,abs(t.transaction_amount - a.a_avg_transaction_amount) AS a_transaction_delta
FROM credit_trans.transactions_features AS t
JOIN credit_trans.merchant_features AS m
USING (location_id)
JOIN credit_trans.account_features AS a
USING (account_id);


## One Hot Encode Transaction Features

    https://madlib.apache.org/docs/latest/group__grp__encode__categorical.html

In [None]:
categoricalFeatures = ['merchant_name','merchant_state','m_min_hour','m_max_hour','account_state','card_type','a_min_hour','a_max_hour']
continuosFeatues = ['transaction_amount','m_fraud_cases','m_avg_transaction_amount','m_min_transaction_amount','m_max_transaction_amount','m_stddev_transaction_amount','m_number_transactions','m_unique_accounts','m_prop_discover_transactions','m_prop_amex_transactions','m_prop_diners_transactions','m_prop_mastercard_transactions','m_prop_visa_transactions','a_avg_transaction_amount','a_min_transaction_amount','a_max_transaction_amount','a_stddev_transaction_amount','a_number_transactions','m_transaction_delta','a_transaction_delta']
idColumns = ['account_id', 'location_id', 'transaction_id', 'fraud_flag']


# encode categorical features
query = """
    DROP TABLE IF EXISTS credit_trans.all_features_onehot, credit_trans.all_features_onehot_dictionary;
    SELECT madlib.encode_categorical_variables (
        'credit_trans.all_features',
        'credit_trans.all_features_onehot',
        '{}',
        NULL,
        '{}',
        NULL,
        'merchant_name=TravelCenters Of America, card_type=Diners',
        NULL,
        NULL,
        TRUE
    );
""".format(','.join(categoricalFeatures),','.join(continuosFeatues + idColumns))
cur.execute(query)

query = """
    SELECT *
    FROM credit_trans.all_features_onehot
    LIMIT 5
"""
cur.execute(query)

colnames = [desc[0] for desc in cur.description]

pivotFeatures = [c for c in colnames if c not in categoricalFeatures + continuosFeatues + idColumns] 

print(pivotFeatures)

pd.DataFrame(cur.fetchall(), columns=colnames)


# Model Training

    ## Training & Validation Sample Split
       
       https://madlib.apache.org/docs/latest/group__grp__train__test__split.html

In [None]:
%%sql
-- split training and validation set
-- we are careful not to include the same customer in both sets

DROP TABLE IF EXISTS credit_trans.model
                    ,credit_trans.model_train
                    ,credit_trans.model_test;                        
SELECT madlib.train_test_split(
    'credit_trans.all_features_onehot',
    'credit_trans.model',
    0.3,
    NULL,
    NULL,
    '*',
    FALSE,
    TRUE
);

SELECT fraud_flag
      ,count(*)
FROM credit_trans.model_train
GROUP BY 1
ORDER BY 1;

# Random Forest (MADlib)

    ## Train model
    
    https://madlib.apache.org/docs/latest/group__grp__random__forest.html

In [None]:
%%sql

-- train random forest model
DROP TABLE IF EXISTS credit_trans.rf_model, credit_trans.rf_model_summary, credit_trans.rf_model_group;
SELECT madlib.forest_train(
            'credit_trans.model_train',
            'credit_trans.rf_model',
            'transaction_id',
            'fraud_flag',
            'transaction_amount, m_fraud_cases, m_transaction_delta, a_transaction_delta, merchant_state_2',
            null,
            null,
            10::integer,
            4::integer,
            true::boolean,
            5::integer,
            10::integer,
            3::integer,
            1::integer,
            10::integer
        );



## Score Validation Data

https://madlib.apache.org/docs/latest/group__grp__random__forest.html

In [None]:
%%sql

-- Score out-of-sample
DROP TABLE IF EXISTS credit_trans.model_test_scored;
SELECT madlib.forest_predict('credit_trans.rf_model',
                             'credit_trans.model_test',
                             'credit_trans.model_test_scored',
                             'prob');
                
DROP TABLE IF EXISTS credit_trans.model_test_scored_tmp;
CREATE TABLE credit_trans.model_test_scored_tmp AS
SELECT *
FROM credit_trans.model_test_scored
JOIN credit_trans.model_test
USING (transaction_id);
DROP TABLE credit_trans.model_test_scored;
ALTER TABLE credit_trans.model_test_scored_tmp RENAME TO model_test_scored;


# MADlibFlow

## Operationalize the Credit Fraud model 

The required input files for model, feature, feature cache are in the $MADLIBFLOW_CLIENT/samples/credit_transactions folder.
The file "credit_transaction_flow.json" defines the deployment workflow.

In [1]:
! madlibflow --help

usage: madlibflow [-h] [--name NAME] [--action {deploy,undeploy}]
                  [--type {flow,model,featuresengine,featurecache,featuregenerator}]
                  [--target {docker,kubernetes,pks}] [--inputJson [INPUTJSON]]

optional arguments:
  -h, --help            show this help message and exit
  --name NAME           unique name for module
  --action {deploy,undeploy}
  --type {flow,model,featuresengine,featurecache,featuregenerator}
  --target {docker,kubernetes,pks}
  --inputJson [INPUTJSON]
                        input for corresponding module


In [2]:
! echo "Deploying Credit Model to Docker......"
! madlibflow --name credit_fraud --type flow --action deploy --target docker --inputJson $MADLIBFLOW_HOME/samples/credit_transactions/credit_transaction_flow.json

Deploying Credit Model to Docker......
deployName => credit_fraud
processing deployment package =>model
Deploying credit_fraud_madlibrest_1 to Docker ..........
docker-compose -p credit_fraud -f /Users/sridharpaladugu/MADlibFlowClient/docker/madlibrest/docker-compose.yml up -d --build --quiet-pull
Creating credit_fraud_madlibrest_1 ... 
[1Bting credit_fraud_madlibrest_1 ... [32mdone[0mRunning post continer script in credit_fraud_madlibrest_1 ..........
Finished the provisioning the madlibrest  container! 
service end point => http://172.17.0.3:5432/predict
processing deployment package =>featuresengine
Deploying credit_fraud_madlibfeaturesengine_1 to Docker ..........
docker-compose -p credit_fraud -f /Users/sridharpaladugu/MADlibFlowClient/docker/madlibfeaturesengine/docker-compose.yml up -d --build --quiet-pull
Creating credit_fraud_madlibfeaturesengine_1 ... 
[1Bting credit_fraud_madlibfeaturesengine_1 ... [32mdone[0mRunning post continer script in credit_fraud_madlibfeaturese

In [4]:
! madlibflow --name credit_fraud --type flow --action undeploy --target docker 

deployName => credit_fraud
un-deploying container credit_fraud_madlibrest_1
fd76b0da0465
fd76b0da0465
un-deploying container credit_fraud_madlibfeaturesengine_1
9bba57bdf211
9bba57bdf211
un-deploying container credit_fraud_madlibflow_1
6236f80d6780
6236f80d6780
