In [1]:
%%configure -f
{ "conf": {
            "spark.pyspark.python": "python3",
            "spark.pyspark.virtualenv.enabled": "true",
            "spark.sql.parquet.writeLegacyFormat": "true",
            "spark.sql.parquet.enableVectorizedReader":"false",
            "spark.pyspark.virtualenv.type":"native",
            "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
            "spark.jars":"s3://aws-glue-assets-137931813934-us-east-2/extraJars/hudi-spark3-bundle_2.12-0.11.1.jar",
            "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
            "spark.sql.hive.convertMetastoreParquet":"false",
            "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
            "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
            "hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
            "spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive":"true"
          }}

In [2]:
spark.sql("REFRESH TABLE analytics_dw.hudi_raw_servicerequest")
spark.sql("REFRESH TABLE analytics_dw.hudi_raw_authorization")
spark.sql("REFRESH TABLE analytics_dw.ghp_auth_file_integration_qc")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1695049560773_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [3]:
%%HTML
<style>
    pre {white-space:pre !important;}≈
</style>

In [4]:
import boto3
import json
import sys
from datetime import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pytz import timezone

appName = "GHP Auth File Integration"
spark = SparkSession.builder \
    .appName(appName) \
    .enableHiveSupport() \
    .config("spark.sql.broadcastTimeout", "60000") \
    .config("spark.sql.hive.convertMetastoreParquet", "false") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .config("hive.metastore.client.factory.class",
            "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
    .config("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY")\
    .getOrCreate()


def log_num_of_auths(cw_client, sr, env):
    """
    Logs the number of incremental auths in the current run
    """
    cw_client.put_metric_data(
        Namespace='cohere',
        MetricData=[{
            'MetricName': 'Airflow_ghp_auth_file_integration',
            'Dimensions': [
                {
                    'Name': 'airflow',
                    'Value': f'Num_of_Auths-{env}'
                },
            ],
            'Value': sr.count(),
            'Unit': 'Count'
        }]
    )
    return None


def log_authorizations_with_null_cohereId(cw_client, auth, env):
    """
    Logs the number of authorizations with null cohereId
    """
    cw_client.put_metric_data(
        Namespace='cohere',
        MetricData=[{
            'MetricName': 'Airflow_ghp_auth_file_integration',
            'Dimensions': [
                {
                    'Name': 'airflow',
                    'Value': f'Num_of_Auths_with_null_cohereId-{env}'
                },
            ],
            'Value': auth.count(),
            'Unit': 'Count'
        }]
    )
    return None

def log_auths_without_pxcode_and_servicegroup(cw_client, auth, env):
    """
    Logs the number of auths without service group id and procedure code
    """
    cw_client.put_metric_data(
        Namespace='cohere',
        MetricData=[{
            'MetricName': 'Airflow_ghp_auth_file_integration',
            'Dimensions': [
                {
                    'Name': 'airflow',
                    'Value': f'Num_of_auths_without_pxcode_and_servicegroup-{env}'
                },
            ],
            'Value': auth.count(),
            'Unit': 'Count'
        }]
    )
    return None

def log_auths_with_both_pxcode_and_servicegroup(cw_client, auth, env):
    """
    Logs the number of auths with both service group id and procedure code
    """
    cw_client.put_metric_data(
        Namespace='cohere',
        MetricData=[{
            'MetricName': 'Airflow_ghp_auth_file_integration',
            'Dimensions': [
                {
                    'Name': 'airflow',
                    'Value': f'Num_of_auths_with_both_pxcode_and_servicegroup-{env}'
                },
            ],
            'Value': auth.count(),
            'Unit': 'Count'
        }]
    )
    return None

def log_denied_auths_with_no_service_status_code(cw_client, auth, env):
    """
    Logs the number of auths with both service group id and procedure code
    """
    cw_client.put_metric_data(
        Namespace='cohere',
        MetricData=[{
            'MetricName': 'Airflow_ghp_auth_file_integration',
            'Dimensions': [
                {
                    'Name': 'airflow',
                    'Value': f'Num_of_denied_auths_with_no_service_status_code-{env}'
                },
            ],
            'Value': auth.count(),
            'Unit': 'Count'
        }]
    )
    return None

def log_incorrect_column_order(cw_client, env):
    """
    Logs the number of auths without service group id and procedure code
    """
    cw_client.put_metric_data(
        Namespace='cohere',
        MetricData=[{
            'MetricName': 'Airflow_ghp_auth_file_integration',
            'Dimensions': [
                {
                    'Name': 'airflow',
                    'Value': f'Incorrect_column_order-{env}'
                },
            ],
            'Value': 1,
            'Unit': 'Count'
        }]
    )
    return None

def log_auth_file_update_size_exceeded_hard_limit(cw_client, auth, env):
    """
    Logs that the auth file size exceeded hard limit
    """
    cw_client.put_metric_data(
        Namespace='cohere',
        MetricData=[{
            'MetricName': 'Airflow_ghp_auth_file_integration',
            'Dimensions': [
                {
                    'Name': 'airflow',
                    'Value': f'Updates_in_Auth_file_exceeded_hard_limit-{env}'
                },
            ],
            'Value': auth.count(),
            'Unit': 'Count'
        }]
    )
    return None

def add_prefix_to_column_name(df, prefix):
    print(f"Adding prefix to {prefix}")
    new_cols = [prefix + "_" + column for column in df.columns]
    new_df = df.toDF(*new_cols)
    return new_df


def get_last_run_time(db):
    '''
     This will fetch the auths processed by this job to get the maximum timestamp of auths processed to evaluate Incremental data
    '''
    table_name = f'{db}.ghp_auth_file_integration_qc'

    max_commit_time = spark.sql(f"select max(ghp_auth_hoodie_commit_time) as hoodie_commit_time ,ghp_is_authorization\
                                from {table_name} group by ghp_is_authorization order by ghp_is_authorization")
    print("max_commit_time data returned")
    max_commit_time = add_prefix_to_column_name(max_commit_time, "max").collect()
    max_commit_time_sr = None
    max_commit_time_auth = None
    max_commit_time_null = "20230101000000000" #default so that jobs runs successfully in all envs
    for row in max_commit_time:
        if row['max_ghp_is_authorization'] == "N":
            max_commit_time_sr = row['max_hoodie_commit_time']
        elif row['max_ghp_is_authorization'] == "Y":
            max_commit_time_auth = row['max_hoodie_commit_time']
        else:
            max_commit_time_null = row['max_hoodie_commit_time']
    max_commit_time_sr = max_commit_time_sr if max_commit_time_sr > max_commit_time_null else max_commit_time_null
    return max_commit_time_sr, max_commit_time_auth


def fetch_ghp_service_requests(db, refresh_type, max_commit_time=None, cohereids=()):
    '''
    Max commit time would be None for historical runs, whereas for incremental runs it would have a value from all previous outputed data
    '''

    table_name = f'{db}.hudi_raw_servicerequest'

    auth_status = ('APPROVED', 'DENIED', 'PARTIALLY_APPROVED', 'WITHDRAWN', 'VOIDED', 'DISMISSED')

    query = f"select fulldocument._id,fulldocument.cohereid, fulldocument.startdate, fulldocument.enddate, fulldocument.encounterType, fulldocument.units, fulldocument.authStatus, fulldocument.placeofservice, fulldocument.selectedFacility as facility , \
    fulldocument.selectedOrderingProvider as orderingprovider , fulldocument.selectedPerformingProvider as performingprovider, fulldocument.patient, fulldocument.healthplanname, fulldocument.clinicalservice, fulldocument.clinicalservices, \
    fulldocument.requestor.channel as requestorChannel, fulldocument.approvedunits as auth_approved_units,fulldocument.semanticprocedurecodes,fulldocument.selectedPerformingProvider.selectedLocation.tin as performing_tin, fulldocument.claimspaymentnote, fulldocument.servicestatuscode,\
    _hoodie_commit_time as hoodie_commit_time , fulldocument.dateCreated, fulldocument.lastUpdated, fulldocument.selectedPerformingProvider.selectedLocation.externalReferenceId as performing_provider_GHP_Id,\
    fulldocument.selectedFacility.selectedLocation.externalReferenceId as facility_GHP_Id, fulldocument.selectedOrderingProvider.selectedLocation.externalReferenceId as ordering_provider_GHP_Id, fulldocument.selectedOrderingProvider.selectedLocation.tin as ordering_tin,\
    fulldocument.selectedFacility.name as facility_GHP_name, fulldocument.selectedFacility.selectedLocation.address.city as facility_GHP_address_city,fulldocument.selectedFacility.selectedLocation.tin as facility_GHP_tin,fulldocument._id as id,fulldocument.selectedFacility.selectedLocation.address.zipcode as facility_GHP_address_zipcode,\
    fulldocument.selectedFacility.selectedLocation.address.line1 as facility_GHP_address_line_1, fulldocument.selectedFacility.selectedLocation.address.line2 as facility_GHP_address_line_2,fulldocument.selectedFacility.selectedLocation.address.state as facility_GHP_address_state, fulldocument.selectedFacility.providerType as facility_GHP_providerType, fulldocument.decisiontimestamp,fulldocument.updatedby,\
    fulldocument.selectedOrderingProvider.firstName as ordering_provider_firstName, fulldocument.selectedOrderingProvider.lastName as ordering_provider_lastName, fulldocument.selectedOrderingProvider.providerType as ordering_provider_providerType,\
    fulldocument.selectedPerformingProvider.firstName as performing_provider_firstName, fulldocument.selectedPerformingProvider.lastName as performing_provider_lastName, fulldocument.selectedPerformingProvider.providerType as performing_provider_providerType, fulldocument.selectedPerformingProvider.name as performing_provider_name, fulldocument.integrationstatus\
    from {table_name}\
    where fulldocument.healthplanname = 'Geisinger'\
    and fulldocument.authStatus in {auth_status}\
    and fullDocument.deleted!=true \
    and fulldocument.authorization is null"

    if len(cohereids) > 0 and max_commit_time is not None:
        query += f" and (_hoodie_commit_time >= {max_commit_time} or fulldocument.cohereid in {cohereids} ) "

    if max_commit_time is not None and len(cohereids) == 0:
        query += f" and _hoodie_commit_time >= {max_commit_time}"

    sr = spark.sql(f"""{query}""")

    sr = add_prefix_to_column_name(sr, "sr")

    sr = sr.withColumn("sr_is_authorization", lit("N"))
    print("service_request data returned")
    return sr


def fetch_date_created(db):
    '''
    Fetches the date created off of the service request to remedy authorizations that were created upon the creation of a continuation request
    (the initial SR was created before the continuations release)
    '''
    table_name = f'{db}.hudi_raw_servicerequest'

    auth_status = ('APPROVED', 'DENIED', 'PARTIALLY_APPROVED', 'WITHDRAWN', 'VOIDED', 'DISMISSED')

    query = f"select fulldocument.cohereid, fulldocument.dateCreated\
            from {table_name}\
    where fulldocument.healthplanname = 'Geisinger'\
    and fulldocument.authStatus in {auth_status}\
    and fullDocument.deleted!=true \
    and fulldocument.authorization is not null"

    sr = spark.sql(f"""{query}""")

    sr = add_prefix_to_column_name(sr, "dc")

    print("dateCreated data returned")
    return sr


def fetch_ghp_authorization(db, refresh_type, max_commit_time=None, cohereids=()):
    '''
    Max commit time would be None for historical runs, whereas for incremental runs it would have a value from all previous outputed data
    '''

    table_name = f'{db}.hudi_raw_authorization'

    auth_status = ('APPROVED', 'DENIED', 'PARTIALLY_APPROVED', 'WITHDRAWN', 'VOIDED', 'DISMISSED')

    query = f"select fulldocument._id,fulldocument.servicerequestids, fulldocument.authnumber as cohereid,fulldocument.startdate, fulldocument.enddate, fulldocument.requestedunits as units, fulldocument.selectedFacility as facility, \
    fulldocument.selectedOrderingProvider as orderingprovider, fulldocument.selectedPerformingProvider as performingprovider , fulldocument.healthplanname,  \
    fulldocument.approvedunits as auth_approved_units,fulldocument.claimspaymentnote, fulldocument.servicestatuscode,\
    _hoodie_commit_time as hoodie_commit_time ,fulldocument.dateCreated,fulldocument.requestorChannel, fulldocument.patient,\
    fulldocument.lastupdated, fulldocument.authstatus, fulldocument.encounterType, fulldocument.placeofservice, fulldocument.clinicalservice, fulldocument.clinicalservices, fulldocument.semanticprocedurecodes,\
    fulldocument.selectedPerformingProvider.selectedLocation.tin as performing_tin, fulldocument.selectedPerformingProvider.selectedLocation.externalReferenceId as performing_provider_GHP_Id,\
    fulldocument.selectedFacility.selectedLocation.externalReferenceId as facility_GHP_Id, fulldocument.selectedOrderingProvider.selectedLocation.externalReferenceId as ordering_provider_GHP_Id, fulldocument.selectedOrderingProvider.selectedLocation.tin as ordering_tin,\
    fulldocument.selectedFacility.name as facility_GHP_name, fulldocument.selectedFacility.selectedLocation.address.city as facility_GHP_address_city,fulldocument.selectedFacility.selectedLocation.tin as facility_GHP_tin,fulldocument._id as id,fulldocument.selectedFacility.selectedLocation.address.zipcode as facility_GHP_address_zipcode,\
    fulldocument.selectedFacility.selectedLocation.address.line1 as facility_GHP_address_line_1, fulldocument.selectedFacility.selectedLocation.address.line2 as facility_GHP_address_line_2,fulldocument.selectedFacility.selectedLocation.address.state as facility_GHP_address_state, fulldocument.selectedFacility.providerType as facility_GHP_providerType, fulldocument.decisiontimestamp,fulldocument.updatedby,\
    fulldocument.selectedOrderingProvider.firstName as ordering_provider_firstName, fulldocument.selectedOrderingProvider.lastName as ordering_provider_lastName, fulldocument.selectedOrderingProvider.providerType as ordering_provider_providerType,\
    fulldocument.selectedPerformingProvider.firstName as performing_provider_firstName, fulldocument.selectedPerformingProvider.lastName as performing_provider_lastName, fulldocument.selectedPerformingProvider.providerType as performing_provider_providerType, fulldocument.selectedPerformingProvider.name as performing_provider_name, fulldocument.integrationstatus\
    from {table_name}\
    where fulldocument.healthplanname = 'Geisinger'\
    and fullDocument.deleted!=true \
    and fulldocument.authstatus in {auth_status} "

    if len(cohereids) > 0 and max_commit_time is not None:
        query += f" and (_hoodie_commit_time >= {max_commit_time} or fulldocument.authnumber in {cohereids} ) "

    if max_commit_time is not None and len(cohereids) == 0:
        query += f" and _hoodie_commit_time >= {max_commit_time}"

    auth = spark.sql(f"""{query}""")

    auth = add_prefix_to_column_name(auth, "sr")

    null_cohere_id_auths = auth.filter(col("sr_cohereid").isNull()).count()
    if null_cohere_id_auths > 0:
        auth_without_cohereId = auth.filter(col("sr_cohereid").isNull())
        auth = auth.filter(col("sr_cohereid").isNotNull())
        # log_authorizations_with_null_cohereId(cw_client, auth_without_cohereId,env)
        print("Logging authorizations with null cohereId")
        auth_without_cohereId = auth_without_cohereId.collect()
        for row in auth_without_cohereId:
            print("Autorizations with null cohere Id: ",row['sr__id']," with service request Ids: ",row['sr_servicerequestids'])

    auth = auth.drop("sr_servicerequestids")
    auth = auth.withColumn("sr_is_authorization", lit("Y"))
    print("authorization data returned")
    return auth


def handle_semantic_procedure_codes(sr, clinical_service, service_group_id, refresh_type):
    # get auths that have an empty/null clinicalServices list
    sr_empty_or_null_clinicalservices = sr.filter((size(col("sr_clinicalservices")) < 1) | (col("sr_dateCreated") < unix_timestamp(lit("2023-05-14 18:00:00")).cast('timestamp')))
    sr_empty_or_null_clinicalservices = sr_empty_or_null_clinicalservices.join(clinical_service, sr_empty_or_null_clinicalservices.sr_clinicalservice == clinical_service.clinicalservice_id, "left") \
    .join(service_group_id, lower(trim(col("clinicalservice_name"))) == col("Cohere_serviceID_name_lookup"), "left") \
        .withColumn("temp_service_group_id", col("GHP_service_name_lookup")) \
        .drop("Cohere_serviceID_name_lookup","GHP_service_name_lookup", "sr__id")

    # get auths with items in their clinicalServices list
    sr_with_clinicalservices = sr.filter((size(col("sr_clinicalservices")) >= 1) & (col("sr_dateCreated") >= unix_timestamp(lit("2023-05-14 18:00:00")).cast('timestamp')))

    # apply old logic to SRs with empty/null clinicalservices
    sr_with_groupId = sr_empty_or_null_clinicalservices.filter(col("temp_service_group_id").isNotNull()) \
        .withColumn("sr_px_units", col("sr_units")) \
        .withColumn("sr_code", lit("")) \
        .withColumn("sr_approvedunits", col("sr_auth_approved_units")) \
        .drop("sr_semanticprocedurecodes", "sr_auth_approved_units")
    sr_withOut_groupId = sr_empty_or_null_clinicalservices.filter(col("temp_service_group_id").isNull()) \
        .withColumn("semanticprocedurecodes_explode", explode("sr_semanticprocedurecodes")) \
        .withColumn("sr_px_units", col("semanticprocedurecodes_explode.units").cast("string")) \
        .withColumn("sr_code", col("semanticprocedurecodes_explode.code").cast("string")) \
        .withColumn("sr_approvedunits", col("semanticprocedurecodes_explode.approvedUnits")) \
        .drop("semanticprocedurecodes_explode", "sr_semanticprocedurecodes", "sr_auth_approved_units")
    sr_empty_or_null_clinicalservices = sr_with_groupId.union(sr_withOut_groupId) \
        .withColumn("sr_dateCreated", coalesce(to_timestamp(col("sr_datecreated"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),
                                               to_timestamp(col("sr_datecreated"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))) \
        .withColumn("sr_lastUpdated", coalesce(to_timestamp(col("sr_lastUpdated"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),
                                               to_timestamp(col("sr_lastUpdated"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))) \
        .withColumn("sr_dateCreated_temp", date_format(col("sr_dateCreated"), "yyyyMMddHHmmssSSS")) \
        .withColumn("sr_units", col("sr_units").cast("string")) \
        .withColumn("sr_approvedunits", col("sr_approvedunits").cast("string")) \
        .withColumn("refresh_type", lit(refresh_type).cast("string"))

    # apply new logic to auths with ids in clinicalservices
    # break out the auth by semantic px code
    # this function naively assumes that all groupIds will be clinicalServiceIds, need to change if other groupBy objs are incorporated in the future
    sr_with_clinicalservices_exp = sr_with_clinicalservices.withColumn("semanticprocedurecodes_explode",explode("sr_semanticprocedurecodes")) \
        .withColumn("sr_px_units", col("semanticprocedurecodes_explode.units").cast("string")) \
        .withColumn("sr_code",col("semanticprocedurecodes_explode.code").cast("string")) \
        .withColumn("sr_approvedunits",col("semanticprocedurecodes_explode.approvedUnits")) \
        .withColumn("sr_clinicalservice", col("semanticprocedurecodes_explode.groupId")) \
        .drop("semanticprocedurecodes_explode","sr_semanticprocedurecodes","sr_auth_approved_units")
    sr_with_clinicalservices_px = sr_with_clinicalservices_exp.join(clinical_service, sr_with_clinicalservices_exp.sr_clinicalservice == clinical_service.clinicalservice_id, "left") \
    .join(service_group_id, lower(trim(col("clinicalservice_name"))) == col("Cohere_serviceID_name_lookup"), "left") \
        .withColumn("temp_service_group_id", col("GHP_service_name_lookup")) \
        .drop("Cohere_serviceID_name_lookup","GHP_service_name_lookup", "sr__id")
    sr_with_service_group_id_for_units = sr_with_clinicalservices_px.filter(col("temp_service_group_id").isNotNull()) \
        .withColumn("sr_approvedunits", col("sr_approvedunits"))
    # if units on px turned on, break out 1 line per px, but with reference to servicegroupid
    sr_with_units_on_px = sr_with_service_group_id_for_units.filter( (sr_with_service_group_id_for_units["clinicalservice_isUnitsOnPx"] == True) & (sr_with_service_group_id_for_units["temp_service_group_id"] == "HOME") ) \
        .withColumn("sr_code", lit(""))
    # if units on px turned off, group pxs on 1 line
    sr_no_units_on_px = sr_with_service_group_id_for_units.filter( (sr_with_service_group_id_for_units["clinicalservice_isUnitsOnPx"] == False) | (sr_with_service_group_id_for_units["temp_service_group_id"] != "HOME")) \
        .groupBy("sr_id", "sr_cohereid", "sr_startdate", "sr_enddate", "sr_encounterType", "sr_authStatus", "sr_placeofservice", "sr_units", \
                 "sr_facility", "sr_orderingprovider", "sr_performingprovider", "sr_patient", "sr_healthplanname", "sr_clinicalservice", "sr_clinicalservices", "sr_requestorChannel", \
                 "sr_performing_tin", "sr_claimspaymentnote", "sr_servicestatuscode", "sr_hoodie_commit_time", "sr_dateCreated", "sr_lastUpdated", "sr_performing_provider_GHP_Id", \
                 "sr_facility_GHP_Id", "sr_ordering_provider_GHP_Id", "sr_ordering_tin", "sr_facility_GHP_address_city", "sr_facility_GHP_tin", "sr_facility_GHP_address_zipcode", \
                 "sr_facility_GHP_address_line_1", "sr_facility_GHP_address_line_2", "sr_facility_GHP_address_state", "sr_decisiontimestamp", "sr_is_authorization", "sr_facility_GHP_providerType", \
                 "sr_facility_GHP_name", "sr_ordering_provider_firstName", "sr_ordering_provider_lastName", "sr_ordering_provider_providerType", \
                 "sr_performing_provider_firstName", "sr_performing_provider_lastName", "sr_performing_provider_providerType", "sr_performing_provider_name", \
                 "clinicalservice_id", "clinicalservice_name", "clinicalservice_allowedhealthplanids", "clinicalservice_isUnitsOnPx", "clinicalservice_procedurecodeids", "clinicalservice_procedureCodes", \
                 "health_plan_name_id", "health_plan_name_name", "temp_service_group_id", "sr_max_commit_time", "sr_integrationstatus").agg(max("sr_px_units").alias("sr_px_units"), max("sr_approvedunits").alias("sr_approvedunits")).withColumn("sr_code", lit(""))
    sr_w_service_group_id = sr_with_units_on_px.unionByName(sr_no_units_on_px)
    sr_withOut_service_group_id = sr_with_clinicalservices_px.filter(col("temp_service_group_id").isNull())
    sr_with_service_group_id_for_units = sr_w_service_group_id.unionByName(sr_withOut_service_group_id) \
        .withColumn("sr_dateCreated",coalesce(to_timestamp(col("sr_datecreated"),"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),to_timestamp(col("sr_datecreated"),"yyyy-MM-dd'T'HH:mm:ss'Z'"))) \
        .withColumn("sr_lastUpdated",coalesce(to_timestamp(col("sr_lastUpdated"),"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),to_timestamp(col("sr_lastUpdated"),"yyyy-MM-dd'T'HH:mm:ss'Z'"))) \
        .withColumn("sr_dateCreated_temp",date_format(col("sr_dateCreated"),"yyyyMMddHHmmssSSS")) \
        .withColumn("sr_approvedunits",col("sr_approvedunits").cast("string")) \
        .withColumn("refresh_type", lit(refresh_type).cast("string"))
    sr_final = sr_with_service_group_id_for_units.unionByName(sr_empty_or_null_clinicalservices)
    return sr_final

def fetch_place_of_service(db):
    '''
    Fetches Place of Service data for lookup
    '''

    table_name = f'{db}.hudi_raw_placeofservice'

    pos = spark.sql(
        f"""
    select fulldocument._id as id, fulldocument.code
    from {table_name}

    """)
    print(f"Returned {pos.count()} place of service records")
    return add_prefix_to_column_name(pos, "pos")


def fetch_provider(db):
    '''
    Fetches Provider data for lookup
    '''

    table_name = f'{db}.hudi_raw_provider'

    provider = spark.sql(
        f"""
    select fulldocument._id as id, fulldocument.healthplanname , fulldocument.firstname , fulldocument.lastname ,
    fulldocument.tinlist, fulldocument.name, fulldocument.providertype
    from {table_name}
    where fulldocument.healthplanname = 'Geisinger' and fulldocument.providertype in ('P', 'G')
    """)
    print(f"Returned {provider.count()} provider records")
    provider = add_prefix_to_column_name(provider, "provider")
    provider = provider.withColumn("provider_tin_list",
                                   split(regexp_replace(col("provider_tinlist").cast("string"), '\[|\]| ', ''), ",\s*")) \
        .drop("provider_tinlist")
    return provider


def fetch_facility(db):
    '''
    Fetches Facility data for lookup
    '''

    table_name = f'{db}.hudi_raw_facility'

    facility = spark.sql(
        f"""
    select fulldocument._id as id, fulldocument.healthplanname , fulldocument.name, fulldocument.providertype
    from {table_name}
    where fulldocument.healthplanname = 'Geisinger' and fulldocument.providertype in ('F', 'G')
    """)
    print(f"Returned {facility.count()} facility records")
    return add_prefix_to_column_name(facility, "facility")


def fetch_patient(db):
    '''
    Fetches Patient data for lookup
    '''

    table_name = f'{db}.hudi_raw_patient'

    patient = spark.sql(
        f"""
    select fulldocument._id as id, fulldocument.coverages , fulldocument.memberid
    from {table_name}
    """)
    patient = add_prefix_to_column_name(patient, "patient")
    patient = patient.withColumn("coverage_explode", explode(col("patient_coverages"))) \
        .withColumn("patient_subscriberID", col("coverage_explode.subscriberID").cast("string")) \
        .withColumn("patient_healthplanname", col("coverage_explode.healthplanname").cast("string")) \
        .drop("coverage_explode", "patient_coverages")
    patient = patient.filter(col("patient_healthplanname") == lit("Geisinger"))
    print(f"Returned {patient.count()} patient records")
    return patient


def fetch_health_plan_name(db):
    '''
   Fetches HealthPlanName to filter Clinical Service Data for GHP
   '''

    table_name = f'{db}.hudi_raw_healthplan'

    health_plan_name = spark.sql(
        f"""
    select fulldocument._id as id , fulldocument.name as name
    from {table_name} t
    where fulldocument.name = 'Geisinger'
    """)
    print(f"Returned {health_plan_name.count()} health_plan_name records")
    return add_prefix_to_column_name(health_plan_name, "health_plan_name")


def fetch_clinical_service(db, health_plan_name):
    '''
   Fetches clinical service data for GHP
   '''

    table_name = f'{db}.hudi_raw_clinicalservice'

    clinicalservice = spark.sql(
        f"""
    select fulldocument._id as id, fulldocument.name, fulldocument.allowedhealthplanids, fulldocument.isUnitsOnPx, fulldocument.procedurecodeids, fulldocument.semanticprocedureCodes.code AS procedureCodes
    from {table_name} t

    """)
    print(f"Returned {clinicalservice.count()} clinicalservice records")
    clinicalservice = add_prefix_to_column_name(clinicalservice,"clinicalservice")
    clinicalservice = clinicalservice.withColumn("clinicalservice_allowedhealthplanids",regexp_replace(col("clinicalservice_allowedhealthplanids").cast("string"), '\[|\]| ', '')) \
        .join(health_plan_name, col("clinicalservice_allowedhealthplanids") == col("health_plan_name_id"), "inner")
    return clinicalservice


def fetch_service_request_relation(db):
    '''
   Fetches service request relation data for GHP
   '''

    table_name = f'{db}.hudi_raw_servicerequestrelation'

    servicerequestrelation = spark.sql(
        f"""
    select fulldocument.from, fulldocument.to
    from {table_name} t
    where fulldocument.healthplanname = 'Geisinger' and fulldocument.relation = 'REPLACEMENT'

    """)
    print(f"Returned {servicerequestrelation.count()} servicerequestrelation records")
    return add_prefix_to_column_name(servicerequestrelation, "servicerequestrelation")


def check_refresh_type(db, refresh_type, cohereids):
    '''
    If the refresh type is full refresh, all auths for GHP will be fetched
    If refresh type is incremental:
                    The max hoodie commit time from the past run would be fetched, and
                    The incremental auths that have come since that max hoodie commit time
    '''

    max_commit_time_auth = "20230101000000000" #default for historical runs
    if refresh_type == "full_refresh":
        sr = fetch_ghp_service_requests(db, refresh_type)
        auth = fetch_ghp_authorization(db, refresh_type)
    else:
        max_commit_time_sr, max_commit_time_auth = get_last_run_time(db)
        print(
            f"Max commit time of previous Job: {max_commit_time_sr} for service requests and {max_commit_time_auth} for authorizations")
        sr = fetch_ghp_service_requests(db, refresh_type, max_commit_time_sr, cohereids)
        auth = fetch_ghp_authorization(db, refresh_type, max_commit_time_auth, cohereids)

    date_created = fetch_date_created(db)
    auth = auth.join(date_created, auth.sr_cohereid == date_created.dc_cohereid, "left") \
        .withColumn("sr_dateCreated",coalesce(col("dc_dateCreated"),col("sr_dateCreated"))).drop("dc_cohereid", "dc_dateCreated")
    sr = sr.unionByName(auth)

    sr = sr.withColumn("sr_max_commit_time",lit(max_commit_time_auth))
    return sr


def compute_final_df(sr, patient, pos, facility, provider, clinical_service, provider_raw_final):
    '''
    The first/Historical run will have authorization as N,
    and the incremental run will have it as either N or U; depending if the auth was created after last job run it would be N, else U
    The provider with provider type G could only be present in the servicing provider Id(performing provider), Name and tin list, ordering provider can only be of type P
    If provider type G is present in facility we send in additional notes field
    '''


    final_df = sr \
        .join(patient, sr.sr_patient == patient.patient_id, "left") \
        .join(pos, sr.sr_placeofservice == pos.pos_id, "left") \
        .withColumnRenamed("sr_ordering_provider_firstName","ordering_provider_first_name") \
        .withColumnRenamed("sr_ordering_provider_lastName","ordering_provider_last_name") \
        .withColumnRenamed("sr_ordering_provider_providerType","ordering_providerType") \
        .withColumnRenamed("sr_performing_provider_firstName","performing_provider_first_name") \
        .withColumnRenamed("sr_performing_provider_lastName","performing_provider_last_name") \
        .withColumnRenamed("sr_performing_provider_providerType","performing_providerType") \
        .withColumnRenamed("sr_performing_provider_name","performing_provider_name") \
        .withColumnRenamed("sr_facility_GHP_providerType","facility_providerType") \
        .withColumn("GHP_Authorization_IntegrationStatus", col("sr_integrationstatus")) \
        .join(provider_raw_final, sr.sr_ordering_provider_GHP_Id == provider_raw_final.GHP_Provider_Number,"left") \
        .withColumn("ordering_providerType",coalesce(col("Provider_Entity"),col("ordering_providerType"),lit("NA"))) \
        .drop("GHP_Provider_Number","Provider_Entity") \
        .join(provider_raw_final, sr.sr_performing_provider_GHP_Id == provider_raw_final.GHP_Provider_Number,"left") \
        .withColumn("performing_providerType",coalesce(col("Provider_Entity"),col("performing_providerType"),lit("NA"))) \
        .drop("GHP_Provider_Number","Provider_Entity") \
        .join(provider_raw_final, sr.sr_facility_GHP_Id == provider_raw_final.GHP_Provider_Number,"left") \
        .withColumn("facility_providerType",coalesce(col("Provider_Entity"),col("facility_providerType"),lit("NA"))) \
        .drop("GHP_Provider_Number","Provider_Entity") \
        .withColumn("GHP_assigned_vendor_code",lit("CH")) \
        .withColumn("GHP_vendor_authorization_number",coalesce(col("sr_cohereid"),col("sr_id"))) \
        .withColumn("GHP_record_type",lit("A")) \
        .withColumn("GHP_request_type",lower(trim(col("sr_encountertype")))) \
        .withColumn("GHP_subscriber_number",col("patient_subscriberID")) \
        .withColumn("GHP_member_number",col("patient_memberid")) \
        .withColumn("GHP_type_of_care_indicator",lit("L")) \
        .withColumn("GHP_service_category",lit("M")) \
        .withColumn("GHP_service_recieved_date",lit("")) \
        .withColumn("GHP_from_or_low_service_date",date_format(col("sr_startdate"),"yyyyMMdd")) \
        .withColumn("GHP_to_or_high_service_date",date_format(col("sr_enddate"),"yyyyMMdd")) \
        .withColumn("GHP_service_group_id",col("temp_service_group_id")) \
        .withColumn("GHP_procedure_code", col("sr_code")) \
        .withColumn("GHP_primary_diagnosis_code",lit("R6889")) \
        .withColumn("GHP_units_requested",col("sr_px_units")) \
        .withColumn("GHP_units_authorized",col("sr_approvedunits")) \
        .withColumn("GHP_units_approved",col("sr_approvedunits")) \
        .withColumn("GHP_service_status",trim(lower(col("sr_authstatus")))) \
        .withColumn("GHP_service_deny_date",date_format(col("sr_decisiontimestamp"),"yyyyMMdd")) \
        .withColumn("GHP_requesting_provider_id",when(col("ordering_providerType") == lit("G"),lit("ZZPRACNOTFND")).otherwise(coalesce(col("sr_ordering_provider_GHP_Id"),lit("ZZPRACNOTFND")))) \
        .withColumn("GHP_servicing_facility_id",when(col("facility_providerType") == lit("G"),lit("ZZFACINOTFND")).otherwise(coalesce(col("sr_facility_GHP_Id"),lit("ZZFACINOTFND")))) \
        .withColumn("GHP_servicing_facility_name",when(col("facility_providerType") == lit("G"),lit("")).otherwise(col("sr_facility_GHP_name"))) \
        .withColumn("GHP_authorized_place_of_service",col("pos_code")) \
        .withColumn("GHP_requested_place_of_service",col("pos_code")) \
        .withColumn("GHP_servicing_provider_id",coalesce(col("sr_performing_provider_GHP_Id"),lit("ZZPRACNOTFND"))) \
        .withColumn("GHP_servicing_provider_full_name",coalesce(concat(col("performing_provider_first_name"),lit(" "), col("performing_provider_last_name")),col("performing_provider_name"))) \
        .withColumn("GHP_servicing_provider_tax_id",col("sr_performing_tin")) \
        .withColumn("GHP_referring_provider_id",when(col("ordering_providerType") == lit("G"),lit("ZZPRACNOTFND")).otherwise(coalesce(col("sr_ordering_provider_GHP_Id"),lit("ZZPRACNOTFND")))) \
        .withColumn("GHP_referring_provider_full_name",when(col("ordering_providerType") == lit("G"),lit("")).otherwise(concat_ws(" ",col("ordering_provider_first_name"), col("ordering_provider_last_name")))) \
        .withColumn("GHP_referring_provider_tax_id",when(col("ordering_providerType") == lit("G"),lit("")).otherwise(col("sr_ordering_tin"))) \
        .withColumn("GHP_attending_provider_id",when(col("performing_providerType") == lit("G"),lit("ZZPRACNOTFND")).otherwise(coalesce(col("sr_performing_provider_GHP_Id"),lit("ZZPRACNOTFND")))) \
        .withColumn("GHP_authorization_type",lit("")) \
        .withColumn("GHP_entry_method",lower(trim(col("sr_requestorChannel")))) \
        .withColumn("GHP_procedure_code_modifier",lit("")) \
        .withColumn("GHP_total_length_of_stay_requested",lit("")) \
        .withColumn("GHP_total_length_of_stay_authorized",lit("")) \
        .withColumn("GHP_requested_admission_date",lit("")) \
        .withColumn("GHP_authorized_admission_date",lit("")) \
        .withColumn("GHP_actual_admission_date",lit("")) \
        .withColumn("GHP_discharge_expected_date_and_time",lit("")) \
        .withColumn("GHP_actual_expected_date_and_time",lit("")) \
        .withColumn("GHP_review_date",lit("")) \
        .withColumn("GHP_concurrent_reviewed_treatment_type",lit("")) \
        .withColumn("GHP_concurrent_reviewed_length_of_stay_requested",lit("")) \
        .withColumn("GHP_concurrent_reviewed_length_of_stay_authorized",lit("")) \
        .withColumn("GHP_requested_length_of_stay",lit("")) \
        .withColumn("GHP_authorized_length_of_stay",lit("")) \
        .withColumn("GHP_allowed_days_length_of_stay",lit("")) \
        .withColumn("GHP_referral_type",lower(trim(col("clinicalservice_name")))) \
        .withColumn("GHP_service_status_reason",col("sr_servicestatuscode")) \
        .withColumn("GHP_auth_hoodie_commit_time",col("sr_hoodie_commit_time")) \
        .withColumn("facility_GHP_address_line_1",coalesce(col("sr_facility_GHP_address_line_1"),lit(""))) \
        .withColumn("facility_GHP_address_line_2",coalesce(col("sr_facility_GHP_address_line_2"),lit(""))) \
        .withColumn("facility_GHP_address_city",coalesce(col("sr_facility_GHP_address_city"),lit(""))) \
        .withColumn("facility_GHP_address_state",coalesce(col("sr_facility_GHP_address_state"),lit(""))) \
        .withColumn("facility_GHP_address_zipcode",coalesce(col("sr_facility_GHP_address_zipcode"),lit(""))) \
        .withColumn("sr_facility_GHP_address",concat(col("facility_GHP_address_line_1"),lit(","),col("facility_GHP_address_line_2"),lit(","),col("facility_GHP_address_city"),lit(","),col("facility_GHP_address_state"),lit(","),col("facility_GHP_address_zipcode"))) \
        .withColumn("sr_facility_GHP_address",regexp_replace(col("sr_facility_GHP_address").cast("string"),"[^a-zA-Z0-9, ]","")) \
        .withColumn("temp_facility_id", when((col("facility_providerType") == lit("G")) & (col("performing_providerType") != lit("G")),col("sr_facility_GHP_Id")).otherwise(lit(None))) \
        .withColumn("temp_facility_notes",when(col("temp_facility_id").isNotNull(), \
                                               concat(col("sr_facility_GHP_name"),lit(";"),lit("TIN: "),col("sr_facility_GHP_tin"),lit(";"),col("sr_facility_GHP_address").cast("string"))) \
                    .otherwise(lit(""))) \
        .withColumn("sr_claimspaymentnote", regexp_replace(col("sr_claimspaymentnote"), "\n", " ")) \
        .withColumn("sr_claimspaymentnote", regexp_replace(col("sr_claimspaymentnote"), "  ", " ")) \
        .withColumn("GHP_additional_notes_line_1",substring("sr_claimspaymentnote",0,69)) \
        .withColumn("GHP_additional_notes_line_2",substring("sr_claimspaymentnote",70,139)) \
        .withColumn("GHP_additional_notes_line_3",substring("sr_claimspaymentnote",140,209)) \
        .withColumn("GHP_additional_notes_line_4",when(col("temp_facility_id").isNotNull(),concat_ws(": ", lit("Srv_Prv_ID"),col("sr_facility_GHP_Id")))) \
        .withColumn("GHP_additional_notes_line_5",substring("temp_facility_notes",0,69)) \
        .withColumn("GHP_additional_notes_line_6",substring("temp_facility_notes",70,139)) \
        .withColumn("GHP_additional_notes_line_7",when(col("sr_is_authorization") == "N",col("sr_id")).otherwise(lit(None))) \
        .withColumn("GHP_is_authorization",col("sr_is_authorization")) \
        .withColumn("GHP_max_hoodie_commit_time", to_timestamp(col("sr_max_commit_time"),"yyyyMMddHHmmssSSS")) \
        .withColumn("GHP_auth_last_updated", to_timestamp(col("sr_lastupdated")))

    '''
    Selecting the relevant fields for the final file output
    '''
    final_df = final_df.select(final_df.colRegex("`(GHP)+?.+`"))

    return final_df


def update_auth_status(final_df):
    '''
    Updating the Auth status based on units
    '''
    final_df = final_df.withColumn("GHP_service_status",
                                   when((~col("GHP_service_status").isin("withdrawn", "voided", "dismissed")), \
                                        when(coalesce(col("GHP_units_approved").cast("Integer"), lit("0")) == lit("0"),
                                             lit("denied")) \
                                        .otherwise(when((col("GHP_units_requested").cast("Integer") > col(
                                            "GHP_units_approved").cast("Integer")),
                                                        lit("partially_approved")).otherwise(lit("approved")))) \
                                   .otherwise(col("GHP_service_status")))

    return final_df


def update_end_date(final_df):
    '''
    Updating the end date(when blank) to be 90 days plus the service start date and if that 90 days goes over to next year, fill the date with 12/31/year
    else keep the original end date
    '''
    final_df = final_df.withColumn("tmp_end_date", date_add(
        date_format(to_date(col("GHP_from_or_low_service_date"), "yyyyMMdd"), "yyyy-MM-dd"), 90)) \
        .withColumn("current_year", year(current_date())) \
        .withColumn("year_end_date", concat_ws("-", col("current_year"), lit("12"), lit("31"))) \
        .withColumn("end_date", when(col("tmp_end_date") > col("year_end_date"), col("year_end_date")).otherwise(
        col("tmp_end_date"))) \
        .withColumn("end_date", date_format(col("end_date"), "yyyyMMdd")) \
        .withColumn("GHP_to_or_high_service_date", when(col("GHP_to_or_high_service_date").isNull(), col("end_date")) \
                    .otherwise(col("GHP_to_or_high_service_date"))) \
        .drop("tmp_end_date", "current_year", "year_end_date", "end_date")

    return final_df


def update_lookup_data(final_df, request_type, entry_method, service_group_id, referral_type, service_status):
    '''
    Using the lookup files to convert Cohere values to GHP values
    '''

    final_df = final_df.join(request_type, col("GHP_request_type") == col("Cohere_request_type_lookup"), "left") \
        .withColumn("GHP_request_type", col("GHP_request_type_lookup")) \
        .drop("Cohere_request_type_lookup", "GHP_request_type_lookup") \
        .join(service_status, col("GHP_service_status") == col("Cohere_service_status_code_lookup"), "left") \
        .withColumn("GHP_service_status", col("GHP_service_status_code_lookup")) \
        .drop("Cohere_service_status_code_lookup", "GHP_service_status_code_lookup") \
        .join(entry_method, col("GHP_entry_method") == col("Cohere_entry_method_code_lookup"), "left") \
        .withColumn("GHP_entry_method", col("GHP_entry_method_code_lookup")) \
        .drop("Cohere_entry_method_code_lookup", "GHP_entry_method_code_lookup") \
        .join(referral_type, col("GHP_referral_type") == col("Cohere_referral_service_name_lookup"), "left") \
        .withColumn("GHP_referral_type", when(col("GHP_referral_type_lookup").isNotNull(),col("GHP_referral_type_lookup")).otherwise(lit("DC"))) \
        .drop("Cohere_referral_service_name_lookup", "GHP_referral_type_lookup")

    return final_df


def filter_duplicate_auths_and_update_authorization_type(final_df, data_sent_to_ghp, cohereids=()):
    '''
    when running backfills for hudi job/migration for updating or adding fields in SR, service request data gets updated. This can lead to sending False(fields not used in integration job) updates to GHP
    The below code, unions the data already sent to GHP, partition on some fields and then duplicates(count more than 1) are filtered.
    If any data element for a record is changed the count of that record should be 1 and then we should only pull the latest record based on GHP_vendor_authorization_number
    '''

    groupbyfields = ['GHP_vendor_authorization_number', 'GHP_subscriber_number', 'GHP_member_number',
                     'GHP_from_or_low_service_date', 'GHP_to_or_high_service_date', 'GHP_service_group_id',
                     'GHP_procedure_code', 'GHP_units_requested', 'GHP_units_approved', \
                     'GHP_service_status', 'GHP_requesting_provider_id', 'GHP_servicing_facility_id',
                     'GHP_servicing_facility_name', 'GHP_authorized_place_of_service', 'GHP_requested_place_of_service', \
                     'GHP_servicing_provider_id', 'GHP_servicing_provider_full_name', 'GHP_servicing_provider_tax_id',
                     'GHP_referring_provider_id', 'GHP_referring_provider_full_name', 'GHP_referring_provider_tax_id',
                     'GHP_attending_provider_id', 'GHP_entry_method', 'GHP_referral_type', \
                     'GHP_service_status_reason', 'GHP_additional_notes_line_1', 'GHP_additional_notes_line_2',
                     'GHP_additional_notes_line_3', 'GHP_additional_notes_line_4']

    data_sent_to_ghp = data_sent_to_ghp \
        .withColumn("GHP_units_authorized",
                    when(col("GHP_service_status") == 'CL', None).otherwise(col("GHP_units_authorized"))) \
        .withColumn("GHP_units_approved",
                    when(col("GHP_service_status") == 'CL', None).otherwise(col("GHP_units_approved")))

    final_df = final_df \
        .withColumn("GHP_units_authorized",
                    when(col("GHP_service_status") == 'CL', None).otherwise(col("GHP_units_authorized"))) \
        .withColumn("GHP_units_approved",
                    when(col("GHP_service_status") == 'CL', None).otherwise(col("GHP_units_approved")))
    # col("GHP_auth_hoodie_commit_time") == col("GHP_max_hoodie_commit_time") check is necessary because
    # the hudi pipeline and this job can be running at the same time (exactly at 5 PM). When that happens, this job would
    # not pick up all the Authorizations with the same hoodie_commit_time for the day. In the next day,
    # col("GHP_auth_last_updated") >= col("GHP_max_hoodie_commit_time") would return false for the unprocessed Authorizations.
    # Hence we need this additional check.
    final_df_1 = final_df.withColumn("already_sent_data_indicator", lit("N")).unionByName(
        data_sent_to_ghp.withColumn("already_sent_data_indicator", lit("Y")), allowMissingColumns=True) \
        .withColumn("count_auths",
                    count("GHP_vendor_authorization_number").over(Window.partitionBy(groupbyfields).orderBy())) \
        .withColumn("dense_ranking", dense_rank().over(
        Window.partitionBy("GHP_vendor_authorization_number").orderBy(desc("GHP_auth_hoodie_commit_time")))) \
        .filter((col("GHP_Authorization_IntegrationStatus") == lit("READY")) | (col("GHP_vendor_authorization_number").isin(list(cohereids))) | (col("GHP_auth_hoodie_commit_time") == col("GHP_max_hoodie_commit_time")) | ((col("count_auths") == lit("1")) & (col("dense_ranking") == lit("1")) & ( \
            col("already_sent_data_indicator") == lit("N")) & (col("GHP_auth_last_updated") >= col("GHP_max_hoodie_commit_time"))))

    sent_data = data_sent_to_ghp.select("GHP_vendor_authorization_number").withColumnRenamed(
        "GHP_vendor_authorization_number", "sent_GHP_vendor_authorization_number").distinct()

    final_df_2 = final_df_1.join(sent_data,
                                 final_df_1.GHP_vendor_authorization_number == sent_data.sent_GHP_vendor_authorization_number,
                                 "left") \
        .withColumn("GHP_authorization_type",
                    when(col("sent_GHP_vendor_authorization_number").isNull(), lit("N")).otherwise(lit("U"))) \
        .drop("count_auths", "sent_GHP_vendor_authorization_number", "dense_ranking", "already_sent_data_indicator")

    return final_df_2


def filter_new_voided_auths(final_df):
    '''
    If an auth on creation is voided, dismissed or withdrawn, it does not need to be sent to GHP
    '''
    final_df = final_df.filter(
        ~((col("GHP_service_status") == lit("CL")) & (col("GHP_authorization_type") == lit("N"))))

    return final_df


def update_units_on_voided_auths(final_df):
    '''
    Updating the units to 0 when the auth status is in voided, withdrawm and dismissed(CL)
    '''
    final_df = final_df.withColumn("GHP_units_authorized",
                                   when(col("GHP_service_status") == lit("CL"), lit("")).otherwise(
                                       col("GHP_units_authorized"))) \
        .withColumn("GHP_units_approved",
                    when(col("GHP_service_status") == lit("CL"), lit("")).otherwise(col("GHP_units_approved")))

    return final_df


def fetch_old_sr_id_from_serviceRequestRelation(final_df):
    '''
    When auths are void and replace, we need to get the relation between new and old auth
    '''
    final_df = final_df.join(service_request_relation,
                             final_df.GHP_additional_notes_line_7 == service_request_relation.servicerequestrelation_to,
                             "left") \
        .withColumn("old_sr_id", col("servicerequestrelation_from")) \
        .drop("servicerequestrelation_from", "servicerequestrelation_to")

    return final_df


def fetch_old_auth_number_from_service_request(final_df):
    '''
    Find the cohereID based on the sr_id
    '''
    final_df = final_df.join(sr.select("sr_id", "sr_cohereid").distinct(), final_df.old_sr_id == sr.sr_id, "left") \
        .withColumn("old_auth_id", col("sr_cohereid")) \
        .drop("sr_id", "sr_cohereid")

    return final_df


def update_authorization_type(final_df, data_sent_to_ghp):
    '''
    An auth type of 'R' and old auth Id needs to be sent for an old auth(duplicate/got voided and replaced)
    '''
    data_sent_to_ghp = data_sent_to_ghp.select("GHP_vendor_authorization_number").withColumnRenamed(
        "GHP_vendor_authorization_number", "sent_GHP_vendor_authorization_number").distinct()

    final_df = final_df.join(data_sent_to_ghp,
                             final_df.old_auth_id == data_sent_to_ghp.sent_GHP_vendor_authorization_number, "left") \
        .withColumn("GHP_additional_notes_line_7", when(col("sent_GHP_vendor_authorization_number").isNotNull(),
                                                        concat(lit("old_auth_id: "), col("old_auth_id"))).otherwise(
        lit(None))) \
        .withColumn("GHP_authorization_type",
                    when(col("sent_GHP_vendor_authorization_number").isNotNull(), lit("R")).otherwise(
                        lit(col("GHP_authorization_type")))) \
        .drop("old_auth_id", "old_sr_id", "sent_GHP_vendor_authorization_number")

    return final_df


def fetch_data_sent_to_ghp(db):
    '''
   Fetches all distinct Auths previously sent to GHP, picking the latest one sent
   '''

    table_name = f'{db}.ghp_auth_file_integration_qc'

    data_sent_to_ghp = spark.sql(
        f"""
    select distinct ghp_assigned_vendor_code,ghp_vendor_authorization_number,ghp_record_type,ghp_request_type,ghp_subscriber_number\
    ,ghp_member_number,ghp_type_of_care_indicator,ghp_service_category,ghp_service_recieved_date,ghp_from_or_low_service_date\
    ,ghp_to_or_high_service_date,ghp_service_group_id,ghp_procedure_code,ghp_primary_diagnosis_code,ghp_units_requested,ghp_units_authorized\
    ,ghp_units_approved,ghp_service_status,ghp_service_deny_date,ghp_requesting_provider_id,ghp_servicing_facility_id,ghp_servicing_facility_name\
    ,ghp_authorized_place_of_service,ghp_requested_place_of_service,ghp_servicing_provider_id,ghp_servicing_provider_full_name,\
    ghp_servicing_provider_tax_id,ghp_referring_provider_id,ghp_referring_provider_full_name,ghp_referring_provider_tax_id,ghp_attending_provider_id\
    ,ghp_authorization_type,ghp_entry_method,ghp_procedure_code_modifier,ghp_total_length_of_stay_requested,ghp_total_length_of_stay_authorized\
    ,ghp_requested_admission_date,ghp_authorized_admission_date,ghp_actual_admission_date,ghp_discharge_expected_date_and_time,ghp_actual_expected_date_and_time\
    ,ghp_review_date,ghp_concurrent_reviewed_treatment_type,ghp_concurrent_reviewed_length_of_stay_requested,ghp_concurrent_reviewed_length_of_stay_authorized\
    ,ghp_requested_length_of_stay,ghp_authorized_length_of_stay,ghp_allowed_days_length_of_stay,ghp_referral_type,ghp_service_status_reason\
    ,ghp_auth_hoodie_commit_time,ghp_additional_notes_line_1,ghp_additional_notes_line_2,ghp_additional_notes_line_3,ghp_additional_notes_line_4\
    ,ghp_additional_notes_line_5,ghp_additional_notes_line_6,ghp_additional_notes_line_7,ghp_is_authorization,year,month,day,hour\
    from {table_name}
    """)
    data_sent_to_ghp = data_sent_to_ghp.withColumn("dense_ranking",dense_rank().over(Window.partitionBy("GHP_vendor_authorization_number"). \
                                                                                  orderBy(desc("GHP_auth_hoodie_commit_time"),desc("year"),desc("month"),desc("day"),desc("hour")))) \
        .filter(col("dense_ranking") == lit("1")) \
        .drop("dense_ranking","year","month","day","hour")

    print(f"Returned {data_sent_to_ghp.count()} distinct data_sent_to_ghp records")
    return data_sent_to_ghp

def update_service_deny_date(df):
    update_service_deny_date = df.withColumn("GHP_service_deny_date", when(col("GHP_service_status") == "DS", col("GHP_service_deny_date")).otherwise(lit(""))) \
    .withColumn("GHP_service_status_reason", when((col("GHP_service_status") == "CO") & (col("GHP_service_status_reason").isin("8M00", "DN00")), lit("")).otherwise(col("GHP_service_status_reason")))
    return update_service_deny_date

# def create_athena_table(glue_table,final_output_df,qc_file_athena_table_key):

#     print(f"In athena {final_output_df.count()}")
#     '''Get All column names and it's types'''
#     column_list=''
#     for field in final_output_df.schema.fields:
#         columns= "`"+field.name +"`  " +str(field.dataType) + " , "
#         column_list+= columns
#     column_list=column_list[0:-2]
#     column_list=column_list.replace("StringType", "string").replace("TimestampType", "Bigint")

#     '''drop the table if already created'''
#     spark.sql(f"drop table if exists {glue_table}")

#     '''create the table using the dataframe schema'''
#     spark.sql(f"CREATE EXTERNAL TABLE {glue_table} ({column_list}) PARTITIONED BY (year, month, day, hour) \
#         ROW FORMAT SERDE\
#       'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' \
#         STORED AS INPUTFORMAT \
#       'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' \
#         OUTPUTFORMAT \
#       'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'\
#       LOCATION \
#       '{qc_file_athena_table_key}'");



# def upload_file_to_s3(final_output_df,target_bucket_name,base_target_key_path,daily_path,file_date,file_time,glue_table):
#     '''
#     QC file in parquet format and Athena QC table
#     '''
#     qc_daily_path = f'year={file_date[0:4]}/month={file_date[4:6]}/day={file_date[6:8]}/hour={file_time[0:2]}'
#     qc_file_target_key = f"s3://{target_bucket_name}/{base_target_key_path}/qc/daily/{qc_daily_path}"
#     qc_file_athena_table_key = f"s3://{target_bucket_name}/{base_target_key_path}/qc/daily/"
#     print(f"Uploading Geisinger QC auth file to {qc_file_target_key}")
#     final_output_df = final_output_df.withColumn("year",lit(f"{file_date[0:4]}")) \
#         .withColumn("month",lit(f"{file_date[4:6]}")) \
#         .withColumn("day",lit(f"{file_date[6:8]}")) \
#         .withColumn("hour",lit(f"{file_time[0:2]}")) \
#         .withColumn("GHP_service_recieved_date",lit(file_date))

#     final_output_df.write.parquet(f"{qc_file_target_key}", mode="append")

#     final_output_df = final_output_df.localCheckpoint()

#     print(f"Wrote {final_output_df.count()} QC records")

#     '''
#     Writing Daily File
#     '''
#     daily_file_target_key = f"s3://{target_bucket_name}/{base_target_key_path}/raw/daily/{daily_path}"
#     print(f"Uploading Geisinger auth file to {daily_file_target_key}")
#     final_output_df_1 = final_output_df.drop("GHP_auth_hoodie_commit_time","year","month","day","hour","GHP_is_authorization","GHP_max_hoodie_commit_time","GHP_auth_last_updated")
#     final_output_df_1 = final_output_df_1.localCheckpoint()
#     final_output_df_1.coalesce(1).write.option("header", True) \
#         .option("delimiter", "|") \
#         .option("header", "false") \
#         .option("emptyValue", None) \
#         .option("nullValue", None) \
#         .mode("append") \
#         .csv(f"{daily_file_target_key}")

#     print(f"Wrote {final_output_df_1.count()} outbound records")

#     '''
#     Dropping and recreating the athena table
#     '''

#     create_athena_table(glue_table,final_output_df,qc_file_athena_table_key)
#     print("Athena table created")

# def rename_daily_s3_file(target_bucket_name,base_target_key_path,daily_path, final_file_name):
#     '''
#     Renaming Daily File
#     '''
#     daily_file_target_key = f"s3://{target_bucket_name}/{base_target_key_path}/raw/daily/{daily_path}"
#     hadoopPath = spark._jvm.org.apache.hadoop.fs.Path(f"{daily_file_target_key}/*")
#     hadoopFs = hadoopPath.getFileSystem(spark._jvm.org.apache.hadoop.conf.Configuration())
#     statuses = hadoopFs.globStatus(hadoopPath)
#     file_name = [file.getPath().getName() for file in statuses if file.getPath().getName().startswith('part-')][0]
#     print(file_name)
#     hadoopFs.rename(spark._jvm.org.apache.hadoop.fs.Path(f"{daily_file_target_key}/{file_name}"), spark._jvm.org.apache.hadoop.fs.Path(f"{daily_file_target_key}/{final_file_name}"))
#     print("File Renamed")
#     hadoopFs.delete(spark._jvm.org.apache.hadoop.fs.Path(f"{daily_file_target_key}/_SUCCESS"))
#     print("Success File Deleted")

# def copy_daily_s3_file_for_encryption(base_target_key_path,daily_path, final_file_name, env ):
#     '''
#     Copying the file from Cohere-health-external-data to cohere-sftp/cryptography to encrpyt and send the file to GHP
#     '''
#     s3 = boto3.resource('s3')
#     source_target_key_path = f"{base_target_key_path}/raw/daily/{daily_path}"
#     encrypt_target_key_path = f"cryptography/{env}/encrypt/geisinger/outbound"
#     copy_source = {
#         'Bucket': 'cohere-health-external-data',
#         'Key': f'{source_target_key_path}/{final_file_name}'
#     }
#     s3.meta.client.copy(copy_source, 'cohere-sftp', f'{encrypt_target_key_path}/{final_file_name}')
#     print(f'{final_file_name} copied to cohere-sftp/{encrypt_target_key_path}/{final_file_name}')

def mapping_environment(env):
    db = "analytics_dw_dev"
    ghp_prefix = "geisinger-dev"
    if env == 'prod':
        db = "analytics_dw"
        ghp_prefix = "geisinger"
    elif env == 'uat':
        db = "analytics_dw_uat"
        ghp_prefix = "geisinger-uat"
    elif env == 'preprod':
        db = "analytics_dw_preprod"
        ghp_prefix = "geisinger-preprod"

    return db, ghp_prefix

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
if True:

    AWS_REGION = "us-east-2"
    cw_client = boto3.client('cloudwatch', AWS_REGION)
    env = 'prod'
    refresh_type = 'Incremental'
    authFileUpdateHardLimit = 600
#     if sys.argv[1] != "{}":
#         paraDict = json.loads(sys.argv[1])
#         for i in paraDict:
#             if i == "env":
#                 env = paraDict[i]  ## have to passed as "env":"dev" ##

#             elif i == "refresh_type":
#                 refresh_type = paraDict[i]  ## have to passed as "refresh_type":"full_refresh" ##

#             elif i == "authFileUpdateHardLimit":
#                 authFileUpdateHardLimit = paraDict[i]  ## have to pass as "authFileUpdateHardLimit":300 ##

#             elif i == "cohereids":
#                 cohereids = tuple(paraDict[i])  ## have to pass as "cohereids" : ('id1','id2')

    cohereids = ('GFXU3603')
    print("Env: " + str(env))
    print("Run type: " + str(refresh_type))
    print("cohereids: " + str(cohereids))
    db, ghp_prefix = mapping_environment(env)

    # define the times to fetch provider file
    s3_resource = boto3.resource('s3')

    objects = list(
        s3_resource.Bucket('cohere-health-external-data').objects.filter(Prefix=f"payer/{ghp_prefix}/providers/"))

    objects.sort(key=lambda o: o.last_modified)
    provider_file_name = objects[-1].key

    print(f"latest provider file name : {provider_file_name}")

    ##Reading lookup Files##
    request_type = spark.read.csv("s3://cohere-analytics-airflow-prod/lookup-data/GHP_request_type_lookup.csv", sep=',',
                                  header=True)
    entry_method = spark.read.csv("s3://cohere-analytics-airflow-prod/lookup-data/GHP_entry_method_lookup.csv", sep=',',
                                  header=True)
    service_group_id = spark.read.csv("s3://cohere-analytics-airflow-prod/lookup-data/GHP_service_id_lookup.csv",
                                      sep=',', header=True)
    referral_type = spark.read.csv("s3://cohere-analytics-airflow-prod/lookup-data/GHP_referral_type_lookup.csv",
                                   sep=',', header=True)
    service_status = spark.read.csv("s3://cohere-analytics-airflow-prod/lookup-data/GHP_service_status_lookup.csv",
                                    sep=',', header=True)
    prov_raw = spark.read.csv(f"s3://cohere-health-external-data/{provider_file_name}", sep='|', header=True)
    provider_raw_final = prov_raw.select("GHP_Provider_Number", "Provider_Entity").distinct()

    sr = check_refresh_type(db, refresh_type, cohereids)
    sr = sr.filter((sr.sr_updatedby != '20230629-denormalize-provider-facility-authorization') &(sr.sr_updatedby != '20230628-denormalize-provider-facility-srs'))
    sr = sr.drop("sr_updatedby")

    if sr.count() == 0:
        print("Received 0 authorizations/service requests")
        log_num_of_auths(cw_client, sr, env)
    else:
        pos = fetch_place_of_service(db)
        provider = fetch_provider(db)
        facility = fetch_facility(db)
        patient = fetch_patient(db)
        health_plan_name = fetch_health_plan_name(db)
        clinical_service = fetch_clinical_service(db, health_plan_name)
        service_request_relation = fetch_service_request_relation(db)
        data_sent_to_ghp = fetch_data_sent_to_ghp(db)
        sr = handle_semantic_procedure_codes(sr, clinical_service, service_group_id, refresh_type)

        final_df = compute_final_df(sr, patient, pos, facility, provider, clinical_service, provider_raw_final)
        final_df = final_df.distinct()

        final_auth_status_df = update_auth_status(final_df)
        final_end_date_df = update_end_date(final_auth_status_df)
        final_lookup_update_df = update_lookup_data(final_end_date_df, request_type, entry_method, service_group_id,
                                                    referral_type, service_status)
        final_filtered_data_to_send_to_GHP = filter_duplicate_auths_and_update_authorization_type(
            final_lookup_update_df, data_sent_to_ghp, cohereids)
        final_without_new_voided_auths_df = filter_new_voided_auths(final_filtered_data_to_send_to_GHP)
        final_update_units_on_voided_auths_df = update_units_on_voided_auths(final_without_new_voided_auths_df)
        final_output_df = final_update_units_on_voided_auths_df.distinct()
        old_sr_id = fetch_old_sr_id_from_serviceRequestRelation(final_update_units_on_voided_auths_df)
        old_auth_number = fetch_old_auth_number_from_service_request(old_sr_id)
        update_auth_type = update_authorization_type(old_auth_number, data_sent_to_ghp)

        ##Taking the distinct to avoid duplicate records
        distinct_update_auth_type = update_auth_type.distinct()
        final_output_df = update_service_deny_date(distinct_update_auth_type).drop("GHP_Authorization_IntegrationStatus")

        ##Adding checkpoint to make sure that data is not altered while writting files at different location

        final_output_df = final_output_df.localCheckpoint()

        auths_without_pxcode_and_servicegroupid = final_output_df.filter((final_output_df.GHP_service_group_id.isNull()) & (final_output_df.GHP_procedure_code == "")).count()
        if auths_without_pxcode_and_servicegroupid > 0:
            auths_without_pxcode_and_servicegroupid = final_output_df.filter((final_output_df.GHP_service_group_id.isNull()) & (final_output_df.GHP_procedure_code == ""))
#             log_auths_without_pxcode_and_servicegroup(cw_client,auths_without_pxcode_and_servicegroupid,env)
            final_output_df = final_output_df.filter(~((col("GHP_service_group_id").isNull()) & (col("GHP_procedure_code") == "")))
            print("Logging rows with both Service Group Id and Procedure Codes as Null")
            auths_without_pxcode_and_servicegroupid = auths_without_pxcode_and_servicegroupid.collect()
            for row in auths_without_pxcode_and_servicegroupid:
                print("Authorization without service group and procedure code: ", row['GHP_vendor_authorization_number'])

        rows_with_pxcode_and_servicegroupid = final_output_df.filter((final_output_df.GHP_service_group_id.isNotNull()) & (final_output_df.GHP_procedure_code != "")).count()
        if rows_with_pxcode_and_servicegroupid>0:
            rows_with_pxcode_and_servicegroupid = final_output_df.filter((final_output_df.GHP_service_group_id.isNotNull()) & (final_output_df.GHP_procedure_code!= ""))
#             log_auths_with_both_pxcode_and_servicegroup(cw_client,rows_with_pxcode_and_servicegroupid,env)
            final_output_df = final_output_df.filter(~((col("GHP_service_group_id").isNotNull()) & (col("GHP_procedure_code") != "")))
            print("Logging rows with both Service Group Id and Procedure Codes populated")
            rows_with_pxcode_and_servicegroupid = rows_with_pxcode_and_servicegroupid.collect()
            for row in rows_with_pxcode_and_servicegroupid:
                print("Authorization with both service group and procedure code: ", row['GHP_vendor_authorization_number'])

        denied_auths_with_no_service_status_code = final_output_df.filter((final_output_df.GHP_service_status == "DS") & ((final_output_df.GHP_service_status_reason == "") | (final_output_df.GHP_service_status_reason.isNull()))).count()
        if denied_auths_with_no_service_status_code > 0:
            denied_auths_with_no_service_status_code = final_output_df.filter((final_output_df.GHP_service_status == "DS") & ((final_output_df.GHP_service_status_reason == "") | (final_output_df.GHP_service_status_reason.isNull())))
#             log_denied_auths_with_no_service_status_code(cw_client, denied_auths_with_no_service_status_code, env)
            final_output_df = final_output_df.filter(~((final_output_df.GHP_service_status == "DS") & ((final_output_df.GHP_service_status_reason == "")|(final_output_df.GHP_service_status_reason.isNull()))))
            print("Logging rows with denied austh with no service status reason")
            denied_auths_with_no_service_status_code = denied_auths_with_no_service_status_code.collect()
            for row in denied_auths_with_no_service_status_code:
                print("Authorization with Denied status but no service status reason: ", row['GHP_vendor_authorization_number'])

        column_order = final_output_df.columns
        expected_column_order = ['GHP_assigned_vendor_code', 'GHP_vendor_authorization_number', 'GHP_record_type', 'GHP_request_type', 'GHP_subscriber_number', 'GHP_member_number', 'GHP_type_of_care_indicator', 'GHP_service_category', 'GHP_service_recieved_date', 'GHP_from_or_low_service_date', 'GHP_to_or_high_service_date', 'GHP_service_group_id', 'GHP_procedure_code', 'GHP_primary_diagnosis_code', 'GHP_units_requested', 'GHP_units_authorized', 'GHP_units_approved', 'GHP_service_status', 'GHP_service_deny_date', 'GHP_requesting_provider_id', 'GHP_servicing_facility_id', 'GHP_servicing_facility_name', 'GHP_authorized_place_of_service', 'GHP_requested_place_of_service', 'GHP_servicing_provider_id', 'GHP_servicing_provider_full_name', 'GHP_servicing_provider_tax_id', 'GHP_referring_provider_id', 'GHP_referring_provider_full_name', 'GHP_referring_provider_tax_id', 'GHP_attending_provider_id', 'GHP_authorization_type', 'GHP_entry_method', 'GHP_procedure_code_modifier', 'GHP_total_length_of_stay_requested', 'GHP_total_length_of_stay_authorized', 'GHP_requested_admission_date', 'GHP_authorized_admission_date', 'GHP_actual_admission_date', 'GHP_discharge_expected_date_and_time', 'GHP_actual_expected_date_and_time', 'GHP_review_date', 'GHP_concurrent_reviewed_treatment_type', 'GHP_concurrent_reviewed_length_of_stay_requested', 'GHP_concurrent_reviewed_length_of_stay_authorized', 'GHP_requested_length_of_stay', 'GHP_authorized_length_of_stay', 'GHP_allowed_days_length_of_stay', 'GHP_referral_type', 'GHP_service_status_reason', 'GHP_auth_hoodie_commit_time', 'GHP_additional_notes_line_1', 'GHP_additional_notes_line_2', 'GHP_additional_notes_line_3', 'GHP_additional_notes_line_4', 'GHP_additional_notes_line_5', 'GHP_additional_notes_line_6', 'GHP_additional_notes_line_7', 'GHP_is_authorization', 'GHP_max_hoodie_commit_time', 'GHP_auth_last_updated']
        if column_order != expected_column_order:
            print("Incorrect column order, stopping the job")
#             log_incorrect_column_order(cw_client, env)
#             spark.stop()

        elif final_output_df.count() == 0:
            print("There are no records to write")
#             log_num_of_auths(cw_client, final_output_df, env)

        elif final_output_df.filter(final_output_df.GHP_authorization_type == 'U').count() > authFileUpdateHardLimit:
            print("The number of updates in the auth file has exceeded the hard limit threshold , stopping the job ")
#             log_auth_file_update_size_exceeded_hard_limit(cw_client, final_output_df, env)
#             spark.stop()

        else:
        ##Calculating Date Time for the files##
            print("Job successful")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:

mismatched input 'and' expecting {<EOF>, ';'}(line 1, pos 2887)

== SQL ==
select fulldocument._id,fulldocument.cohereid, fulldocument.startdate, fulldocument.enddate, fulldocument.encounterType, fulldocument.units, fulldocument.authStatus, fulldocument.placeofservice, fulldocument.selectedFacility as facility ,     fulldocument.selectedOrderingProvider as orderingprovider , fulldocument.selectedPerformingProvider as performingprovider, fulldocument.patient, fulldocument.healthplanname, fulldocument.clinicalservice, fulldocument.clinicalservices,     fulldocument.requestor.channel as requestorChannel, fulldocument.approvedunits as auth_approved_units,fulldocument.semanticprocedurecodes,fulldocument.selectedPerformingProvider.selectedLocation.tin as performing_tin, fulldocument.claimspaymentnote, fulldocument.servicestatuscode,    _hoodie_commit_time as hoodie_commit_time , fulldocument.dateCreated, fulldocument.lastUpdated, fulldocument.selectedPerformingProv

In [None]:
# spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

In [6]:
final_output_df.filter(col('GHP_vendor_authorization_number').isin(list(cohereids))).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
name 'final_output_df' is not defined
Traceback (most recent call last):
NameError: name 'final_output_df' is not defined



In [None]:
final_output_df.filter(final_output_df.GHP_authorization_type == 'U').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

388

In [None]:
final_output_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1536

In [None]:
check_refresh_type_df = check_refresh_type(db, refresh_type, cohereids).filter(col('sr_cohereid').isin(list(cohereids)))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

max_commit_time data returned
Adding prefix to max
Max commit time of previous Job: 20230911193230673 for service requests and 20230911200211012 for authorizations
Adding prefix to sr
service_request data returned
Adding prefix to sr
authorization data returned
Adding prefix to dc
dateCreated data returned

In [None]:
sr_with_clinical = check_refresh_type_df.filter((size(col("sr_clinicalservices")) >= 1) & (col("sr_dateCreated") >= unix_timestamp(lit("2023-05-14 18:00:00")).cast('timestamp')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
sr_with_clinical.show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------+-----------+--------------------+--------------------+----------------+--------+-------------+------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
check_refresh_type(db, refresh_type, cohereids).filter(col('sr_cohereid').isin(list(cohereids))).count()

In [None]:
check_refresh_type(db, refresh_type, cohereids).filter(col('sr_cohereid').isin(list(cohereids))).show(30, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

max_commit_time data returned
Adding prefix to max
Max commit time of previous Job: 20230911193230673 for service requests and 20230911200211012 for authorizations
Adding prefix to sr
service_request data returned
Adding prefix to sr
authorization data returned
Adding prefix to dc
dateCreated data returned
+------------------------+-----------+--------------------+--------------------+----------------+--------+-------------+------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------

In [None]:
sr.filter(col('sr_cohereid').isin(list(cohereids))).count()

In [None]:
sr.filter(col('sr_cohereid').isin(list(cohereids))).show(15, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------------+----------+----------------+--------+-------------+-----------------+-----------+-------------------+---------------------+----------+-----------------+------------------+-------------------+-------------------+-----------------+--------------------+--------------------+---------------------+--------------+--------------+-----------------------------+------------------+---------------------------+---------------+--------------------+----------------------------+-------------------+-----+-------------------------------+------------------------------+------------------------------+-----------------------------+----------------------------+--------------------+------------------------------+-----------------------------+---------------------------------+--------------------------------+-------------------------------+-----------------------------------+---------------------------+--------------------+-------------------+------------------+-----------+-------+----

In [None]:
final_lookup_update_df.filter(col('GHP_vendor_authorization_number').isin(list(cohereids))).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------------------+------------------------+-------------------------------+---------------+----------------+---------------------+-----------------+--------------------------+--------------------+-------------------------+----------------------------+---------------------------+--------------------+------------------+--------------------------+-------------------+--------------------+------------------+------------------+---------------------+--------------------------+-------------------------+---------------------------+-------------------------------+------------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+----------------------+----------------+---------------------------+----------------------------------+-----------------------------------+----------------------------+------------------

In [None]:
final_filtered_data_to_send_to_GHP.filter(col('GHP_vendor_authorization_number').isin(list(cohereids))).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------------------+------------------------+-------------------------------+---------------+----------------+---------------------+-----------------+--------------------------+--------------------+-------------------------+----------------------------+---------------------------+--------------------+------------------+--------------------------+-------------------+--------------------+------------------+------------------+---------------------+--------------------------+-------------------------+---------------------------+-------------------------------+------------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+----------------------+----------------+---------------------------+----------------------------------+-----------------------------------+----------------------------+------------------

In [None]:
len(cohereids)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

24

In [None]:
dataframe = final_output_df.filter(col('GHP_vendor_authorization_number').isin(list(cohereids)))
dataframe.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [None]:
non_duplicates = dataframe.dropDuplicates(['GHP_vendor_authorization_number'])
non_duplicates.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [None]:
window_spec = Window.partitionBy("GHP_vendor_authorization_number")
df_with_count = dataframe.withColumn("count", count("GHP_vendor_authorization_number").over(window_spec))
duplicates = df_with_count.filter(col("count") > 1)
duplicates.show(truncate=False)
duplicates.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------+-------------------------------+---------------+----------------+---------------------+-----------------+--------------------------+--------------------+-------------------------+----------------------------+---------------------------+--------------------+------------------+--------------------------+-------------------+--------------------+------------------+------------------+---------------------+--------------------------+-------------------------+---------------------------+-------------------------------+------------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+----------------------+----------------+---------------------------+----------------------------------+-----------------------------------+----------------------------+-----------------------------+------------------------

In [None]:
cohereids_dataframe = spark.createDataFrame([(GHP_vendor_authorization_number,) for GHP_vendor_authorization_number in list(cohereids)], ["GHP_vendor_authorization_number"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
cohereids_dataframe.show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------------+
|GHP_vendor_authorization_number|
+-------------------------------+
|BZWA9879                       |
|DJXK4047                       |
|IPFK9564                       |
|KGPN2264                       |
|MWGL7517                       |
|FKLW1334                       |
|KHOZ8644                       |
|GXBI7365                       |
|VCUJ6612                       |
|INWK5973                       |
|AMLC7798                       |
|UWCE5132                       |
|TGPI0049                       |
|QUON8248                       |
|CYMY9687                       |
|KYHW7236                       |
|EDAG7731                       |
|XVQU9416                       |
|YJJI8714                       |
|MPFN1053                       |
+-------------------------------+
only showing top 20 rows

In [None]:
filtered_out_cohereids = cohereids_dataframe.join(dataframe, on="GHP_vendor_authorization_number", how="left_anti")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
filtered_out_cohereids.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

24

In [None]:
# dataframe.filter(col("GHP_authorization_type") == "U").show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
final_output_df.filter(col('GHP_vendor_authorization_number').isin("DVIO1503", "FCZN5546")).show(truncate=False)
# inpatient_groupids.show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------+-------------------------------+---------------+----------------+---------------------+-----------------+--------------------------+--------------------+-------------------------+----------------------------+---------------------------+--------------------+------------------+--------------------------+-------------------+--------------------+------------------+------------------+---------------------+--------------------------+-------------------------+---------------------------+-------------------------------+------------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+----------------------+----------------+---------------------------+----------------------------------+-----------------------------------+----------------------------+-----------------------------+------------------------

In [None]:
sr.filter(col('sr_cohereid').isin("DVIO1503", "FCZN5546")).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+--------------------+----------------+--------+-------------+------------------------+------------------------+-----------------+------------------------+--------------------------+-------------------+-----------------+--------------------+--------------------+---------------------+-----------------------+-----------------------+-----------------------------+------------------+---------------------------+---------------+------------------------+----------------------------+-------------------+------------------------+-------------------------------+------------------------------+------------------------------+-----------------------------+----------------------------+-------------+------------------------------+-----------------------------+---------------------------------+--------------------------------+-------------------------------+-----------------------------------+---------------------------+---------------------------+------------------------

In [None]:
final_df.filter(col('GHP_vendor_authorization_number').isin("DVIO1503", "FCZN5546")).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------------+-----------------------------------+------------------------+-------------------------------+---------------+----------------+---------------------+-----------------+--------------------------+--------------------+-------------------------+----------------------------+---------------------------+--------------------+--------------------------+------------------+--------------------------+-------------------+--------------------+------------------+------------------+---------------------+--------------------------+-------------------------+---------------------------+-------------------------------+------------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+----------------------+----------------+---------------------------+----------------------------------+--------------------------

In [None]:
final_lookup_update_df.filter(col('GHP_vendor_authorization_number').isin("DVIO1503", "FCZN5546")).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------------+-----------------------------+-----------------------------------+------------------------+---------------+----------------+---------------------+-----------------+--------------------------+--------------------+-------------------------+----------------------------+---------------------------+--------------------+--------------------------+------------------+--------------------------+-------------------+--------------------+------------------+------------------+---------------------+--------------------------+-------------------------+---------------------------+-------------------------------+------------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+----------------------+----------------+---------------------------+----------------------------------+--------------------------

In [None]:
final_filtered_data_to_send_to_GHP.filter(col('GHP_vendor_authorization_number').isin("DVIO1503", "FCZN5546")).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------------+-----------------------------+-----------------------------------+------------------------+---------------+----------------+---------------------+-----------------+--------------------------+--------------------+-------------------------+----------------------------+---------------------------+--------------------+--------------------------+------------------+--------------------------+-------------------+--------------------+------------------+------------------+---------------------+--------------------------+-------------------------+---------------------------+-------------------------------+------------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+----------------------+----------------+---------------------------+----------------------------------+--------------------------

In [None]:
distinct_update_auth_type.filter(col('GHP_vendor_authorization_number').isin("DVIO1503", "FCZN5546")).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------------+-----------------------------+-----------------------------------+------------------------+---------------+----------------+---------------------+-----------------+--------------------------+--------------------+-------------------------+----------------------------+---------------------------+--------------------+--------------------------+------------------+--------------------------+-------------------+--------------------+------------------+------------------+---------------------+--------------------------+-------------------------+---------------------------+-------------------------------+------------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+--------------------------------+-----------------------------+-------------------------+----------------------+----------------+---------------------------+----------------------------------+--------------------------

In [None]:
db, ghp_prefix = mapping_environment('prod')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
tz = timezone('US/Eastern')
current_timestamp_est = datetime.now(tz)
target_bucket_name = 'cohere-health-external-data'

base_target_key_path = f"payer/{ghp_prefix}/auth/outbound"
glue_table = f"{db}.ghp_auth_file_integration_qc"

file_date = current_timestamp_est.strftime("%Y%m%d")
file_time = current_timestamp_est.strftime("%H%M%S")
now_str = current_timestamp_est.strftime("%Y%m%d%H%M%S")
daily_path = f"{file_date[0:4]}/{file_date[4:6]}/{file_date[6:8]}"
final_file_name = f"COHERE{'_Q4' if env == 'dev' else ''}_GHP_MedicalAuth_File_{file_date}_{file_time}.txt"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# upload_file_to_s3(final_output_df,target_bucket_name,base_target_key_path,daily_path,file_date,file_time,glue_table)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Uploading Geisinger QC auth file to s3://cohere-health-external-data/payer/geisinger-dev/auth/outbound/qc/daily/year=2023/month=08/day=18/hour=21
Wrote 695 QC records
Uploading Geisinger auth file to s3://cohere-health-external-data/payer/geisinger-dev/auth/outbound/raw/daily/2023/08/18
Wrote 695 outbound records
In athena 695
Athena table created

In [None]:
# rename_daily_s3_file(target_bucket_name,base_target_key_path,daily_path,final_file_name)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

part-00000-613ba626-22c3-4fda-b056-a42867806e36-c000.csv
File Renamed
Success File Deleted

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

analytics_dw_dev.ghp_auth_file_integration_qc

In [None]:
final_output_df = final_output_df.withColumn("year",lit(f"{file_date[0:4]}")) \
        .withColumn("month",lit(f"{file_date[4:6]}")) \
        .withColumn("day",lit(f"{file_date[6:8]}")) \
        .withColumn("hour",lit(f"{file_time[0:2]}")) \
        .withColumn("GHP_service_recieved_date",lit(file_date))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
column_list=''
for field in final_output_df.schema.fields:
    columns= "`"+field.name +"`  " +str(field.dataType) + " , "
    column_list+= columns
column_list=column_list[0:-2]
column_list=column_list.replace("StringType", "string").replace("TimestampType", "Bigint")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
qc_file_athena_table_key = f"s3://{target_bucket_name}/{base_target_key_path}/qc/daily/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
print(glue_table)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

analytics_dw.ghp_auth_file_integration_qc

In [None]:
print(qc_file_athena_table_key)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://cohere-health-external-data/payer/geisinger/auth/outbound/qc/daily/

In [None]:
column_list = '`GHP_assigned_vendor_code`  string , `GHP_vendor_authorization_number`  string , `GHP_record_type`  string , `GHP_request_type`  string , `GHP_subscriber_number`  string , `GHP_member_number`  string , `GHP_type_of_care_indicator`  string , `GHP_service_category`  string , `GHP_service_recieved_date`  string , `GHP_from_or_low_service_date`  string , `GHP_to_or_high_service_date`  string , `GHP_service_group_id`  string , `GHP_procedure_code`  string , `GHP_primary_diagnosis_code`  string , `GHP_units_requested`  string , `GHP_units_authorized`  string , `GHP_units_approved`  string , `GHP_service_status`  string , `GHP_service_deny_date`  string , `GHP_requesting_provider_id`  string , `GHP_servicing_facility_id`  string , `GHP_servicing_facility_name`  string , `GHP_authorized_place_of_service`  string , `GHP_requested_place_of_service`  string , `GHP_servicing_provider_id`  string , `GHP_servicing_provider_full_name`  string , `GHP_servicing_provider_tax_id`  string , `GHP_referring_provider_id`  string , `GHP_referring_provider_full_name`  string , `GHP_referring_provider_tax_id`  string , `GHP_attending_provider_id`  string , `GHP_authorization_type`  string , `GHP_entry_method`  string , `GHP_procedure_code_modifier`  string , `GHP_total_length_of_stay_requested`  string , `GHP_total_length_of_stay_authorized`  string , `GHP_requested_admission_date`  string , `GHP_authorized_admission_date`  string , `GHP_actual_admission_date`  string , `GHP_discharge_expected_date_and_time`  string , `GHP_actual_expected_date_and_time`  string , `GHP_review_date`  string , `GHP_concurrent_reviewed_treatment_type`  string , `GHP_concurrent_reviewed_length_of_stay_requested`  string , `GHP_concurrent_reviewed_length_of_stay_authorized`  string , `GHP_requested_length_of_stay`  string , `GHP_authorized_length_of_stay`  string , `GHP_allowed_days_length_of_stay`  string , `GHP_referral_type`  string , `GHP_service_status_reason`  string , `GHP_auth_hoodie_commit_time`  string , `GHP_additional_notes_line_1`  string , `GHP_additional_notes_line_2`  string , `GHP_additional_notes_line_3`  string , `GHP_additional_notes_line_4`  string , `GHP_additional_notes_line_5`  string , `GHP_additional_notes_line_6`  string , `GHP_additional_notes_line_7`  string , `GHP_is_authorization`  string , `GHP_max_hoodie_commit_time`  Bigint , `GHP_auth_last_updated`  Bigint , `year`  string , `month`  string , `day`  string , `hour`  string'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
print(column_list)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

`GHP_assigned_vendor_code`  string , `GHP_vendor_authorization_number`  string , `GHP_record_type`  string , `GHP_request_type`  string , `GHP_subscriber_number`  string , `GHP_member_number`  string , `GHP_type_of_care_indicator`  string , `GHP_service_category`  string , `GHP_service_recieved_date`  string , `GHP_from_or_low_service_date`  string , `GHP_to_or_high_service_date`  string , `GHP_service_group_id`  string , `GHP_procedure_code`  string , `GHP_primary_diagnosis_code`  string , `GHP_units_requested`  string , `GHP_units_authorized`  string , `GHP_units_approved`  string , `GHP_service_status`  string , `GHP_service_deny_date`  string , `GHP_requesting_provider_id`  string , `GHP_servicing_facility_id`  string , `GHP_servicing_facility_name`  string , `GHP_authorized_place_of_service`  string , `GHP_requested_place_of_service`  string , `GHP_servicing_provider_id`  string , `GHP_servicing_provider_full_name`  string , `GHP_servicing_provider_tax_id`  string , `GHP_referring

In [None]:
spark.sql(f"drop table if exists {glue_table}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [None]:
spark.sql(f"CREATE EXTERNAL TABLE {glue_table} ({column_list}) PARTITIONED BY (year, month, day, hour) \
        ROW FORMAT SERDE\
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' \
        STORED AS INPUTFORMAT \
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' \
        OUTPUTFORMAT \
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'\
      LOCATION \
      '{qc_file_athena_table_key}'")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [None]:
spark.sql("MSCK REPAIR TABLE analytics_dw.ghp_auth_file_integration_qc")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [None]:
db = "analytics_dw_preprod"
max_commit_time_sr, max_commit_time_auth = get_last_run_time(db)
print(f"max_commit_time_sr = {max_commit_time_sr}")
print(f"max_commit_time_auth = {max_commit_time_auth}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

max_commit_time data returned
Adding prefix to max
max_commit_time_sr = 20230512143350363
max_commit_time_auth = 20230808183419148

In [None]:
cohere_id = 'PMFI7890'

# Query to select specific columns
# servicerequest_query = f"select fulldocument.patientStayDaysRequested, fulldocument.patientStayDaysApproved, fulldocument.requestType, fulldocument.authorization, fulldocument.decisiontimestamp as reviewDate from analytics_dw_preprod.hudi_raw_servicerequest where fulldocument.healthplanname = 'Geisinger' and fulldocument.authStatus in ('APPROVED', 'DENIED', 'PARTIALLY_APPROVED', 'WITHDRAWN', 'VOIDED', 'DISMISSED') and fullDocument.deleted != true and fullDocument.encounterType = 'INPATIENT' and _hoodie_commit_time > {max_commit_time_sr} and fullDocument.cohereid in ('{cohere_id}')"

# Query to select all columns
# servicerequest_query = f"select * from analytics_dw_preprod.hudi_raw_servicerequest where fulldocument.healthplanname = 'Geisinger' and fulldocument.authStatus in ('APPROVED', 'DENIED', 'PARTIALLY_APPROVED', 'WITHDRAWN', 'VOIDED', 'DISMISSED') and fullDocument.deleted != true and fullDocument.encounterType = 'INPATIENT' and _hoodie_commit_time > {max_commit_time_sr} and fullDocument.cohereid in ('{cohere_id}')"

# Query without and WHERE clauses
servicerequest_query = f"select * from analytics_dw_preprod.hudi_raw_servicerequest where fullDocument.cohereid in ('{cohere_id}')"

servicerequest_dataframe = spark.sql(servicerequest_query)
servicerequest_dataframe.show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-----------------------+------------------------+----------------------+-------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Temporary
# max_commit_time_auth = 0

authorization_id = '64cd3dd6f4b03e0001540481'

# Current Query
# authorization_query = f"select fulldocument._id,fulldocument.servicerequestids, fulldocument.authnumber as cohereid,fulldocument.startdate, fulldocument.enddate, fulldocument.requestedunits as units,     fulldocument.healthplanname,      fulldocument.approvedunits as auth_approved_units,fulldocument.claimspaymentnote, fulldocument.servicestatuscode,    _hoodie_commit_time as hoodie_commit_time ,fulldocument.dateCreated,fulldocument.requestorChannel, fulldocument.patient,    fulldocument.lastupdated, fulldocument.authstatus, fulldocument.encounterType, fulldocument.placeofservice, fulldocument.clinicalservice, fulldocument.clinicalservices, fulldocument.semanticprocedurecodes,    fulldocument.selectedPerformingProvider.selectedLocation.tin as performing_tin, fulldocument.selectedPerformingProvider.selectedLocation.externalReferenceId as performing_provider_GHP_Id,    fulldocument.selectedFacility.selectedLocation.externalReferenceId as facility_GHP_Id, fulldocument.selectedOrderingProvider.selectedLocation.externalReferenceId as ordering_provider_GHP_Id, fulldocument.selectedOrderingProvider.selectedLocation.tin as ordering_tin,    fulldocument.selectedFacility.name as facility_GHP_name, fulldocument.selectedFacility.selectedLocation.address.city as facility_GHP_address_city,fulldocument.selectedFacility.selectedLocation.tin as facility_GHP_tin,fulldocument._id as id,fulldocument.selectedFacility.selectedLocation.address.zipcode as facility_GHP_address_zipcode,    fulldocument.selectedFacility.selectedLocation.address.line1 as facility_GHP_address_line_1, fulldocument.selectedFacility.selectedLocation.address.line2 as facility_GHP_address_line_2,fulldocument.selectedFacility.selectedLocation.address.state as facility_GHP_address_state, fulldocument.selectedFacility.providerType as facility_GHP_providerType, fulldocument.decisiontimestamp,    fulldocument.selectedOrderingProvider.firstName as ordering_provider_firstName, fulldocument.selectedOrderingProvider.lastName as ordering_provider_lastName, fulldocument.selectedOrderingProvider.providerType as ordering_provider_providerType,    fulldocument.selectedPerformingProvider.firstName as performing_provider_firstName, fulldocument.selectedPerformingProvider.lastName as performing_provider_lastName, fulldocument.selectedPerformingProvider.providerType as performing_provider_providerType, fulldocument.selectedPerformingProvider.name as performing_provider_name,    fulldocument.expectedAdmissionDate, fulldocument.admissionDateTime as actualAdmissionDateTime, fulldocument.actualDischargeDateTime, fulldocument.authCategory from analytics_dw_preprod.hudi_raw_authorization where fulldocument.healthplanname = 'Geisinger' and fullDocument.deleted!=true and _hoodie_commit_time > {max_commit_time_auth} and fullDocument.encounterType = 'INPATIENT'"

# Query without the WHERE hoodie commit time
authorization_query = f"select fulldocument._id,fulldocument.servicerequestids, fulldocument.authnumber as cohereid,fulldocument.startdate, fulldocument.enddate, fulldocument.requestedunits as units,     fulldocument.healthplanname,      fulldocument.approvedunits as auth_approved_units,fulldocument.claimspaymentnote, fulldocument.servicestatuscode,    _hoodie_commit_time as hoodie_commit_time ,fulldocument.dateCreated,fulldocument.requestorChannel, fulldocument.patient,    fulldocument.lastupdated, fulldocument.authstatus, fulldocument.encounterType, fulldocument.placeofservice, fulldocument.clinicalservice, fulldocument.clinicalservices, fulldocument.semanticprocedurecodes,    fulldocument.selectedPerformingProvider.selectedLocation.tin as performing_tin, fulldocument.selectedPerformingProvider.selectedLocation.externalReferenceId as performing_provider_GHP_Id,    fulldocument.selectedFacility.selectedLocation.externalReferenceId as facility_GHP_Id, fulldocument.selectedOrderingProvider.selectedLocation.externalReferenceId as ordering_provider_GHP_Id, fulldocument.selectedOrderingProvider.selectedLocation.tin as ordering_tin,    fulldocument.selectedFacility.name as facility_GHP_name, fulldocument.selectedFacility.selectedLocation.address.city as facility_GHP_address_city,fulldocument.selectedFacility.selectedLocation.tin as facility_GHP_tin,fulldocument._id as id,fulldocument.selectedFacility.selectedLocation.address.zipcode as facility_GHP_address_zipcode,    fulldocument.selectedFacility.selectedLocation.address.line1 as facility_GHP_address_line_1, fulldocument.selectedFacility.selectedLocation.address.line2 as facility_GHP_address_line_2,fulldocument.selectedFacility.selectedLocation.address.state as facility_GHP_address_state, fulldocument.selectedFacility.providerType as facility_GHP_providerType, fulldocument.decisiontimestamp,    fulldocument.selectedOrderingProvider.firstName as ordering_provider_firstName, fulldocument.selectedOrderingProvider.lastName as ordering_provider_lastName, fulldocument.selectedOrderingProvider.providerType as ordering_provider_providerType,    fulldocument.selectedPerformingProvider.firstName as performing_provider_firstName, fulldocument.selectedPerformingProvider.lastName as performing_provider_lastName, fulldocument.selectedPerformingProvider.providerType as performing_provider_providerType, fulldocument.selectedPerformingProvider.name as performing_provider_name,    fulldocument.expectedAdmissionDate, fulldocument.admissionDateTime as actualAdmissionDateTime, fulldocument.actualDischargeDateTime, fulldocument.authCategory from analytics_dw_preprod.hudi_raw_authorization where fulldocument.healthplanname = 'Geisinger' and fullDocument.deleted!=true and fullDocument.encounterType = 'INPATIENT'"
authorization_dataframe = spark.sql(authorization_query)

authorization_dataframe.filter(authorization_dataframe._id == authorization_id).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------+--------------------------+--------+--------------------+-------+-----+--------------+-------------------+-----------------+-----------------+------------------+------------------------+----------------+------------------------+------------------------+----------+-------------+------------------------+---------------+----------------+----------------------+--------------+--------------------------+---------------+------------------------+------------+-----------------+-------------------------+----------------+------------------------+----------------------------+---------------------------+---------------------------+--------------------------+-------------------------+------------------------+---------------------------+--------------------------+------------------------------+-----------------------------+----------------------------+--------------------------------+------------------------+---------------------+-----------------------+---------------------

In [None]:
final_dataframe = servicerequest_dataframe.join(authorization_dataframe, servicerequest_dataframe.authorization == authorization_dataframe._id, "inner")
final_dataframe.show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------+-----------------------+------------+------------------------+------------------------+------------------------+--------------------------+--------+--------------------+--------------------+-----+--------------+-------------------+-----------------+-----------------+------------------+------------------------+----------------+------------------------+------------------------+----------+-------------+------------------------+---------------+----------------+----------------------+--------------+--------------------------+---------------+------------------------+------------+-----------------------+-------------------------+----------------+------------------------+----------------------------+---------------------------+---------------------------+--------------------------+-------------------------+------------------------+---------------------------+--------------------------+------------------------------+-----------------------------+-----------------------

In [None]:
authorization_dataframe.filter(authorization_dataframe._id == "64d235ec897c620001995301").show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------+--------------------------+--------+--------------------+--------------------+-----+--------------+-------------------+-----------------+-----------------+------------------+------------------------+----------------+------------------------+------------------------+----------+-------------+------------------------+---------------+----------------+----------------------+--------------+--------------------------+---------------+------------------------+------------+-----------------------+-------------------------+----------------+------------------------+----------------------------+---------------------------+---------------------------+--------------------------+-------------------------+------------------------+---------------------------+--------------------------+------------------------------+-----------------------------+----------------------------+--------------------------------+------------------------+---------------------+-----------------------+--