In [0]:
%python 

#### Use this code to configure your accoutn before running SQL in Databricks

storage_account_name = 'rgdevglobalreahl'
storage_account_key =  'RjnFSNg2IDzQ5bN/8aTZOFK1lnpL6zytdMnuQIrJQMw6psa7++18fCoIfaLkq5DEFV3Hp0b7WZjR+AStQuhR+A=='

spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)


from pyspark.sql.functions import col, lit, round 
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import logging
import pandas as pd
from pyspark.sql.functions import expr, md5, concat_ws, coalesce, when




In [0]:
####### Set Claims Data
claims_source = "wasbs://marketscan@rgdevglobalreahl.blob.core.windows.net/marketscan/medicaid/marketscan-preprod-step2"
df = spark.read.parquet(claims_source)


### TOB mapping
ip_tob_prefixes = ["11", "12", "17", "21", "22", "81", "82", "84", "85", "86"]
snf_tob_prefixes = ["17", "21", "22"]
hospice_tob_prefixes = ["81", "82", "65"]

#### DRG Mapping (Upload after revising yearly)
drg_mapping = pd.read_csv('DRG_Mapping.csv', dtype={"DRG": str})
drg_map_spark = spark.createDataFrame(drg_mapping)
drg_map_spark = drg_map_spark.withColumn("DRG", col("DRG").cast("string"))

#### BETOS Mapping (Upload after revising yearly) - for Prof Mapping HCPCS & CPT Codes
betos_mapping = pd.read_csv('CMS_BETOS.csv', dtype={"procedure_code": str})
betos_mapping_spark = spark.createDataFrame(betos_mapping)
betos_mapping_spark = betos_mapping_spark.withColumn("procedure_code", col("procedure_code").cast("string"))


#########__________________Major Service Cat_____________________#####################


### Major Service Category
df = df.withColumn("major_service_category", F.when((col("DRG").isNull() & col("bill_type").isNull()), "Professional").otherwise(
    F.when(col("DRG").isNotNull(), "Inpatient Facility").otherwise(F.when(F.col("bill_type").substr(1, 2).isin(ip_tob_prefixes), "Inpatient Facility").otherwise("Outpatient Facility"))))


#########__________________IP Detailed Service Cat_____________________#####################
#### Detailed Service Categories
### IP Step 1: Hospice & SNF based on TOB
df = df.withColumn("detailed_service_category_ip_HospSNF", 
                   F.when((col("major_service_category") == "Inpatient Facility") & (col("bill_type").substr(1,2).isin(snf_tob_prefixes)), "SNF").otherwise(
                       F.when((col("major_service_category") == "Inpatient Facility") & (col("bill_type").substr(1,2).isin(hospice_tob_prefixes)), "Hospice").otherwise(
                        None)))

### IP Step 2: Join with mapped DRGS
df = df.join(
    drg_map_spark.select("DRG","Level_II", "Level_III"),
    on="DRG",
    how="left"
)

# Use DRG mapping or previously determined categories
df = df.withColumn("IP_level_II", F.when(col("major_service_category") == "Inpatient Facility", F.coalesce( col("detailed_service_category_ip_HospSNF"), col("Level_II"), lit("Other"))).otherwise(None))
df = df.withColumn("IP_level_III", F.when(col("major_service_category") == "Inpatient Facility", F.coalesce(col("detailed_service_category_ip_HospSNF"), col("Level_III"), lit("Other Inpatient Facility"))).otherwise(None))

df = df.drop("detailed_service_category_ip_HospSNF", "Level_II", "Level_III")


##########Note ... redo level II as level IV - more detail and map acute IP to level II ###########


#########__________________Prof Detailed Service Cat_____________________#####################
### Prof Step 1: Join the BETOS Mapping
df = df.join(
    betos_mapping_spark.select("procedure_code","RBCS_Cat_Desc", "RBCS_SubCat_Desc"),
    on="procedure_code",
    how="left"
)

### Prof Step 2: Null out everything prof for level II and level III
df = df.withColumn("Prof_level_II", F.when(col("major_service_category") == "Professional", F.coalesce(col("RBCS_Cat_Desc"), lit("Other Professional"))).otherwise(None))
df = df.withColumn("Prof_level_III", F.when(col("major_service_category") == "Professional", F.coalesce(col("RBCS_SubCat_Desc"), lit("Other Professional"))).otherwise(None))

df = df.drop("RBCS_Cat_Desc", "RBCS_SubCat_Desc")

#########__________________OP Detailed Service Cat_____________________#####################

df = df.withColumn(
    "OP_Level_III",
    expr("""
        CASE
            WHEN major_service_category = 'Outpatient Facility' AND bill_type LIKE '3%' THEN 'Home Health'
            
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '037%' THEN 'Anesthesia'
            
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '054%' THEN 'Ambulance'
            
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '090%' THEN 'Behavioral Health'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '091%' THEN 'Behavioral Health'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '100%' THEN 'Behavioral Health'
            
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '080%' THEN 'Dialysis'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '082%' THEN 'Dialysis'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '083%' THEN 'Dialysis'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '084%' THEN 'Dialysis'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '085%' THEN 'Dialysis'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '088%' THEN 'Dialysis'
            
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '073%' THEN 'EKG/ECG/EEG'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '074%' THEN 'EKG/ECG/EEG'
            
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '045%' THEN 'Emergency Room'
            
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '025%' THEN 'Facility Dispensed Pharmacy'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '063%' THEN 'Facility Dispensed Pharmacy'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '089%' THEN 'Facility Dispensed Pharmacy'
            
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '030%' THEN 'Lab/Pathology'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '031%' THEN 'Facility Dispensed Pharmacy'
            
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '076%' THEN 'Observation'
            
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '042%' THEN 'PT/OT/ST'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '043%' THEN 'PT/OT/ST'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '044%' THEN 'PT/OT/ST'

            
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '051%' THEN 'Outpatient Clinic'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '052%' THEN 'Outpatient Clinic'
        
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '026%' THEN 'Outpatient IV Therapy'

            WHEN major_service_category = 'Outpatient Facility' AND revenue_code in('0360','0361','0362','0367','0369','0481','0490', '0499', '0790', '0799')  THEN 'Outpatient Surgery'
            
            -- WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '087%' THEN 'Cell/Gene Therapy'
            
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '032%' THEN 'Radiology - Diagnostic'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '033%' THEN 'Radiology - Therapeutic'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '035%' THEN 'Radiology - CT'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '061%' THEN 'Radiology - MRI/MRA/MRT'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '034%' THEN 'Radiology - Nuclear Medicine'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '040%' THEN 'Radiology - Other'

            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '072%' THEN 'Labor and Delivery'

            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '029%' THEN 'DME/Prosthetics/Supplies'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '027%' THEN 'DME/Prosthetics/Supplies'
            WHEN major_service_category = 'Outpatient Facility' AND revenue_code LIKE '062%' THEN 'DME/Prosthetics/Supplies'

            WHEN major_service_category = 'Outpatient Facility' THEN 'Other Outpatient Facility'
            
            ELSE NULL
        END
    """))


### V1 of detailed svc cat, will get recategorized later
df = df.withColumn("detailed_service_category_temp", expr("""
    case when major_service_category = 'Inpatient Facility' Then  IP_Level_III
    when major_service_category = 'Outpatient Facility' Then  OP_Level_III
    when major_service_category = 'Professional' Then  Prof_level_III
    else NULL end
    """))


### V1 of service cat ID, will get recategorized later
df = df.withColumn(
    "hash_key_temp",
    md5(concat_ws("||", "member_id", "detailed_service_category_temp", "Incurred_Date"))
)



### Bill type 12X found to be re-billed as OP rules in an IP stay - this hierarchy ensures that the one billed with DRG comes first in the window function if there is a tie with incurred date
df = df.withColumn("IP_DSC_Hierarchy", expr("""
    case when IP_level_III = 'Other Inpatient Facility' Then  3
    when IP_level_III in ('SNF', 'Hospice') Then  2
    else 1 end
    """))




#### Write and Save
target_path = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medical.parquet" 
df.write.partitionBy("YEAR", "MS_Source_File").parquet(target_path, mode="overwrite")


In [0]:
#########__________________Relabeling IP Adjacent Admissions (IP Acute)_____________________#####################


####### Set Claims Data, Filter to IP Acute
claims_source = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medical.parquet" 
df = spark.read.parquet(claims_source)

df = df.filter((col("major_service_category") == "Inpatient Facility") & 
               (~col("IP_level_III").isin(["SNF", "Hospice"])))


### Inucrred date needs to be changed to admit date for SNF/Hospice
# List of columns to group by

#hash key temp is concat of mbr ID, detailed svc cat tmp, inc date


dedup_df = df
'''.select("member_id", "service_to_date", "incurred_date", "IP_level_II", "detailed_service_category_temp","hash_key_temp","major_service_category", "admit_date" ,"IP_DSC_Hierarchy").distinct().orderBy("member_id", "incurred_date")
'''

# Define window by member_id ordered by admit_date
window_spec = Window.partitionBy("member_id").orderBy("incurred_date", "IP_DSC_Hierarchy", "service_to_date")

# Add lagged discharge_date
'''dedup_df = df.withColumn("prev_service_to_date", F.lag("service_to_date").over(window_spec)).withColumn("prev_IP_level_III", F.lag("detailed_service_category_temp").over(window_spec)).withColumn("prev_IP_level_II", F.lag("IP_level_II").over(window_spec)).withColumn("prev_hash_key", F.lag("hash_key_temp").over(window_spec))
'''


# Sort by incurred_date
window_group = Window.partitionBy("member_id").orderBy("incurred_date","IP_DSC_Hierarchy" )

# Track running max of service_to_date
## Becomes temp new discharge date & we 1st aggregate / regroup all svcs that fall under this
dedup_df = dedup_df.withColumn("running_max_service_to", F.max("service_to_date").over(window_group))

############### _________ handling overlapping IP _________________#####

### create this key to handle overlapping IP svcs
dedup_df = dedup_df.withColumn(
    "hash_key_temp_step2",
    md5(concat_ws("||", "member_id", "running_max_service_to"))
)


window_spec = Window.partitionBy("member_id", "hash_key_temp_step2")

### not used later??
dedup_df = dedup_df.withColumn("min_incurred_date_over_hash_step2", F.min("incurred_date").over(window_spec))
dedup_df = dedup_df.withColumn("min_IP_Hierarch_over_hash_step2", F.min("IP_DSC_Hierarchy").over(window_spec))



#####___________________________

#### do unique to handle multiple incurred dates and look at only one date at a time, not multiple ... for the purpose of then counting row numbers 1-1 with each encounter
IP_svc_hash = dedup_df.select("hash_key_temp_step2", "member_id", "detailed_service_category_temp", "IP_level_II", "incurred_date", "IP_DSC_Hierarchy").orderBy("incurred_date", "IP_DSC_Hierarchy").distinct()


### Keep on the 1st occuring value -- deduped
window_spec = Window.partitionBy("hash_key_temp_step2").orderBy("incurred_date", "IP_DSC_Hierarchy")
IP_svc_hash = IP_svc_hash.withColumn("row_num", F.row_number().over(window_spec))
IP_svc_hash = IP_svc_hash.filter(col("row_num") == 1).distinct()


dedup_df = dedup_df.join (
    IP_svc_hash.select("hash_key_temp_step2", "detailed_service_category_temp", "IP_level_II").withColumnRenamed("detailed_service_category_temp", "detailed_service_category_override").withColumnRenamed("IP_level_II", "IP_level_II_override"), 
    on="hash_key_temp_step2", 
    how="left"
)

### create new key to aggregate over - with revised svc categories after condensing overlapping dates by member
dedup_df = dedup_df.withColumn(
    "svc_overide_hash",
    md5(concat_ws("||", "member_id", "detailed_service_category_override", "min_incurred_date_over_hash_step2")))


############### _________ handling adjacent IP _________________#####

dedup_df = dedup_df.withColumn(
    "hash_key_temp_step3",
    md5(concat_ws("||", "member_id", "min_incurred_date_over_hash_step2"))
)

IP_svc_hash = dedup_df.select("member_id", "running_max_service_to", "min_incurred_date_over_hash_step2","hash_key_temp_step3", "detailed_service_category_override", "IP_level_II_override", "svc_overide_hash" ).orderBy("incurred_date").distinct()


# Sort by Start_date
window_spec = Window.partitionBy("member_id").orderBy("min_incurred_date_over_hash_step2")

# Get previous end_date
IP_svc_hash = IP_svc_hash.withColumn("prev_end", F.lag("running_max_service_to").over(window_spec))

# Flag new episode when not overlapping or adjacent (i.e., Start_date > prev_end + 1)
IP_svc_hash = IP_svc_hash.withColumn(
    "new_episode_flag",
    F.when(
        (F.col("prev_end").isNull()) |
        (F.datediff("min_incurred_date_over_hash_step2", "prev_end") >= 1),
        1
    ).otherwise(0)
)

# Assign episode ID using cumulative sum
IP_svc_hash = IP_svc_hash.withColumn("episode_id", F.sum("new_episode_flag").over(window_spec))

IP_svc_hash = IP_svc_hash.withColumn("episode_id_hash",  md5(concat_ws("||", "member_id", "episode_id")))



####__________________
window_spec = Window.partitionBy("episode_id_hash").orderBy("min_incurred_date_over_hash_step2")

IP_svc_hash = IP_svc_hash.withColumn("row_num_adjacent_IP", F.row_number().over(window_spec))

IP_svc_hash_dedup = IP_svc_hash.filter(col("row_num_adjacent_IP") == 1).distinct()

IP_svc_hash_dedup = IP_svc_hash_dedup.select("member_id", "detailed_service_category_override", "IP_level_II_override","episode_id_hash", "svc_overide_hash")


IP_svc_hash = IP_svc_hash.join (
    IP_svc_hash_dedup.select( "episode_id_hash", "detailed_service_category_override", "IP_level_II_override").withColumnRenamed("detailed_service_category_override", "detailed_service_category_override_final").withColumnRenamed("IP_level_II_override", "IP_level_II_override_final"), 
    on="episode_id_hash", 
    how="left"
)


### Calc final admit / discharge
window_spec = Window.partitionBy("episode_id_hash")

IP_svc_hash = IP_svc_hash.withColumn("final_IP_admit", F.min("min_incurred_date_over_hash_step2").over(window_spec))

IP_svc_hash = IP_svc_hash.withColumn("final_IP_discharge", F.max("running_max_service_to").over(window_spec))

#### join back to full claims
#### LEFT OFF CHECK HERE --- svc hash formbr vs. claims


dedup_df = dedup_df.join(
    IP_svc_hash.select("svc_overide_hash", "detailed_service_category_override_final", "IP_level_II_override_final", "final_IP_admit", "final_IP_discharge"),
    on="svc_overide_hash",
    how="left"
)

### drop extra columns

dedup_df = dedup_df.drop("hash_key_temp_step3", "IP_level_II_override", "detailed_service_category_override", "min_IP_Hierarch_over_hash_step2", "min_incurred_date_over_hash_step2", "running_max_service_to", "hash_key_temp_step2","svc_overide_hash", "detailed_service_category_temp", "hash_key_temp", "detailed_service_category_temp	hash_key_temp	IP_DSC_Hierarchy")


## clean up steps
### there is a very small amount of rows that sometime get categorized as NULL -- not sure why; but it was ~ < 0.1% of IP acute medical spend after runnning a number of times. 
dedup_df = (
    dedup_df
    .withColumn(
        "detailed_service_category_override_final",
        coalesce(col("detailed_service_category_override_final"), lit("Medical"))
    )
    .withColumn(
        "IP_level_II_override_final",
        coalesce(col("IP_level_II_override_final"), lit("Ungrouped"))
    )
    .withColumn(
        "final_IP_admit",
        coalesce(col("final_IP_admit"), col("incurred_date"))
    )
    .withColumn(
        "final_IP_discharge",
        coalesce(col("final_IP_discharge"), col("service_to_date"))
    )
    .withColumn(
        "IP_detailed_svccat_errorflag",
        when(col("detailed_service_category_override_final").isNull(), lit(True)).otherwise(lit(False))
    )
)



target_path = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_IPAcute.parquet" 
dedup_df.write.parquet(target_path, mode="overwrite")


In [0]:
#########__________________Relabeling IP Adjacent Admissions (SNF)_____________________#####################


####### Set Claims Data, Filter to IP Acute
claims_source = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medical.parquet" 
df = spark.read.parquet(claims_source)

df = df.filter((col("major_service_category") == "Inpatient Facility") & 
               (col("IP_level_III").isin(["SNF"])))


### Inucrred date needs to be changed to admit date for SNF/Hospice
# List of columns to group by

#hash key temp is concat of mbr ID, detailed svc cat tmp, inc date


dedup_df = df
'''.select("member_id", "service_to_date", "incurred_date", "IP_level_II", "detailed_service_category_temp","hash_key_temp","major_service_category", "admit_date" ,"IP_DSC_Hierarchy").distinct().orderBy("member_id", "incurred_date")
'''

# Define window by member_id ordered by admit_date
window_spec = Window.partitionBy("member_id").orderBy("incurred_date", "IP_DSC_Hierarchy", "service_to_date")

# Add lagged discharge_date
'''dedup_df = df.withColumn("prev_service_to_date", F.lag("service_to_date").over(window_spec)).withColumn("prev_IP_level_III", F.lag("detailed_service_category_temp").over(window_spec)).withColumn("prev_IP_level_II", F.lag("IP_level_II").over(window_spec)).withColumn("prev_hash_key", F.lag("hash_key_temp").over(window_spec))
'''


# Sort by incurred_date
window_group = Window.partitionBy("member_id").orderBy("incurred_date","IP_DSC_Hierarchy" )

# Track running max of service_to_date
dedup_df = dedup_df.withColumn("running_max_service_to", F.max("service_to_date").over(window_group))

############### _________ handling overlapping IP _________________#####
dedup_df = dedup_df.withColumn(
    "hash_key_temp_step2",
    md5(concat_ws("||", "member_id", "running_max_service_to"))
)


window_spec = Window.partitionBy("member_id", "hash_key_temp_step2")

dedup_df = dedup_df.withColumn("min_incurred_date_over_hash_step2", F.min("incurred_date").over(window_spec))

dedup_df = dedup_df.withColumn("min_IP_Hierarch_over_hash_step2", F.min("IP_DSC_Hierarchy").over(window_spec))



#####___________________________

IP_svc_hash = dedup_df.select("hash_key_temp_step2", "member_id", "detailed_service_category_temp", "IP_level_II", "incurred_date", "IP_DSC_Hierarchy").orderBy("incurred_date", "IP_DSC_Hierarchy").distinct()

window_spec = Window.partitionBy("hash_key_temp_step2").orderBy("incurred_date", "IP_DSC_Hierarchy")

IP_svc_hash = IP_svc_hash.withColumn("row_num", F.row_number().over(window_spec))

IP_svc_hash = IP_svc_hash.filter(col("row_num") == 1).distinct()


dedup_df = dedup_df.join (
    IP_svc_hash.select("hash_key_temp_step2", "detailed_service_category_temp", "IP_level_II").withColumnRenamed("detailed_service_category_temp", "detailed_service_category_override").withColumnRenamed("IP_level_II", "IP_level_II_override"), 
    on="hash_key_temp_step2", 
    how="left"
)

dedup_df = dedup_df.withColumn(
    "svc_overide_hash",
    md5(concat_ws("||", "member_id", "detailed_service_category_override", "min_incurred_date_over_hash_step2")))


############### _________ handling adjacent IP _________________#####

dedup_df = dedup_df.withColumn(
    "hash_key_temp_step3",
    md5(concat_ws("||", "member_id", "min_incurred_date_over_hash_step2"))
)

IP_svc_hash = dedup_df.select("member_id", "running_max_service_to", "min_incurred_date_over_hash_step2","hash_key_temp_step3", "detailed_service_category_override", "IP_level_II_override", "svc_overide_hash" ).orderBy("incurred_date").distinct()


# Sort by Start_date
window_spec = Window.partitionBy("member_id").orderBy("min_incurred_date_over_hash_step2")

# Get previous end_date
IP_svc_hash = IP_svc_hash.withColumn("prev_end", F.lag("running_max_service_to").over(window_spec))

# Flag new episode when not overlapping or adjacent (i.e., Start_date > prev_end + 1)
IP_svc_hash = IP_svc_hash.withColumn(
    "new_episode_flag",
    F.when(
        (F.col("prev_end").isNull()) |
        (F.datediff("min_incurred_date_over_hash_step2", "prev_end") >= 1),
        1
    ).otherwise(0)
)

# Assign episode ID using cumulative sum
IP_svc_hash = IP_svc_hash.withColumn("episode_id", F.sum("new_episode_flag").over(window_spec))

IP_svc_hash = IP_svc_hash.withColumn("episode_id_hash",  md5(concat_ws("||", "member_id", "episode_id")))


####__________________
window_spec = Window.partitionBy("episode_id_hash").orderBy("min_incurred_date_over_hash_step2")

IP_svc_hash = IP_svc_hash.withColumn("row_num_adjacent_IP", F.row_number().over(window_spec))

IP_svc_hash_dedup = IP_svc_hash.filter(col("row_num_adjacent_IP") == 1).distinct()
IP_svc_hash_dedup = IP_svc_hash_dedup.select("member_id", "detailed_service_category_override", "IP_level_II_override","episode_id_hash", "svc_overide_hash")


IP_svc_hash = IP_svc_hash.join (
    IP_svc_hash_dedup.select( "episode_id_hash", "detailed_service_category_override", "IP_level_II_override").withColumnRenamed("detailed_service_category_override", "detailed_service_category_override_final").withColumnRenamed("IP_level_II_override", "IP_level_II_override_final"), 
    on="episode_id_hash", 
    how="left"
)

### Calc final admit / discharge
window_spec = Window.partitionBy("episode_id_hash")

IP_svc_hash = IP_svc_hash.withColumn("final_IP_admit", F.min("min_incurred_date_over_hash_step2").over(window_spec))

IP_svc_hash = IP_svc_hash.withColumn("final_IP_discharge", F.max("running_max_service_to").over(window_spec))

dedup_df = dedup_df.join(
    IP_svc_hash.select("svc_overide_hash", "detailed_service_category_override_final", "IP_level_II_override_final", "final_IP_admit", "final_IP_discharge"),
    on="svc_overide_hash",
    how="left"
)

### drop extra columns
dedup_df = dedup_df.drop("hash_key_temp_step3", "IP_level_II_override", "detailed_service_category_override", "min_IP_Hierarch_over_hash_step2", "min_incurred_date_over_hash_step2", "running_max_service_to", "hash_key_temp_step2","svc_overide_hash", "detailed_service_category_temp", "hash_key_temp", "detailed_service_category_temp	hash_key_temp	IP_DSC_Hierarchy")


## clean up steps
### there is a very small amount of rows that sometime get categorized as NULL -- not sure why; but it was ~ < 0.1% of IP acute medical spend after runnning a number of times. 
dedup_df = (
    dedup_df
    .withColumn(
        "detailed_service_category_override_final",
        coalesce(col("detailed_service_category_override_final"), lit("Medical"))
    )
    .withColumn(
        "IP_level_II_override_final",
        coalesce(col("IP_level_II_override_final"), lit("Ungrouped"))
    )
    .withColumn(
        "final_IP_admit",
        coalesce(col("final_IP_admit"), col("incurred_date"))
    )
    .withColumn(
        "final_IP_discharge",
        coalesce(col("final_IP_discharge"), col("service_to_date"))
    )
    .withColumn(
        "IP_detailed_svccat_errorflag",
        when(col("detailed_service_category_override_final").isNull(), lit(True)).otherwise(lit(False))
    )
)


target_path = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_SNF.parquet" 
dedup_df.write.parquet(target_path, mode="overwrite")



In [0]:
#########__________________Relabeling IP Adjacent Admissions (Hospice)_____________________#####################


####### Set Claims Data, Filter to IP Acute
claims_source = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medical.parquet" 
df = spark.read.parquet(claims_source)

df = df.filter((col("major_service_category") == "Inpatient Facility") & 
               (col("IP_level_III").isin(["Hospice"])))


### Inucrred date needs to be changed to admit date for SNF/Hospice
# List of columns to group by

#hash key temp is concat of mbr ID, detailed svc cat tmp, inc date


dedup_df = df
'''.select("member_id", "service_to_date", "incurred_date", "IP_level_II", "detailed_service_category_temp","hash_key_temp","major_service_category", "admit_date" ,"IP_DSC_Hierarchy").distinct().orderBy("member_id", "incurred_date")
'''

# Define window by member_id ordered by admit_date
window_spec = Window.partitionBy("member_id").orderBy("incurred_date", "IP_DSC_Hierarchy", "service_to_date")

# Add lagged discharge_date
'''dedup_df = df.withColumn("prev_service_to_date", F.lag("service_to_date").over(window_spec)).withColumn("prev_IP_level_III", F.lag("detailed_service_category_temp").over(window_spec)).withColumn("prev_IP_level_II", F.lag("IP_level_II").over(window_spec)).withColumn("prev_hash_key", F.lag("hash_key_temp").over(window_spec))
'''


# Sort by incurred_date
window_group = Window.partitionBy("member_id").orderBy("incurred_date","IP_DSC_Hierarchy" )

# Track running max of service_to_date
dedup_df = dedup_df.withColumn("running_max_service_to", F.max("service_to_date").over(window_group))

############### _________ handling overlapping IP _________________#####
dedup_df = dedup_df.withColumn(
    "hash_key_temp_step2",
    md5(concat_ws("||", "member_id", "running_max_service_to"))
)


window_spec = Window.partitionBy("member_id", "hash_key_temp_step2")

dedup_df = dedup_df.withColumn("min_incurred_date_over_hash_step2", F.min("incurred_date").over(window_spec))

dedup_df = dedup_df.withColumn("min_IP_Hierarch_over_hash_step2", F.min("IP_DSC_Hierarchy").over(window_spec))



#####___________________________

IP_svc_hash = dedup_df.select("hash_key_temp_step2", "member_id", "detailed_service_category_temp", "IP_level_II", "incurred_date", "IP_DSC_Hierarchy").orderBy("incurred_date", "IP_DSC_Hierarchy").distinct()

window_spec = Window.partitionBy("hash_key_temp_step2").orderBy("incurred_date", "IP_DSC_Hierarchy")

IP_svc_hash = IP_svc_hash.withColumn("row_num", F.row_number().over(window_spec))

IP_svc_hash = IP_svc_hash.filter(col("row_num") == 1).distinct()


dedup_df = dedup_df.join (
    IP_svc_hash.select("hash_key_temp_step2", "detailed_service_category_temp", "IP_level_II").withColumnRenamed("detailed_service_category_temp", "detailed_service_category_override").withColumnRenamed("IP_level_II", "IP_level_II_override"), 
    on="hash_key_temp_step2", 
    how="left"
)

dedup_df = dedup_df.withColumn(
    "svc_overide_hash",
    md5(concat_ws("||", "member_id", "detailed_service_category_override", "min_incurred_date_over_hash_step2")))


############### _________ handling adjacent IP _________________#####

dedup_df = dedup_df.withColumn(
    "hash_key_temp_step3",
    md5(concat_ws("||", "member_id", "min_incurred_date_over_hash_step2"))
)

IP_svc_hash = dedup_df.select("member_id", "running_max_service_to", "min_incurred_date_over_hash_step2","hash_key_temp_step3", "detailed_service_category_override", "IP_level_II_override", "svc_overide_hash" ).orderBy("incurred_date").distinct()


# Sort by Start_date
window_spec = Window.partitionBy("member_id").orderBy("min_incurred_date_over_hash_step2")

# Get previous end_date
IP_svc_hash = IP_svc_hash.withColumn("prev_end", F.lag("running_max_service_to").over(window_spec))

# Flag new episode when not overlapping or adjacent (i.e., Start_date > prev_end + 1)
IP_svc_hash = IP_svc_hash.withColumn(
    "new_episode_flag",
    F.when(
        (F.col("prev_end").isNull()) |
        (F.datediff("min_incurred_date_over_hash_step2", "prev_end") >= 1),
        1
    ).otherwise(0)
)

# Assign episode ID using cumulative sum
IP_svc_hash = IP_svc_hash.withColumn("episode_id", F.sum("new_episode_flag").over(window_spec))

IP_svc_hash = IP_svc_hash.withColumn("episode_id_hash",  md5(concat_ws("||", "member_id", "episode_id")))


####__________________
window_spec = Window.partitionBy("episode_id_hash").orderBy("min_incurred_date_over_hash_step2")

IP_svc_hash = IP_svc_hash.withColumn("row_num_adjacent_IP", F.row_number().over(window_spec))

IP_svc_hash_dedup = IP_svc_hash.filter(col("row_num_adjacent_IP") == 1).distinct()
IP_svc_hash_dedup = IP_svc_hash_dedup.select("member_id", "detailed_service_category_override", "IP_level_II_override","episode_id_hash", "svc_overide_hash")


IP_svc_hash = IP_svc_hash.join (
    IP_svc_hash_dedup.select( "episode_id_hash", "detailed_service_category_override", "IP_level_II_override").withColumnRenamed("detailed_service_category_override", "detailed_service_category_override_final").withColumnRenamed("IP_level_II_override", "IP_level_II_override_final"), 
    on="episode_id_hash", 
    how="left"
)

### Calc final admit / discharge
window_spec = Window.partitionBy("episode_id_hash")

IP_svc_hash = IP_svc_hash.withColumn("final_IP_admit", F.min("min_incurred_date_over_hash_step2").over(window_spec))

IP_svc_hash = IP_svc_hash.withColumn("final_IP_discharge", F.max("running_max_service_to").over(window_spec))

dedup_df = dedup_df.join(
    IP_svc_hash.select("svc_overide_hash", "detailed_service_category_override_final", "IP_level_II_override_final", "final_IP_admit", "final_IP_discharge"),
    on="svc_overide_hash",
    how="left"
)

### drop extra columns
dedup_df = dedup_df.drop("hash_key_temp_step3", "IP_level_II_override", "detailed_service_category_override", "min_IP_Hierarch_over_hash_step2", "min_incurred_date_over_hash_step2", "running_max_service_to", "hash_key_temp_step2","svc_overide_hash", "detailed_service_category_temp", "hash_key_temp", "detailed_service_category_temp	hash_key_temp	IP_DSC_Hierarchy")


## clean up steps
### there is a very small amount of rows that sometime get categorized as NULL -- not sure why; but it was ~ < 0.1% of IP acute medical spend after runnning a number of times. 
dedup_df = (
    dedup_df
    .withColumn(
        "detailed_service_category_override_final",
        coalesce(col("detailed_service_category_override_final"), lit("Medical"))
    )
    .withColumn(
        "IP_level_II_override_final",
        coalesce(col("IP_level_II_override_final"), lit("Ungrouped"))
    )
    .withColumn(
        "final_IP_admit",
        coalesce(col("final_IP_admit"), col("incurred_date"))
    )
    .withColumn(
        "final_IP_discharge",
        coalesce(col("final_IP_discharge"), col("service_to_date"))
    )
    .withColumn(
        "IP_detailed_svccat_errorflag",
        when(col("detailed_service_category_override_final").isNull(), lit(True)).otherwise(lit(False))
    )
)



target_path = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_Hospice.parquet" 
dedup_df.write.parquet(target_path, mode="overwrite")



Union the IP Tables; Add Util count field

In [0]:
df_hospice = spark.read.parquet("wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_Hospice.parquet")

df_snf = spark.read.parquet("wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_SNF.parquet")

df_ipacute = spark.read.parquet("wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_IPAcute.parquet")

union_df = df_hospice.unionByName(df_snf).unionByName(df_ipacute)


union_df = union_df.withColumn(
    "service_cat_id",
    md5(concat_ws("||", "member_id", "detailed_service_category_override_final", "final_IP_admit")))


union_df = union_df.withColumn("svc_cat_id_row_num", F.row_number().over(Window.partitionBy("service_cat_id").orderBy("claim_number", "seqnum")))

window_spec = Window.partitionBy("service_cat_id")

union_df = union_df.withColumn("util_count", 
                              F.when((F.col("svc_cat_id_row_num") == 1) & (F.sum("allowed").over(window_spec) > 0), 1)
                              .when((F.col("svc_cat_id_row_num") == 1) & (F.sum("allowed").over(window_spec) < 0), -1)
                              .when((F.col("svc_cat_id_row_num") == 1) & (F.sum("allowed").over(window_spec) == 0), 0)
                              .otherwise(None))

union_df = union_df.drop("Prof_level_II", "Prof_level_III", "OP_Level_III", "IP_DSC_Hierarchy", "svc_cat_id_row_num", "IP_level_II", "IP_level_III").withColumnRenamed("detailed_service_category_override_final", "detailed_service_category").withColumnRenamed("IP_level_II_override_final", "IP_admit_clinical_label").withColumnRenamed("final_IP_admit", "lockton_admit").withColumnRenamed("final_IP_discharge", "lockton_discharge")

union_df = union_df.withColumn("subservice_category", 
                              F.when((F.col("detailed_service_category") == "SNF"), "SNF")
                              .when((F.col("detailed_service_category") == "Hospice"), "Hospice")
                              .otherwise("Acute"))



########### Test

target_path = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_IP_Split.parquet"
union_df_IP = union_df.write.partitionBy("YEAR", "MS_Source_File").parquet(target_path, mode="overwrite")


OP Hierarchy and Observation Mapping; Add Util Field


In [0]:
####### Set Claims Data, Filter to OP Facility
claims_source = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medical.parquet" 
df = spark.read.parquet(claims_source)


############### Logic for everything but HH -- HH happens outside of a OP facility event (ex. ER admit leading surgery for example)

df = df.filter((col("major_service_category") == "Outpatient Facility") & 
               (~col("OP_level_III").isin(["Home Health"])))

df = df.withColumn("OP_Hierarchy", 
                   when(col("OP_level_III") == "Outpatient Surgery", 1)
                   .when(col("OP_level_III") == "Observation", 2)
                   .when(col("OP_level_III") == "Emergency Room", 3)
                   .otherwise(4))

window_spec = Window.partitionBy("incurred_date", "member_id")
df = df.withColumn("Min_Hierarchy", F.min("OP_Hierarchy").over(window_spec))

df = df.withColumn("Remap_OP",
                   when (col("Min_Hierarchy") == 1, "Outpatient Surgery")
                   .when (col("Min_Hierarchy") == 2, "Observation")
                   .when (col("Min_Hierarchy") == 3, "Emergency Room")
                   .otherwise(col("OP_level_III")))


df = df.withColumn(
    "incurred_date_hash",
    md5(concat_ws("||", "member_id", "incurred_date")))

df = df.withColumn(
    "service_to_date_hash",
    md5(concat_ws("||", "member_id", "service_to_date")))


df = df.withColumn(
    "incurred_date_hash_svc",
    md5(concat_ws("||", "member_id", "Remap_OP", "incurred_date")))

df = df.withColumn(
    "service_to_date_hash_svc",
    md5(concat_ws("||", "member_id", "Remap_OP", "service_to_date")))


##### multi day observation happens iwth rev vode 762 -- allow for service cat ID to be defined by grouping across days if multi day observation
multiday_obs = df.filter(col("revenue_code") == '0762').select("member_id", "revenue_code", "service_to_date_hash_svc").withColumn("multiday_obs_flag", lit(1)).distinct()

##### define svc cat ID here - based on incurred date and remapped svc category
df = df.join(
    multiday_obs.select("service_to_date_hash_svc", "multiday_obs_flag"),
    on="service_to_date_hash_svc",
    how="left"
).withColumn(
    "service_cat_id",
    when(
        col("multiday_obs_flag").isNotNull(),
        col("service_to_date_hash_svc")
    ).otherwise(col("incurred_date_hash_svc"))
)


##### svc cat ID defined, so now can calculate lockton admit and discharge date (controlling definition here) -- I.e. OP facility by def has 1 date of service, except multiday observation; in final step allowing for HH to be across dates too by letting it's service to date flow through 

### create lockton admit and discharge
window_spec = Window.partitionBy("service_cat_id")
df = df.withColumn("lockton_admit", F.min("incurred_date").over(window_spec))
df = df.withColumn("lockton_discharge", F.max("service_to_date").over(window_spec))
df = df.withColumnRenamed("Remap_OP", "detailed_service_category_final")
df_OP = df


################## _______________ OP HH _______________ #####
df_hh = spark.read.parquet(claims_source)

df_hh = df_hh.filter((col("major_service_category") == "Outpatient Facility") & 
               (col("OP_level_III").isin(["Home Health"])))

df_hh = df_hh.withColumn("lockton_admit", coalesce(col("admit_date"), col("incurred_date")))
df_hh = df_hh.withColumn("lockton_discharge", coalesce(col("discharge_date"), col("incurred_date")))
df_hh = df_hh.withColumnRenamed("OP_level_III", "detailed_service_category_final")


df_hh = df_hh.withColumn(
    "service_cat_id",
    md5(concat_ws("||", "member_id", "detailed_service_category_final", "lockton_admit")))

union_df_OP = df_OP.unionByName(df_hh, allowMissingColumns=True)


union_df_OP = union_df_OP.withColumn("svc_cat_id_row_num", F.row_number().over(Window.partitionBy("service_cat_id").orderBy("claim_number", "seqnum")))

window_spec = Window.partitionBy("service_cat_id")

union_df_OP = union_df_OP.withColumn("util_count", 
                              F.when((F.col("svc_cat_id_row_num") == 1) & (F.sum("allowed").over(window_spec) > 0), 1)
                              .when((F.col("svc_cat_id_row_num") == 1) & (F.sum("allowed").over(window_spec) < 0), -1)
                              .when((F.col("svc_cat_id_row_num") == 1) & (F.sum("allowed").over(window_spec) == 0), 0)
                              .otherwise(None))



union_df_OP = union_df_OP.drop("service_to_date_hash_svc", "IP_level_II", "IP_level_III", "Prof_level_II", "Prof_level_III", "OP_Level_III", "detailed_service_category_temp", "hash_key_temp", "IP_DSC_Hierarchy", "OP_Hierarchy", "Min_Hierarchy", "incurred_date_hash", "service_to_date_hash", "incurred_date_hash_svc", "svc_cat_id_row_num").withColumnRenamed("detailed_service_category_final", "detailed_service_category")



union_df_OP = union_df_OP.withColumn("subservice_category", 
                              F.when((col("detailed_service_category").isin(["Emergency Room", "Outpatient Surgery", "Observation"])), "OP Visit")
                              .otherwise("OP Service"))



##### Export
target_path = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_OP_Split.parquet"

union_df_OP.write.partitionBy("YEAR", "MS_Source_File").parquet(target_path, mode="overwrite")



Professional Charges


In [0]:
####### Set Claims Data, Filter to OP Facility
claims_source = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medical.parquet" 
df = spark.read.parquet(claims_source)


############### Logic for everything but HH -- HH happens outside of a OP facility event (ex. ER admit leading surgery for example)

df_prof = df.filter((col("major_service_category") == "Professional"))
                    

### add svc cat id after renaming DSC, add util count
df_prof = df_prof.withColumnRenamed("hash_key_temp", "service_cat_id").withColumnRenamed("detailed_service_category_temp", "detailed_service_category").withColumnRenamed("Prof_level_II", "subservice_category")

df_prof = df_prof.withColumn("svc_cat_id_row_num", F.row_number().over(Window.partitionBy("service_cat_id").orderBy("claim_number", "seqnum")))

window_spec = Window.partitionBy("service_cat_id")

df_prof = df_prof.withColumn("util_count", 
                              F.when((F.col("svc_cat_id_row_num") == 1) & (F.sum("allowed").over(window_spec) > 0), 1)
                              .when((F.col("svc_cat_id_row_num") == 1) & (F.sum("allowed").over(window_spec) < 0), -1)
                              .when((F.col("svc_cat_id_row_num") == 1) & (F.sum("allowed").over(window_spec) == 0), 0)
                              .otherwise(None))


df_prof = df_prof.drop("IP_level_II", "IP_level_III", "Prof_level_III", "OP_Level_III", "IP_DSC_Hierarchy", "svc_cat_id_row_num")
                    
##### Export
target_path = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_Prof_Split.parquet"

df_prof.write.partitionBy("YEAR", "MS_Source_File").parquet(target_path, mode="overwrite")


Final Combination of the Tables and Sort

In [0]:

IP_path = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_IP_Split.parquet"
OP_path = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_OP_Split.parquet"
Prof_path = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_Prof_Split.parquet"


IP = spark.read.parquet(IP_path)
OP = spark.read.parquet(OP_path)
Prof = spark.read.parquet(Prof_path)

Combined_table = IP.unionByName(OP, allowMissingColumns=True).unionByName(Prof, allowMissingColumns=True)


# Define the desired order of columns

IP_path = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_IP_Split.parquet"
OP_path = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_OP_Split.parquet"
Prof_path = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_Prof_Split.parquet"


IP = spark.read.parquet(IP_path)
OP = spark.read.parquet(OP_path)
Prof = spark.read.parquet(Prof_path)

Combined_table = IP.unionByName(OP, allowMissingColumns=True).unionByName(Prof, allowMissingColumns=True)

'''
# Define the desired order of columns
column_order = ["procedure_code",
"DRG",
"fachdid",
"member_id",
"claim_number",
"seqnum",
"marketscan_version_number",
"family_id",
"principal_diagnosis",
"icd_diagnosis_code1",
"icd_diagnosis_code2",
"icd_diagnosis_code3",
"icd_diagnosis_code4",
"icd_diagnosis_code5",
"icd_diagnosis_code6",
"icd_diagnosis_code7",
"icd_diagnosis_code8",
"icd_diagnosis_code9",
"principal_procedure",
"procedure_code2",
"procedure_code3",
"procedure_code4",
"procedure_code5",
"procedure_code6",
"cpt_modifier",
"bill_type",
"admit_type",
"discharge_status",
"revenue_code",
"cob",
"coins",
"copay",
"deduct",
"paid",
"allowed",
"quantity",
"units",
"paid_inn",
"provider_id",
"npi",
"service_to_date",
"paid_date",
"incurred_date",
"discharge_date",
"admit_date",
"dob_year",
"member_age",
"cap_svc",
"proctyp",
"dxver",
"facprof",
"mhsacovg",
"ntwkprov",
"plan_type",
"proc_group",
"service_category",
"MDC",
"employee_region",
"employee_msa",
"pos",
"provider_type",
"data_type",
"age_group",
"employee_class",
"employee_status",
"employee_geo",
"eidflag",
"employee_relation",
"enrflag",
"physician_specialty",
"rx",
"member_gender",
"healthplan_vs_employer",
"industry",
"medicare_advantage_flag",
"msn_version",
"caseid",
"major_service_category",
"subservice_category",
"detailed_service_category",
"service_cat_id",
"util_count",
"IP_admit_clinical_label",
"lockton_admit",
"lockton_discharge",
"IP_detailed_svccat_errorflag",
"multiday_obs_flag",
"YEAR",
"MS_Source_File"]



# Reorder the columns in Combined_table
Combined_table = Combined_table.select([col(c) for c in column_order])
'''


target_path = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical_final/ms_medicaid_medclaims_combined.parquet"

Combined_table.write.partitionBy("YEAR", "MS_Source_File").parquet(target_path, mode="overwrite")


Rx Mapping


In [0]:
Rx_path = "wasbs://marketscan@rgdevglobalreahl.blob.core.windows.net/marketscan/medicaid/marketscan-Rx-preprod-step2"
Rx = spark.read.parquet(Rx_path)

Rx = Rx.withColumn("major_service_category", lit("Rx"))

#### mail order ind not available on the medicaid file -- so use days supply as a proxy
Rx = Rx.withColumn(
    "subservice_category",
    F.when(F.abs(col("allowed")) > 2500, "Specialty")
    .when(F.abs(col("days_supply")) >= 84, "Mail Order")
    .otherwise("Retail"))


Rx = Rx.withColumn(
    "detailed_service_category",
    F.when(F.abs(col("allowed")) > 2500, "Specialty")
    .when(col("generic_equivalent").isin(["1"]), "Single-Source Brand")
    .when(col("generic_equivalent").isin(["2", "3"]), "Multi-Source Brand")
    .when(col("generic_equivalent").isin(["4", "5"]), "Generic")
    .otherwise("OTC/Other"))


target_path = "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/Rx/ms_medicaid_Rxclaims_combined.parquet"
Rx.write.partitionBy("year" ).parquet(target_path, mode="overwrite")

In [0]:
%sql								
/*
DROP VIEW IF EXISTS LDS_view;								
CREATE TEMPORARY VIEW LDS_view 								
USING PARQUET								
OPTIONS (								
  								
  path "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medciare_medclaims_combined.parquet"
  								
  );								

-- "wasbs://marketscan@rgdevglobalreahl.blob.core.windows.net/marketscan/medicaid/marketscan-preprod-step2"
-- "wasbs://ms-etl-preprod-servicecat@rgdevglobalreahl.blob.core.windows.net/medicaid/medical/ms_medicaid_medclaims_combined.parquet"

select member_id,YEAR,
sum(allowed) as allowed
from LDS_view
group by 1,2
order by 3 asc

*/


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3451601888346498>, line 121[0m
[1;32m     28[0m column_order [38;5;241m=[39m [[38;5;124m"[39m[38;5;124mprocedure_code[39m[38;5;124m"[39m,
[1;32m     29[0m [38;5;124m"[39m[38;5;124mDRG[39m[38;5;124m"[39m,
[1;32m     30[0m [38;5;124m"[39m[38;5;124mfachdid[39m[38;5;124m"[39m,
[0;32m   (...)[0m
[1;32m    115[0m [38;5;124m"[39m[38;5;124mYEAR[39m[38;5;124m"[39m,
[1;32m    116[0m [38;5;124m"[39m[38;5;124mMS_Source_File[39m[38;5;124m"[39m]
[1;32m    120[0m [38;5;66;03m# Reorder the columns in Combined_table[39;00m
[0;32m--> 121[0m Combined_table [38;5;241m=[39m Combined_table[38;5;241m.[39mselect([col(c) [38;5;28;01mfor[39;00m c [38;5;129;01min[39;00m column_order])
[1;32m    125[0m target_path [38;5;241m=[39m [38;5;124m"[39m[38;5;124mwa