In [0]:
## Pyspark script to load a trained ML model to assist
##  in the creation of the provider education 90 day flag.
## Last Updated: 04/13/2022

import time
import sys
from datetime import *
import numpy as np
import argparse
from pyspark import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark import HiveContext, SQLContext
from pyspark.sql import *
#from pyspark.ml.pipeline import PipelineModel
from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler
from scipy.stats import ttest_ind_from_stats

In [0]:
# Creates a Spark session and context

spark = SparkSession.builder.appName('pe90_model').getOrCreate()
spark.sparkContext.setLogLevel('WARN')

In [0]:
user = dbutils.secrets.get("snowflake", "snowflake-user")
password = dbutils.secrets.get("snowflake", "snowflake-pwd")
sf_connection = dict(sfUrl= "cms_fps.us-east-1-gov.privatelink.snowflakecomputing.com:443",
sfUser= user,
sfPassword = password,
sfDatabase = "FPS_MTE",
sfSchema = "FPS_MLASR",                   
sfWarehouse = "WH_LRG")
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

In [0]:
pe90_ucm_educated_cases = '''
SELECT * 
FROM "FPS_MTE"."FPS_MLASR"."PE90_UCM_EDUCATED_CASES"
'''

tpe_educated_cases = '''
SELECT *
FROM "FPS_MTE"."FPS_DW"."TPE_EDUCATED_CASES"
'''

pe90_ucm_most_current_education = '''
SELECT * 
FROM "FPS_MTE"."FPS_MLASR"."PE90_UCM_MOST_CURRENT_EDUCATION"
'''

pe90_tpe_most_current_education = '''
SELECT *
FROM "FPS_MTE"."FPS_MLASR"."PE90_TPE_MOST_CURRENT_EDUCATION"
'''

pe90_most_current_education = '''
SELECT * 
FROM "FPS_MTE"."FPS_MLASR"."PE90_MOST_CURRENT_EDUCATION"
'''

pe90_alleducated_with_maxalertdate = '''
SELECT *
FROM "FPS_MTE"."FPS_MLASR"."PE90_ALLEDUCATED_WITH_MAXALERTDATE"
'''

pe90_educatedlist = '''
SELECT *
FROM "FPS_MTE"."FPS_MLASR"."PE90_EDUCATEDLIST"
'''

pe90_testing_data_before = '''
SELECT *
FROM "FPS_MTE"."FPS_MLASR"."PE90_TESTING_DATA_BEFORE"
'''
pe90_testing_data_after = '''
SELECT *
FROM "FPS_MTE"."FPS_MLASR"."PE90_TESTING_DATA_AFTER"
'''

pe90_testing_data_before_parta = '''
SELECT *
FROM "FPS_MTE"."FPS_MLASR"."PE90_TESTING_DATA_BEFORE_PARTA"
'''

pe90_testing_data_after_parta = '''
SELECT *
FROM "FPS_MTE"."FPS_MLASR"."PE90_TESTING_DATA_AFTER_PARTA";
'''
pe90_asrlist = ''' 
SELECT * FROM "FPS_MTE"."FPS_MLASR"."PE90_ASRLIST";
'''

fps_asrpt = '''
SELECT * FROM "FPS_MTE"."FPS_DW"."FPS_ASRPT";
'''

fps_asrpt_alert_asctn = '''
SELECT * FROM "FPS_MTE"."FPS_DW"."FPS_ASRPT_ALERT_ASCTN";
'''
fps_alert = '''
SELECT * FROM "FPS_MTE"."FPS_DW"."FPS_ALERT";
'''

fps_model = '''
SELECT * FROM "FPS_MTE"."FPS_DW"."FPS_MODEL";
'''

In [0]:
pe90_ucm_edu = (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",pe90_ucm_educated_cases)
                  .load())

tpe_educated_cases = (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",tpe_educated_cases)
                  .load())

pe90_ucm_most_curr_edu = (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",pe90_ucm_most_current_education)
                  .load())
pe90_tpe_most_curr_edu = (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",pe90_tpe_most_current_education)
                  .load())
pe90_most_curr_edu = (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",pe90_most_current_education)
                  .load())
pe90_alledu_maxale = (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",pe90_alleducated_with_maxalertdate)
                  .load())
pe90_educatedlist = (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",pe90_educatedlist)
                  .load())
pe90_testing_data_bef = (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",pe90_testing_data_before)
                  .load())
pe90_testing_data_aft = (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",pe90_testing_data_after)
                  .load())
pe90_testing_bef_parta = (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",pe90_testing_data_before_parta)
                  .load())

pe90_testing_aft_parta = (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",pe90_testing_data_after_parta)
                  .load())

pe90_asrlist = (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",pe90_asrlist)
                  .load())
  
                
fps_asrpt =  (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",fps_asrpt)
                  .load())

fps_asrpt_alert_asctn = (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",fps_asrpt_alert_asctn)
                  .load())

fps_alert = (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",fps_alert)
                  .load())

fps_model = (sqlContext.read
                  .format(SNOWFLAKE_SOURCE_NAME)
                  .options(**sf_connection)
                  .option("query",fps_model)
                  .load())

In [0]:


before_educ_ptb = pe90_testing_data_bef.na.replace('', 'EMPTY').filter(col('clm_src_type')=='B')\
.select('clm_blg_prvdr_npi_num','clm_line_from_dt','STATE_CD','clm_line_hcpcs_cd','hcpcs_1_mdfr_cd'\
          ,'hcpcs_2_mdfr_cd','clm_prncpl_dgns_cd','clm_dgns_1_cd','clm_dgns_2_cd','clm_dgns_3_cd'\
          ,'clm_dgns_4_cd','clm_dgns_5_cd','clm_prcdr_cd','clm_sbmt_chrg_amt')

  
before_educ_dme = pe90_testing_data_bef.na.replace('', 'EMPTY').filter(col('clm_src_type')=='DME')\
.select('clm_blg_prvdr_npi_num','clm_line_from_dt','STATE_CD','clm_line_hcpcs_cd','hcpcs_1_mdfr_cd'\
          ,'hcpcs_2_mdfr_cd','clm_prncpl_dgns_cd','clm_dgns_1_cd','clm_dgns_2_cd','clm_dgns_3_cd'\
          ,'clm_dgns_4_cd','clm_dgns_5_cd','clm_prcdr_cd','clm_sbmt_chrg_amt')

before_educ_pta = pe90_testing_bef_parta.na.replace('', 'EMPTY')\
.select('clm_blg_prvdr_npi_num','current_education_date','STATE_CD','clm_line_from_dt','clm_sbmt_chrg_amt'\
,'clm_line_hcpcs_cd','hcpcs_1_mdfr_cd','hcpcs_2_mdfr_cd','clm_prncpl_dgns_cd','clm_dgns_1_cd'\
,'clm_dgns_2_cd','clm_dgns_3_cd','clm_dgns_4_cd','clm_dgns_5_cd','clm_prcdr_cd','clm_prcdr_1_cd','clm_prcdr_2_cd'\
,'clm_prcdr_3_cd','clm_prcdr_4_cd','clm_prcdr_5_cd')
  
  
  
after_educ_ptb = pe90_testing_data_aft.na.replace('', 'EMPTY').filter(col('clm_src_type')=='B')\
  .select('clm_blg_prvdr_npi_num','clm_line_from_dt','STATE_CD','clm_line_hcpcs_cd','hcpcs_1_mdfr_cd'\
          ,'hcpcs_2_mdfr_cd','clm_prncpl_dgns_cd','clm_dgns_1_cd','clm_dgns_2_cd','clm_dgns_3_cd'\
          ,'clm_dgns_4_cd','clm_dgns_5_cd','clm_prcdr_cd','clm_sbmt_chrg_amt')
  
after_educ_dme = pe90_testing_data_aft.na.replace('', 'EMPTY').filter(col('clm_src_type')=='DME')\
  .select('clm_blg_prvdr_npi_num','clm_line_from_dt','STATE_CD','clm_line_hcpcs_cd','hcpcs_1_mdfr_cd'\
          ,'hcpcs_2_mdfr_cd','clm_prncpl_dgns_cd','clm_dgns_1_cd','clm_dgns_2_cd','clm_dgns_3_cd'\
          ,'clm_dgns_4_cd','clm_dgns_5_cd','clm_prcdr_cd','clm_sbmt_chrg_amt')
  
after_educ_pta = pe90_testing_aft_parta.na.replace('', 'EMPTY')\
  .select('clm_blg_prvdr_npi_num','current_education_date','STATE_CD','clm_line_from_dt','clm_sbmt_chrg_amt'\
          ,'clm_line_hcpcs_cd','hcpcs_1_mdfr_cd','hcpcs_2_mdfr_cd','clm_prncpl_dgns_cd','clm_dgns_1_cd'\
          ,'clm_dgns_2_cd','clm_dgns_3_cd','clm_dgns_4_cd','clm_dgns_5_cd','clm_prcdr_cd','clm_prcdr_1_cd','clm_prcdr_2_cd'\
          ,'clm_prcdr_3_cd','clm_prcdr_4_cd','clm_prcdr_5_cd')
  
  

In [0]:
before_educ_ptb = before_educ_ptb.toDF(*[c.upper() for c in before_educ_ptb.columns])
before_educ_dme = before_educ_dme.toDF(*[c.upper() for c in before_educ_dme.columns])
before_educ_pta = before_educ_pta.toDF(*[c.upper() for c in before_educ_pta.columns])
after_educ_ptb = after_educ_ptb.toDF(*[c.upper() for c in after_educ_ptb.columns])
after_educ_dme = after_educ_dme.toDF(*[c.upper() for c in after_educ_dme.columns])
after_educ_pta = after_educ_pta.toDF(*[c.upper() for c in after_educ_pta.columns])

In [0]:
regression_model_ptb = PipelineModel.load('dbfs:/ml/pe90_model/partb/trainedpipeline')
regression_model_dme = PipelineModel.load('dbfs:/ml/pe90_model/partdme/trainedpipeline')
regression_model_pta = PipelineModel.load('dbfs:/ml/pe90_model/parta/trainedpipeline')

In [0]:
# Estimating Pristine Provider Submitted Charge Amount for Each Claim for Before - Part B
before_educ_estimate_ptb = regression_model_ptb.transform(before_educ_ptb)
before_educ_estimate_ptb.createOrReplaceTempView("before_educ_estimate_ptb")
# Estimating Pristine Provider Submitted Charge Amount for Each Claim for Before - DME
before_educ_estimate_dme = regression_model_dme.transform(before_educ_dme)
before_educ_estimate_dme.createOrReplaceTempView("before_educ_estimate_dme")
# Estimating Pristine Provider Submitted Charge Amount for Each Claim for Before - Part A
before_educ_estimate_pta = regression_model_pta.transform(before_educ_pta)
before_educ_estimate_pta.createOrReplaceTempView("before_educ_estimate_pta")
# Estimating Pristine Provider Submitted Charge Amount for Each Claim for After - Part B
after_educ_estimate_ptb = regression_model_ptb.transform(after_educ_ptb)
after_educ_estimate_ptb.createOrReplaceTempView("after_educ_estimate_ptb")
# Estimating Pristine Provider Submitted Charge Amount for Each Claim for After - DME
after_educ_estimate_dme = regression_model_dme.transform(after_educ_dme)
after_educ_estimate_dme.createOrReplaceTempView("after_educ_estimate_dme")
# Estimating Pristine Provider Submitted Charge Amount for Each Claim for After - Part A
after_educ_estimate_pta = regression_model_pta.transform(after_educ_pta)
after_educ_estimate_pta.createOrReplaceTempView("after_educ_estimate_pta")

In [0]:
# Calculate Difference in Estimate and Actual PtB
  
before_educ_diff_ptb = spark.sql("\
          Select clm_blg_prvdr_npi_num\
                ,clm_sbmt_chrg_amt\
                ,prediction\
                ,clm_sbmt_chrg_amt - prediction as incorrect_billing_amt\
           from before_educ_estimate_ptb")
  
  
after_educ_diff_ptb = spark.sql("\
          Select clm_blg_prvdr_npi_num\
          ,clm_sbmt_chrg_amt\
          ,prediction\
          ,clm_sbmt_chrg_amt - prediction as incorrect_billing_amt\
          from after_educ_estimate_ptb")
  
# Calculate Difference in Estimate and Actual DME
  
before_educ_diff_dme = spark.sql("\
          Select clm_blg_prvdr_npi_num\
                ,clm_sbmt_chrg_amt\
                ,prediction\
                ,clm_sbmt_chrg_amt - prediction as incorrect_billing_amt\
          from before_educ_estimate_dme")
  
  
after_educ_diff_dme = spark.sql("\
          Select clm_blg_prvdr_npi_num\
                ,clm_sbmt_chrg_amt\
                ,prediction\
                ,clm_sbmt_chrg_amt - prediction as incorrect_billing_amt\
          from after_educ_estimate_dme")
  
  
# Calculate Difference in Estimate and Actual Part A

before_educ_diff_pta = spark.sql("\
    Select clm_blg_prvdr_npi_num\
          ,clm_sbmt_chrg_amt\
          ,prediction\
          ,clm_sbmt_chrg_amt - prediction as incorrect_billing_amt\
    from before_educ_estimate_pta")
  
  
after_educ_diff_pta = spark.sql("\
    Select clm_blg_prvdr_npi_num\
          ,clm_sbmt_chrg_amt\
          ,prediction\
          ,clm_sbmt_chrg_amt - prediction as incorrect_billing_amt\
          from after_educ_estimate_pta")
  
  
  
  
# Union DataFrames of PartB and DME Together
before_educ_diff_dmeptb = before_educ_diff_ptb.union(before_educ_diff_dme)
before_educ_diff = before_educ_diff_dmeptb.union(before_educ_diff_pta)
after_educ_diff_dmeptb = after_educ_diff_ptb.union(after_educ_diff_dme)
after_educ_diff = after_educ_diff_dmeptb.union(after_educ_diff_pta)
  
  
  
# Find average, stdev of incorrect billing  and the number of claims (sample size)
before_agg = before_educ_diff.groupby("clm_blg_prvdr_npi_num").agg(avg("incorrect_billing_amt").alias("avg_incorrect_before")\
                                                                 ,stddev("incorrect_billing_amt").alias("stdev_incorrect_before")\
                                                                 ,count("clm_sbmt_chrg_amt").alias("sample_size_before"))

after_agg = after_educ_diff.groupby("clm_blg_prvdr_npi_num").agg(avg("incorrect_billing_amt").alias("avg_incorrect_after")\
                                                               ,stddev("incorrect_billing_amt").alias("stdev_incorrect_after")\
                                                               ,count("clm_sbmt_chrg_amt").alias("sample_size_after"))

# Forming Testing Data
  
  
testing_data = before_agg.join(after_agg , on = ['clm_blg_prvdr_npi_num'], how ='inner')
# engine.saveAsTable(testing_data,"{}.pe90_testing_data_all".format(options.ml_db))


In [0]:
testing_data.createOrReplaceTempView("testing_data")

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW filter_data AS (
SELECT     CAST(clm_blg_prvdr_npi_num as integer)
           ,avg_incorrect_before
           ,stdev_incorrect_before
           ,sample_size_before
           ,avg_incorrect_after
           ,stdev_incorrect_after
           ,sample_size_after
    FROM testing_data
    WHERE sample_size_before >= 50 AND
          sample_size_after >= 50    AND
          stdev_incorrect_before > 0 AND
    stdev_incorrect_after > 0);

In [0]:
%python
filtered_data = spark.table('filter_data')
filtered_data.persist()

In [0]:
%python

filtered_data = filtered_data.select('*').collect()

In [0]:
def conduct_Ttest(testingdata_collect):
  
  final_data = []

  for i in range(0,len(testingdata_collect)):
    
    before_mean = testingdata_collect[i]['avg_incorrect_before']
    before_stdev = testingdata_collect[i]['stdev_incorrect_before']
    before_samp = testingdata_collect[i]['sample_size_before']
    after_mean = testingdata_collect[i]['avg_incorrect_after']
    after_stdev = testingdata_collect[i]['stdev_incorrect_after']
    after_samp = testingdata_collect[i]['sample_size_after']
    t2, p2 = ttest_ind_from_stats(mean1 = before_mean, std1 = before_stdev, nobs1 = before_samp, mean2= after_mean, std2 = after_stdev, nobs2= after_samp, equal_var=False)
    
    #If incorrect greater after education or p-value is too big, flag; If p-value less than threshold, don't flag
    if (p2 < 0.05):
      flag = 'N'
    else:
      flag = 'Y'

    final_data.extend(((testingdata_collect[i]['clm_blg_prvdr_npi_num'], (after_mean - before_mean), float(t2), float(p2), flag),))

  schema = StructType([StructField("Educated_NPI", IntegerType())\
                          ,StructField("Averge_Difference", FloatType())\
                          ,StructField("Test_Value", FloatType())\
                          ,StructField("P_value", FloatType())\
                          ,StructField("Flag", StringType())])
    
  result_df =  spark.createDataFrame(final_data,schema=schema)
  result_df.write.format('delta').mode('overwrite').save('dbfs:/ml/pe90_model/result')


In [0]:
conduct_Ttest(filtered_data)
result_df = spark.read.format("delta").load('dbfs:/ml/pe90_model/result')

In [0]:
%sql
DROP TABLE IF EXISTS fps_mlasr.result

In [0]:
%sql
CREATE TABLE fps_mlasr.RESULT
USING DELTA
LOCATION 'dbfs:/ml/pe90_model/result';

In [0]:
pe90_testing_data_bef.createOrReplaceTempView('pe90_testing_data_before')
pe90_testing_data_aft.createOrReplaceTempView('pe90_testing_data_after')
pe90_testing_bef_parta.createOrReplaceTempView('pe90_testing_before_parta')
pe90_testing_aft_parta.createOrReplaceTempView('pe90_testing_after_parta')
pe90_most_curr_edu.createOrReplaceTempView('pe90_most_current_education')
fps_asrpt.createOrReplaceTempView('fps_asrpt')
fps_asrpt_alert_asctn.createOrReplaceTempView('fps_asrpt_alert_asctn')
fps_alert.createOrReplaceTempView('fps_alert')
fps_model.createOrReplaceTempView('fps_model')

In [0]:
%sql

CREATE OR REPLACE TEMPORARY VIEW skippedClaims AS (
    
	Select beforemodel.clm_blg_prvdr_npi_num
          ,'B' as clm_src_type
          ,current_date as pe90_date
          ,'before' as before_after_educ
          ,beforemodel.data_count as before
          ,aftermodel.data_count as after
      from
          (Select clm_blg_prvdr_npi_num
                  ,count(clm_blg_prvdr_npi_num) as data_count
           from pe90_testing_data_before 
           where clm_src_type = 'B'
           group by clm_blg_prvdr_npi_num) as beforemodel
           
           left join 
           
           (Select clm_blg_prvdr_npi_num
                   ,count(clm_blg_prvdr_npi_num) as data_count
            from before_educ_estimate_ptb 
            group by clm_blg_prvdr_npi_num) as aftermodel
    
      on beforemodel.clm_blg_prvdr_npi_num = aftermodel.clm_blg_prvdr_npi_num 
      where beforemodel.data_count != aftermodel.data_count
            
	UNION ALL
    
	Select beforemodel.clm_blg_prvdr_npi_num
          ,'B' as clm_src_type
          ,current_date as pe90_date
          ,'after' as before_after_educ
          ,beforemodel.data_count as before
          ,aftermodel.data_count as after 
    from
          (Select clm_blg_prvdr_npi_num
                  ,count(clm_blg_prvdr_npi_num) as data_count
           from pe90_testing_data_after 
           where clm_src_type = 'B'
           group by clm_blg_prvdr_npi_num) as beforemodel
          
          left join 
          
          (Select clm_blg_prvdr_npi_num
                    ,count(clm_blg_prvdr_npi_num) as data_count
           from after_educ_estimate_ptb 
           group by clm_blg_prvdr_npi_num) as aftermodel
          
      on beforemodel.clm_blg_prvdr_npi_num = aftermodel.clm_blg_prvdr_npi_num 
      where beforemodel.data_count != aftermodel.data_count
          
	UNION ALL
    
	Select beforemodel.clm_blg_prvdr_npi_num
           ,'DME' as clm_src_type
           ,current_date as pe90_date
           ,'before' as before_after_educ
           ,beforemodel.data_count as before
           ,aftermodel.data_count as after 
    from
          (Select clm_blg_prvdr_npi_num
                  ,count(clm_blg_prvdr_npi_num) as data_count
           from pe90_testing_data_before 
           where clm_src_type = 'DME'
           group by clm_blg_prvdr_npi_num) as beforemodel
          
          left join 
          
          (Select clm_blg_prvdr_npi_num
                  ,count(clm_blg_prvdr_npi_num) as data_count
           from before_educ_estimate_dme
           group by clm_blg_prvdr_npi_num) as aftermodel
     on beforemodel.clm_blg_prvdr_npi_num = aftermodel.clm_blg_prvdr_npi_num 
     where beforemodel.data_count != aftermodel.data_count
            
	UNION ALL
    
    Select beforemodel.clm_blg_prvdr_npi_num
          ,'DME' as clm_src_type
          ,current_date as pe90_date
          ,'after' as before_after_educ
          ,beforemodel.data_count as before
          ,aftermodel.data_count as after 
    from
          (Select clm_blg_prvdr_npi_num
                  , count(clm_blg_prvdr_npi_num) as data_count
           from pe90_testing_data_after
            where clm_src_type = 'DME'
            group by clm_blg_prvdr_npi_num) as beforemodel
            
            left join 
            
           (Select clm_blg_prvdr_npi_num
                   ,count(clm_blg_prvdr_npi_num) as data_count
            from after_educ_estimate_dme
            group by clm_blg_prvdr_npi_num) as aftermodel
            
	on beforemodel.clm_blg_prvdr_npi_num = aftermodel.clm_blg_prvdr_npi_num 
	where beforemodel.data_count != aftermodel.data_count
    
	UNION ALL
    
	Select beforemodel.clm_blg_prvdr_npi_num
           ,'A' as clm_src_type
           ,current_date as pe90_date
           ,'before' as before_after_educ
           ,beforemodel.data_count as before
           ,aftermodel.data_count as after 
     from
          (Select clm_blg_prvdr_npi_num
                  ,count(clm_blg_prvdr_npi_num) as data_count
           from pe90_testing_before_parta
           group by clm_blg_prvdr_npi_num) as beforemodel
           
           left join
           
           (Select clm_blg_prvdr_npi_num
                   ,count(clm_blg_prvdr_npi_num) as data_count
            from before_educ_estimate_pta
            group by clm_blg_prvdr_npi_num) as aftermodel
            
	on beforemodel.clm_blg_prvdr_npi_num = aftermodel.clm_blg_prvdr_npi_num 
	where beforemodel.data_count != aftermodel.data_count
    
	UNION ALL
    
	Select beforemodel.clm_blg_prvdr_npi_num
           ,'A' as clm_src_type
           ,current_date as pe90_date
           ,'after' as before_after_educ
           ,beforemodel.data_count as before
           ,aftermodel.data_count as after 
     from
          (Select clm_blg_prvdr_npi_num
                  ,count(clm_blg_prvdr_npi_num) as data_count
            from pe90_testing_after_parta
            group by clm_blg_prvdr_npi_num) as beforemodel
            
            left join 
            
          (Select clm_blg_prvdr_npi_num
                  ,count(clm_blg_prvdr_npi_num) as data_count
           from after_educ_estimate_pta
           group by clm_blg_prvdr_npi_num) as aftermodel
	on beforemodel.clm_blg_prvdr_npi_num = aftermodel.clm_blg_prvdr_npi_num 
	where beforemodel.data_count != aftermodel.data_count);

In [0]:
skipped = spark.table('skippedClaims')
skipped.cache()

In [0]:
skipped.write.format('delta').mode('overwrite').option("overwriteSchema", "true").save('dbfs:/ml/pe90_model/skipped')

In [0]:
%sql
DROP TABLE IF EXISTS fps_mlasr.skippedClaims

In [0]:
%sql
CREATE TABLE fps_mlasr.skippedClaims
USING DELTA
LOCATION 'dbfs:/ml/pe90_model/skipped';

In [0]:
(skipped.write.format("snowflake")
 .options(**sf_connection)
 .option("dbtable", "SKIPPED")
 .mode('append')
 .save())

In [0]:
result_df.createOrReplaceTempView("pe90_testing_results")
pe90_educatedlist.createOrReplaceTempView("pe90_educatedlist")
pe90_alledu_maxale.createOrReplaceTempView("pe90_alledu_maxale")
pe90_asrlist.createOrReplaceTempView("pe90_asrlist")

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW education_data AS (
SELECT alleduc.asrpt_id
      ,CASE
          WHEN result.Flag IS NOT null THEN result.Flag
          WHEN result.Flag IS null AND usededuc.asrpt_id IS NOT null THEN 'Untestable'
          WHEN result.Flag IS null AND usededuc.asrpt_id IS null AND datediff(current_date, alleduc.max_alert_date) <= 365 THEN '<90days'
          ELSE 'N/A'
          END AS testing_flag
FROM pe90_alledu_maxale as alleduc
LEFT JOIN
pe90_educatedlist AS usededuc
ON usededuc.asrpt_id = alleduc.asrpt_id
LEFT JOIN
pe90_testing_results AS result
ON alleduc.subj_id = result.Educated_NPI); 

In [0]:
education_data_N = spark.sql("""Select * from education_data where testing_flag!='N' """)
education_data_N.createOrReplaceTempView("education_data_N") 

In [0]:
edu_data = spark.table('education_data')
edu_data.write.format('delta').mode('overwrite').save('dbfs:/ml/pe90_model/education_data')

In [0]:
%sql
DROP TABLE IF EXISTS fps_mlasr.education_data;

In [0]:
%sql
CREATE TABLE fps_mlasr.education_data
USING DELTA
LOCATION 'dbfs:/ml/pe90_model/education_data';

In [0]:
%sql

CREATE OR REPLACE TEMPORARY VIEW asr_education_date AS (
SELECT educ.asrpt_id
      ,educ.current_education_date
      ,result.Flag
      ,asr.subj_id
FROM pe90_most_current_education educ
INNER JOIN 
fps_asrpt AS asr 
ON asr.asrpt_id = educ.asrpt_id
INNER JOIN 
--pe90_testing_results_M AS result
(SELECT Educated_NPI
        ,Flag
 FROM pe90_testing_results
 WHERE  Flag = 'N') AS result
ON asr.subj_id = result.Educated_NPI);


In [0]:
asr_date = spark.table('asr_education_date')
asr_date.write.format('delta').mode('overwrite').save('dbfs:/ml/pe90_model/asr_education_date')

In [0]:
%sql
DROP TABLE IF EXISTS fps_mlasr.asr_education_date;

In [0]:
%sql
CREATE TABLE fps_mlasr.asr_education_date
USING DELTA
LOCATION 'dbfs:/ml/pe90_model/asr_education_date';

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW asr_education_date1 AS ( 
SELECT educ.asrpt_id
      ,educ.current_education_date
      ,educ.Flag
      ,educ.subj_id
      ,c.alert_creat_dt
      ,c.model_id
      ,c.alert_id
FROM asr_education_date AS educ
INNER JOIN
fps_asrpt_alert_asctn AS b 
ON educ.asrpt_id = b.asrpt_id
INNER JOIN
fps_alert AS c 
ON b.alert_id=c.alert_id 
INNER JOIN 
fps_model AS m 
ON c.model_id = m.model_id
WHERE m.MODEL_ACTVTY_STUS in ( 'Active' ,'Inactive')
GROUP BY educ.asrpt_id
        ,educ.current_education_date
        ,educ.Flag
        ,educ.subj_id
        ,c.alert_creat_dt
        ,c.model_id
        ,c.alert_id);


In [0]:
asr_date1 = spark.table('asr_education_date1')
asr_date1.write.format('delta').mode('overwrite').save('dbfs:/ml/pe90_model/asr_education_date1')

In [0]:
%sql
DROP TABLE IF EXISTS fps_mlasr.asr_education_date1;

In [0]:
%sql
CREATE TABLE fps_mlasr.asr_education_date1
USING DELTA
LOCATION 'dbfs:/ml/pe90_model/asr_education_date1';

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW alert_b AS (

SELECT alleduc.asrpt_id 
      ,alleduc.current_education_date 
      ,alleduc.Flag
      ,alleduc.subj_id
      ,alleduc.model_id
      ,alleduc.alert_creat_dt 
      ,alleduc.alert_id 
FROM asr_education_date1 alleduc
WHERE alleduc.alert_creat_dt < alleduc.current_education_date);

In [0]:
alert_b = spark.table('alert_b')
alert_b.write.format('delta').mode('overwrite').save('dbfs:/ml/pe90_model/alert_b')

In [0]:
%sql
DROP TABLE IF EXISTS fps_mlasr.alert_b;

In [0]:
%sql
CREATE TABLE fps_mlasr.alert_b
USING DELTA
LOCATION 'dbfs:/ml/pe90_model/alert_b';

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW alert_a AS (

SELECT alleduc.asrpt_id 
      ,alleduc.current_education_date 
      ,alleduc.Flag
      ,alleduc.subj_id
      ,alleduc.model_id
      ,alleduc.alert_creat_dt
      ,alleduc.alert_id   
FROM asr_education_date1 alleduc
WHERE alleduc.alert_creat_dt > alleduc.current_education_date);


In [0]:
alert_a = spark.table('alert_a')
alert_a.write.format('delta').mode('overwrite').save('dbfs:/ml/pe90_model/alert_a')

In [0]:
%sql
DROP TABLE IF EXISTS fps_mlasr.alert_a;

In [0]:
%sql
CREATE TABLE fps_mlasr.alert_a
USING DELTA
LOCATION 'dbfs:/ml/pe90_model/alert_a';

In [0]:

%sql
CREATE OR REPLACE TEMPORARY VIEW inal_data4_DS AS (
SELECT DISTINCT a.asrpt_id 
                ,'New Model' AS testing_flag 
FROM alert_a AS a 
LEFT JOIN 
alert_b AS b 
ON a.asrpt_id = b.asrpt_id 
WHERE a.model_id NOT IN (select b.model_id 
                        FROM alert_b b 
                        WHERE  b.asrpt_id = a.asrpt_id) 
GROUP BY a.asrpt_id, a.subj_id, a.current_education_date 
ORDER BY a.asrpt_id);

In [0]:
inal_data4_DS = spark.table('inal_data4_DS')
inal_data4_DS.write.format('delta').mode('overwrite').save('dbfs:/ml/pe90_model/inal_data4_DS')

In [0]:
%sql
DROP TABLE IF EXISTS fps_mlasr.inal_data4_DS;

In [0]:
%sql
CREATE TABLE fps_mlasr.inal_data4_DS
USING DELTA
LOCATION 'dbfs:/ml/pe90_model/inal_data4_DS';

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW final_data_5_DS AS (


SELECT DISTINCT a.asrpt_id 
      ,'N' AS testing_flag 
FROM 
(SELECT asrpt_id 
 FROM alert_a 
 UNION 
 SELECT asrpt_id 
 FROM alert_b) a 
WHERE a.asrpt_id NOT IN (SELECT b.asrpt_id 
                         FROM inal_data4_DS  b)
GROUP BY a.asrpt_id
ORDER BY a.asrpt_id);

In [0]:
final_data_5_DS = spark.table('final_data_5_DS')
final_data_5_DS.write.format('delta').mode('overwrite').save('dbfs:/ml/pe90_model/final_data_5_DS')

In [0]:
%sql
DROP TABLE IF EXISTS fps_mlasr.final_data_5_DS;

In [0]:
%sql
CREATE TABLE fps_mlasr.final_data_5_DS
USING DELTA
LOCATION 'dbfs:/ml/pe90_model/final_data_5_DS';

In [0]:
%sql

CREATE OR REPLACE TEMPORARY VIEW After_education_flag AS (
SELECT * 
FROM inal_data4_DS
UNION ALL 
SELECT * 
FROM final_data_5_DS
UNION ALL
SELECT * 
FROM education_data_N);


In [0]:
After_education_flag = spark.table('After_education_flag')
After_education_flag.write.format('delta').mode('overwrite').save('dbfs:/ml/pe90_model/After_education_flag')

In [0]:
%sql
DROP TABLE IF EXISTS fps_mlasr.After_education_flag;

In [0]:
%sql
CREATE TABLE fps_mlasr.After_education_flag
USING DELTA
LOCATION 'dbfs:/ml/pe90_model/After_education_flag';

In [0]:
%sql

CREATE OR REPLACE TEMPORARY VIEW pe90_incr AS (
SELECT asr.asrpt_id
      ,current_date as pe90_date
      ,Case 
          WHEN educ.testing_flag IS null THEN 'N/A' 
          ELSE educ.testing_flag 
          END AS pe90_flag
FROM pe90_asrlist as asr
LEFT JOIN
After_education_flag AS educ
ON asr.asrpt_id = educ.asrpt_id);


In [0]:
pe90_incr = spark.table('pe90_incr')
pe90_incr.persist()
pe90_incr.write.format('delta').mode('overwrite').save('dbfs:/ml/pe90_model/pe90_incr')

In [0]:
%sql
DROP TABLE IF EXISTS fps_mlasr.pe90_incr

In [0]:
%sql
CREATE TABLE fps_mlasr.pe90_incr
USING DELTA
LOCATION 'dbfs:/ml/pe90_model/pe90_incr';


In [0]:
(pe90_incr.write.format("snowflake")
 .options(**sf_connection)
 .option("dbtable", "PE90_INCR")
 .mode('append')
 .save())