In [6]:
import pandas as pd 
import numpy as np 
import os
from typing import List, Dict, Tuple
import logging
from dotenv import load_dotenv
from simple_salesforce import Salesforce

In [7]:
def setup_logger(name: str, log_level: str = None):
    """Setup logger with consistent formatting"""
    
    if log_level is None:
        log_level = os.getenv('LOG_LEVEL', 'INFO')
    
    logger = logging.getLogger(name)
    logger.setLevel(getattr(logging, log_level))
    
    # Avoid duplicate handlers
    if logger.handlers:
        return logger
    
    # Console handler
    ch = logging.StreamHandler()
    ch.setLevel(getattr(logging, log_level))
    
    # Formatter
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    ch.setFormatter(formatter)
    
    logger.addHandler(ch)
    
    return logger

In [2]:
username=os.getenv('SALESFORCE_USERNAME')
password=os.getenv('SALESFORCE_PASSWORD')
security_token=os.getenv('SALESFORCE_SECURITY_TOKEN')
domain=os.getenv('SALESFORCE_DOMAIN')

In [8]:
load_dotenv(override=True)
logger = setup_logger(__name__)

In [9]:
class SalesforceLoader:
    """Load data into Salesforce via REST API"""
    
    def __init__(self):
        """Initialize Salesforce connection"""
        try:
            self.sf = Salesforce(
                username=os.getenv('SALESFORCE_USERNAME'),
                password=os.getenv('SALESFORCE_PASSWORD'),
                security_token=os.getenv('SALESFORCE_SECURITY_TOKEN'),
                domain=os.getenv('SALESFORCE_DOMAIN')
            )
            logger.info(f"Connected to Salesforce: {self.sf.sf_instance}")
        except Exception as e:
            logger.error(f"Failed to connect to Salesforce: {e}")
            raise
    
    def upsert_patient(self, patient_data: Dict) -> Tuple[bool, str, str]:
        """
        Upsert a single patient record
        Returns: (success, salesforce_id, message)
        """
        try:
            # Upsert using Patient_ID__c as external ID
            result = self.sf.Patient_Medical_Record__c.upsert(
                f"Patient_ID__c/{patient_data['Patient_ID__c']}", 
                patient_data
            )
            
            # Extract Salesforce ID from result
            sf_id = result.get('id') if isinstance(result, dict) else result
            
            logger.debug(f"Upserted patient {patient_data['Patient_ID__c']}: {sf_id}")
            return True, sf_id, "Success"
            
        except Exception as e:
            error_msg = str(e)
            logger.error(f"Error upserting patient {patient_data.get('Patient_ID__c')}: {error_msg}")
            return False, None, error_msg
    
    def upsert_patients_batch(self, patients: List[Dict]) -> Dict:
        """
        Upsert multiple patients
        Returns: Summary with success/failure counts
        """
        results = {
            'total': len(patients),
            'success': 0,
            'failed': 0,
            'patient_id_map': {},  # Map patient_id to Salesforce ID
            'errors': []
        }
        
        logger.info(f"Starting batch upsert of {len(patients)} patients")
        
        for patient in patients:
            success, sf_id, message = self.upsert_patient(patient)
            
            if success:
                results['success'] += 1
                results['patient_id_map'][patient['Patient_ID__c']] = sf_id
            else:
                results['failed'] += 1
                results['errors'].append({
                    'patient_id': patient.get('Patient_ID__c'),
                    'error': message
                })
        
        logger.info(f"Batch upsert complete: {results['success']} success, {results['failed']} failed")
        return results
    
    def insert_lab_result(self, lab_data: Dict, patient_sf_id: str) -> Tuple[bool, str]:
        """
        Insert a single lab result
        Returns: (success, message)
        """
        try:
            # Add patient lookup
            lab_data['Patient__c'] = patient_sf_id
            
            result = self.sf.Lab_Result__c.create(lab_data)
            
            logger.debug(f"Inserted lab result for patient {patient_sf_id}")
            return True, "Success"
            
        except Exception as e:
            error_msg = str(e)
            logger.error(f"Error inserting lab result: {error_msg}")
            return False, error_msg
    
    def insert_lab_results_batch(self, lab_results: List[Dict], 
                                  patient_id_map: Dict) -> Dict:
        """
        Insert multiple lab results
        patient_id_map: Maps patient_id to Salesforce ID
        """
        results = {
            'total': len(lab_results),
            'success': 0,
            'failed': 0,
            'errors': []
        }
        
        logger.info(f"Starting batch insert of {len(lab_results)} lab results")
        
        for lab in lab_results:
            # Get patient's Salesforce ID
            patient_id = lab.get('patient_id')
            patient_sf_id = patient_id_map.get(patient_id)
            
            if not patient_sf_id:
                results['failed'] += 1
                results['errors'].append({
                    'patient_id': patient_id,
                    'error': 'Patient Salesforce ID not found'
                })
                continue
            
            # Remove patient_id from lab data (not a Salesforce field)
            lab_clean = {k: v for k, v in lab.items() 
                        if k in ['Test_Type__c', 'Test_Value__c', 'Reference_Range__c', 
                                'Test_Date__c', 'Status__c']}
            
            success, message = self.insert_lab_result(lab_clean, patient_sf_id)
            
            if success:
                results['success'] += 1
            else:
                results['failed'] += 1
                results['errors'].append({
                    'patient_id': patient_id,
                    'error': message
                })
        
        logger.info(f"Batch insert complete: {results['success']} success, {results['failed']} failed")
        return results
    
    def insert_risk_assessment(self, risk_data: Dict, patient_sf_id: str) -> Tuple[bool, str]:
        """Insert a single risk assessment"""
        try:
            # Add patient lookup and remove patient_id
            risk_clean = {
                'Patient__c': patient_sf_id,
                'Risk_Level__c': risk_data['Risk_Level__c'],
                'Risk_Score__c': risk_data['Risk_Score__c'],
                'Assessment_Date__c': risk_data['Assessment_Date__c'],
                'Risk_Factors__c': risk_data['Risk_Factors__c']
            }
            
            result = self.sf.Risk_Assessment__c.create(risk_clean)
            
            logger.debug(f"Inserted risk assessment for patient {patient_sf_id}")
            return True, "Success"
            
        except Exception as e:
            error_msg = str(e)
            logger.error(f"Error inserting risk assessment: {error_msg}")
            return False, error_msg
    
    def insert_risk_assessments_batch(self, risk_assessments: List[Dict],
                                      patient_id_map: Dict) -> Dict:
        """Insert multiple risk assessments"""
        results = {
            'total': len(risk_assessments),
            'success': 0,
            'failed': 0,
            'errors': []
        }
        
        logger.info(f"Starting batch insert of {len(risk_assessments)} risk assessments")
        
        for risk in risk_assessments:
            patient_id = risk.get('patient_id')
            patient_sf_id = patient_id_map.get(patient_id)
            
            if not patient_sf_id:
                results['failed'] += 1
                results['errors'].append({
                    'patient_id': patient_id,
                    'error': 'Patient Salesforce ID not found'
                })
                continue
            
            success, message = self.insert_risk_assessment(risk, patient_sf_id)
            
            if success:
                results['success'] += 1
            else:
                results['failed'] += 1
                results['errors'].append({
                    'patient_id': patient_id,
                    'error': message
                })
        
        logger.info(f"Batch insert complete: {results['success']} success, {results['failed']} failed")
        return results
    
    def query_patients(self, limit: int = 100) -> List[Dict]:
        """Query patients from Salesforce"""
        try:
            query = f"""
                SELECT Id, Patient_ID__c, First_Name__c, Last_Name__c, 
                       Date_of_Birth__c, Gender__c, Email__c, Phone__c, Address__c
                FROM Patient_Medical_Record__c
                LIMIT {limit}
            """
            
            results = self.sf.query(query)
            records = results['records']
            
            logger.info(f"Queried {len(records)} patients from Salesforce")
            return records
            
        except Exception as e:
            logger.error(f"Error querying patients: {e}")
            return []


In [17]:
sf = Salesforce(
    username,
    password,
    security_token,
    domain
)

In [19]:
os.getenv('SALESFORCE_USERNAME')

'yifu.hou700@agentforce.com'

In [25]:
load_dotenv(override=True)

print('Testing with:')
print('  Username:', os.getenv('SALESFORCE_USERNAME'))
print('  Domain:', os.getenv('SALESFORCE_DOMAIN'))

sf = Salesforce(
    username,
    password,
    security_token,
    domain
)
print('✅ Connected to:', sf.sf_instance)

# Test if custom objects exist
try:
    result = sf.query('SELECT Id FROM Patient_Medical_Record__c LIMIT 1')
    print('✅ Custom objects found!')
except Exception as e:
    print('❌ Custom objects not found in this org:', e)

Testing with:
  Username: yifu.hou700@agentforce.com
  Domain: test
✅ Connected to: orgfarm-76addc4243-dev-ed.develop.my.salesforce.com
✅ Custom objects found!


In [4]:
appointments = pd.read_csv('./data/raw/appointments.csv')
lab_results = pd.read_csv('./data/raw/lab_results.csv')

In [5]:
appointments

Unnamed: 0,patient_id,appointment_date,appointment_type,provider,status
0,P0001,2025-11-30,Consultation,Dr. Johnson,Pending
1,P0001,2025-12-12,Follow-up,Dr. Williams,Confirmed
2,P0002,2026-01-30,Follow-up,Dr. Smith,Scheduled
3,P0002,2025-12-16,Annual Check-up,Dr. Johnson,Pending
4,P0002,2025-12-14,Consultation,Dr. Williams,Confirmed
5,P0003,2026-01-10,Follow-up,Dr. Smith,Scheduled
6,P0003,2025-12-30,Follow-up,Dr. Johnson,Pending
7,P0003,2025-12-01,Annual Check-up,Dr. Smith,Scheduled
8,P0004,2025-12-01,Follow-up,Dr. Williams,Scheduled
9,P0005,2025-11-20,Annual Check-up,Dr. Johnson,Pending


In [6]:
lab_results

Unnamed: 0,patient_id,test_type,value,reference_range,test_date,status
0,P0001,Blood Pressure,85.98,70-100,2025-01-15,Critical
1,P0001,Glucose,126.76,70-100,2025-06-05,Abnormal
2,P0001,Blood Pressure,149.4,70-100,2025-01-30,Critical
3,P0001,A1C,120.76,70-100,2024-12-11,Normal
4,P0001,Blood Pressure,113.52,70-100,2025-07-02,Normal
5,P0002,Cholesterol,179.22,70-100,2025-01-14,Abnormal
6,P0002,Cholesterol,171.94,70-100,2025-10-17,Normal
7,P0003,Cholesterol,99.07,70-100,2024-11-12,Abnormal
8,P0003,Cholesterol,147.54,70-100,2024-12-25,Critical
9,P0004,Glucose,170.3,70-100,2025-08-18,Critical
