In [1]:
%%configure -f
{
    "conf": {
           "spark.pyspark.python": "python3",
           "spark.pyspark.virtualenv.enabled": "true",
           "spark.pyspark.virtualenv.type":"native",
           "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
           "spark.jars" : "s3://sagemaker-coherehealth-1/spark/elasticsearch-hadoop-7.10.1.jar,s3://sagemaker-coherehealth-1/spark/org.apache.commons.httpclient.jar"
    }
}

In [2]:
%%HTML
<style>
pre {white-space:pre !important;}
</style>

In [3]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1623244855533_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
sc.install_pypi_package('boto3==1.17.74')
sc.install_pypi_package('biopython')
sc.install_pypi_package('matplotlib')
sc.install_pypi_package('pandas')
sc.install_pypi_package('ipython')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting boto3==1.17.74
  Downloading https://files.pythonhosted.org/packages/63/ec/ce099cd67c8c8a60f2666d97d91e993c2dde968ec6fbaf1d06024591a38f/boto3-1.17.74-py2.py3-none-any.whl (131kB)
Collecting s3transfer<0.5.0,>=0.4.0 (from boto3==1.17.74)
  Downloading https://files.pythonhosted.org/packages/63/d0/693477c688348654ddc21dcdce0817653a294aa43f41771084c25e7ff9c7/s3transfer-0.4.2-py2.py3-none-any.whl (79kB)
Collecting botocore<1.21.0,>=1.20.74 (from boto3==1.17.74)
  Downloading https://files.pythonhosted.org/packages/4a/ac/617d3ac25ea905279deb06edd82d6c19ca272006d6dcf232b837b75c3dde/botocore-1.20.90-py2.py3-none-any.whl (7.6MB)
Collecting urllib3<1.27,>=1.25.4 (from botocore<1.21.0,>=1.20.74->boto3==1.17.74)
  Downloading https://files.pythonhosted.org/packages/0c/cd/1e2ec680ec7b09846dc6e605f5a7709dfb9d7128e51a026e7154e18a234e/urllib3-1.26.5-py2.py3-none-any.whl (138kB)
Collecting python-dateutil<3.0.0,>=2.1 (from botocore<1.21.0,>=1.20.74->boto3==1.17.74)
  Downloading https://fil

In [5]:
# df = spark.read.option('header',"true").option("delimeter", "|").csv("s3a://sagemaker-coherehealth-1/HUMANA_MEDCLAIM_201811.txt", sep="|")
df = spark.read.load('s3a://emr-cohere-data-management/humana/med_claims/transformed/')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
df = df.select('member_ssn', 'claim_id', 'service_from_date', 'diagnosis_code1', 'procedure_code')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------------+-----------------+---------------+--------------+
|member_ssn|       claim_id|service_from_date|diagnosis_code1|procedure_code|
+----------+---------------+-----------------+---------------+--------------+
| 400230095|820210610274543|       2021-01-30|           N186|         90999|
| 400230095|820210610274543|       2021-01-29|           N186|         J1756|
| 400230095|820210610274543|       2021-01-12|           N186|         J0887|
| 400230095|820210610274543|       2021-01-21|           N186|         84132|
| 400230095|820210610274543|       2021-01-08|           N186|         84520|
| 400230095|820210610274543|       2021-01-23|           N186|         A4657|
| 916625860|820210630035627|       2020-09-15|          M1711|         20610|
| 419620303|820210640298427|       2021-02-24|           R339|         A4332|
| 402172381|820210630023678|       2021-03-02|          F3181|         90792|
| 272489348|820210641422799|       2021-03-01|          C3411|  

In [8]:
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

587809621

In [9]:
df = df.filter(df.diagnosis_code1.isNotNull())
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

587808672

In [10]:
from pyspark.sql import functions as F
import operator

# primary grouping - 
# goal here is to form groups of members and diagnosis codes (i.e. member 1, diag 1 / member 2, diag 1 / member 1 diag 2 ...)
# then for each diag we collect the SET of claim id's and the ordered LIST of procedure codes.
# since spark does not guarantee order in F.collect_list, we form a struct and then sort by service_from_date

# sorter, stolen from StackOverflow
def sorter(l):
    res = sorted(l, key=operator.itemgetter(0))
    ord_ret = ''
    for struct in res:
        ord_ret += str(struct[1]) + ' '
    return ord_ret
    
sort_udf = F.udf(sorter)

grp = (
        df
         .groupby('member_ssn', 'diagnosis_code1')
         .agg(
              F.collect_list(
                             F.struct('service_from_date', 
                                      'procedure_code')
                            ).alias('list_col')
             )
      )
    
list_df = grp.select('member_ssn', 
                     'diagnosis_code1',
                     sort_udf('list_col').alias('ord_procedure_codes'))

list_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------------+--------------------+
|member_ssn|diagnosis_code1| ord_procedure_codes|
+----------+---------------+--------------------+
|      null|          D4862|99244 77065 99214...|
|      null|           F900|99214 99213 99213...|
|      null|        H4031X0|              99213 |
|      null|         L97818|11043 11046 11042...|
|      null|          M5031|J0702 20610 99213...|
|      null|         M67432|99213 20612 J0702...|
|      null|         O99815|        36415 82947 |
|      null|           P761|74021 74018 71045...|
|      null|         R41841|92523 99214 92523...|
|      null|           R791|71275 93010 93010...|
|      null|        S2001XA|99283 00400 76642...|
|      null|        S32312A|73700 99283 99213...|
|      null|        S3692XA|        99221 99231 |
|      null|        S61219A|99213 12002 90471...|
|      null|        S8251XB|              01480 |
|      null|        S86111D|97161 97010 97110...|
|      null|        S98132D|73630 99203 99213...|


In [11]:
list_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

78874665

In [12]:
# %pip install biopython
from Bio import pairwise2

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# # # # this function is iterative! we need a pyspark udf    
# def seq_align_adherence(df):
#     with open('carepaths.json', 'r') as js:
#         data = json.load(js)
    
#     adherence_arr = np.zeros(df.shape[0])
#     for i, row in df.iterrows():
#         if i % 500000 == 0:
#             print(i)
            
#         diag_cd = row['PRIMARY DIAG CD']

#         try:
#             carepaths = data[diag_cd][0]
#             best_adh = 0
#             for cp in carepaths:
#                 target, mapping = get_target_sequence(cp)
#                 pcd_seq, frac = construct_pcd_seq(ast.literal_eval(row['HCPCS CPT4 BASE CD 1']), cp, mapping)
#                 reduced_pcd_seq = get_reduced_str(pcd_seq)
#                 if frac != 0:
#                     adherence = sequence_alignment(reduced_pcd_seq, target) * frac
#                 else:
#                     adherence = 0
#                 if adherence > best_adh:
#                     best_adh = adherence
#             adherence_arr[i] = best_adh
#         except:
#             adherence_arr[i] = -1
            
#     df['sa_adherence'] = adherence_arr

# function to construct scoring dict
def make_scoring_dict(target_str):
    sdict = {}
    for c1 in target_str:
        for c2 in target_str:
            if c1 == c2:
                if c1 != '0' and c2 != '0':
                    sdict[(c1, c2)] = 10
                else:
                    sdict[(c1, c2)] = 1000
            elif c1 == '0' or c2 == '0':
                sdict[(c1, c2)] = -1000
            elif abs(int(c1) - int(c2)) == 1:
                sdict[(c1, c2)] = 5
            else:
                sdict[(c1, c2)] = 0
    return sdict

# function to get target sequence from a list
def get_target_sequence(carepath):
    mapping = {}
    key_list = list(carepath.keys())
    target = '0'
    for k in range(len(key_list)):
        mapping[key_list[k]] = str(k+1)
        target += str(k+1)
        
    return target, mapping


# function to only have unique characters
def get_reduced_str(input_str):
    new = ''
    for c in input_str:
        if c not in new:
            new += c
    return new


# FLAG -- we could roll up on both ends here
# function to query pcd codes and get their service cateogry / construct pcd sequence
def construct_pcd_seq(procedures, carepath, mapping):
    pcd_seq = '0'
    for procedure in procedures:
        for key in carepath:
            if procedure in carepath[key]:
                pcd_seq += mapping[key]
                break
                
    return pcd_seq, (len(pcd_seq)-1) / len(procedures)


def sequence_alignment(pcd_seq, cp_seq):
    scoring = make_scoring_dict(cp_seq)
    a = pairwise2.align.globaldd(cp_seq, 
                                 pcd_seq,
                                 scoring,
                                 -10,
                                 0,
                                 -5,
                                 -1,
                                 penalize_end_gaps=False,
                                 score_only=True)
    
    return ((a-1000) / (len(pcd_seq)-1) / 10)

sequence_alignment('012', '012345')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1.0

In [14]:
from pyspark.sql.types import FloatType, DoubleType
import os
import ast
import json

def seq_alignment_adherence(diag_cd, procedure_cds):
    try:
        carepaths = data[diag_cd][0]
        best_adh = 0.0
        for cp in carepaths:
            target, mapping = get_target_sequence(cp)
            code_list = procedure_cds.split(' ')
            pcd_seq, frac = construct_pcd_seq(code_list, cp, mapping)
            reduced_pcd_seq = get_reduced_str(pcd_seq)
            if frac != 0:
                adherence = sequence_alignment(reduced_pcd_seq, target) * frac
                print(adherence)
            else:
                adherence = 0
            if adherence > best_adh:
                best_adh = adherence
        return best_adh
    except:
        return -1.0
    
adherence_udf = F.udf(seq_alignment_adherence, DoubleType())

def generous_adherence(diag_cd, procedure_cds):
    try:
        carepaths = data[diag_cd][0]
        best_adh = 0.0
        for cp in carepaths:
            target, mapping = get_target_sequence(cp)
            code_list = procedure_cds.split(' ')
            pcd_seq, frac = construct_pcd_seq(code_list, cp, mapping)
            reduced_pcd_seq = get_reduced_str(pcd_seq)
            if frac != 0:
                adherence = sequence_alignment(reduced_pcd_seq, target)
            else:
                best_adh = -1.0
            if adherence > best_adh:
                best_adh = adherence
        return best_adh
    except:
        return -1.0
    
gen_adherence_udf = F.udf(generous_adherence, DoubleType())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
import boto3
import json

s3 = boto3.resource('s3')
json_str = s3.Object('sagemaker-coherehealth-1', 'claims-data/carepaths.json').get()['Body'].read().decode('utf-8')
data = json.loads(json_str)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
adh_df = list_df.withColumn('seq_align_adh', adherence_udf(list_df.diagnosis_code1, list_df.ord_procedure_codes))
adh_df = adh_df.withColumn('generous_adh', gen_adherence_udf(list_df.diagnosis_code1, list_df.ord_procedure_codes))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
adh_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------------+--------------------+-------------------+------------+
|member_ssn|diagnosis_code1| ord_procedure_codes|      seq_align_adh|generous_adh|
+----------+---------------+--------------------+-------------------+------------+
|      null|          D4862|99244 77065 99214...|               -1.0|        -1.0|
|      null|           F900|99213 99214 99213...|               -1.0|        -1.0|
|      null|        H4031X0|              99213 |               -1.0|        -1.0|
|      null|         L97818|11043 11046 11042...|               -1.0|        -1.0|
|      null|          M5031|20610 99213 J0702...|0.18617021276595744|         0.5|
|      null|         M67432|L3808 20612 J0702...|               -1.0|        -1.0|
|      null|         O99815|        36415 82947 |               -1.0|        -1.0|
|      null|           P761|74021 71045 74018...|               -1.0|        -1.0|
|      null|         R41841|92523 99214 92507...|               -1.0|        -1.0|
|   

In [18]:
adh_df.write.parquet("s3a://sagemaker-coherehealth-1/claims-data/adherence_data/", mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# ------------------------------

## Start here to read adherence df in most current state

In [5]:
adh_df = spark.read.load('s3a://sagemaker-coherehealth-1/claims-data/adherence_data/')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
adh_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

78874665

In [7]:
seq_align_df = adh_df.filter(adh_df.seq_align_adh != -1.0)
seq_align_df.describe(['seq_align_adh']).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+
|summary|      seq_align_adh|
+-------+-------------------+
|  count|            6855000|
|   mean|0.10138786287137741|
| stddev|0.21047998251714428|
|    min|                0.0|
|    max| 0.9991680532445923|
+-------+-------------------+

In [8]:
gen_adh_df = adh_df.filter(adh_df.generous_adh != -1.0)
gen_adh_df.describe(['generous_adh']).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+
|summary|       generous_adh|
+-------+-------------------+
|  count|            2171379|
|   mean| 0.6153987995647674|
| stddev|0.26922224025707525|
|    min|                0.1|
|    max|                1.0|
+-------+-------------------+

In [9]:
import matplotlib
from matplotlib import pyplot as plt 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# fig, ax = plt.subplots()
# ax.hist(list(gen_adh_df.select('generous_adh').toPandas()['generous_adh']), bins=50, color='orange')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# %matplot plt

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# fig, ax = plt.subplots()
# ax.hist(list(seq_align_df.select('seq_align_adh').toPandas()['seq_align_adh']), bins=50, color='blue')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# %matplot plt

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
query = (
    gen_adh_df
    .filter(
    (gen_adh_df.generous_adh > 0.5) &
    (gen_adh_df.generous_adh < 0.75) &
    (gen_adh_df.seq_align_adh > 0))
)
query.show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+------------+
|member_ssn|diagnosis_code1|ord_procedure_codes                                                                                                                                                                                                                    

In [15]:
rollup_analysis = (
    seq_align_df
    .filter(
        (seq_align_df.seq_align_adh == 0) &
        (seq_align_df.generous_adh != 0) &
        (seq_align_df.diagnosis_code1 == 'M545')
    )
)

rollup_analysis.show(100, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------+
|member_ssn|diagnosis_code1|ord_procedure_codes                                                                                                                                                                                                                                                                                   |seq_align_adh|generous_adh|
+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [16]:
rollup_analysis.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

370613

In [None]:
# step 1 - list col
# step 2 - explode :)


# an idea - prcd code blacklist? anesthetics, local doctor visits, chiropractor visits?