In [28]:
from core import *
from udf_open_payments import *

sc = SparkContext.getOrCreate()

In [2]:
### Functions (created with team) ###

def split_comma(x):
    return list(reader([x], delimiter=',', quotechar='"'))[0]

def is_na(x):  
    """
    To be wrapped with udf.
    Returns 1 if the element in the column is null and 0 otherwise. 
    """
    if x is not None and x is not '':
        return 1
    else:
        return 0

isNa = F.udf(is_na)  

def str_to_bool(x):
    """
    To be wrapped with udf.
    Applied to a colum, maps yes/no and transform them to 1/0 respectively. Nones mapped to 0.
    """
    try:
        if x.lower()=='yes':
            return 1
        else:
            return 0
    except:
        return 0

strToBool = F.udf(str_to_bool)

def max_to_one(x):
    """
    To be wrapped with udf.
    Applied to a colum, maps yes/no and transform them to 1/0 respectively. Nones mapped to 0.
    """
    try:
        if x>1:
            return 1
        else:
            return x
    except:
        return x

maxToOne = F.udf(max_to_one)

def replace_na(x):
    """
    To be wrapped with udf.
    Applied to a colum, maps yes/no and transform them to 1/0 respectively. Nones mapped to 0.
    """
    if x is None:
        return 'blank'
    else:
        return x

replaceNA = F.udf(replace_na)

def check_blanks(x):
    try:
        if len(x)==0:
            return 'blank'
        else: return x
    except:
        return x
    
checkBlankUdf = F.udf(check_blanks)


############ DF COLUMNS FUNCTIONS #################


def TransformColumn(df, cols, userDefinedFunction, newCol=None):
    """
    Gets columns `cols` of `df` and applies a `userDefinedFunction`.
    
    If newCol dictionary is specified:
            - Won't drop cols and will add the new ones with the new name 
                Format ex:
                
                    {'oldCol1':'newCol1','oldCol2':'newCol2',..,'oldColN':'newColN'} (N cols given)
    
    If newCol dictionary is not specified:
            - Will drop the old columns and substitute them by the result of the transformation.
    """
    if newCol is None:
        
        dfNew = df
        for col in cols:
            dfNew = dfNew.withColumn('%s_idx'%col, F.lit(userDefinedFunction(dfNew[col])))\
                         .drop(col)\
                         .withColumnRenamed('%s_idx'%col, col)
        return dfNew
    
    else:
        
        dfNew = df
        for col in cols:
            dfNew = dfNew.withColumn(newCol[col], F.lit(userDefinedFunction(dfNew[col])))
        return dfNew
    
def indexStringColumns(df, cols):
    """
    Modified from ex2 of Lesson5 in https://github.com/dianewoodbridge/2019-msds697-example/
    """
 
    newdf = df
    
    labels_mapping = {}
    for c in cols:
        # For each given column, fits StringIndexerModel.
        sm = StringIndexer(inputCol=c, outputCol=c+"-num").setHandleInvalid("keep")\
                .fit(newdf)
        # Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        # then drops the original columns and drops the "-num" suffix. 
        labels_mapping[c] = sm.labels
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf, labels_mapping



def oneHotEncodeColumns(df, cols, dropLast=False):
    """
    Taken from ex2 of Lesson5 in https://github.com/dianewoodbridge/2019-msds697-example/
    """
    newdf = df
    for c in cols:
        # For each given colum, create OneHotEncoder. 
        # dropLast : Whether to drop the last category in the encoded vector (default: true)
        onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=dropLast)
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)

    return newdf


# Data Loading

In [3]:
#### Open Payments Research Data Loading ####

# Files downloaded from https://www.cms.gov/OpenPayments/Explore-the-Data/Dataset-Downloads.html and put on s3

file_2017 = "s3://msds697-openpayments/OP_DTL_RSRCH_PGYR2017_P06292018.csv"
file_2016 = "s3://msds697-openpayments/OP_DTL_RSRCH_PGYR2016_P06292018.csv"
file_2015 = "s3://msds697-openpayments/OP_DTL_RSRCH_PGYR2015_P06292018.csv"
file_2014 = "s3://msds697-openpayments/OP_DTL_RSRCH_PGYR2014_P06292018.csv"
file_2013 = "s3://msds697-openpayments/OP_DTL_RSRCH_PGYR2013_P06292018.csv"

text_2017 = sc.textFile(file_2017)
columns_2017 = text_2017.map(split_comma).take(1)
df_text_2017 = text_2017.map(split_comma).toDF(columns_2017[0])

text_2013 = sc.textFile(file_2013)
columns_2013 = text_2013.map(split_comma).take(1)
df_text_2013 = text_2013.map(split_comma).toDF(columns_2013[0])

common_cols = list(set(df_text_2017.columns).intersection(set(df_text_2013.columns)))

files = [file_2016, file_2015, file_2014, file_2013]

df_research = df_text_2017.select([c for c in common_cols])

for file in files:
    text_research = sc.textFile(file)
    columns_research = text_research.map(split_comma).take(1)
    df_text = text_research.map(split_comma).toDF(columns_research[0])
    df_text =  df_text.select([c for c in common_cols])
    df_research = df_research.union(df_text)

In [4]:
df_research = df_research.withColumn("research_or_general", lit(1.0))

In [5]:
#### Open Payments General Data Loading ####

g_file_2017 = "s3n://msds697-openpayments/OP_DTL_GNRL_PGYR2017_P06292018.csv"
g_file_2016 = "s3n://msds697-openpayments/OP_DTL_GNRL_PGYR2016_P06292018.csv"
g_file_2015 = "s3n://msds697-openpayments/OP_DTL_GNRL_PGYR2015_P06292018.csv"
g_file_2014 = "s3n://msds697-openpayments/OP_DTL_GNRL_PGYR2014_P06292018.csv"
g_file_2013 = "s3n://msds697-openpayments/OP_DTL_GNRL_PGYR2013_P06292018.csv"

g_text_2017 = sc.textFile(g_file_2017)
g_columns_2017 = g_text_2017.map(split_comma).take(1)
g_df_text_2017 = g_text_2017.map(split_comma).toDF(g_columns_2017[0])

g_text_2013 = sc.textFile(g_file_2013)
g_columns_2013 = g_text_2013.map(split_comma).take(1)
g_df_text_2013 = g_text_2013.map(split_comma).toDF(g_columns_2013[0])

g_common_cols = list(set(g_df_text_2017.columns).intersection(set(g_df_text_2013.columns)))

g_files = [g_file_2016, g_file_2015, g_file_2014, g_file_2013]

df_general = g_df_text_2017.select([c for c in g_common_cols])

for file in g_files:
    text_general = sc.textFile(file)
    columns_general = text_general.map(split_comma).take(1)
    df_text = text_general.map(split_comma).toDF(columns_general[0])
    df_text =  df_text.select([c for c in g_common_cols])
    df_general = df_general.union(df_text)

In [6]:
df_general = df_general.withColumn("research_or_general", lit(0.0))

In [7]:
## Combining Data from Research and General ##

all_columns = set(df_general.columns).intersection(set(df_research.columns))

df_general = df_general.select([c for c in all_columns])
df_research = df_research.select([c for c in all_columns])
df_together = df_research.union(df_general)

# EDA

In [9]:
df_together.groupBy(df_together["Form_of_Payment_or_Transfer_of_Value"]).count().orderBy("count",ascending=False).show(50, False)

+----------------------------------------------------+--------+
|Form_of_Payment_or_Transfer_of_Value                |count   |
+----------------------------------------------------+--------+
|In-kind items and services                          |41778528|
|Cash or cash equivalent                             |11210767|
|Dividend, profit or other return on investment      |1728    |
|Stock, stock option, or any other ownership interest|726     |
|Stock                                               |256     |
|Any other ownership interest                        |222     |
|Stock option                                        |166     |
|Form_of_Payment_or_Transfer_of_Value                |10      |
+----------------------------------------------------+--------+



In [15]:
Unique_IDs = df1.select('Physician_Profile_ID').distinct()

In [16]:
Unique_IDs.count()

976208

# Processing Data for Modeling

In [16]:
%%time
##### DATA PROCESSING ######

df1 = df_together.filter("Total_Amount_of_Payment_USDollars != 0")
df1 = df1.filter("Total_Amount_of_Payment_USDollars == Total_Amount_of_Payment_USDollars")

df2 = df1.filter(df1.Covered_Recipient_Type == 'Covered Recipient Physician')\
            .drop('Covered_Recipient_Type','Teaching_Hospital_ID')

Unique_IDs = df2.select('Physician_Profile_ID').distinct()

Row_Counts = df2.groupby(df2.Physician_Profile_ID).count().withColumnRenamed("count", "Number_of_Columns")

Row_Counts = Unique_IDs.join(Row_Counts, "Physician_Profile_ID", 'left').fillna(0.0)

Avg_Payments_By_Type = df2.groupBy(df2["Physician_Specialty"])\
                       .agg(F.avg("Total_Amount_of_Payment_USDollars").alias("Avg_Amount_of_Payment_USDollars_By_Type"))

Number_of_Cash_Payments = df2.filter(df2.Form_of_Payment_or_Transfer_of_Value == "Cash or cash equivalent")\
           .groupBy(df2.Physician_Profile_ID).count().withColumnRenamed("count", "Number_of_Cash_Payments")

Number_of_Cash_Payments = Unique_IDs.join(Number_of_Cash_Payments, "Physician_Profile_ID", 'left').fillna(10.0)


Number_of_Years = df2.groupby(df2.Physician_Profile_ID)\
                .agg(F.countDistinct('Program_Year'))\
                .withColumnRenamed('count(DISTINCT Program_Year)', 'Number_of_Years')

Number_of_Years = Unique_IDs.join(Number_of_Years, "Physician_Profile_ID", 'left').fillna(10.0)


Distinct_Manufacturers = df2.groupby(df2.Physician_Profile_ID)\
                .agg(F.countDistinct('Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID'))\
                .withColumnRenamed('count(DISTINCT Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID)', 'Distinct_Manufacturers')

Distinct_Manufacturers = Unique_IDs.join(Distinct_Manufacturers, "Physician_Profile_ID", 'left').fillna(10.0)


yesNo_cols = ['Delay_in_Publication_Indicator']

df2 = TransformColumn(df=df1,
                      cols=yesNo_cols,
                      userDefinedFunction=strToBool)


first_cols = ['Recipient_City', 'Recipient_State',
              'Physician_Specialty',
              'Physician_Primary_Type',
              'Physician_License_State_code1','Program_Year',
             ]
sum_cols = ['Total_Amount_of_Payment_USDollars', 
            'Delay_in_Publication_Indicator',
            "research_or_general"]

aggregations = ([F.first(col).alias(col) for col in first_cols ] + [F.sum(col).alias(col) for col in sum_cols ])


df3_2_p = df2.groupBy('Physician_Profile_ID').agg(*aggregations)  

df4_p_na = TransformColumn(df=df3_2_p, cols=df3_2_p.columns, userDefinedFunction=replace_na)

df5_p_aux = TransformColumn(df=df4_p_na, cols=['Physician_Primary_Type',
                                               'Physician_Specialty',
                                               'Physician_License_State_code1'], userDefinedFunction=checkBlankUdf)

df_6 = df5_p_aux.join(Number_of_Years, "Physician_Profile_ID", 'left')\
                .join(Number_of_Cash_Payments, "Physician_Profile_ID", 'left')\
                .join(Row_Counts, "Physician_Profile_ID", 'left')\
                .join(Distinct_Manufacturers, "Physician_Profile_ID", 'left')

df_6 = df_6.join(Avg_Payments_By_Type, "Physician_Specialty", 'left')

new_df = df_6.withColumn("Total_Amount_of_Payment_USDollars_int", df_6["Total_Amount_of_Payment_USDollars"]\
                       .cast("integer")).drop("Total_Amount_of_Payment_USDollars")\
                       .withColumnRenamed("Total_Amount_of_Payment_USDollars_int", "Total_Amount_of_Payment_USDollars")


new_df = new_df.withColumn('avg_Total_Amount_of_Payment_USDollars', new_df.Total_Amount_of_Payment_USDollars/new_df.Number_of_Columns)

new_df_cache = new_df.cache()


CPU times: user 47.1 ms, sys: 10.2 ms, total: 57.4 ms
Wall time: 4.47 s


In [40]:
new_df_cache.head()

Row(Physician_Specialty=u'Allopathic & Osteopathic Physicians|Pathology|Clinical Informatics', Physician_Profile_ID=u'1397054', Recipient_City=u'PROVIDENCE', Recipient_State=u'RI', Program_Year=u'2015', Delay_in_Publication_Indicator=0.0, research_or_general=0.0, Physician_Primary_Type=u'Medical Doctor', Physician_License_State_code1=u'RI', Number_of_Years=2, Number_of_Cash_Payments=1, Number_of_Columns=3, Distinct_Manufacturers=2, Avg_Amount_of_Payment_USDollars_By_Type=458.6328571428572, Total_Amount_of_Payment_USDollars=124, avg_Total_Amount_of_Payment_USDollars=41.333333333333336)

In [13]:
new_df_cache.count()  

976208

In [46]:
def quantiles_payments(value):

    if value < 258:  
        return 0
    else:
        return 1

quantilesPayments = F.udf(quantiles_payments)

df_quantiles_float = TransformColumn(df=new_df_cache, 
                                     cols=["avg_Total_Amount_of_Payment_USDollars"], 
                                     userDefinedFunction=quantilesPayments)

df_quantiles_float_cache = df_quantiles_float.cache()

In [47]:
index_cols = [ 'Recipient_City',
 'Recipient_State',
 'Physician_Primary_Type',
 'Physician_Specialty',
 'Physician_License_State_code1',
 'Physician_Specialty']


dfindex, labels_mapping = indexStringColumns(df_quantiles_float_cache.drop('Physician_Profile_ID', 'Program_Year'), 
                                             index_cols)

dfindex_na = TransformColumn(df=dfindex, cols=dfindex.columns, userDefinedFunction=replace_na)

dfhot = oneHotEncodeColumns(dfindex_na, ['Recipient_City',
                                         'Physician_Primary_Type',
                                         'Physician_Specialty',
                                         'Physician_License_State_code1'])

input_cols = ['Number_of_Years',
 'Number_of_Columns',
 'Avg_Amount_of_Payment_USDollars_By_Type',
 'Number_of_Cash_Payments',
 'Recipient_City', 
 'Physician_Primary_Type',
 'Physician_Specialty',
 'Physician_License_State_code1',
 'Distinct_Manufacturers',
 "research_or_general"]

va = VectorAssembler(outputCol="features", inputCols=input_cols)

lpoints = va.setHandleInvalid("skip").transform(dfhot).select("features", "avg_Total_Amount_of_Payment_USDollars")\
                             .withColumnRenamed("avg_Total_Amount_of_Payment_USDollars", "label")

lpoints_int = lpoints.withColumn("int_label", lpoints["label"].cast("integer"))\
                            .drop("label").withColumnRenamed("int_label", "label")

splits = lpoints_int.randomSplit([0.8, 0.2])

payments_train = splits[0].cache()
payments_valid = splits[1].cache()

# Final Model 

In [13]:
%%time

dt = DecisionTreeClassifier(maxDepth=20, maxBins= 217, minInstancesPerNode=1, minInfoGain = 0, impurity='gini')
dtmodel = dt.fit(payments_train)

CPU times: user 701 ms, sys: 164 ms, total: 866 ms
Wall time: 52min 43s


In [30]:
print(dtmodel._call_java('toDebugString'))

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_bf6f45f77692) of depth 20 with 23867 nodes
  If (feature 20268 <= 0.5)
   If (feature 3 <= 23.5)
    If (feature 2 <= 222.56308482335868)
     If (feature 2 <= 152.9139084590851)
      If (feature 3 <= 9.5)
       If (feature 20267 <= 4.5)
        If (feature 3 <= 2.5)
         If (feature 20267 <= 1.5)
          If (feature 1 <= 2.5)
           If (feature 2 <= 136.8641030926666)
            If (feature 11 in {1.0})
             If (feature 19694 in {1.0})
              Predict: 0.0
             Else (feature 19694 not in {1.0})
              If (feature 2 <= 40.055673947170746)
               Predict: 1.0
              Else (feature 2 > 40.055673947170746)
               If (feature 1 <= 1.5)
                If (feature 19705 in {1.0})
                 Predict: 0.0
                Else (feature 19705 not in {1.0})
                 If (feature 19742 in {1.0})
                  Predict: 1.0
                 Else (feature 19742

In [31]:
dtpredicts = dtmodel.transform(payments_valid)

In [32]:
dtpredicts.show()

+--------------------+-----+---------------+--------------------+----------+
|            features|label|  rawPrediction|         probability|prediction|
+--------------------+-----+---------------+--------------------+----------+
|(20269,[0,1,2,3,4...|    0| [5776.0,130.0]|[0.97798848628513...|       0.0|
|(20269,[0,1,2,3,6...|    0|      [7.0,0.0]|           [1.0,0.0]|       0.0|
|(20269,[0,1,2,3,7...|    0| [6500.0,819.0]|[0.88809946714031...|       0.0|
|(20269,[0,1,2,3,9...|    0| [5679.0,272.0]|[0.95429339606788...|       0.0|
|(20269,[0,1,2,3,1...|    0|     [26.0,0.0]|           [1.0,0.0]|       0.0|
|(20269,[0,1,2,3,1...|    0|[32763.0,594.0]|[0.98219264322331...|       0.0|
|(20269,[0,1,2,3,1...|    0| [5326.0,142.0]|[0.97403072421360...|       0.0|
|(20269,[0,1,2,3,1...|    0|  [1286.0,65.0]|[0.95188749074759...|       0.0|
|(20269,[0,1,2,3,2...|    0|[32763.0,594.0]|[0.98219264322331...|       0.0|
|(20269,[0,1,2,3,2...|    0| [5326.0,142.0]|[0.97403072421360...|       0.0|

In [33]:

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(dtpredicts)
print "Test Error : " 
error = 1.0 - accuracy
print error

Test Error : 
0.0824877604287


In [34]:
print accuracy

0.917512239571


In [35]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f_one = evaluator.evaluate(dtpredicts)
print "F1 Score : "
print f_one

F1 Score : 
0.906199658466


In [36]:
dtpredicts.select('label','prediction').groupBy('label','prediction').count().show(100)

+-----+----------+------+
|label|prediction| count|
+-----+----------+------+
|    1|       0.0| 12456|
|    0|       0.0|171374|
|    1|       1.0|  6850|
|    0|       1.0|  3567|
+-----+----------+------+

