In [1]:
# Importing Necessary dependencies
import pandas as pd
import snowflake.connector as sf
import numpy as np
import time
import boto3
import os
import zipfile
import smtplib, ssl
import shutil
import csv
from snowflake.connector.pandas_tools import write_pandas


In [2]:
#Create Log for review
start_time = time.time()
claimslog = []

In [3]:
try:
    #Create Local Directory to store files in temporarily
    os.makedirs('C:\\Users\\Jad Driggers\\Documents\\Vesta\\CCAFILES')
    os.chdir('C:\\Users\\Jad Driggers\\Documents\\Vesta\\CCAFILES')
    root_directory = os.getcwd()
    claimslog.append('Successfully created CCA File Temporary Folder at ' + time.strftime('%Y-%m-%d %H:%M:%S',
                                                                                          time.localtime(time.time())))
except Exception as e:
    claimslog.append('There was an error with creating the temporary CCA File - ' + str(e))
    print('Successfully created CCA File Temporary Folder at ' + time.strftime('%Y-%m-%d %H:%M:%S',
                                                                               time.localtime(time.time())))

In [4]:
try:
    #Create Connection Object for Connecting to AWS
    s3 = boto3.resource(
        service_name='s3',
        region_name='us-east-1',
        aws_access_key_id=os.getenv('aws_access_key_id'),
        aws_secret_access_key=os.getenv('aws_secret_access_key'))
    print('AWS connection object created at ' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    claimslog.append(
        'AWS connection object created at ' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
except Exception as e:
    print('There was an error with creating the AWS connection object - ' + str(e))
    claimslog.append('There was an error with creating the AWS connection object - ' + str(e))


AWS connection object created at 2022-12-16 15:35:50


In [5]:
#Create the file name format for locating the proper CCA files to parse

filename_format_list = ['Element claims 202212.zip', 'Element MDS 202212.zip']
claimslog.append('Looking for CCA Files that start like ' + " and ".join(filename_format_list) + ' at ' + time.strftime(
    '%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
print('Looking for CCA Files that start like ' + " and ".join(filename_format_list) + ' at ' + time.strftime(
    '%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

Looking for CCA Files that start like Element claims 202212.zip and Element MDS 202212.zip at 2022-12-16 15:35:54


In [9]:
#Creating a list to store all the keys (file names) to download

key_list = []

try:
    #Searching the S3 bucket for the most current Ping Files
    for obj in s3.Bucket('hometeam-clinical-data').objects.all():
        for filename_format in filename_format_list:
            if filename_format in str(obj):
                #print(obj.key)
                key_list.append(obj.key)

except Exception as e:
    claimslog.append('There was an error while looking for most CCA Files - ' + str(e))

In [10]:
print(key_list)


['raw/cca/Element MDS 202212.zip', 'raw/cca/Element claims 202212.zip']


In [11]:
try:
    #Downloading each of the files found in the key list
    for file in key_list:
        s3.Bucket('hometeam-clinical-data').download_file(file, file.split('/')[2])
        print(
            'Files were successfully downloaded at ' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    claimslog.append(
        'Files were successfully downloaded at ' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
except Exception as e:
    claimslog.append('There was an error while trying to download the CCA Files - ' + str(e))

Files were successfully downloaded at 2022-12-16 15:37:21
Files were successfully downloaded at 2022-12-16 15:37:33


In [10]:
# Move files into Monthly_Claims folder
bucket = 'hometeam-clinical-data'
copy_source = {
    'Bucket': bucket,
    'Key': 'raw/cca/Element MDS 202212.zip',
    }

copy_source2 = {
    'Bucket': bucket,
    'Key': 'raw/cca/Element claims 202212.zip',
}
s3.meta.client.copy(copy_source,bucket, 'raw/cca/Monthly_Claims/Element MDS 202212.zip')
s3.meta.client.copy(copy_source2,bucket, 'raw/cca/Monthly_Claims/Element claims 202212.zip')
#s3.meta.client.delete_object('hometeam-clinical-data','Element MDS 202210.zip')
# s3.client.delete_object(Bucket=bucket, Key='Element MDS 202210.zip')
s3_object = s3.Object('hometeam-clinical-data', 'raw/cca/Monthly_Claims/Element MDS 202212.zip')
s3_object.delete()




{'ResponseMetadata': {'RequestId': 'P928AS9WDP0VP36C',
  'HostId': 'BxI3py9dDMktm/GD9ejTdwSukLCCYrq/2y2bJ9pHosL3c/Vvbs3+TDMq0kAO+Etjsr4xFxJ+Tyg=',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'x-amz-id-2': 'BxI3py9dDMktm/GD9ejTdwSukLCCYrq/2y2bJ9pHosL3c/Vvbs3+TDMq0kAO+Etjsr4xFxJ+Tyg=',
   'x-amz-request-id': 'P928AS9WDP0VP36C',
   'date': 'Fri, 16 Dec 2022 20:32:01 GMT',
   'x-amz-version-id': '.fki0l0uct8rwI41tSIbHWPFyZ27j3a6',
   'x-amz-delete-marker': 'true',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'DeleteMarker': True,
 'VersionId': '.fki0l0uct8rwI41tSIbHWPFyZ27j3a6'}

In [12]:
#On local computer, change directory and set directory for unzipping of files.
os.chdir('C:\\Users\\Jad Driggers\\Documents\\Vesta\\CCAFILES')
root_directory = os.getcwd()

In [13]:
#Locate only Zipped Files
files_to_unzip = []
for filename in os.listdir(root_directory):
    if 'zip' in filename:
        files_to_unzip.append(filename)

In [14]:
try:
    #Unzip each file in the Zipped files list
    for zipped_file in files_to_unzip:
        with zipfile.ZipFile(root_directory + "\\" + zipped_file, 'r') as zip_ref:
            #print(zipped_file)
            zip_ref.extractall(root_directory)
    shutil.unpack_archive(root_directory + "\\" + zipped_file, root_directory + "\\" + zipped_file.split('.')[0])
    claimslog.append(
        'Successfully Unzipped each file at ' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
except Exception as e:
    claimslog.append('There was an error while trying to unzip each file - ' + str(e))

In [15]:
#% % timeit
#Create dictionary to store dataframes as they are created 
df_dict = {}
error_dict = {}

#Set current directory
cwd = os.chdir('C:\\Users\\Jad Driggers\\Documents\\Vesta\\CCAFILES')

#Loop through all txt files in the directory
for i, file in enumerate(os.listdir(cwd)):
    if '.txt' in file:

        #empty lists to store the data while cleaning
        df_list = []
        df_error_list = []

        #open the txt file
        with open(file, 'r') as csvfile:
            reader = csv.reader(csvfile, delimiter='\t')

            #read through each line and find any rows with errors
            for i, row in enumerate(reader):
                if i == 0:
                    #capture the correct number of columns for the dataframe
                    correct_columns = len(row)

                df_list.append(row)

                #create list of rows with errors
                if len(row) < correct_columns:
                    df_error_list.append(i)

            #Check to see if the list row in the data frame is an empty row, if so, drop it
            if len(df_list[df_error_list[-1]]) == 0:
                df_error_list.pop()

            #The error exists between two rows, so looking at the second occurance of an error 
            #and deleting the first item should fix the error
            for i, error in enumerate(df_error_list):
                if i % 2 != 0:
                    df_list[error].pop(0)

            #Loop back through the error list and join first errors to second errors to make a complete row
            for i, error in enumerate(df_error_list):
                if i % 2 == 0:
                    df_list[error] = df_list[error] + df_list[error + 1]

            #Loop back through the entire data frame list to delete those rows that are smaller than the correct
            #number of columns
            for i, item in enumerate(df_list):
                if len(item) < correct_columns:
                    del df_list[i]

            df = pd.DataFrame(df_list[1:])
            df.columns = df_list[0]
            df = df.rename(columns={df.columns[0]: df.columns[0][3:]})
            df = df.astype(str)
            df.columns = map(lambda x: str(x).upper(), df.columns)
            df_dict[csvfile.name.replace(' ', '_').split(".")[0]] = df
            error_dict[csvfile.name.replace(' ', '_').split(".")[0]] = df_error_list

        csvfile.close()

for key, value in df_dict.items():
    print(key)



Element_claims_member_contact
Element_claims_member_demographics
Element_claims_member_dx
Element_claims_member_enrollment
Element_claims_member_HCC
Element_claims_PCP_contact
Element_claims
Element_MDS


In [16]:
#Delete all contents in the temporary CCA Folder
os.chdir('C:\\Users\\Jad Driggers\\Documents\\Vesta')
shutil.rmtree('C:\\Users\\Jad Driggers\\Documents\\Vesta\\CCAFILES')
claimslog.append('Successfully Deleted all contents in temporary CCA Folder at ' + time.strftime('%Y-%m-%d %H:%M:%S',
                                                                                                 time.localtime(
                                                                                                     time.time())))

In [17]:
df_dict['Element_claims_member_demographics'].rename(columns={"CCAID": "MEMBER_ID",
                                                              "NAME": "MNAME",
                                                              "PCL_SITENAME": "PCL",
                                                              "PCL_SUMMARYNAME": "PCL_SUMMARY",
                                                              "PCL_CAPSITE": "PCL_CAP",
                                                              "DUAL": "DUAL_",
                                                              "GC_ENGAGEMENTSTATUS": "GC_ENGAGEMENT_STATUS",
                                                              "MDS_UNREACHABLEFLAG": "MDS_UNREACHABLE"},
                                                     errors="raise",
                                                     inplace=True)
df_dict['Element_claims_member_contact'].rename(columns={"CCAID": "MEMBER_ID",
                                                         "ENR_SPAN_START": "ENROLL_ST",
                                                         "ENR_SPAN_END": "ENROLL_ED",
                                                         "ENROLL_STATUS": "ENROLL_STATUS2",
                                                         "NAME": "FULL_NAME",
                                                         "AGE_NOW": "AGE",
                                                         "GENDER": "SEX",
                                                         "LANGUAGE": "LANGUAGE_SPOKEN",
                                                         "ADDRESS1": "ADDRESS_1",
                                                         "ADDRESS2": "ADDRESS_2",
                                                         "LATEST_PHONE": "PHONE_1"},
                                                errors="raise",
                                                inplace=True)
df_dict['Element_claims_member_enrollment'].rename(columns={"CCAID": "MEMBER_ID",
                                                            "PCP_PROVK": "PCP_ID",
                                                            "PCL_SITENAME": "PCL",
                                                            "DUAL": "DUAL_"},
                                                   errors="raise",
                                                   inplace=True)
df_dict['Element_claims'].rename(columns={"CCAID": "MEMBER_ID",
                                          "HICN": "MEDICARE_ID",
                                          "CLAIM_NUM": "CLAIM_ID",
                                          "TABLEROWID": "CLAIM_LINE",
                                          "HOSPITAL_CLAIM_TYPE": "HOS_CLAIM_TYPE",
                                          "SERVICE_CODE": "CODE",
                                          "SERVICE_DESC": "CODE_DESC",
                                          "DATE_TO": "DATE_THRU",
                                          "BILLTYPE": "BILL_TYPE",
                                          "BILLTYPE_DESCR": "BILL_TYPE_DESCR",
                                          "DATE_PAID": "PAID_DTE",
                                          "CLAIMCATEGORY_GL1": "CLAIM_GROUP"},
                                 errors="raise",
                                 inplace=True)
df_dict['Element_claims_member_dx'].rename(columns={"CCAID": "MEMBER_ID",
                                                    "HICN": "MEDICARE_ID",
                                                    "CLAIM_NUM": "CLAIM_ID",
                                                    "DIAGREFNO": "DIAG_NUM"},
                                           errors="raise",
                                           inplace=True)
df_dict['Element_MDS'].rename(columns={"CCAID": "MEMBER_ID",
                                       "ASSESSMENT_DATE": "ENC_DATE"},
                              errors="raise",
                              inplace=True)

In [18]:
#Create two dictionaries to store the columns and the max len of values in those columns
max_col_len = {}
col_dict = {}

#Vectorizing the length function
measurer = np.vectorize(len)

#Looping through df_dictionary to capture column names and max len of values in those columns
max_col_len = {}
for key, value in df_dict.items():
    col_len = measurer(df_dict[key].astype(str)).max(axis=0)
    max_col_len[key] = col_len
    col_dict[key] = df_dict[key].columns.tolist()


#Function for joining the two dictionaries with similar keys (claim files)
def common_entries(*dcts):
    if not dcts:
        return
    for i in set(dcts[0]).intersection(*dcts[1:]):
        yield (i,) + tuple(d[i] for d in dcts)


mylist = list(common_entries(col_dict, max_col_len))

#Creating new dictionary and zipping the column names with respective max len of values in those columns
sql_dict = {}
for x in mylist:
    sql_dict[x[0]] = list(zip(x[1], x[2]))

#Iterating through the list values to prep for SQL to Snowflake
sql_script_dict_table = {}
for key, value in sql_dict.items():
    script_string_table = ''
    for (col, max_len) in sql_dict[key]:
        script_string_table += str(col) + ' VARCHAR(' + str(max_len + 10) + '),'
    sql_script_dict_table[key] = "(" + script_string_table[:-1] + ")"

print("--- %s seconds ---" % (time.time() - start_time))

--- 370.15873074531555 seconds ---


In [19]:
df_dict['Element_claims_member_contact']

Unnamed: 0,MEMBER_ID,MBI,HICN,MMIS_ID,PRODUCT,ENROLL_ST,ENROLL_ED,ENROLL_STATUS2,FULL_NAME,DOB,...,ADDRESS_1,ADDRESS_2,CITY,STATE,ZIP,COUNTY,ADDR_START,ADDR_END,PHONE_1,CURRENT_AS_OF
0,5364521168,1PQ7T86FD63,281042735M,100027225505,SCO,2004-07-01,9999-12-30,Enrolled,Semen Yudkovich,1935-09-19,...,67 Silsbee Street,# 502,Lynn,MA,01901,ESSEX,2004-05-01,,(781) 581-3902 [Home],2022-12-15 12:58:59.000
1,5364521169,2HH6A83NK63,281043939M,100027225513,SCO,2004-07-01,2022-02-28,Disenrolled,Margarita Yudkovich,1935-10-27,...,67 Silsbee Street Apt 502,,Lynn,MA,01901,ESSEX,2004-07-01,2022-02-28,(781) 581-3902 [Home],2022-12-15 12:58:59.000
2,5364521215,7PE4CW7HF86,036205064D,100005437941,SCO,2004-09-01,2020-01-31,Disenrolled,Barbara Davis,1931-01-30,...,28 Essex St Room 4B,,Lynn,MA,01902,ESSEX,2012-08-10,2020-01-31,(781) 599-0190 [Home],2022-12-15 12:58:59.000
3,5364521236,4KR5DX1YF65,011767159M,100016631432,SCO,2004-09-01,2021-03-31,Disenrolled,Semen Tyutyunik,1926-05-05,...,19 Willow Street,Apt 208,Lynn,MA,01901,ESSEX,2004-05-01,2021-03-31,(781) 598-1391 [Home],2022-12-15 12:58:59.000
4,5364521251,5YU3H27TQ86,019604163A,100218304242,SCO,2004-10-01,9999-12-30,Enrolled,Eduardo Santos,1935-11-03,...,160 Neptune Blvd. Apt 507,,Lynn,MA,01905,ESSEX,2004-05-01,,(339) 440-5236 [Home],2022-12-15 12:58:59.000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3142,5366070702,9KW1C85DD28,,100221657412,SCO,2022-11-01,9999-12-30,Enrolled,Yvelisse Aponte Derich,1947-04-06,...,18 Walnut st Apt 210,,PEABODY,MA,01960,ESSEX,2022-10-28,,(978) 397-6169 [Home],2022-12-15 12:58:59.000
3143,5366070866,,,100035368751,SCO,2022-11-01,9999-12-30,Enrolled,Victor Espinosa Rodriguez,1953-08-11,...,26 Myrtle Street Apt 2,,Lynn,MA,01905,ESSEX,2022-11-01,,(781) 632-7483 [Home],2022-12-15 12:58:59.000
3144,5366070873,,,100005738123,SCO,2022-11-01,9999-12-30,Enrolled,RUN NOUM,1955-06-05,...,21 Morris Street #2,,Lynn,MA,01905,ESSEX,2022-11-01,,(781) 853-3330 [Home],2022-12-15 12:58:59.000
3145,5366070928,1KT8GY1FH59,,100033762004,SCO,2022-11-01,9999-12-30,Enrolled,BARBARA F JOHNSON,1951-01-10,...,170 S COMMON ST #405,,LYNN,MA,01902,ESSEX,2022-11-01,,(781) 854-1088 [Home],2022-12-15 12:58:59.000


In [20]:
start_time = time.time()
#Creating of parameters for securing connection to Snowflake-credentials stored in local environment variables
username = os.getenv('Snowflake_User')
password = os.getenv('Snowflake_password')
account = os.getenv('Snowflake_account')

#Define parameters if neccessary
warehouse = 'DEVELOPER_STANDARD'
database = 'VESTA_DEVELOPMENT'
schema = 'ANALYST_SANDBOX'

#Create connection object for Snowflake connection
conn = sf.connect(user=username, password=password, account=account, warehouse=warehouse)


#Execution function
def execute_query(connection, query):
    cursor = connection.cursor()
    cursor.execute(query)
    cursor.close


#Define Database to use in Snowflake
sql = 'USE DATABASE {}'.format(database)
execute_query(conn, sql)

#Define Schema to use in Snowflake
sql = 'USE SCHEMA {}.{}'.format(database, schema)
execute_query(conn, sql)

#Define Warehouse to use in Snowflake
sql = 'USE WAREHOUSE {}'.format(warehouse)
execute_query(conn, sql)

### SQL to drop tables prior to creating and uploading data

In [21]:

################################## Contact Table
try:
    sql = 'DROP TABLE IF EXISTS CCA_CONTACT_RAW_TEST'
    execute_query(conn, sql)

except Exception as e:
    print(e)
################################## Claims Table
try:
    sql = 'DROP TABLE IF EXISTS CCA_CLAIMS_RAW_TEST'
    execute_query(conn, sql)

except Exception as e:
    print(e)

################################## Demographic Table
try:
    sql = 'DROP TABLE IF EXISTS CCA_DEMO_RAW_TEST'
    execute_query(conn, sql)

except Exception as e:
    print(e)

################################## DX Table
try:
    sql = 'DROP TABLE IF EXISTS CCA_DX_RAW_TEST'
    execute_query(conn, sql)

except Exception as e:
    print(e)

################################## Enroll Table
try:
    sql = 'DROP TABLE IF EXISTS CCA_ENROLL_RAW_TEST'
    execute_query(conn, sql)

except Exception as e:
    print(e)

################################## PCP Table
try:
    sql = 'DROP TABLE IF EXISTS CCA_PCP_RAW_TEST'
    execute_query(conn, sql)

except Exception as e:
    print(e)

    ################################## MDS Table
try:
    sql = 'DROP TABLE IF EXISTS CCA_MDS_RAW_TEST'
    execute_query(conn, sql)

except Exception as e:
    print(e)

In [22]:

################################## MEMBER CONTACT SQL

try:
    sql = 'CREATE TABLE IF NOT EXISTS CCA_CONTACT_RAW_TEST ' + sql_script_dict_table['Element_claims_member_contact']
    execute_query(conn, sql)

    success, nchucks, nrows, _ = write_pandas(conn, df_dict['Element_claims_member_contact'], 'CCA_CONTACT_RAW_TEST')
except Exception as e:
    print(e)
###################################

################################## CLAIMS SQL

try:
    sql = 'CREATE TABLE IF NOT EXISTS CCA_CLAIMS_RAW_TEST ' + sql_script_dict_table['Element_claims']
    execute_query(conn, sql)

    success, nchucks, nrows, _ = write_pandas(conn, df_dict['Element_claims'], 'CCA_CLAIMS_RAW_TEST')
except Exception as e:
    print(e)
# ###################################
#
# ################################## DEMO SQL
#
try:
    sql = 'CREATE TABLE IF NOT EXISTS CCA_DEMO_RAW_TEST ' + sql_script_dict_table['Element_claims_member_demographics']
    execute_query(conn, sql)

    success, nchucks, nrows, _ = write_pandas(conn, df_dict['Element_claims_member_demographics'], 'CCA_DEMO_RAW_TEST')
except Exception as e:
    print(e)
# ###################################
#
# ################################## DX SQL

try:
    sql = 'CREATE TABLE IF NOT EXISTS CCA_DX_RAW_TEST ' + sql_script_dict_table['Element_claims_member_dx']
    execute_query(conn, sql)

    success, nchucks, nrows, _ = write_pandas(conn, df_dict['Element_claims_member_dx'], 'CCA_DX_RAW_TEST')
except Exception as e:
    print(e)
# ###################################
#
# ################################## ENROLLMENT SQL

try:
    sql = 'CREATE TABLE IF NOT EXISTS CCA_ENROLL_RAW_TEST' + sql_script_dict_table['Element_claims_member_enrollment']
    execute_query(conn, sql)

    success, nchucks, nrows, _ = write_pandas(conn, df_dict['Element_claims_member_enrollment'], 'CCA_ENROLL_RAW_TEST')
except Exception as e:
    print(e)
# ###################################
#
# ################################## PCP SQL

try:
    sql = 'CREATE TABLE IF NOT EXISTS CCA_PCP_RAW_TEST' + sql_script_dict_table['Element_claims_PCP_contact']
    execute_query(conn, sql)

    success, nchucks, nrows, _ = write_pandas(conn, df_dict['Element_claims_PCP_contact'], 'CCA_PCP_RAW_TEST')
except Exception as e:
    print(e)
# # ###################################
#
# ################################## MDS SQL

try:
    sql = 'CREATE TABLE IF NOT EXISTS CCA_MDS_RAW_TEST' + sql_script_dict_table['Element_MDS']
    execute_query(conn, sql)

    success, nchucks, nrows, _ = write_pandas(conn, df_dict['Element_MDS'], 'CCA_MDS_RAW_TEST')
except Exception as e:
    print(e)
###################################
print("--- %s seconds ---" % (time.time() - start_time))

--- 483.2472846508026 seconds ---


In [24]:
my_string = '\n'.join(claimslog)

port = 465  # For SSL
smtp_server = "smtp.gmail.com"
sender_email = "VestaPingLog@gmail.com"  # Enter your address
receiver_email_list = ["jdriggers@vestahealthcare.com", "john@vestahealthcare.com",
                       'joe@vestahealthcare.com']  # Enter receiver address
password = os.getenv('Vesta_Ping_Log_Email') #Need password for VestaPingLog@gmail.com
message = "Subject: Ping Logs \n" + '''
             
''' + my_string

context = ssl.create_default_context()
with smtplib.SMTP_SSL(smtp_server, port, context=context) as server:
    server.login(sender_email, password)
    for receiver_email in receiver_email_list:
        server.sendmail(sender_email, receiver_email, message)

SMTPAuthenticationError: (535, b'5.7.8 Username and Password not accepted. Learn more at\n5.7.8  https://support.google.com/mail/?p=BadCredentials v10-20020a05620a440a00b006fab416015csm2548837qkp.25 - gsmtp')