## Project: Open Payment

In [1]:
import re
import numpy as np
import pandas as pd
pd.set_option("display.max_columns", None)

from pyspark import SparkContext, SparkConf

import plotly.express as px 
from jupyter_dash import JupyterDash 
import dash_core_components as dcc 
import dash_html_components as html 
from dash.dependencies import Input, Output
app = JupyterDash(__name__) 

import os
import warnings
warnings.filterwarnings('ignore')

from keys.sadaminghkeys import *

In [2]:
# ! pip install dash
# ! pip install jupyter-dash

## 1. Load Data

In [3]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:3.3.1" pyspark-shell'
# Spark configuration
sc = SparkContext.getOrCreate()
sc.setLogLevel("OFF")

In [4]:
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set('fs.s3a.access.key', AWS_PUBLIC)
hadoopConf.set('fs.s3a.secret.key', AWS_SECRET)

In [5]:
file = 's3://usf-msds694-openpayments/BigSets/OP_DTL_GNRL_PGYR2019_P06302021.csv'
rdd = sc.textFile(file)

In [6]:
# take a small sample
rdd = sc.parallelize(rdd.take(2000)) 

## 2. Pre-Defined Functions

In [7]:
def to_DataFrame(data):
    """Convert a list of data to df"""
    df = pd.DataFrame(data)
    df.columns = df.iloc[0]
    df = df.iloc[1:]
    df.head(20)
    return df

In [8]:
def isfloat(element: str) -> bool:
    """Check if a string can be convert to float"""
    try:
        float(element)
        return True
    except ValueError:
        return False

## 3. Data Processing

### 3.1 Print and view the original data

In [9]:
COMMA_MATCHER = re.compile(r",(?=(?:[^\"']*[\"'][^\"']*[\"'])*[^\"']*$)")
rdd = rdd.map(lambda x: COMMA_MATCHER.split(x))
rdd = rdd.map(lambda x: [i.replace("\"", "") for i in x])
data = rdd.collect()
df = to_DataFrame(data)
header = list(df.columns)
df.head()

Unnamed: 0,Change_Type,Covered_Recipient_Type,Teaching_Hospital_CCN,Teaching_Hospital_ID,Teaching_Hospital_Name,Physician_Profile_ID,Physician_First_Name,Physician_Middle_Name,Physician_Last_Name,Physician_Name_Suffix,Recipient_Primary_Business_Street_Address_Line1,Recipient_Primary_Business_Street_Address_Line2,Recipient_City,Recipient_State,Recipient_Zip_Code,Recipient_Country,Recipient_Province,Recipient_Postal_Code,Physician_Primary_Type,Physician_Specialty,Physician_License_State_code1,Physician_License_State_code2,Physician_License_State_code3,Physician_License_State_code4,Physician_License_State_code5,Submitting_Applicable_Manufacturer_or_Applicable_GPO_Name,Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID,Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Name,Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_State,Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Country,Total_Amount_of_Payment_USDollars,Date_of_Payment,Number_of_Payments_Included_in_Total_Amount,Form_of_Payment_or_Transfer_of_Value,Nature_of_Payment_or_Transfer_of_Value,City_of_Travel,State_of_Travel,Country_of_Travel,Physician_Ownership_Indicator,Third_Party_Payment_Recipient_Indicator,Name_of_Third_Party_Entity_Receiving_Payment_or_Transfer_of_Value,Charity_Indicator,Third_Party_Equals_Covered_Recipient_Indicator,Contextual_Information,Delay_in_Publication_Indicator,Record_ID,Dispute_Status_for_Publication,Related_Product_Indicator,Covered_or_Noncovered_Indicator_1,Indicate_Drug_or_Biological_or_Device_or_Medical_Supply_1,Product_Category_or_Therapeutic_Area_1,Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_1,Associated_Drug_or_Biological_NDC_1,Covered_or_Noncovered_Indicator_2,Indicate_Drug_or_Biological_or_Device_or_Medical_Supply_2,Product_Category_or_Therapeutic_Area_2,Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_2,Associated_Drug_or_Biological_NDC_2,Covered_or_Noncovered_Indicator_3,Indicate_Drug_or_Biological_or_Device_or_Medical_Supply_3,Product_Category_or_Therapeutic_Area_3,Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_3,Associated_Drug_or_Biological_NDC_3,Covered_or_Noncovered_Indicator_4,Indicate_Drug_or_Biological_or_Device_or_Medical_Supply_4,Product_Category_or_Therapeutic_Area_4,Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_4,Associated_Drug_or_Biological_NDC_4,Covered_or_Noncovered_Indicator_5,Indicate_Drug_or_Biological_or_Device_or_Medical_Supply_5,Product_Category_or_Therapeutic_Area_5,Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_5,Associated_Drug_or_Biological_NDC_5,Program_Year,Payment_Publication_Date
1,UNCHANGED,Covered Recipient Physician,,,,24604,JAMES,E,BROWN,JR.,729 1/2 N MAIN ST,,NORTH SYRACUSE,NY,13212-1651,United States,,,Medical Doctor,Allopathic & Osteopathic Physicians|Obstetrics...,NY,,,,,Mission Pharmacal Company,100000000186,Mission Pharmacal Company,TX,United States,12.3,11/20/2019,1,In-kind items and services,Food and Beverage,,,,No,No Third Party Payment,,No,,,No,625331753,No,Yes,Covered,Drug,Prenatal Vitamin & Mineral,CitraNatal,0178-0796-30,,,,,,,,,,,,,,,,,,,,,2019,06/30/2021
2,UNCHANGED,Covered Recipient Physician,,,,145724,EARLANDO,O,THOMAS,,2337 RIDGEWAY AVE,,ROCHESTER,NY,14626,United States,,,Medical Doctor,Allopathic & Osteopathic Physicians|Obstetrics...,NY,,,,,Mission Pharmacal Company,100000000186,Mission Pharmacal Company,TX,United States,15.8,11/20/2019,1,In-kind items and services,Food and Beverage,,,,No,No Third Party Payment,,No,,,No,625331755,No,Yes,Covered,Drug,Prenatal Vitamin & Mineral,CitraNatal,0178-0796-30,,,,,,,,,,,,,,,,,,,,,2019,06/30/2021
3,UNCHANGED,Covered Recipient Physician,,,,216332,KATHERINE,S,LAMMERS,,2337 RIDGEWAY AVE,,ROCHESTER,NY,14626,United States,,,Medical Doctor,Allopathic & Osteopathic Physicians|Obstetrics...,NY,,,,,Mission Pharmacal Company,100000000186,Mission Pharmacal Company,TX,United States,15.8,11/20/2019,1,In-kind items and services,Food and Beverage,,,,No,No Third Party Payment,,No,,,No,625331757,No,Yes,Covered,Drug,Prenatal Vitamin & Mineral,CitraNatal,0178-0796-30,,,,,,,,,,,,,,,,,,,,,2019,06/30/2021
4,UNCHANGED,Covered Recipient Physician,,,,676113,ROBIN,B,BONE,,2700 NAPOLEON AVE,SUITE 560,NEW ORLEANS,LA,70115-6914,United States,,,Medical Doctor,Allopathic & Osteopathic Physicians|Obstetrics...,LA,,,,,Mission Pharmacal Company,100000000186,Mission Pharmacal Company,TX,United States,17.28,06/14/2019,1,In-kind items and services,Food and Beverage,,,,No,No Third Party Payment,,No,,,No,625331759,No,Yes,Covered,Drug,Prenatal Vitamin & Mineral,CitraNatal,0178-0796-30,,,,,,,,,,,,,,,,,,,,,2019,06/30/2021
5,UNCHANGED,Covered Recipient Physician,,,,424082,ARCHANA,R,PAINE,,2700 NAPOLEON AVE,SUITE 560,NEW ORLEANS,LA,70115-6914,United States,,,Medical Doctor,Allopathic & Osteopathic Physicians|Obstetrics...,LA,,,,,Mission Pharmacal Company,100000000186,Mission Pharmacal Company,TX,United States,17.28,06/14/2019,1,In-kind items and services,Food and Beverage,,,,No,No Third Party Payment,,No,,,No,625331761,No,Yes,Covered,Drug,Prenatal Vitamin & Mineral,CitraNatal,0178-0796-30,,,,,,,,,,,,,,,,,,,,,2019,06/30/2021


### 3.2 How many recipient records each state

In [15]:
col_Recipient_State = header.index('Recipient_State')
rdd_no_header = rdd.filter(lambda x: x[col_Recipient_State] != 'Recipient_State' and x[col_Recipient_State] != '')
rdd_Recipient_State = rdd_no_header.map(lambda x: x[col_Recipient_State])
Recipient_State_count = rdd_Recipient_State.countByValue().items()
Recipient_State_count = pd.DataFrame(Recipient_State_count)
Recipient_State_count.columns = ['Code', 'Count']

fig1 = px.choropleth(Recipient_State_count,
                    locations='Code',
                    color='Count',
                    color_continuous_scale='oranges',
                    locationmode='USA-states',
                    labels={'Current Unemployment Rate':'Unemployment Rate %'},
                    scope='usa')

null = fig1.update_layout(
    title={'text':'Number of Recipient Records per State',
           'xanchor':'center',
           'yanchor':'top',
           'x':0.5})

### 3.3 How much money in total for each state by manufactures

In [16]:
col_Payment_State = header.index('Total_Amount_of_Payment_USDollars')
state_code = header.index('Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_State')
rdd_no_header = rdd.filter(lambda x: x[col_Payment_State] != 'Total_Amount_of_Payment_USDollars' 
                                 and x[col_Payment_State] != ''
                                 and x[state_code] != ''
                                 and x[state_code] != 'No Third Party Payment')
rdd_Payment_State = rdd_no_header.map(lambda x: (x[state_code], float(x[col_Payment_State])) if isfloat(x[col_Payment_State]) else (x[state_code], 0))
rdd_Payment_State = rdd_Payment_State.groupByKey().mapValues(list)
rdd_Payment_State = rdd_Payment_State.mapValues(lambda x: sum(x)/len(x))
Payment_State_count = rdd_Payment_State.collect()
Payment_State_count = pd.DataFrame(Payment_State_count)
Payment_State_count.columns = ['Code', 'Payment']

fig2 = px.choropleth(Payment_State_count,
                    locations='Code',
                    color='Payment',
                    color_continuous_scale='oranges',
                    locationmode='USA-states',
                    labels={'Current Unemployment Rate':'Unemployment Rate %'},
                    scope='usa')

null = fig2.update_layout(
    title={'text':'Manufacturer amount of payments',
           'xanchor':'center',
           'yanchor':'top',
           'x':0.5})


## 4. Dash Example

In [19]:
app.layout = html.Div([
    html.H1("JupyterDash Demo"),
    dcc.Graph(id='graph1', figure=fig1),
    dcc.Graph(id='graph2', figure=fig2),
    html.Div([
        html.P('Dash converts Python classes into HTML'),
        html.P("This conversion happens behind the scenes by Dash's JavaScript front-end")
    ]),
    html.Label([
        "colorscale",
        dcc.Dropdown(
            id='colorscale-dropdown', clearable=False,
            value='plasma', options=[
                {'label': c, 'value': c}
                for c in px.colors.named_colorscales()
            ])
    ]),
])


In [20]:
app.run_server(mode='jupyterlab', debug=True)

In [11]:
sc.stop()