# Fraud Detection By Using Open Source Medicare Data - part 1: pig + sklearn


```python
class author(object):
    
    name = "Xinyu (Max) Liu"

    email = "xinyulrsm@gmail.com"

    create =  "01/16/2016"

    addreess = "Waltham, MA 02453"
```


This topic is inspired by ref-1 and ref-2. Here I will show how do I use the same data resource by different techeniques for data preparation, feature engineering and model fitting. I divide this work to four parts:

+ part-1: use hadoop + pig + sklearn ( this work )

+ part-2: use hive + spark

+ part-3: use hive + R

+ part-4: use flink 


References:

1) http://www.dataiku.com/blog/2015/08/12/Medicare_Fraud.html by Pierre Gutierrez @ Dataiku

    Very interesting feature engineering. 
    
2) http://nbviewer.jupyter.org/github/ofermend/IPython-notebooks/blob/master/blog-part-1.ipynb  by Ofer Mendelevitch 

    This is very good and "classic" tutorial for data science by using hadoop, pig, hive, spark and other techniques. 


## Data Source

1) 2013 Part D Prescriber data:

https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Part-D-Prescriber.html

2) 2013 payment data:

https://www.cms.gov/OpenPayments/Explore-the-Data/Dataset-Downloads.html

3) npi exclusions data:

http://oig.hhs.gov/exclusions/exclusions_list.asp#instruct


## Copy data to hadoop

In [None]:
%%bash
$HADOOP_HOME/bin/hdfs dfs -mkdir -p /data/
$HADOOP_HOME/bin/hdfs dfs -put /home/max/data/PARTD_PRESCRIBER_PUF_NPI_DRUG_13.tab    /data
$HADOOP_HOME/bin/hdfs dfs -put /home/max/data/OP_DTL_GNRL_PGYR2013_P01152016.csv      /data

## pig script for data preparison

In [None]:
-

In [None]:
%%bash  
pig dataPrep.pig

## Model and Prediction

In [None]:
import os 

import zipfile 

from dbfread import DBF

from sklearn.naive_bayes import GaussianNB 
from sklearn.svm import SVC 
from sklearn.linear_model import LogisticRegression 
from sklearn import ensemble 
from sklearn.ensemble import RandomForestClassifier 
from sklearn.metrics import brier_score_loss, precision_score, recall_score,f1_score, roc_auc_score, accuracy_score 
from sklearn.metrics import confusion_matrix, roc_curve
from sklearn.cross_validation import train_test_split 
from sklearn.preprocessing import StandardScaler 
from sklearn.feature_extraction import DictVectorizer
from sklearn.cluster import KMeans

import pydoop.hdfs as hdfs 
from pydoop.hdfs import path as hpath 

import pandas as pd

from joblib import Parallel, delayed

import random

import scipy
from scipy.stats import ttest_ind

import bokeh
from bokeh.plotting import figure, output_file, show
from bokeh.charts import Bar
from bokeh.io import output_notebook, hplot, vplot

import numpy as np

### Read fraud npi list

In [None]:
dataDir = "/home/hduser/T4/data/healthCare/"

In [None]:
%%bash
cd  /home/hduser/T4/data/healthCare/
ls *.DBF

In [None]:
# read all the npis in the DBF files
npis = []
for fnx in os.listdir(dataDir):
    if fnx.endswith('EXCL.DBF'):
        fn = os.path.join(dataDir, fnx)
        for record in DBF(fn):
            #print(record)
            npi = record['NPI']
            if npi != '0000000000':
                #print npi
                npis.append(int(npi))

In [None]:
# build a DataFrame for 'is_fraud' npis
df_npi_fraud = pd.DataFrame([[npi, 1.0] for npi in npis], columns=['npi', 'is_fraud'])

In [None]:
df_npi_fraud.head()

# Read Pard_D_13 data from hdfs generated by pig

In [None]:
# Read files from HDFS
# Note that the results generated by pig script is a folder instead of a file
# we need need to conbine all the parts in the folder to a pandas DataFrame

def hdfs_read_csv_from_hdfs(path, sep=',', cols=None, col_types=None):
    files = hdfs.ls(path);
    file_parts = []

    for f in files:
        print f
        print os.path.basename(f)
        if os.path.basename(f).startswith('_'):
            continue
        with hdfs.open(f) as fh:
            file_parts.append(pd.read_csv(fh, sep=sep, header=None, error_bad_lines=False))

    df = pd.concat(file_parts, ignore_index=True)
    if cols is not  None:
        if len(cols) == len(df.columns):
            df.columns = cols
    return df

In [None]:
cols1 = ['npi']
cols2 = ['count','specialty']
cols3 = ['last_name' , 'first_name', 'city', 'state']
cols4 = ['claim_min','claim_max','claim_sum','supply_min','supply_max','supply_sum','drug_min',
'drug_max','drug_sum']
cols5 = ['last_name_1' , 'first_name_1', 'city_1', 'state_1']
cols6 = ["payment_count", "total_payment"]

cols = cols1 + cols2 + cols3 + cols4 + cols5 +cols6

In [None]:

data_path = '/data/HealthCare/npi_payment_drugs_all.csv'

df_partD = hdfs_read_csv_from_hdfs(data_path, sep='\t', cols=cols)

In [None]:
# drop 'last_name' , 'first_name', 'city', and 'state' from DataFrame
# we don't need them in the model
df_partD.drop(cols3 + cols5,axis=1, inplace=True)

In [None]:
df_partD.head()

# NPI and drug group

In [None]:
cols_npi_drug = ["npi", "drug", "count", "total_claim_count", "total_day_supply", "total_drug_cost"]

# the file below is generated by pig
data_path = '/data/HealthCare/partd_13_npi_drug_all.csv'

npi_drug = hdfs_read_csv_from_hdfs(data_path, sep='\t', cols=cols_npi_drug)

In [None]:
npi_drug['total_claim_count'] = npi_drug['total_claim_count'].map(lambda x: np.log10(x + 1.0))

In [None]:
npi_drug['total_day_supply'] = npi_drug['total_day_supply'].map(lambda x: np.log10(x + 1.0))

In [None]:
npi_drug['total_drug_cost'] = npi_drug['total_drug_cost'].map(lambda x: np.log10(x + 1.0))

In [None]:
npi_drug.head()

In [None]:
len(npi_drug)

# Merge Pard_D data and is_fraud data



In [None]:
df = pd.merge(df_partD, df_npi_fraud, how='left', on='npi')

In [None]:
print len(df_partD), len(df_partD.columns)
print len(df), len(df.columns)
del(df_partD)

In [None]:
# fill nans by 0
df.fillna(0, inplace=True)

In [None]:
df['total_payment'].describe()

In [None]:
# we found 427 record we could use as fraud
len(df[df['is_fraud']==1])

In [None]:
df.head()

In [None]:
# add three more features
df['claim_max-min'] = df['claim_max']-df['claim_min']
df['supply_max-min'] = df['supply_max']-df['supply_min']
df['drug_max-min'] = df['drug_max']-df['drug_min']

# Feature description

## Divide data to training set and validation set¶

### we weill divide the data to training set and validation set. We will only use training set to select feature to avoid data leak.

In [None]:
#TODO add code for five-fold cross-validation

from sklearn.cross_validation import KFold


ix_ran = df.index.values
random.shuffle(ix_ran)

df_len = len(df)
train_len = int(df_len * 0.8)  # 80% for training

#kf = KFold(df_len, n_folds=5)
#for ix_train, ix_valid in kf:

ix_train = ix_ran[:train_len]
ix_valid = ix_ran[train_len:]

df_train = df.ix[ix_train]
df_valid = df.ix[ix_valid]

print  len(ix_train), len(ix_valid)

In [None]:
npi_drug_w_flag_train= pd.merge(npi_drug, df_train[['npi','is_fraud']], how='inner', on=['npi'])

npi_drug_w_flag_all= pd.merge(npi_drug, df[['npi','is_fraud']], how='inner', on=['npi'])

In [None]:
len(npi_drug_w_flag_train[npi_drug_w_flag_train['is_fraud']==1])

In [None]:
cols = ['total_claim_count' , 'total_day_supply' , 'total_drug_cost' ]

In [None]:
# get unique drug names
drugs = set([ drugx for drugx in npi_drug_w_flag_train['drug'].values if isinstance(drugx, str)])
print len(drugs)

In [None]:
print "Total records in npi_drug train set : ", len(npi_drug_w_flag_train)
print "is_fraud recodes :  ", len(npi_drug_w_flag_train[npi_drug_w_flag_train['is_fraud']==1])
npi_drug_w_flag_train.head()

In [None]:
ng_train = npi_drug_w_flag_train.groupby(['drug', 'is_fraud'])
ng_all = npi_drug_w_flag_all.groupby(['drug', 'is_fraud'])

In [None]:
ngkeys = ng_train.groups.keys()
print len(ngkeys)
print ngkeys[0:5]

In [None]:
drug_has_both_01 = [drugx for drugx in drugs if ((drugx,0.0) in ngkeys ) & ( (drugx,1.0) in ngkeys)]

In [None]:
#TODO use agg to replace the code below
re_drug_tt = dict()
for drugx in drug_has_both_01:
    for colx in cols:
        fraud_0 = ng_train.get_group((drugx,0.0))[colx].values
        fraud_1 = ng_train.get_group((drugx,1.0))[colx].values
        # print len(fraud_0), len(fraud_1)
        if (len(fraud_0)>2) & (len(fraud_1)>2) :
            tt = ttest_ind(fraud_0, fraud_1)
            re_drug_tt[(drugx, colx)] = tt

In [None]:
p005 = [(key, p) for (key, (t, p)) in re_drug_tt.items() if p <=0.05]  # p = 0.1 or 0.05
print len(p005)

In [None]:
from bokeh.charts import BoxPlot,Histogram, output_file, show, hplot,vplot
output_notebook()

In [None]:
mm=100
drug_name = p005[mm][0][0]
print drug_name
df_bar = pd.concat([ng_train.get_group((p005[mm][0][0],0.0)), ng_train.get_group((p005[mm][0][0],1.0))])
df_bar.head()

In [None]:
box = [
    BoxPlot(df_bar, values=col, label='is_fraud', title= "p-value: " + "%0.2e"%(re_drug_tt[(drug_name, col)][1]), 
               color='is_fraud', plot_width=300, plot_height=500)
       for col in ["total_claim_count", "total_day_supply", "total_drug_cost"]
      ]
#output_file(drug_name + '.html')
print drug_name
show(hplot(*box))

In [None]:
hist = [Histogram(df_bar, values=col, label='is_fraud',  color='is_fraud')
        for col  in ["total_claim_count", "total_day_supply", "total_drug_cost"]
      ]
show(vplot(*hist))

# Featuer engineering

## Drug score
### select drug and catagory from T-test by p value

### logistic regreassion to for a "drug score" calculation

In [None]:
ng_list = []
new_col_all =[]
for i, p005x in enumerate(p005):
    #if i>4:
    #   break
    drug_name = p005x[0][0]
    cat_name = p005x[0][1] 
    
    new_col = drug_name+'_'+cat_name
    new_col_all.append(new_col)

    ng_0 = ng_all.get_group((drug_name,0.0))[['npi', cat_name]]
    ng_1 = ng_all.get_group((drug_name,1.0))[['npi', cat_name]]

    ng_01 = pd.concat([ng_0, ng_1])
    ng_01.rename(columns={cat_name: new_col}, inplace=True)
    ng_list.append(ng_01)


In [None]:
df.head()

In [None]:
npi_col = df[['npi']]

In [None]:
w_npi = []

for n, nx in enumerate(ng_list):
    
    nggx = pd.merge(npi_col, nx.drop_duplicates(['npi']), on='npi', how='left')
    #print n, len(nggx), len(npi_col)
    w_npi.append(nggx)

In [None]:
for wx in w_npi:
    col_n = wx.columns[1]
    df[col_n] = wx[col_n].values

In [None]:
wx = w_npi[0]
wx.columns[1]
col_n = wx.columns[1]

In [None]:
len(wx[col_n].values)

In [None]:
del(wx)

In [None]:
df = df.fillna(0)

In [None]:
df_train = df.ix[ix_train]
df_valid = df.ix[ix_valid]

In [None]:
df_train.columns

In [None]:
df_valid.columns

In [None]:
# the code below will cause a memory issue, why?

#for ng_list_x in ng_list:
#    df=pd.merge(df, ng_list_x, on='npi', how='left')

In [None]:
X= df_train[new_col_all].values
Y = df_train['is_fraud'].values
clf =  LogisticRegression(C=1e5, class_weight={0:1, 1:4000}, n_jobs=3)
clf.fit(X,Y)
y_p=clf.predict_proba(X)


In [None]:
X = df[new_col_all].values
y_p = clf.predict_proba(X)
df['drug_score'] = y_p[:,1]

df_train = df.ix[ix_train]
df_valid = df.ix[ix_valid]

## speciaty score

In [None]:
spec_dict =[]
spec_fraud_1 = df_train[df_train['is_fraud']==1]['specialty']

In [None]:
from collections import Counter
counts = Counter(spec_fraud_1)

In [None]:
spec_dict =  dict(counts)

In [None]:
spec_dict.get('343',0)

In [None]:
df['spec_score'] = df['specialty'].map(lambda x: spec_dict.get(x, 0))

In [None]:
df_train = df.ix[ix_train]
df_valid = df.ix[ix_valid]

# Select features for modeling

In [None]:
numerical_feas_sel = ['count',
                     'claim_min','claim_max', 'claim_sum',
                     'supply_min','supply_max','supply_sum',
                     'drug_min','drug_max','drug_sum',
                     'spec_score','drug_score',
                     'payment_count', 'total_payment']
# 'claim_max-min','supply_max-min','drug_max-min', 
target = 'is_fraud'

# Single classifier

In [None]:
params_0 = {'n_estimators': 100, 'max_depth': 8, 'min_samples_split': 1, 'learning_rate': 0.01}
params_1 = {'n_estimators': 500, 'max_depth': 10, 'min_samples_split': 1, 'class_weight' : {0:1, 1:4000}, 'n_jobs':3}

scaler = StandardScaler()
    
clfs = [
    LogisticRegression(C=1e5,class_weight={0:1, 1:4000}, n_jobs=3),
    
    GaussianNB(),

    ensemble.RandomForestClassifier(**params_1),

    ensemble.ExtraTreesClassifier(**params_1),
    
    ensemble.GradientBoostingClassifier(**params_0)
    
    ]
    
      

In [None]:
X_train = df_train[numerical_feas_sel].values

y_train = df_train['is_fraud'].values
    
X_train = scaler.fit_transform(X_train)

X_valid = df_valid[numerical_feas_sel].values
y_valid = df_valid['is_fraud'].values
X_valid_x= scaler.transform(X_valid)
    

In [None]:
prob_result = []
df_m = []
clfs_fited = []
for clf in clfs:
    print("%s:" %  clf.__class__.__name__)
    clf.fit(X_train,y_train)
    clfs_fited.append(clf)
    y_pred = clf.predict(X_valid_x)
    prob_pos  = clf.predict_proba(X_valid_x)[:, 1]
    prob_result.append(prob_pos)
    m = confusion_matrix(y_valid, y_pred)
    clf_score = brier_score_loss(y_valid, prob_pos, pos_label=y_valid.max())
    print("\tBrier: %1.5f" % (clf_score))
    print("\tPrecision: %1.5f" % precision_score(y_valid, y_pred))
    print("\tRecall: %1.5f" % recall_score(y_valid, y_pred))
    print("\tF1: %1.5f" % f1_score(y_valid, y_pred))
    print("\tauc: %1.5f" % roc_auc_score(y_valid, prob_pos))
    print("\tAccuracy: %1.5f\n" % accuracy_score(y_valid, y_pred))
    df_m.append(
        pd.DataFrame(m, index=['True Negative', 'True Positive'], columns=['Pred. Negative', 'Pred. Positive'])
        )

In [None]:
fpr, tpr, thresholds = roc_curve(y_valid, prob_result[2])

In [None]:
TOOLS = 'pan, wheel_zoom,box_zoom,box_select,crosshair,resize,reset, hover'
p = figure(tools=TOOLS)
p.circle(fpr,tpr, size=4)
p.title = "ROC"
p.xaxis.axis_label  = "FP rate"
p.yaxis.axis_label  = "TP rate"
show(p)

In [None]:
feature_importance = clfs_fited[2].feature_importances_
# make importances relative to max importance
feature_importance = 100.0 * (feature_importance / feature_importance.max())
sorted_idx = np.argsort(feature_importance)

In [None]:
numerical_feas_sel

In [None]:
feature_importance[sorted_idx]

In [None]:
feas = [numerical_feas_sel[ix] for ix in sorted_idx]
bardata = {"name":feas[::-1], "importance percent":feature_importance[sorted_idx][::-1]}

In [None]:
from bokeh.charts import Bar, output_file, show, hplot
from bokeh.charts.attributes import ColorAttr, CatAttr

In [None]:
bar = Bar(bardata, values="importance percent", label=CatAttr(columns=["name"], sort=False))

In [None]:
show(bar)

## Model Blend