# Data validation for bulk patient import

In [4]:
import pandas as pd

pd.to_datetime('4/8/1982', format='%m/%d/%Y')

Timestamp('1982-04-08 00:00:00')

In [72]:
import numpy as np
import pandas as pd
pd.set_option('display.max_colwidth', 0)
from datetime import datetime
from dateutil.relativedelta import relativedelta

from tqdm import tqdm

%load_ext line_profiler

The line_profiler extension is already loaded. To reload it, use:
  %reload_ext line_profiler


In [81]:
df = pd.read_csv("sample_import.csv", dtype = str)
df

Unnamed: 0,Patient First Name,Patient Last Name,Date of Birth,From Date of Service,To Date of Service,MRN / Patient Account #,Payer ID,Service Type,Member ID,Provider NPI,Client ID
0,John,Smith,8/1/1984,8/12/2021,8/12/2021,1234,AETNA,30,W123456,1234567890,test_client
1,Christine,Wales,10/15/1980,4/99/2021,4/12/2021,35693,G60021,PT,few291093dfj2,2345678901,test_client
2,Jose,Xaiver,1/1/2000,4/12/2021,10/15/1982,F3466,G60021,98,f323523,3456789912,test_client
3,Gary,Jackson,8/1/1984,8/12/2021,8/12/2021,A3921,Cigna,33,F1234,3456789912,test_client
4,Michael,Steele,1/1/2000,4/12/2021,4/14/2021,4567y7,AARP,PT,Bth24rju39,5679011934,test_client
5,Sam,Chan,1/2/1800,10/15/1982,4/12/2021,g2345,60023,30,W00123,6790122945,test_client


## Generate simulation file

Set `generate_large_test_file` args

In [74]:
def generate_large_test_file(rows, clean_data_rate):
    
    df = pd.read_csv("sample_import.csv", dtype = str)
    
    # duplicate rows
    import random
    df2 = pd.DataFrame(np.repeat(df.values, (rows//len(df)), axis=0), columns=df.columns)
    df2 = df2.sample(frac=1).reset_index(drop=True)
    
    assert 0 < clean_data_rate < 1, "clean_data_rate in (0,1)."
    
    df2_clean = df2[:int(len(df2)*clean_data_rate)]
    df2_dirty = df2[int(len(df2)*clean_data_rate):]
        
    for col in df2_dirty.columns:
        df2_dirty.loc[df2_dirty.sample(frac=1/len(df2_dirty.columns)).index, col] = np.nan
    
    df2_dirty = df2_dirty.fillna("")
    
    # simulate garbage entries
    df2_dirty.loc[df2_dirty.sample(frac=.05).index, "Patient First Name"] = " "
    df2_dirty.loc[df2_dirty.sample(frac=.001).index, "Patient First Name"] = "FNU"
    df2_dirty.loc[df2_dirty.sample(frac=.0005).index, "Patient Last Name"] = "Unknown"
    
    df2_dirty.loc[df2_dirty.sample(frac=.05).index, "Date of Birth"] = "01/01/1800"
    df2_dirty.loc[df2_dirty.sample(frac=.05).index, "Date of Birth"] = "12/31/2200"
    
    df2_dirty.loc[df2_dirty.sample(frac=.05).index, "From Date of Service"] = "12/31/2025"
    df2_dirty.loc[df2_dirty.sample(frac=.05).index, "To Date of Service"] = "01/01/2015"
    
    df2_dirty.loc[df2_dirty.sample(frac=.01).index, "Provider NPI"] = "999999999"

    # clean up
    df3 = df2_clean.append(df2_dirty, ignore_index=True).sample(frac=1).reset_index(drop=True)
    print(f"\n--> Generated test file with {rows=:,} and {clean_data_rate=}.")
    
    df3.to_csv("sample_import_500_000.csv", index=False)
    print(f"--> Saved test file.\n")
    
# %lprun -f generate_large_test_file 
generate_large_test_file(rows=int(5e5), clean_data_rate=.25) 

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._setitem_single_block(indexer, value, name)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  iloc._setitem_with_indexer(indexer, value, self.name)



--> Generated test file with rows=500,000 and clean_data_rate=0.25.
--> Saved test file.



## Validation

### Load file

In [87]:
"""Validate bulk file for eligibilty status and claim status.

Inputs:
- file_path: <str>
- eligibility_or_claim: {"eligibility", "claim"}
"""
file_path = "sample_import_500_000.csv"
eligibility_or_claim = "claim"


df = pd.read_csv(file_path, dtype = str, keep_default_na=False)
df.head(50)

Unnamed: 0,Patient First Name,Patient Last Name,Date of Birth,From Date of Service,To Date of Service,MRN / Patient Account #,Payer ID,Service Type,Member ID,Provider NPI,Client ID
0,Sam,Chan,1/2/1800,10/15/1982,4/12/2021,g2345,60023,30,W00123,6790122945.0,test_client
1,Gary,,8/1/1984,8/12/2021,,A3921,,33,F1234,3456789912.0,
2,Jose,Xaiver,1/1/2000,,10/15/1982,F3466,G60021,98,f323523,3456789912.0,test_client
3,Jose,Xaiver,1/1/2000,4/12/2021,10/15/1982,F3466,G60021,98,f323523,3456789912.0,test_client
4,Jose,Xaiver,1/1/2000,4/12/2021,10/15/1982,F3466,G60021,98,f323523,3456789912.0,test_client
5,,Chan,1/2/1800,10/15/1982,4/12/2021,g2345,60023,30,W00123,6790122945.0,test_client
6,Michael,Steele,1/1/2000,4/12/2021,4/14/2021,4567y7,AARP,PT,Bth24rju39,5679011934.0,test_client
7,Gary,Jackson,8/1/1984,8/12/2021,8/12/2021,A3921,Cigna,33,,3456789912.0,test_client
8,Sam,Chan,1/2/1800,10/15/1982,4/12/2021,g2345,60023,30,W00123,6790122945.0,test_client
9,Jose,Xaiver,1/1/2000,4/12/2021,10/15/1982,F3466,G60021,98,f323523,3456789912.0,test_client


In [90]:
req_cols = ['Patient First Name',
            'Patient Last Name',
            'Date of Birth',
            'From Date of Service',
            'To Date of Service',
            'MRN / Patient Account #',
            'Payer ID',
            'Service Type',
            'Member ID',
            'Provider NPI',
            'Client ID']

assert set(req_cols).issubset(set(df.columns)), "Bad file, missing required columns. "

df = df.fillna("").apply(lambda x: x.str.strip())

# if tDOS not given, default to fDOS
df['To Date of Service'] = df['To Date of Service'].mask(df['To Date of Service'].eq(''),df['From Date of Service'])

if eligibility_or_claim == "claim":
    # if service type not given, default to 30
    df.loc[df["Service Type"] == "", 'Service Type'] = "30"

In [91]:
df.head(50)

Unnamed: 0,Patient First Name,Patient Last Name,Date of Birth,From Date of Service,To Date of Service,MRN / Patient Account #,Payer ID,Service Type,Member ID,Provider NPI,Client ID
0,Sam,Chan,1/2/1800,10/15/1982,4/12/2021,g2345,60023,30,W00123,6790122945.0,test_client
1,Gary,,8/1/1984,8/12/2021,8/12/2021,A3921,,33,F1234,3456789912.0,
2,Jose,Xaiver,1/1/2000,,10/15/1982,F3466,G60021,98,f323523,3456789912.0,test_client
3,Jose,Xaiver,1/1/2000,4/12/2021,10/15/1982,F3466,G60021,98,f323523,3456789912.0,test_client
4,Jose,Xaiver,1/1/2000,4/12/2021,10/15/1982,F3466,G60021,98,f323523,3456789912.0,test_client
5,,Chan,1/2/1800,10/15/1982,4/12/2021,g2345,60023,30,W00123,6790122945.0,test_client
6,Michael,Steele,1/1/2000,4/12/2021,4/14/2021,4567y7,AARP,PT,Bth24rju39,5679011934.0,test_client
7,Gary,Jackson,8/1/1984,8/12/2021,8/12/2021,A3921,Cigna,33,,3456789912.0,test_client
8,Sam,Chan,1/2/1800,10/15/1982,4/12/2021,g2345,60023,30,W00123,6790122945.0,test_client
9,Jose,Xaiver,1/1/2000,4/12/2021,10/15/1982,F3466,G60021,98,f323523,3456789912.0,test_client


### Main sauce

In [101]:
import boto3
import pymysql 

session = boto3.Session(profile_name='beatsbulkdev')
ssm = session.client("ssm")

rds_db = ssm.get_parameter(Name="RDS_DATABASE")["Parameter"]["Value"]
rds_host = ssm.get_parameter(Name="RDS_HOSTNAME")["Parameter"]["Value"]
name = ssm.get_parameter(Name="RDS_USERNAME")["Parameter"]["Value"]
password = ssm.get_parameter(Name="RDS_PASSWORD")["Parameter"]["Value"]
db_name = ""
port = int(ssm.get_parameter(Name="RDS_PORT")["Parameter"]["Value"])

conn = pymysql.connect(host=rds_host, user=name, port=port,
                                   passwd=password, db=db_name, connect_timeout=5)

def db_query(query):
    with conn.cursor() as cur:
        cur.execute(query)
        conn.commit()

        return cur.fetchall()

valid_service_types = db_query(query=f"SELECT service_type_name FROM {rds_db}.service_types;")
valid_service_types = [stype[0] for stype in valid_service_types]



In [116]:
aliases = db_query(query=f"""SELECT payer_alias.payer_alias_identifier,
                                payer_mapping.external_payer_name, 
                                payer_mapping.eligibility_enrollment_required,
                                payer_mapping.claims_enrollment_required,
                                payer_mapping.enrollment_required
                            FROM {rds_db}.payer_mapping 
                            right join {rds_db}.payer_alias 
                            on payer_alias.payer_mapping_id=payer_mapping.payer_mapping_id
                            where payer_alias.created_by='BH1234';""")

In [117]:
aliases = pd.DataFrame(aliases, columns=["alias", "payer", "eligibility_enrollment_required", "claims_enrollment_required", "enrollment_required"])
aliases = dict(zip(alias_df.alias, alias_df.set_index("alias").to_dict('records')))

In [93]:
def validate_bulk(df, eligibility_claims_era):
    
    validation_matrix = []

    # first name
    validation_matrix.append( ["" if name and 1<len(name)<46 else "Missing First Name. " for name in df["Patient First Name"]] )
    
    # last name
    validation_matrix.append( ["" if name and 1<len(name)<46 else "Missing Last Name. " for name in df["Patient Last Name"]] )
    
    # dob
    _valid_dob = ( pd.to_datetime(pd.Timestamp(datetime.now() - relativedelta(years=120))) <  pd.to_datetime(df['Date of Birth'], format='%m/%d/%Y', errors="coerce") ) & \
                                ( pd.to_datetime(df['Date of Birth'], format='%m/%d/%Y', errors="coerce") < pd.Timestamp(datetime.now()) )
    validation_matrix.append( ["" if i else "Invalid DOB: require MM/DD/YYYY < today. " for i in _valid_dob] )

    # dos
    _fdos = pd.to_datetime(df['From Date of Service'], format='%m/%d/%Y', errors="coerce")
    _valid_fdos = ~_fdos.isnull() & ( _fdos > pd.Timestamp(datetime.now() - relativedelta(years=120)) )
    _tdos = pd.to_datetime(df['To Date of Service'], format='%m/%d/%Y', errors="coerce")
    _valid_tdos = ~_tdos.isnull() & ( _tdos < pd.Timestamp(datetime.now() + relativedelta(years=10)) )
    validation_matrix.append( ["" if valid_fdos and valid_tdos and tdos >= fdos  \
                               else "Invalid To/From DoS: MM/DD/YYYY and From DoS <= To DoS. "  \
                               for valid_fdos, valid_tdos, fdos, tdos in zip(_valid_fdos, _valid_tdos, _fdos, _tdos)] )

    # payer id
    validation_matrix.append( ["" if payer_id and 1<len(payer_id)<46 else "Missing Payer ID. " for payer_id in df["Payer ID"]] )
    validation_matrix.append( ["" if payer_id in aliases else "Given Payer ID not mapped in mapping file. " for payer_id in df["Payer ID"]] )
    
    if eligibility_claims_era == "eligibility":
        
        def check_eligibility(payer_id):
            
            res = aliases.get(payer_id, {}).get("eligibility_enrollment_required", {})
            
            if res == "Y":
                return "Provider enrollment is required. "
            elif res == "N/A":
                return f"Eligibility check not available for {aliases.get(payer_id, {}).get("payer", "this Payer ID")}. "
            elif res == "N":
                return ""
            else:
                return ""

        validation_matrix.append( [check_eligibility(payer_id) for payer_id in df["Payer ID"]] )

    elif eligibility_claims_era == "claims":
        
        def check_claims(payer_id):
            
            res = aliases.get(payer_id, {}).get("claims_enrollment_required", {})
            
            if res == "Y":
                return "Provider enrollment is required. "
            elif res == "N/A":
                return f"Claims verification not available for {aliases.get(payer_id, {}).get("payer", "this Payer ID")}. "
            elif res == "N":
                return ""
            else:
                return ""
        
        validation_matrix.append( [check_claims(payer_id) for payer_id in df["Payer ID"]] )
        
    elif eligibility_claims_era == "era":
        
        def check_era(payer_id):
            
            res = aliases.get(payer_id, {}).get("enrollment_required", {})
            
            if res == "Y":
                return "Provider enrollment is required. "
            elif res == "N/A":
                return f"ERA not available for {aliases.get(payer_id, {}).get("payer", "this Payer ID")}. "
            elif res == "N":
                return ""
            else:
                return ""

        validation_matrix.append( [check_era(payer_id) for payer_id in df["Payer ID"]] )

    # service type
    validation_matrix.append( ["" if stype in valid_service_types else "Invalid Service Type. " for stype in df["Service Type"]] )

    # member id
    validation_matrix.append( ["" if member_id else "Missing Member ID. " for member_id in df["Member ID"]] )

    # provider npi
    validation_matrix.append( ["" if npi.isdigit() and len(npi)==10 else "Missing 10-digit NPI. " for npi in df["Provider NPI"]] )
    
    # client id
    validation_matrix.append( ["" if client_id else "Missing Client ID. " for client_id in df["Client ID"]] )

    # zip-a-dee-doo-dah 
    df["Errors"] = ["".join(x) for x in zip(*validation_matrix)]
    
    return df

In [95]:
df = validate_bulk(df)
df.head(50)

Unnamed: 0,Patient First Name,Patient Last Name,Date of Birth,From Date of Service,To Date of Service,MRN / Patient Account #,Payer ID,Service Type,Member ID,Provider NPI,Client ID,Errors
0,Sam,Chan,1/2/1800,10/15/1982,4/12/2021,g2345,60023,30,W00123,6790122945.0,test_client,Invalid DOB: require MM/DD/YYYY < today.
1,Gary,,8/1/1984,8/12/2021,8/12/2021,A3921,,33,F1234,3456789912.0,,Missing Last Name. Missing Payer ID. Missing Client ID.
2,Jose,Xaiver,1/1/2000,,10/15/1982,F3466,G60021,98,f323523,3456789912.0,test_client,Invalid To/From DoS: MM/DD/YYYY and To DoS not before From DoS.
3,Jose,Xaiver,1/1/2000,4/12/2021,10/15/1982,F3466,G60021,98,f323523,3456789912.0,test_client,Invalid To/From DoS: MM/DD/YYYY and To DoS not before From DoS.
4,Jose,Xaiver,1/1/2000,4/12/2021,10/15/1982,F3466,G60021,98,f323523,3456789912.0,test_client,Invalid To/From DoS: MM/DD/YYYY and To DoS not before From DoS.
5,,Chan,1/2/1800,10/15/1982,4/12/2021,g2345,60023,30,W00123,6790122945.0,test_client,Missing First Name. Invalid DOB: require MM/DD/YYYY < today.
6,Michael,Steele,1/1/2000,4/12/2021,4/14/2021,4567y7,AARP,PT,Bth24rju39,5679011934.0,test_client,
7,Gary,Jackson,8/1/1984,8/12/2021,8/12/2021,A3921,Cigna,33,,3456789912.0,test_client,Missing Member ID.
8,Sam,Chan,1/2/1800,10/15/1982,4/12/2021,g2345,60023,30,W00123,6790122945.0,test_client,Invalid DOB: require MM/DD/YYYY < today.
9,Jose,Xaiver,1/1/2000,4/12/2021,10/15/1982,F3466,G60021,98,f323523,3456789912.0,test_client,Invalid To/From DoS: MM/DD/YYYY and To DoS not before From DoS.


## *fin.*

## Test write speed to singular table in RDS of 200k rows of fake data

In [None]:
import boto3
import pymysql
import urllib.parse
from sqlalchemy import create_engine

ssm = boto3.client("ssm")

db_env = ssm.get_parameter(Name="RDS_DATABASE")["Parameter"]["Value"]
rds_host = ssm.get_parameter(Name="RDS_HOSTNAME")["Parameter"]["Value"]
user = ssm.get_parameter(Name="RDS_USERNAME")["Parameter"]["Value"]
password = ssm.get_parameter(Name="RDS_PASSWORD")["Parameter"]["Value"]
db_name = ""
port = int(ssm.get_parameter(Name="RDS_PORT")["Parameter"]["Value"])

# conn = pymysql.connect(host=rds_host, user=user, port=port, passwd=password, db=db_name, connect_timeout=5)
connect_str = f"mysql+pymysql://{user}:{urllib.parse.quote_plus(password)}@{rds_host}:3306/{db_env}"
print(f"{connect_str=}")
engine = create_engine(connect_str, echo=False, future=True)

In [None]:
df

In [None]:
df_to_sql = df.loc[df.Errors == "", df.columns != 'Errors']
df_to_sql

In [None]:
%time df_to_sql[:200000].to_sql(name="test_data_valid", con=engine, if_exists='replace', index=True, index_label="test_data_valid_id", chunksize=(min(10000, int(len(df_to_sql)//10))), method="multi")

## *fin.*

# Write data to payer_mapping table

In [49]:
import pandas as pd

df = pd.read_csv("payer_id_mapping_protected.csv", dtype={"Unique Identifier":'int',
                                                          "External Payer Name":'str',
                                                          "Display_ID":'str',
                                                         "Eligibility_Payer_ID": 'str',
                                                         "Eligibility_Enrollment_Required":"str",
                                                         "Claims_Payer_ID":"str",
                                                         "Claims_Enrollment_Required":"str",
                                                          "ERA_Payer_ID":"str",
                                                          "ERA_Enrollment_Required":"str"
                                                         },
                 index_col=None, header=0, na_filter=False, )
df.head()

Unnamed: 0,Unique Identifier,External Payer Name,Display_ID,Eligibility_Payer_ID,Eligibility_Enrollment_Required,Claims_Payer_ID,Claims_Enrollment_Required,ERA_Payer_ID,ERA_Enrollment_Required
0,1,CENTIVO,,45564,N,45564.0,N,45564.0,Y
1,2,WILSON MCSHANE,,41095,N,,,,
2,3,THE GUARDIAN LIFE INSURANCE COMPANY OF AMERICA,,64246,N,64246.0,N,64246.0,Y
3,4,MANAGED CARE OF NORTH AMERICA (MCNA) DENTAL,,65030,N,,,,
4,5,1199 NATIONAL BENEFIT FUND,,13162,N,,,13162.0,Y


In [50]:
df.replace(r'^\s*$', np.nan, regex=True, inplace = True)

In [53]:
df = df.where(pd.notnull(df), None)

In [60]:
df

Unnamed: 0,Unique Identifier,External Payer Name,Display_ID,Eligibility_Payer_ID,Eligibility_Enrollment_Required,Claims_Payer_ID,Claims_Enrollment_Required,ERA_Payer_ID,ERA_Enrollment_Required
0,1,CENTIVO,,45564,N,45564,N,45564,Y
1,2,WILSON MCSHANE,,41095,N,,,,
2,3,THE GUARDIAN LIFE INSURANCE COMPANY OF AMERICA,,64246,N,64246,N,64246,Y
3,4,MANAGED CARE OF NORTH AMERICA (MCNA) DENTAL,,65030,N,,,,
4,5,1199 NATIONAL BENEFIT FUND,,13162,N,,,13162,Y
...,...,...,...,...,...,...,...,...,...
5261,5262,ZAVALLA,,,,,,A0245,Y
5262,5263,ZENITH AMERICAN SOLUTIONS INC,,,,,,89677,Y
5263,5264,ZENITH LUMINX,,,,,,7386647821,Y
5264,5265,ZURICH FARMERS,,,,,,J1015,Y


In [None]:
%%time

import boto3
import pymysql 

session = boto3.Session(profile_name='beatsbulkdev')
ssm = session.client("ssm")

rds_db = ssm.get_parameter(Name="RDS_DATABASE")["Parameter"]["Value"]
rds_host = ssm.get_parameter(Name="RDS_HOSTNAME")["Parameter"]["Value"]
name = ssm.get_parameter(Name="RDS_USERNAME")["Parameter"]["Value"]
password = ssm.get_parameter(Name="RDS_PASSWORD")["Parameter"]["Value"]
db_name = ""
port = int(ssm.get_parameter(Name="RDS_PORT")["Parameter"]["Value"])

conn = pymysql.connect(host=rds_host, user=name, port=port,
                                   passwd=password, db=db_name, connect_timeout=5)

# insert new records
with conn.cursor() as cur:
    #cur.execute("SET foreign_key_checks = 0")
    cur.executemany(f"""INSERT INTO BeatsBulkDev.payer_mapping(payer_mapping_id, 
                            external_payer_name, display_id, eligibility_payer_id, eligibility_enrollment_required,
                            claims_payer_id, claims_enrollment_required, era_payer_id, enrollment_required, created_on, created_by) 
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), 'benedict.au@beatshealthconsulting.com')""", 
                    df.values.tolist())
    #cur.execute("SET foreign_key_checks = 1")
    conn.commit()
    

CPU times: user 2.8 s, sys: 458 ms, total: 3.26 s
Wall time: 2min 46s


In [None]:
session = boto3.Session(profile_name='beatsbulkdev')
s3 = session.client("s3")

s3.

## Data validation for Payer ID mapping sheet

In [51]:
import os
import json
import requests

def call_aws_api_lambda(api_url, client_id, file):
    payload = {
        "client_id": client_id, 
        "file": file
        }
    print(payload)
    r = requests.post(api_url, json=payload)
    
    if r.status_code==200:
        return r.json()
    else: 
        return r.status_code, r.text
    
api_url = "https://0ltcqd9tmg.execute-api.us-east-2.amazonaws.com/test/payer_map"
client_id = "A12345"
file = "tmp/payer_id_mapping.xlsx"
    
call_aws_api_lambda(api_url, client_id, payer_map_s3_bucket, payer_map_s3_dir)

{'client_id': 'A12345', 'payer_map_s3_bucket': 'benedictau-rnd-ohio', 'payer_map_s3_dir': 'tmp/payer_id_mapping.xlsx'}


{'client_id': 'A12345',
 'payer_map_s3_uri': 'tmp/payer_id_mapping.xlsx',
 'status': 'Fail',
 'success_count': 3,
 'fail_count': 3}

In [119]:
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

CLIENT_ID = "BH12345"
to = 'payer_id_mapping.xlsx' # "s3://benedictau-rnd-ohio/tmp/payer_id_mapping.xlsx"
MASTER_FILE_DIR = 'payer_map_master_03182022.xlsx' # "s3://benedictau-rnd-ohio/tmp/payer_map_master_03182022.xlsx"

payer_map_master_df = pd.read_excel(MASTER_FILE_DIR, index_col=None, header=0, na_filter=False)
file_required_headers = payer_map_master_df.columns.tolist()

In [120]:
payer_map_df = pd.read_excel(MAP_FILE_DIR, index_col=None, header=0, na_filter=False)
payer_map_headers = payer_map_df.columns.tolist()

if payer_map_headers[:len(file_required_headers)] != file_required_headers:
    print(f"Invalid column headers in file. Require: {*payer_map_headers,}. ")
    # return client id, file location in outbox, error signal
else:
    payer_map_df = payer_map_df[file_required_headers] # drop validation check columns
    
if len(payer_map_df) != len(payer_map_master_df):
    print(f"Invalid file. Row mismatch. ")
    # return client id, file location in outbox, error signal


NameError: name 'MAP_FILE_DIR' is not defined

In [None]:
# validate and export

def validate_payer_map(df):
    
    payer_alias_df = df.iloc[:, -10:]
    payer_alias_val_count_df = payer_alias_df.apply(lambda x: x.value_counts(dropna=True))
    payer_alias_val_count_df.drop("", inplace=True) # drop empty string "" count

    # create list of duplicate values that are across the spreadsheet
    duplicate_values_list = payer_alias_val_count_df.index[payer_alias_val_count_df.sum(axis=1, skipna=True)>1].to_list()
    # print(f"{duplicate_values_list=}")

    # for each duplicate value, we get the source row indicies
    validation_results = [""]*len(payer_alias_df)
    
    for val in duplicate_values_list:
        rows = payer_alias_df.loc[payer_alias_df.eq(val).any(axis=1)].index.tolist()
        error_msg = f"Alias ID/Name `{val}` duplicated in rows {[int(row+2) for row in rows]}. "
        for row in rows:
            validation_results[row] += error_msg

    df["Errors"] = validation_results
    
    return df

def val_res(row):
    if all(row[-11: -1]==([""]*10)):
        return ""
    elif not row.Errors:
        return "Success"
    else:
        return "Fail"

payer_map_df = validate_payer_map(payer_map_df)
payer_map_df["Validation_Result"] = payer_map_df.apply(val_res, axis=1)

# check for missing alias id-name pairs:
out_df = payer_map_df.loc[payer_map_df.Validation_Result=="Success", file_required_headers]
out_df["id"] = out_df.index
out_df = pd.wide_to_long(out_df, ["Payer Alias ID", "Payer Alias Name"], i="id", j="alias") # .reset_index(drop=True)
out_df = out_df.loc[(out_df["Payer Alias ID"]!="") | (out_df["Payer Alias Name"]!="")]
# list of (alias id, alias name) to punch into DB
alias_list = out_df[["Payer Alias ID", "Payer Alias Name"]].values.tolist()
# check pairs that neither contain empty string
validation_results = [""]*len(payer_map_df)
for row, alias in enumerate(alias_list):
    if not alias[0]: # if alias id was left blank
        payer_map_df.at[[i for i,_ in out_df.index.to_list()][row], 'Errors'] += f"Payer Alias ID missing. "
        # validation_results[[i for i,_ in out_df.index.to_list()][row]] += f"Payer Alias ID missing. "
    if not alias[1]: # if alias id was left blank
        payer_map_df.at[[i for i,_ in out_df.index.to_list()][row], 'Errors'] += f"Payer Alias Name missing. "
        # validation_results[[i for i,_ in out_df.index.to_list()][row]] += f"Payer Alias Name missing. "

payer_map_df["Validation_Result"] = payer_map_df.apply(val_res, axis=1)
del out_df

payer_map_df.to_excel("payer_id_mapping_errors.xlsx", index=False)
payer_map_df.head(10)

In [None]:
from collections import Counter
validation_result_count = Counter(payer_map_df.Validation_Result)
num_success, num_fail = validation_result_count.get("Success", 0), validation_result_count.get("Fail", 0)


out_dict = {}
out_dict["client_id"] = CLIENT_ID
out_dict["status"] = "Success" if (num_success > 0 and num_fail == 0) else "Fail"
out_dict["success_count"] = num_success
out_dict["fail_count"] = num_fail


In [118]:
out_df

NameError: name 'out_df' is not defined

## Write to DB table `payer_alias`

In [75]:
import pandas as pd

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

CLIENT_ID = "BH12345"
MAP_FILE_DIR = 'payer_id_mapping_errors.xlsx' # "s3://benedictau-rnd-ohio/tmp/payer_id_mapping.xlsx"
MASTER_FILE_DIR = 'payer_map_master_03182022.xlsx' # "s3://benedictau-rnd-ohio/tmp/payer_map_master_03182022.xlsx"

payer_map_master_df = pd.read_excel(MASTER_FILE_DIR, index_col=None, header=0, na_filter=False)
file_required_headers = payer_map_master_df.columns.tolist()

In [76]:
payer_map_df = pd.read_excel(MAP_FILE_DIR, index_col=None, header=0, na_filter=False)
payer_map_headers = payer_map_df.columns.tolist()

if payer_map_headers[:len(file_required_headers)] != file_required_headers:
    print(f"Invalid column headers in file. Require: {*payer_map_headers,}. ")
    # return client id, file location in outbox, error signal
else:
    pass # payer_map_df = payer_map_df[file_required_headers] # drop validation check columns
    
if len(payer_map_df) != len(payer_map_master_df):
    print(f"Invalid file. Row mismatch. ")
    # return client id, file location in outbox, error signal


In [77]:
payer_map_df.head()

Unnamed: 0,Unique Identifier,External Payer Name,Display_ID,Eligibility_Payer_ID,Eligibility_Enrollment_Required,Claims_Payer_ID,Claims_Enrollment_Required,ERA_Payer_ID,ERA_Enrollment_Required,Payer Alias ID1,Payer Alias Name1,Payer Alias ID2,Payer Alias Name2,Payer Alias ID3,Payer Alias Name3,Payer Alias ID4,Payer Alias Name4,Payer Alias ID5,Payer Alias Name5,Errors,Validation_Result
0,1,CENTIVO,,45564,N,45564.0,N,45564.0,Y,test1,test1-a,test2,test2-a,,,,,,,,Success
1,2,WILSON MCSHANE,,41095,N,,,,,test3,test3-a,test4,test4-a,,,,,,,"Alias ID/Name `test4-a` duplicated in rows [3, 4].",Fail
2,3,THE GUARDIAN LIFE INSURANCE COMPANY OF AMERICA,,64246,N,64246.0,N,64246.0,Y,test6,test6-a,test7,test4-a,,,,,,,"Alias ID/Name `test4-a` duplicated in rows [3, 4].",Fail
3,4,MANAGED CARE OF NORTH AMERICA (MCNA) DENTAL,,65030,N,,,,,test8,test8-a,,,,,,,,,,Success
4,5,1199 NATIONAL BENEFIT FUND,,13162,N,,,13162.0,Y,test9,test9-a,,,,,,,,,,Success


In [78]:
out_df = payer_map_df.loc[payer_map_df.Validation_Result=="Success", file_required_headers]
out_df["id"] = out_df.index
out_df = pd.wide_to_long(out_df, ["Payer Alias ID", "Payer Alias Name"], i="id", j="alias").reset_index(drop=True)
out_df = out_df.loc[(out_df["Payer Alias ID"]!="") & (out_df["Payer Alias Name"]!="")]
out_df = out_df[["Payer Alias ID", "Payer Alias Name", "Unique Identifier"]]
out_df["created_by"] = CLIENT_ID
out_df.values.tolist()

[['test1', 'test1-a', 1, 'BH12345'],
 ['test8', 'test8-a', 4, 'BH12345'],
 ['test9', 'test9-a', 5, 'BH12345'],
 ['test2', 'test2-a', 1, 'BH12345']]

In [85]:
import boto3
import pymysql 

session = boto3.Session(profile_name='beatsbulkdev')
ssm = session.client("ssm")

rds_db = ssm.get_parameter(Name="RDS_DATABASE")["Parameter"]["Value"]
rds_host = ssm.get_parameter(Name="RDS_HOSTNAME")["Parameter"]["Value"]
name = ssm.get_parameter(Name="RDS_USERNAME")["Parameter"]["Value"]
password = ssm.get_parameter(Name="RDS_PASSWORD")["Parameter"]["Value"]
db_name = ""
port = int(ssm.get_parameter(Name="RDS_PORT")["Parameter"]["Value"])

conn = pymysql.connect(host=rds_host, user=name, port=port,
                                   passwd=password, db=db_name, connect_timeout=5)

def db_query(query):
    with conn.cursor() as cur:
        cur.execute(query)
        conn.commit()

        return cur.fetchall()
    

In [88]:
query = """SELECT * FROM BeatsBulkDev.payer_alias;"""
    
res = db_query(query=query)
res

()

In [89]:
# delete existing records
db_query(query="SET SQL_SAFE_UPDATES = 0;")
db_query(query=f"""DELETE FROM BeatsBulkDev.payer_alias WHERE created_by='{CLIENT_ID}';""")
db_query(query="SET SQL_SAFE_UPDATES = 1;")

()

In [82]:
# insert new records
with conn.cursor() as cur:
    cur.execute("SET foreign_key_checks = 0")
    cur.executemany(f"""INSERT INTO BeatsBulkDev.payer_alias(payer_alias_identifier, 
                            payer_alias_name, payer_mapping_id, created_on, created_by) 
                        VALUES (%s, %s, %s, NOW(), %s)""", 
                    out_df.values.tolist())
    cur.execute("SET foreign_key_checks = 1")
    conn.commit()
    

## Populating payer_mapping table

In [34]:
df = pd.read_csv("payer_map_master_load_db.csv", index_col=None, header=0, na_filter=False, dtype=str)
df.head()

Unnamed: 0,Unique Identifier,External Payer Name,Display_ID,Eligibility_Payer_ID,Eligibility_Enrollment_Required,Claims_Payer_ID,Claims_Enrollment_Required,ERA_Payer_ID,ERA_Enrollment_Required
0,1,CENTIVO,,45564,N,45564.0,N,45564.0,Y
1,2,WILSON MCSHANE,,41095,N,,,,
2,3,THE GUARDIAN LIFE INSURANCE COMPANY OF AMERICA,,64246,N,64246.0,N,64246.0,Y
3,4,MANAGED CARE OF NORTH AMERICA (MCNA) DENTAL,,65030,N,,,,
4,5,1199 NATIONAL BENEFIT FUND,,13162,N,,,13162.0,Y


In [35]:
df.Eligibility_Enrollment_Required.unique()

array(['N', 'Y', 'N/A', ''], dtype=object)

In [36]:
df.Claims_Enrollment_Required.unique()

array(['N', 'N/A', 'Y', ''], dtype=object)

In [37]:
df.ERA_Enrollment_Required.unique()

array(['Y', 'N/A', 'N'], dtype=object)

In [None]:
# insert new records
with conn.cursor() as cur:
    cur.execute("SET foreign_key_checks = 0")
    cur.executemany(f"""INSERT INTO BeatsBulkDev.payer_mapping(payer_mapping_id, external_payer_name, display_id, eligibility_payer_id, eligibility_enrollment_required, ) 
                        VALUES (%s, %s, %s, NOW(), %s)""", 
                    out_df.values.tolist())
    cur.execute("SET foreign_key_checks = 1")
    conn.commit()
    

### Write upload files to s3

In [150]:
session = boto3.Session(profile_name='beatsbulkdev')
s3 = session.client("s3")
ssm = session.client("ssm")

admin_id = "BH1234"
s3_bucket = ssm.get_parameter(Name="S3_BUCKET_NAME")["Parameter"]["Value"]
file = "group_name_mapping.xlsx"

s3.upload_file(file, s3_bucket, f"upload/{admin_id}/{file}")


## Data validation for group mapping sheet

In [13]:
import numpy as np
import pandas as pd
import boto3
import pymysql


MAP_FILE_DIR = "group_name_mapping_success_case.xlsx"

admin_id = "BH1234"

group_mapping_df = pd.read_excel(MAP_FILE_DIR, dtype=str, index_col=None, header=0, na_filter=False, engine='openpyxl')
group_mapping_df

Unnamed: 0,Client ID,Client Name,Group Name,Provider NPI,Billing NPI
0,OM112,ATI,ATI Physical,1234567890,1234567890
1,OM113,ATI2,ATI Physical Therapy LLC,5678999999,5678999999
2,OM113,ATI3,ATI Physical,2342342345,8493933395
3,OM114,ATI4,NORTHWESTERN LLC,8383939024,8383939024
4,OM114,ATI4,NORTHWESTERN MEDICINE,8383939024,1234567893
5,OM114,ATI4,NORTHWESTERN MEDICINE,2342342345,9890909003


In [14]:
assert all(group_mapping_df.columns[:5] == ['Client ID', 'Client Name', 'Group Name', 'Provider NPI', 'Billing NPI']), "Invalid file column headers."

In [15]:
def validate_group_mapping(df):

    validation_matrix = []
    validation_matrix.append( ["Missing Client ID" if (name and not cid) else "" for cid, name in zip(df["Client ID"], df["Client Name"]) ] )

    validation_matrix.append( ["" if name and len(name)>1 else "Missing Group Name. " for name in df["Group Name"]] )
    validation_matrix.append( ["" if npi.isdigit() and len(npi)==10 else "Missing 10-digit Rendering/Provider NPI. " for npi in df["Provider NPI"]] )
    validation_matrix.append( ["" if npi.isdigit() and len(npi)==10 else "Missing 10-digit Billing NPI. " for npi in df["Billing NPI"]] )

q
    df["Errors"] = ["".join(x) for x in zip(*validation_matrix)]
    df["Validation_Result"] = ["Success" if not error else "Fail" for error in df["Errors"]]

    return df

In [16]:
group_mapping_df = validate_group_mapping(group_mapping_df)
group_mapping_df

Unnamed: 0,Client ID,Client Name,Group Name,Provider NPI,Billing NPI,Errors,Validation_Result
0,OM112,ATI,ATI Physical,1234567890,1234567890,,Success
1,OM113,ATI2,ATI Physical Therapy LLC,5678999999,5678999999,,Success
2,OM113,ATI3,ATI Physical,2342342345,8493933395,,Success
3,OM114,ATI4,NORTHWESTERN LLC,8383939024,8383939024,Duplicate Rendering/Provider NPI.,Fail
4,OM114,ATI4,NORTHWESTERN MEDICINE,8383939024,1234567893,Duplicate Rendering/Provider NPI.,Fail
5,OM114,ATI4,NORTHWESTERN MEDICINE,2342342345,9890909003,,Success


In [106]:
group_mapping_df.to_excel("group_name_mapping_errors.xlsx", index=False)


In [132]:
session = boto3.Session(profile_name='beatsbulkdev')
ssm = session.client("ssm")

rds_db = ssm.get_parameter(Name="RDS_DATABASE")["Parameter"]["Value"]
rds_host = ssm.get_parameter(Name="RDS_HOSTNAME")["Parameter"]["Value"]
name = ssm.get_parameter(Name="RDS_USERNAME")["Parameter"]["Value"]
password = ssm.get_parameter(Name="RDS_PASSWORD")["Parameter"]["Value"]
db_name = ""
port = int(ssm.get_parameter(Name="RDS_PORT")["Parameter"]["Value"])

conn = pymysql.connect(host=rds_host, user=name, port=port,
                                   passwd=password, db=db_name, connect_timeout=5)

def db_query(query):
    with conn.cursor() as cur:
        cur.execute(query)
        conn.commit()

        return cur.fetchall()

In [None]:
# move group records into history

db_query(query=f"""INSERT INTO {rds_db}.client_master_history SELECT * FROM {rds_db}.client_master WHERE created_by='{admin_id}';""")
db_query(query=f"""INSERT INTO {rds_db}.group_master_history SELECT * FROM {rds_db}.group_master WHERE created_by='{admin_id}';""")
db_query(query=f"""INSERT INTO {rds_db}.group_mapping_history SELECT * FROM {rds_db}.group_mapping WHERE created_by='{admin_id}';""")

db_query(query="SET SQL_SAFE_UPDATES = 0;")
db_query(query=f"""DELETE FROM {rds_db}.group_mapping WHERE created_by='{admin_id}';""")
db_query(query=f"""DELETE FROM {rds_db}.group_master WHERE created_by='{admin_id}';""")
db_query(query=f"""DELETE FROM {rds_db}.client_master WHERE created_by='{admin_id}';""")
db_query(query="SET SQL_SAFE_UPDATES = 1;")

In [111]:
out_df = group_mapping_df.loc[group_mapping_df.Validation_Result=="Success", ["Client ID", "Client Name", "Group Name", "Provider NPI"]]
clients = [[i,j] for i,j in out_df[["Client ID", "Client Name"]].drop_duplicates(subset="Client ID").values.tolist() if i]
groups =  [[i,j] for i,j in out_df[["Client ID", "Group Name"]].drop_duplicates(subset="Group Name").values.tolist() if j]
npis = [[i,j] for i,j in out_df[["Group Name", "Provider NPI"]].drop_duplicates(subset="Group Name").values.tolist() if j]
out_df

Unnamed: 0,Client ID,Client Name,Group Name,Provider NPI
0,OM112,ATI,ATI Physical,1234567890
2,OM113,ATI,ATI Physical,8493933395
3,,,NORTHWESTERN LLC,8383939024
4,,,NORTHWESTERN MEDICINE,1234567893
5,,,NORTHWESTERN MEDICINE,9890909003


In [None]:
# write to db

for client_id, client_name in clients:
    db_query(query=f"""INSERT INTO {rds_db}.client_master(client_id, client_name, created_on, created_by)
                            VALUES ('{client_id}', '{client_name}', NOW(), '{admin_id}');""")

for client_id, group_name in groups:
    if client_id:
        db_query(query=f"""INSERT INTO {rds_db}.group_master(client_master_id, group_name, created_on, created_by)
                            VALUES ((SELECT client_master_id FROM {rds_db}.client_master WHERE client_id='{client_id}' AND created_by='{admin_id}'), '{group_name}', NOW(), '{admin_id}');""")
    else: 
        db_query(query=f"""INSERT INTO {rds_db}.group_master(group_name, created_on, created_by)
                            VALUES ('{group_name}', NOW(), '{admin_id}');""")

for group_name, npi in npis:
    db_query(query=f"""INSERT INTO {rds_db}.group_mapping(provider_npi, group_master_id, created_on, created_by)
                            VALUES ('{npi}',  (SELECT group_master_id FROM {rds_db}.group_master WHERE group_name='{group_name}' AND created_by='{admin_id}'), NOW(), '{admin_id}');""")
        

## API CALLS

In [179]:
import os
import json
import requests

def validate_payer_mapping_lambda(api_url, admin_id, file):
    payload = {
        "admin_id": admin_id, 
        "file": file
        }
    r = requests.post(api_url, json=payload)
    if r.status_code==200:
        return r.json()
    else: 
        return r.status_code, r.text
    
api_url = "https://3lqxw3w1td.execute-api.us-east-2.amazonaws.com/beats/payer_map_val"
admin_id = "BH1234"
file = "payer_id_mapping.csv"
    
%time res = validate_payer_mapping_lambda(api_url, admin_id, file)
res

CPU times: user 36.2 ms, sys: 2.85 ms, total: 39 ms
Wall time: 6.83 s


{'admin_id': 'BH1234',
 'file': 'payer_id_mapping.csv',
 'write_to': 'download/BH1234/payer_id_mapping.csv',
 'status': 'Fail',
 'success_count': 3,
 'fail_count': 3}

In [182]:
def payer_map_to_db_lambda(api_url, admin_id, file):
    payload = {
        "admin_id": admin_id, 
        "file": file
        }
    r = requests.post(api_url, json=payload)
    if r.status_code==200:
        return r.json()
    else: 
        return r.status_code, r.text
    
api_url = "https://3lqxw3w1td.execute-api.us-east-2.amazonaws.com/beats/payer_map_to_db"
admin_id = "BH1234"
file = "payer_id_mapping.csv"
    
%time res = payer_map_to_db_lambda(api_url, admin_id, file)
res

CPU times: user 31.6 ms, sys: 4.1 ms, total: 35.7 ms
Wall time: 767 ms


{'admin_id': 'BH1234', 'status': 'sucesss'}

In [197]:
import os
import json
import requests

def validate_group_mapping_lambda(api_url, admin_id, file):
    payload = {
        "admin_id": admin_id, 
        "file": file
        }
    r = requests.post(api_url, json=payload)
    if r.status_code==200:
        return r.json()
    else: 
        return r.status_code, r.text
    
api_url = "https://3lqxw3w1td.execute-api.us-east-2.amazonaws.com/beats/group_map_val"
admin_id = "BH1234"
file = "group_name_mapping.csv"
    
%time res = validate_group_mapping_lambda(api_url, admin_id, file)
res

CPU times: user 30.7 ms, sys: 5.34 ms, total: 36.1 ms
Wall time: 3.2 s


{'admin_id': 'BH1234',
 'file': 'group_name_mapping.csv',
 'write_to': 'download/BH1234/group_name_mapping.csv',
 'status': 'Fail',
 'success_count': 5,
 'fail_count': 1}

In [196]:
def group_map_to_db_lambda(api_url, admin_id, file):
    payload = {
        "admin_id": admin_id, 
        "file": file
        }
    r = requests.post(api_url, json=payload)
    if r.status_code==200:
        return r.json()
    else: 
        return r.status_code, r.text
    
api_url = "https://3lqxw3w1td.execute-api.us-east-2.amazonaws.com/beats/group_map_to_db"
admin_id = "BH1234"
file = "group_name_mapping.csv"
    
%time res = group_map_to_db_lambda(api_url, admin_id, file)
res

CPU times: user 31 ms, sys: 3.35 ms, total: 34.3 ms
Wall time: 546 ms


{'admin_id': 'BH1234', 'file': 'group_name_mapping.csv', 'status': 'Fail'}