# DocGraph neo4j Graph DB Creation 

### Imports

In [1]:
from pyspark import SparkContext, SparkConf
import graphlab as gl

A newer version of GraphLab Create (v1.4.0) is available! Your current version is v1.3.0.
You can use pip to upgrade the graphlab-create package. For more information see https://dato.com/products/create/upgrade.

### Setting Up Spark Instance

In [2]:
# Configure Spark Settings
conf=SparkConf()
conf.set("spark.executor.memory", "9g")
conf.set("spark.cores.max", "1")
#conf.set("spark.driver.extraClassPath", gl.get_spark_integration_jar_path())
conf.setAppName("My App")

## Initialize SparkContext
sc = SparkContext('local[*]', conf=conf)

### Functions

In [3]:
# function to type cast variables
def safe_cast(val, to_type, default=0):
    try:
        return to_type(val)
    except ValueError:
        return default

In [4]:
# function to filter out all doctors not in Chicago
def chicago_filter(line):
    five_zip = safe_cast(line[10][0:5].replace('\"',''), int)
    if five_zip in zip_list:
        return True
    else:
        return False

In [5]:
def clean(line):
    new_line = []
    for cell in line:
        new_line.append(cell.replace('\"',''))
    return new_line

In [6]:
def encode_filter(line):
    new_line = []
    for first in line:
        first = first.split(',')
        for cell in first:
            cell = cell.encode('utf-8').strip()
            new_line.append(cell)    
    return new_line

In [7]:
# function to filter out the quotes in every line
def quote_filter(line):
    new_line = []
    for cell in line:
        new_line.append(cell.replace('\"',''))
    return new_line

In [8]:
def filter_npis(line):
    if line[0] in npi_dict and line[1] in npi_dict:
        return True
    else:
        return False

## Trimming to Chicago Zip Codes

### Loading the data

In [9]:
# DocGraph Node Data
dg_node_raw = sc.textFile('../../../../data/DocGraph_Procedure.csv').map(lambda x: x.split('\t'))

In [10]:
# DocGraph Edge Data
dg_edge_raw = sc.textFile('../../../../data/DocGraph-2012-2013-Days365.csv').map(lambda x: x.split('\t'))

In [11]:
# Chicago Zip Codes
chicago_zips = gl.SFrame.read_csv('../../../../data/chicago_metro_zip.csv',verbose = False)
zip_list = list(chicago_zips['zip'])

[INFO] Start server at: ipc:///tmp/graphlab_server-59357 - Server binary: /Users/astuckey002/anaconda/lib/python2.7/site-packages/graphlab/unity_server - Server log: /tmp/graphlab_server_1431613435.log
[INFO] GraphLab Server Version: 1.3.0


------------------------------------------------------
Inferred types from first line of file as 
column_type_hints=[int,str,str,str,str,str]
If parsing fails due to incorrect types, you can correct
the inferred type list above and pass it to read_csv in
the column_type_hints argument
------------------------------------------------------
PROGRESS: Finished parsing file /Users/astuckey002/Documents/data/chicago_metro_zip.csv
PROGRESS: Parsing completed. Parsed 804 lines in 0.012352 secs.


### Filter the data

In [12]:
# filter out the header
dg_header = dg_node_raw.first()
dg_node_raw = dg_node_raw.filter(lambda line: line != dg_header)

In [13]:
# line by line filtering out the quotes in each cell
dg_node_updated = dg_node_raw.map(lambda x : quote_filter(x))

In [14]:
# filter out the zip codes not in Chicago
dg_node_chicago = dg_node_updated.filter(lambda line: chicago_filter(line))

### Convert the RDD to GraphLab SFrame

In [15]:
dg_node_sf = gl.SFrame.from_rdd(dg_node_chicago)

In [16]:
dg_node_sf_u = dg_node_sf.unpack('X1')

In [17]:
col_names = {'X1.0':'npi','X1.1':'nppes_provider_last_org_name','X1.2':'nppes_provider_first_name','X1.3':'nppes_provider_mi',
             'X1.4':'nppes_credentials','X1.5':'nppes_provider_gender','X1.6':'nppes_entity_code','X1.7':'nppes_provider_street1',
             'X1.8':'nppes_provider_street2','X1.9':'nppes_provider_city','X1.10':'nppes_provider_zip',
             'X1.11':'nppes_provider_state','X1.12':'nppes_provider_country','X1.13':'provider_type',
             'X1.14':'medicare_participation_indicator','X1.15':'place_of_Service','X1.16':'hcpcs_code','X1.17':'line_srvc_cnt',
             'X1.18':'bene_unique_cnt','X1.19':'bene_day_srvc_cnt','X1.20':'average_Medicare_allowed_amt',
             'X1.21':'stdev_Medicare_allowed_amt','X1.22':'average_submitted_chrg_amt','X1.23':'stdev_submitted_chrg_amt',
             'X1.24':'average_Medicare_payment_amt','X1.25':'stdev_Medicare_payment_amt'}
dg_node_sf_u = dg_node_sf_u.rename(col_names)

In [18]:
col_types = {'line_srvc_cnt':int,'bene_unique_cnt':int,'bene_day_srvc_cnt':int,'average_Medicare_allowed_amt':float,
             'stdev_Medicare_allowed_amt':float,'average_submitted_chrg_amt':float,'stdev_submitted_chrg_amt':float,
             'average_Medicare_payment_amt':float,'stdev_Medicare_payment_amt':float}

In [19]:
for name in dg_node_sf_u.column_names():
    if name in col_types.keys():
        print name, 'starting...'
        if col_types[name] is int:
            dg_node_sf_u[name] = dg_node_sf_u[name].astype(int)
        elif col_types[name] is float:
            dg_node_sf_u[name] = dg_node_sf_u[name].astype(float)
        else:
            print 'Something went wrong with column:', name
        print name, 'ending...'

line_srvc_cnt starting...
line_srvc_cnt ending...
bene_unique_cnt starting...
bene_unique_cnt ending...
bene_day_srvc_cnt starting...
bene_day_srvc_cnt ending...
average_Medicare_allowed_amt starting...
average_Medicare_allowed_amt ending...
stdev_Medicare_allowed_amt starting...
stdev_Medicare_allowed_amt ending...
average_submitted_chrg_amt starting...
average_submitted_chrg_amt ending...
stdev_submitted_chrg_amt starting...
stdev_submitted_chrg_amt ending...
average_Medicare_payment_amt starting...
average_Medicare_payment_amt ending...
stdev_Medicare_payment_amt starting...
stdev_Medicare_payment_amt ending...


In [20]:
chi_npi_list = list(dg_node_sf_u['npi'].unique())

In [21]:
npi_dict = {}
for npi in chi_npi_list:
    npi_dict[npi] = 0

In [22]:
dg_edge_updated = dg_edge_raw.map(lambda x : encode_filter(x))

In [23]:
dg_edge_chicago = dg_edge_updated.filter(lambda line : filter_npis(line))

In [24]:
dg_edge_sf = gl.SFrame.from_rdd(dg_edge_chicago)

In [25]:
dg_edge_sf_u = dg_edge_sf.unpack('X1')

In [26]:
col_names = {'X1.0':'FirstNPI','X1.1':'SecondNPI','X1.2':'SharedTransactionCount','X1.3':'PatientTotal','X1.4':'SameDayTotal'}
dg_edge_sf_u = dg_edge_sf_u.rename(col_names)

In [27]:
col_types = {'FirstNPI':str,'SecondNPI':str,'SharedTransactionCount':int,'PatientTotal':int,'SameDayTotal':int}
for name in dg_edge_sf_u.column_names():
    if name in col_types.keys():
        print name, 'starting...'
        if col_types[name] is int:
            dg_edge_sf_u[name] = dg_edge_sf_u[name].astype(int)
        elif col_types[name] is float:
            dg_edge_sf_u[name] = dg_edge_sf_u[name].astype(float)
        elif col_types[name] is str:
            dg_edge_sf_u[name] = dg_edge_sf_u[name].astype(str)
        else:
            print 'Something went wrong with column:', name
        print name, 'ending...'

FirstNPI starting...
FirstNPI ending...
SecondNPI starting...
SecondNPI ending...
SharedTransactionCount starting...
SharedTransactionCount ending...
PatientTotal starting...
PatientTotal ending...
SameDayTotal starting...
SameDayTotal ending...


In [31]:
dg_node_sf_u.save('../../../../data/dg_nodes_chicago.csv')
dg_edge_sf_u.save('../../../../data/dg_edges_chicago.csv')
dg_node_sf_u[['npi']].unique().save('../../../../data/unique_npis.csv')