In [2]:
import os
os.chdir('/mnt/c/Users/ralvin/OneDrive - Reliant Health Partners/Documents/RHP_dev_RA/Automate_skyvia')
import pandas as pd
from simple_salesforce import Salesforce, SalesforceLogin
import pyodbc, sys, time 
import requests
import numpy as np
from datetime import datetime
from helper_functions_v3 import create_df, updated_data_pull, upsert, delete_record, insert_records, salesforce_connection, read_sftp_data, read_sftp_directory, data_pull
import configparser
import json
import paramiko
import io

pd.set_option('display.max_rows', None) 
pd.set_option('display.max_columns', None) 


##### retrieve source data
config = configparser.ConfigParser()
config.read('config_v2.ini')
sftp_config = config['sftp']
mapping_config = config['mappings']
host = sftp_config.get('host')
port = sftp_config.getint('port', fallback=22) 
username = sftp_config.get('user')
password = sftp_config.get('password')
remote_dir = sftp_config.get('remote_path')
json_path = mapping_config.get('json_file_path')



Source_data = read_sftp_directory(remote_dir, host, port, username, password) ### list of all source dataframes
for i in Source_data:
    print(i)
    print(f'Source data shape: {Source_data[i].shape}')

## sf connection
sf = salesforce_connection(sandbox=True)
sf_prod = salesforce_connection(sandbox=False)

Connection established successfully!
Files in directory: ['837I00111182024.csv', '837P00111182024.csv']
File 837I00111182024.csv read into DataFrame successfully
File 837P00111182024.csv read into DataFrame successfully
Connection closed.
837I00111182024.csv
Source data shape: (446, 42)
837P00111182024.csv
Source data shape: (3106, 42)
Connected to Salesforce sandbox
Connected to Salesforce Prod


In [8]:
def ingest_data(json_path, Source_data, sf):
    ## get data mappings 
    with open(json_path, 'r') as f:
        data = json.load(f)
    mappings = {k:v for k,v in data.items()}
    Claims_map = mappings['Claim_Object_Map']
    Account_map = mappings['Account_Object_Map']
    Povider_map = mappings['Provider_Object_Map']
    lines_map = mappings['Line_Item_Object_Map']

    Errors = {}
    for Source_doc_name in Source_data:
        Source_doc = Source_data[Source_doc_name]
        if Source_doc_name != '837I00111182024.csv':  ### testing sepcific file
            print(Source_doc_name)
            print(Source_doc.shape)
            ##upsert account and provider records
            map_account_table, _ = create_df(Account_map, Source_doc, sf)
            map_provider_table, keys = create_df(Povider_map, Source_doc, sf)
            print('upserting to providers ... ')
            new_ids, errors = upsert(map_provider_table.drop_duplicates(), keys, sf)
            Errors[(Source_doc_name, 'providers_obj')] = errors ### need to replace source doc with actual string representation


            ##### populate claims obj on sf
            map_claims_table, keys = create_df(Claims_map, Source_doc, sf)
            print('Inserting claims data ... ')
            new_ids_claims, errors = insert_records(map_claims_table.drop_duplicates(), keys, sf) ###insert new claims records
            Errors[(Source_doc_name, 'claims_obj')] = errors

            ##### populate lines obj on sf
            map_lines_table, keys = create_df(lines_map, Source_doc, sf, new_ids_claims)
            print('Inserting lines data ... ')
            new_ids_lines, errors = insert_records(map_lines_table.drop_duplicates(), keys, sf) ###insert new line records
            Errors[(Source_doc_name, 'lines_obj')] = errors

    return Errors

Errors = ingest_data(json_path, Source_data, sf)

    

837P00111182024.csv
(3106, 42)
pulling updated Provider_TIN__c table
pulled Provider_TIN__c
Merged TIN
pulling updated Groups_Clients__c table
pulled Groups_Clients__c
Merged Patient Group/Policy Number
pulling updated Jurisdiction__c table
pulled Jurisdiction__c
Merged JurisdictionState
pulling updated DRG__c table
pulled DRG__c
Merged DRG
pulling updated Provider_Specialty__c table
pulled Provider_Specialty__c
Merged Billing Provider Taxonomy
Inserting claims data ... 
Processing record 0: {'Claim_ID__c': 'BFC71C030596B19tksft', 'Provider_TIN__c': 'a0F4W00000W1Kb7UAF', 'Group_Client__c': 'a0C8a00000r3Gf3EAE', 'Claim_Number__c': '243190499E', 'Jurisdiction__c': 'a024W00000HYAjjQAH', 'Diag_Code__c': 'I10', 'Diag_Code_2nd__c': 'Z125', 'Diag_Code_3rd__c': 'E785', 'Diag_Code_4th__c': 'E119', 'Provider__c': 'PATHGROUP LABS LLC ', 'Patient__c': 'MCHANEY ERIC C', 'Patient_ID__c': 'MB04526570', 'Date_of_Birth__c': '10/09/1972', 'Provider_Zip__c': 372175200, 'DRG__c': nan, 'QPA__c': 'N', 'NPI_

In [11]:
for key in Errors:
    print(key)

len(Errors[('837P00111182024.csv', 'lines_obj')])

('837P00111182024.csv', 'claims_obj')
('837P00111182024.csv', 'lines_obj')


163

In [3]:
claims_sf = updated_data_pull(['Claims__c'], sf)
Line_items_sf = updated_data_pull(['Line_Items__c'], sf)
print(claims_sf.shape) ## original shape is 12,937
print(Line_items_sf.shape) ## original shape is 22,841

# 13040, 23266 after first file ingested


pulled Claims__c
pulled Line_Items__c
(12937, 101)
(22841, 49)


In [30]:
### delete records

Source_data_df = Source_data['837I00111182024.csv']
print(Source_data_df.shape)
# Source_data_df = Source_data['837P00111182024.csv']
# print(Source_data_df.shape)

source_cases = Source_data_df.drop_duplicates(subset=['Claim ID'])[['Claim ID']]
ids = list(source_cases.merge(updated_data_pull(['Claims__c'], sf), left_on='Claim ID', right_on='Claim_ID__c').loc[:, ['Claim ID', 'Claim_ID__c', 'Id']]['Id'])
keys = {'Claims__c':'Claim_ID__c'}
delete_record(ids, keys, sf)

(446, 42)
pulled Claims__c
Object: Claims__c, Records to process: 103
All records deleted successfully.


In [28]:
len(ids)

0

In [None]:
import logging
from logging.handlers import RotatingFileHandler

# Set up logger
logger = logging.getLogger('data_ingestion')
logger.setLevel(logging.DEBUG)

# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)

# File handler with rotation
file_handler = RotatingFileHandler('data_ingestion.log', maxBytes=5*1024*1024, backupCount=3)
file_handler.setLevel(logging.DEBUG)

# Log message format
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)

# Add handlers to the logger
logger.addHandler(console_handler)
logger.addHandler(file_handler)

# Usage in data ingestion
def ingest_data(file_path):
    try:
        logger.info(f"Starting data ingestion for file: {file_path}")
        # Simulate data ingestion
        if not file_path.endswith('.csv'):
            raise ValueError("Unsupported file format")
        logger.info(f"Successfully ingested file: {file_path}")
    except Exception as e:
        logger.error(f"Error during data ingestion: {e}", exc_info=True)

# Example usage
ingest_data('example.csv')
ingest_data('example.txt')  # This will log an error