In [226]:
import pandas as pd
import psycopg2

In [None]:
# Connect to PostgreSQL
conn = psycopg2.connect(
    dbname="your_database_name",
    user="your_username",
    password="your_password",
    host="your_host",
    port="your_port"
)

In [228]:
# SQL commands to create the schema
create_schema_sql = """
-- User Table
DROP TABLE IF EXISTS salesforce.user CASCADE;
CREATE TABLE salesforce.user (
    id VARCHAR(18) PRIMARY KEY,
    first_name VARCHAR(255),
    last_name VARCHAR(255),
    email VARCHAR(255),
    username VARCHAR(255) UNIQUE
);

-- Account Table
DROP TABLE IF EXISTS salesforce.account CASCADE;
CREATE TABLE salesforce.account (
    id VARCHAR(18) PRIMARY KEY,
    owner_id VARCHAR(18) REFERENCES salesforce.user(id),
    name VARCHAR(255),
    industry VARCHAR(255),
    billing_street VARCHAR(255),
    billing_city VARCHAR(255),
    billing_state VARCHAR(255),
    billing_postal_code VARCHAR(20),
    billing_country VARCHAR(255),
    created_date TIMESTAMP,
    last_modified_date TIMESTAMP
);

-- Contact Table
DROP TABLE IF EXISTS salesforce.contact CASCADE;
CREATE TABLE salesforce.contact (
    id VARCHAR(18) PRIMARY KEY,
    account_id VARCHAR(18) REFERENCES salesforce.account(id),
    owner_id VARCHAR(18) REFERENCES salesforce.user(id),
    first_name VARCHAR(255),
    last_name VARCHAR(255),
    email VARCHAR(255),
    phone VARCHAR(20),
    
    Birthdate DATE,
    AgeRange VARCHAR(255),
    MailingCity VARCHAR(255),
    MailingStateCode VARCHAR(255),
    
    
    created_date TIMESTAMP,
    last_modified_date TIMESTAMP,
    Number_of_ILMP_Action_Plans INT
);

-- ILMP_Action_Plan Table
DROP TABLE IF EXISTS salesforce.ILMP_Action_Plan CASCADE;
CREATE TABLE salesforce.ILMP_Action_Plan (
    id VARCHAR(18) PRIMARY KEY,
    name VARCHAR(255),
    Barriers_to_Employment VARCHAR(255),
    Client_Status_at_Intake VARCHAR(255),
    Education_Level VARCHAR(255),
    Employment_Insurance_Claimant VARCHAR(255),
    Result_Code VARCHAR(255),
    Result_Date DATE,
    Result_Education_Level VARCHAR(255),
    Return_to_School_Result VARCHAR(255),
    Social_Assistance_Recipient VARCHAR(255),
    Start_Date DATE,
    Status VARCHAR(255),
    Number_of_Interventions_Completed INT,
    Number_of_Interventions_Open INT,     
    Number_of_Interventions INT,          
    created_date TIMESTAMP,
    last_modified_date TIMESTAMP
);

-- ILMP_Intervention Table
DROP TABLE IF EXISTS salesforce.ILMP_Intervention CASCADE;
CREATE TABLE salesforce.ILMP_Intervention (
    Id VARCHAR(18) PRIMARY KEY,
    ILMP_Action_Plan VARCHAR(18) REFERENCES salesforce.ILMP_Action_Plan(id),
    name VARCHAR(255),
    Code VARCHAR(255),
    Outcome VARCHAR(255),
    Cost DECIMAL(18, 2),
    Start_Date DATE,
    End_Date DATE,
    created_date TIMESTAMP,
    last_modified_date TIMESTAMP
);

-- Program Table
DROP TABLE IF EXISTS salesforce.program CASCADE;
CREATE TABLE salesforce.program (
    Id VARCHAR(18) PRIMARY KEY,
    Name VARCHAR(255),
    Description TEXT,
    ProgramIssueArea VARCHAR(255),
    ShortSummary TEXT,
    Status VARCHAR(255),
    TargetPopulation VARCHAR(255)
);

-- Program Cohort Table
DROP TABLE IF EXISTS salesforce.programCohort CASCADE;
CREATE TABLE salesforce.programCohort (
    Id VARCHAR(18) PRIMARY KEY,
    Name VARCHAR(255),
    Program VARCHAR(18) REFERENCES salesforce.program(id),
    Status VARCHAR(255)
);

-- Program Engagement Table
DROP TABLE IF EXISTS salesforce.programEngagement CASCADE;
CREATE TABLE salesforce.programEngagement (
    Id VARCHAR(18) PRIMARY KEY,
    Name VARCHAR(255),
    AccountId VARCHAR(18) REFERENCES salesforce.account(id),
    
    ContactId VARCHAR(18) REFERENCES salesforce.contact(id),
    
    ProgramId VARCHAR(18) REFERENCES salesforce.program(id),
    
    ProgramCohort VARCHAR(255) REFERENCES salesforce.programCohort(id),

    
    Stage VARCHAR(255),
    StartDate TIMESTAMP,
    Enrolment_Date TIMESTAMP,
    Withdrawal_Date TIMESTAMP,
    Withdrawal_Reason VARCHAR(255)
);
"""

# Execute the schema creation SQL
try:
    cur.execute(create_schema_sql)
    conn.commit()
    print("Schema created successfully!")
except Exception as e:
    conn.rollback()
    print(f"Error creating schema: {e}")

Schema created successfully!


In [229]:
mappings = {
    "salesforce.user": {
        "file": r"\User.csv",
        "columns": {
            "Id": "id",
            "FirstName": "first_name",
            "LastName": "last_name",
            "Email": "email",
            "Username": "username"
        },
        "encoding": "ISO-8859-1"  # Detected encoding
    },
    "salesforce.account": {
        "file": r"\Account.csv",
        "columns": {
            "Id": "id",
            "OwnerId": "owner_id",
            "Name": "name",
            "Industry": "industry",
            "BillingStreet": "billing_street",
            "BillingCity": "billing_city",
            "BillingState": "billing_state",
            "BillingPostalCode": "billing_postal_code",
            "BillingCountry": "billing_country",
            "CreatedDate": "created_date",
            "LastModifiedDate": "last_modified_date"
        },
        "encoding": "ISO-8859-1"  # Detected encoding
    },
    "salesforce.contact": {
        "file": r"\Contact.csv",
        "columns": {
            "Id": "id",
            "AccountId": "account_id",
            "OwnerId": "owner_id",
            "FirstName": "first_name",
            "LastName": "last_name",
            "Email": "email",
            "Phone": "phone",
            
            "Birthdate": "Birthdate", 
            "Age_Range__c": "AgeRange",
            "MailingCity": "MailingCity",
            "MailingStateCode": "MailingStateCode",

            "CreatedDate": "created_date",
            "LastModifiedDate": "last_modified_date",
            "Number_of_ILMP_Action_Plans__c": "Number_of_ILMP_Action_Plans"
        },
        "encoding": "ISO-8859-1"  # Detected encoding
    },
    "salesforce.ILMP_Action_Plan": {
        "file": r"\ILMP_Action_Plan__c.csv",
        "columns": {
            "Id": "id",
           
            
            "Name": "name",
            "Barriers_to_Employment__c": "Barriers_to_Employment",
            "Client_Status_at_Intake__c": "Client_Status_at_Intake",
            "Education_Level__c": "Education_Level",
            "Employment_Insurance_Claimant__c": "Employment_Insurance_Claimant",
            "Result_Code__c": "Result_Code",
            "Result_Date__c": "Result_Date",
            "Result_Education_Level__c": "Result_Education_Level",
            "Return_to_School_Result__c": "Return_to_School_Result",
            "Social_Assistance_Recipient__c": "Social_Assistance_Recipient",
            "Start_Date__c": "Start_Date",
            "Status__c": "Status",
            "Number_of_Interventions_Completed__c": "Number_of_Interventions_Completed",
            "Number_of_Interventions_Open__c": "Number_of_Interventions_Open",
            "Number_of_Interventions__c": "Number_of_Interventions",
            "CreatedDate": "created_date",
            "LastModifiedDate": "last_modified_date"
        },
        "encoding": "ISO-8859-1"  # Compatible with ASCII
    },
    "salesforce.ILMP_Intervention": {
        "file": r"\ILMP_Intervention__c.csv",
        "columns": {
            "Id": "id",
            "ILMP_Action_Plan__c": "ILMP_Action_Plan",
           
            
            "Name": "name",
            "Code__c": "Code",
            "Outcome__c": "Outcome",
            "Cost__c": "Cost",
            "Start_Date__c": "Start_Date",
            "End_Date__c": "End_Date",
            "CreatedDate": "created_date",
            "LastModifiedDate": "last_modified_date"
        },
        "encoding": "ISO-8859-1"  # Compatible with ASCII
    },
    "salesforce.program": {
        "file": r"\pmdm__Program__c.csv",
        "columns": {
            "Id": "id",
            
            "Name": "name",
            "pmdm__Description__c": "Description",
            "pmdm__ProgramIssueArea__c": "ProgramIssueArea",
            "pmdm__ShortSummary__c": "ShortSummary",
            "pmdm__Status__c": "Status",
            "pmdm__TargetPopulation__c": "TargetPopulation"
        },
        "encoding": "ISO-8859-1"  # Compatible with ASCII
    },
    "salesforce.programCohort": {
        "file": r"\pmdm__ProgramCohort__c.csv",
        "columns": {
            "Id": "id",
            
            "Name": "name",
            "pmdm__Program__c": "Program",
            "pmdm__Status__c": "Status"
        },
        "encoding": "ISO-8859-1"  # Compatible with ASCII
    },
    "salesforce.programEngagement": {
        "file": r"\pmdm__ProgramEngagement__c.csv",
        "columns": {
            "Id": "id",
            
            "Name": "name",
            "pmdm__Account__c": "AccountId",
            "pmdm__Contact__c": "ContactId",

            "pmdm__ProgramCohort__c": "ProgramCohort",
            
            "pmdm__Program__c": "ProgramId",
            
            "pmdm__Stage__c": "Stage",
            "pmdm__StartDate__c": "StartDate",
            "Enrolment_Date__c": "Enrolment_Date",
            "Withdrawal_Date__c": "Withdrawal_Date",
            "Withdrawal_Reason__c": "Withdrawal_Reason"
        },
        "encoding": "ISO-8859-1"  # Compatible with ASCII
    }
}

In [230]:

def preprocess_csv(file_path, column_mapping, encoding="utf-8"):
    """
    Preprocess a CSV file by renaming columns, converting dates, handling missing values,
    and ensuring numeric fields are properly formatted.
    
    Parameters:
        file_path (str): Path to the CSV file.
        column_mapping (dict): Mapping of Salesforce column names to database column names.
        encoding (str): Encoding of the CSV file (default is "utf-8").
        
    Returns:
        pd.DataFrame: A cleaned and preprocessed DataFrame.
    """
    try:
        # Step 1: Load the CSV file with the specified encoding
        df = pd.read_csv(file_path, encoding=encoding)
        
        # Step 2: Rename columns based on the mapping
        df.rename(columns=column_mapping, inplace=True)
        
        # Step 3: Retain only the columns specified in the mapping
        df = df[list(column_mapping.values())]
        
        # Step 4: Convert date columns to ISO 8601 format
        for col in df.columns:
            if "date" in col.lower():  # Check if the column is a date
                df[col] = pd.to_datetime(df[col], errors='coerce').dt.strftime('%Y-%m-%d %H:%M:%S')
        
        # Step 5: Convert numeric columns to the correct type
        for col in df.columns:
            if "number" in col.lower() or "cost" in col.lower():
                df[col] = pd.to_numeric(df[col], errors="coerce")
        
        # Step 6: Handle missing values (replace NaN with None for PostgreSQL compatibility)
        df = df.where(pd.notnull(df), None)
        
        return df
    
    except Exception as e:
        print(f"Error preprocessing {file_path}: {e}")
        return None

In [231]:
# Preprocess all CSV files using the mappings
preprocessed_data = {}

for table_name, mapping in mappings.items():
    file_path = mapping["file"]
    column_mapping = mapping["columns"]
    encoding = mapping.get("encoding", "utf-8")  # Default to utf-8 if not specified
    
    print(f"Preprocessing {table_name} from {file_path}...")
    preprocessed_df = preprocess_csv(file_path, column_mapping, encoding)
    
    if preprocessed_df is not None:
        preprocessed_data[table_name] = preprocessed_df
        print(f"Successfully preprocessed {table_name}.")
    else:
        print(f"Failed to preprocess {table_name}.")

Preprocessing salesforce.user from E:\IFA Files\IFA Salesforce data\2-3-2025\User.csv...
Successfully preprocessed salesforce.user.
Preprocessing salesforce.account from E:\IFA Files\IFA Salesforce data\2-3-2025\Account.csv...
Successfully preprocessed salesforce.account.
Preprocessing salesforce.contact from E:\IFA Files\IFA Salesforce data\2-3-2025\Contact.csv...
Successfully preprocessed salesforce.contact.
Preprocessing salesforce.ILMP_Action_Plan from E:\IFA Files\IFA Salesforce data\2-3-2025\ILMP_Action_Plan__c.csv...
Successfully preprocessed salesforce.ILMP_Action_Plan.
Preprocessing salesforce.ILMP_Intervention from E:\IFA Files\IFA Salesforce data\2-3-2025\ILMP_Intervention__c.csv...
Successfully preprocessed salesforce.ILMP_Intervention.
Preprocessing salesforce.program from E:\IFA Files\IFA Salesforce data\2-3-2025\pmdm__Program__c.csv...
Successfully preprocessed salesforce.program.
Preprocessing salesforce.programCohort from E:\IFA Files\IFA Salesforce data\2-3-2025\pmdm_

In [232]:
def load_dataframe_to_table(df, table_name):
    """
    Load a preprocessed DataFrame into a PostgreSQL table.
    
    Parameters:
        df (pd.DataFrame): The preprocessed DataFrame.
        table_name (str): The name of the table in PostgreSQL.
    """
    try:
        # Generate the INSERT query
        columns = ", ".join(df.columns)
        placeholders = ", ".join(["%s"] * len(df.columns))
        insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
        
        # Insert data into the table
        records = [tuple(row) for row in df.to_numpy()]
        cur.executemany(insert_query, records)
        conn.commit()
        print(f"Data loaded successfully into {table_name}!")
    except Exception as e:
        conn.rollback()
        print(f"Error loading data into {table_name}: {e}")

In [233]:
# Load all preprocessed data into PostgreSQL
for table_name, df in preprocessed_data.items():
    print(f"Loading data into {table_name}...")
    load_dataframe_to_table(df, table_name)

Loading data into salesforce.user...
Data loaded successfully into salesforce.user!
Loading data into salesforce.account...
Data loaded successfully into salesforce.account!
Loading data into salesforce.contact...
Data loaded successfully into salesforce.contact!
Loading data into salesforce.ILMP_Action_Plan...
Data loaded successfully into salesforce.ILMP_Action_Plan!
Loading data into salesforce.ILMP_Intervention...
Data loaded successfully into salesforce.ILMP_Intervention!
Loading data into salesforce.program...
Data loaded successfully into salesforce.program!
Loading data into salesforce.programCohort...
Data loaded successfully into salesforce.programCohort!
Loading data into salesforce.programEngagement...
Data loaded successfully into salesforce.programEngagement!


In [234]:
# Close the connection
conn.close()