# Salesforce CRM Data Pipeline for Banking Churn POC

This notebook handles the Salesforce CRM data pipeline for the Apex National Bank churn prediction POC.

## Overview

Salesforce serves as our **CRM System**, containing:
- **Contacts** - Customer records linked to ERPNext via email
- **Cases** - Support tickets and complaints (key churn signal!)
- **Tasks** - Customer interactions (calls, emails, meetings)

## Churn Signal Logic

The number of support cases is a key predictor of churn:

| Segment | Case Count | Rationale |
|---------|------------|----------|
| Active | 0-2 | Happy customers, few issues |
| At-Risk | 2-5 | Growing frustration |
| Churned | 3-8 | High complaints before leaving |

## Entity Resolution

Customers are linked between ERPNext and Salesforce using **email** as the join key:

```
ERPNext Customer (email_id) <---> Salesforce Contact (Email)
```

## Notebook Sections

1. **Configuration** - Environment setup and credentials
2. **Salesforce API Client** - REST API wrapper
3. **Exploration** - Understand existing Salesforce data
4. **Data Ingestion** - Create CRM data linked to ERPNext
5. **Data Extraction** - Export to CSV/JSON for Databricks

---
## 1. Configuration

### Prerequisites

1. Salesforce Developer Edition account
2. Connected App (External Client App) created
3. Security Token from email
4. Credentials in `docs/.env` file (auto-loaded)

### Credentials File Format

The notebook automatically loads credentials from `docs/.env`:

```powershell
$env:SALESFORCE_INSTANCE_URL = "https://your-org.develop.my.salesforce.com"
$env:SALESFORCE_USERNAME = "your-email@example.com"
$env:SALESFORCE_PASSWORD = "your-password"
$env:SALESFORCE_SECURITY_TOKEN = "your-security-token"
$env:SALESFORCE_CONSUMER_KEY = "your-consumer-key"
$env:SALESFORCE_CONSUMER_SECRET = "your-consumer-secret"
```

### Install Dependencies

```bash
pip install simple-salesforce
```

In [1]:
# Install simple-salesforce if not present
try:
    from simple_salesforce import Salesforce
    print("simple-salesforce already installed")
except ImportError:
    print("Installing simple-salesforce...")
    !pip install simple-salesforce
    from simple_salesforce import Salesforce
    print("Installation complete!")

simple-salesforce already installed


In [2]:
# Required packages
import os
import json
import csv
import random
from datetime import datetime, timedelta
from typing import List, Dict, Any
from pathlib import Path
import pandas as pd

try:
    from simple_salesforce import Salesforce, SalesforceLogin
    from simple_salesforce.exceptions import SalesforceAuthenticationFailed
except ImportError:
    print("Please run: pip install simple-salesforce")

print("Packages imported successfully!")

Packages imported successfully!


In [3]:
# =============================================================================
# CONFIGURATION - Auto-load from .env file
# =============================================================================

import re

def load_env_file(env_path: str) -> dict:
    """
    Load credentials from .env file (PowerShell format).
    
    Parses lines like: $env:VAR_NAME = "value"
    """
    credentials = {}
    try:
        with open(env_path, 'r') as f:
            for line in f:
                # Match PowerShell format: $env:VAR_NAME = "value"
                match = re.match(r'\$env:(\w+)\s*=\s*["\']([^"\']*)["\']', line.strip())
                if match:
                    var_name, value = match.groups()
                    credentials[var_name] = value
        print(f"Loaded {len(credentials)} credentials from {env_path}")
    except FileNotFoundError:
        print(f"Warning: {env_path} not found, using environment variables")
    return credentials

# Load from .env file
ENV_FILE = Path("../../docs/.env")
env_creds = load_env_file(str(ENV_FILE))

# Salesforce Credentials (from .env file or environment variables)
SF_INSTANCE_URL = env_creds.get("SALESFORCE_INSTANCE_URL", os.getenv("SALESFORCE_INSTANCE_URL", ""))
SF_USERNAME = env_creds.get("SALESFORCE_USERNAME", os.getenv("SALESFORCE_USERNAME", ""))
SF_PASSWORD = env_creds.get("SALESFORCE_PASSWORD", os.getenv("SALESFORCE_PASSWORD", ""))
SF_SECURITY_TOKEN = env_creds.get("SALESFORCE_SECURITY_TOKEN", os.getenv("SALESFORCE_SECURITY_TOKEN", ""))
SF_CONSUMER_KEY = env_creds.get("SALESFORCE_CONSUMER_KEY", os.getenv("SALESFORCE_CONSUMER_KEY", ""))
SF_CONSUMER_SECRET = env_creds.get("SALESFORCE_CONSUMER_SECRET", os.getenv("SALESFORCE_CONSUMER_SECRET", ""))

# Data paths
ERP_CUSTOMERS_PATH = Path("../../data/raw/erp_customers.csv")
OUTPUT_DIR = Path("../../data/raw")

# Date range (same as ERPNext)
START_DATE = datetime(2023, 1, 1)
END_DATE = datetime(2026, 1, 11)

# Check credentials
creds_set = all([SF_USERNAME, SF_PASSWORD, SF_SECURITY_TOKEN])

if creds_set:
    print(f"\nConfiguration loaded!")
    print(f"  Instance: {SF_INSTANCE_URL or 'Will auto-detect'}")
    print(f"  Username: {SF_USERNAME}")
    print(f"  Token: {SF_SECURITY_TOKEN[:10]}...")
else:
    print("WARNING: Salesforce credentials not set!")
    print("\nEither:")
    print("  1. Add credentials to docs/.env file")
    print("  2. Or set environment variables manually")

Loaded 8 credentials from ..\..\docs\.env

Configuration loaded!
  Instance: https://orgfarm-6f9d8da0ee-dev-ed.develop.my.salesforce.com
  Username: sulaimansta013.2ae597b9710e@agentforce.com
  Token: IZ3FRKUOG6...


In [4]:
# =============================================================================
# REFERENCE DATA
# =============================================================================

# Case types for banking CRM
CASE_TYPES = [
    {"subject": "ATM Card Issue", "type": "Problem", "priority": "High"},
    {"subject": "Transaction Dispute", "type": "Problem", "priority": "High"},
    {"subject": "Fee Inquiry", "type": "Question", "priority": "Medium"},
    {"subject": "Statement Request", "type": "Request", "priority": "Low"},
    {"subject": "Account Access Issue", "type": "Problem", "priority": "High"},
    {"subject": "Mobile App Problem", "type": "Problem", "priority": "Medium"},
    {"subject": "Interest Rate Question", "type": "Question", "priority": "Low"},
    {"subject": "Loan Inquiry", "type": "Question", "priority": "Medium"},
    {"subject": "Card Renewal", "type": "Request", "priority": "Medium"},
    {"subject": "Fraud Alert", "type": "Problem", "priority": "High"},
]

# Task types for interactions
TASK_TYPES = [
    {"subject": "Welcome Call", "type": "Call", "priority": "Normal"},
    {"subject": "Follow-up Call", "type": "Call", "priority": "Normal"},
    {"subject": "Product Inquiry Response", "type": "Email", "priority": "Normal"},
    {"subject": "Account Review", "type": "Meeting", "priority": "High"},
    {"subject": "Cross-sell Opportunity", "type": "Call", "priority": "Normal"},
    {"subject": "Retention Call", "type": "Call", "priority": "High"},
    {"subject": "Survey Follow-up", "type": "Email", "priority": "Low"},
    {"subject": "Birthday Greeting", "type": "Email", "priority": "Low"},
]

# Case status options
CASE_STATUSES = ["New", "Working", "Escalated", "Closed"]

print(f"Reference data loaded:")
print(f"  Case types: {len(CASE_TYPES)}")
print(f"  Task types: {len(TASK_TYPES)}")

Reference data loaded:
  Case types: 10
  Task types: 8


---
## 2. Salesforce API Client

Using `simple-salesforce` library for REST API access.

In [7]:
def connect_salesforce():
    """
    Connect to Salesforce using simple-salesforce.
    
    Authentication uses Username + Password + Security Token.
    """
    try:
        # Connect using username/password/token
        sf = Salesforce(
            username=SF_USERNAME,
            password=SF_PASSWORD,
            security_token=SF_SECURITY_TOKEN,
            consumer_key=SF_CONSUMER_KEY if SF_CONSUMER_KEY else None,
            consumer_secret=SF_CONSUMER_SECRET if SF_CONSUMER_SECRET else None,
        )
        return sf
    except SalesforceAuthenticationFailed as e:
        print(f"Authentication failed: {e}")
        print("\nCheck:")
        print("1. Username is correct")
        print("2. Password is correct")
        print("3. Security Token is valid (reset if needed)")
        return None
    except Exception as e:
        print(f"Connection error: {e}")
        return None


# Test connection
if creds_set:
    sf = connect_salesforce()
    if sf:
        # Get user info
        user_info = sf.query("SELECT Id, Name, Email FROM User WHERE Username = '{}'".format(SF_USERNAME))
        if user_info['records']:
            user = user_info['records'][0]
            print(f"Connected to Salesforce!")
            print(f"  User: {user['Name']}")
            print(f"  Email: {user['Email']}")
            print(f"  Instance: {sf.sf_instance}")
        else:
            print("Connected but could not fetch user info")
else:
    print("Skipping connection test - credentials not set")
    sf = None

Connected to Salesforce!
  User: Sulaiman Ahmed
  Email: sulaimansta013@gmail.com
  Instance: orgfarm-6f9d8da0ee-dev-ed.develop.my.salesforce.com


---
## 3. Exploration

Survey existing data in Salesforce.

In [8]:
def explore_salesforce(sf):
    """
    Survey existing data in Salesforce.
    """
    print("=" * 60)
    print("Salesforce Data Survey")
    print(f"Instance: {sf.sf_instance}")
    print("=" * 60)
    
    objects_to_check = [
        ("Contact", "Customer contacts"),
        ("Account", "Company accounts"),
        ("Case", "Support cases"),
        ("Task", "Activities/tasks"),
        ("Lead", "Potential customers"),
        ("Opportunity", "Sales opportunities"),
    ]
    
    results = {}
    for obj, desc in objects_to_check:
        try:
            count = sf.query(f"SELECT COUNT() FROM {obj}")['totalSize']
            results[obj] = count
            print(f"  {obj:15} : {count:6} records  ({desc})")
        except Exception as e:
            results[obj] = 0
            print(f"  {obj:15} : Error - {str(e)[:50]}")
    
    return results

# Run exploration
if sf:
    sf_data = explore_salesforce(sf)
else:
    print("Not connected to Salesforce")

Salesforce Data Survey
Instance: orgfarm-6f9d8da0ee-dev-ed.develop.my.salesforce.com
  Contact         :     20 records  (Customer contacts)
  Account         :     13 records  (Company accounts)
  Case            :     26 records  (Support cases)
  Task            :      0 records  (Activities/tasks)
  Lead            :     22 records  (Potential customers)
  Opportunity     :     31 records  (Sales opportunities)


In [9]:
# Load ERPNext customers to understand what we need to link
print("\n" + "=" * 60)
print("ERPNext Customers (to link)")
print("=" * 60)

if ERP_CUSTOMERS_PATH.exists():
    erp_customers = pd.read_csv(ERP_CUSTOMERS_PATH)
    
    print(f"\nLoaded {len(erp_customers)} customers from ERPNext")
    print(f"\nSegment distribution:")
    print(erp_customers['website'].value_counts())  # 'website' stores segment
    
    print(f"\nSample customers:")
    print(erp_customers[['customer_name', 'email_id', 'territory', 'website']].head())
else:
    print(f"ERPNext customer file not found: {ERP_CUSTOMERS_PATH}")
    print("Please run the ERPNext pipeline first.")
    erp_customers = None


ERPNext Customers (to link)

Loaded 502 customers from ERPNext

Segment distribution:
website
active     311
at_risk    126
churned     63
Name: count, dtype: int64

Sample customers:
     customer_name                 email_id         territory  website
0  Adnan Ahmed 831  adnan.ahmed31@gmail.com   Suburban Branch   active
1  Adnan Begum 248  adnan.begum45@gmail.com   Downtown Branch   active
2  Adnan Begum 595  adnan.begum26@gmail.com   Suburban Branch   active
3  Adnan Malik 605  adnan.malik70@gmail.com  Southgate Branch  at_risk
4  Adnan Malik 886  adnan.malik79@gmail.com   Eastside Branch   active


---
## 4. Data Ingestion

Create CRM data in Salesforce linked to ERPNext customers.

### Churn Signal Implementation

The key insight: **customers who complain more are more likely to churn**.

We implement this by creating more Cases for at-risk and churned customers:
- Active customers: 0-2 cases
- At-risk customers: 2-5 cases  
- Churned customers: 3-8 cases

In [10]:
def determine_case_count(segment: str) -> int:
    """
    Determine number of support cases based on customer segment.
    
    This creates the churn signal in our data:
    - More cases = higher churn risk
    """
    if segment == "churned":
        return random.randint(3, 8)  # High complaints
    elif segment == "at_risk":
        return random.randint(2, 5)  # Medium complaints
    else:  # active
        return random.randint(0, 2)  # Low complaints


def determine_task_count(segment: str) -> int:
    """
    Determine number of interactions based on segment.
    
    At-risk and churned may have more retention attempts.
    """
    if segment == "churned":
        return random.randint(2, 5)  # Retention attempts
    elif segment == "at_risk":
        return random.randint(1, 4)  # Some outreach
    else:  # active
        return random.randint(1, 3)  # Normal interactions


def random_date_in_range(start: datetime, end: datetime) -> datetime:
    """Generate random date between start and end."""
    delta = end - start
    random_days = random.randint(0, delta.days)
    return start + timedelta(days=random_days)


print("Helper functions defined.")
print("\nCase count by segment:")
print(f"  Active: {determine_case_count('active')}-{determine_case_count('active')} cases")
print(f"  At-Risk: {determine_case_count('at_risk')}-{determine_case_count('at_risk')} cases")
print(f"  Churned: {determine_case_count('churned')}-{determine_case_count('churned')} cases")

Helper functions defined.

Case count by segment:
  Active: 2-0 cases
  At-Risk: 5-4 cases
  Churned: 5-7 cases


In [11]:
def create_contacts(sf, erp_customers: pd.DataFrame) -> List[Dict]:
    """
    Create Salesforce Contacts linked to ERPNext customers.
    
    Uses email as the join key.
    """
    print("\n" + "=" * 60)
    print("Creating Contacts")
    print("=" * 60)
    
    contacts = []
    errors = 0
    
    for idx, row in erp_customers.iterrows():
        # Parse name into first/last
        name_parts = row['customer_name'].split()
        first_name = name_parts[0] if name_parts else "Unknown"
        last_name = " ".join(name_parts[1:]) if len(name_parts) > 1 else "Customer"
        
        try:
            result = sf.Contact.create({
                'FirstName': first_name,
                'LastName': last_name,
                'Email': row['email_id'],
                'Phone': row['mobile_no'],
                'Description': f"Segment: {row['website']}, Branch: {row['territory']}",
                'LeadSource': 'Existing Customer',
            })
            
            contacts.append({
                'sf_contact_id': result['id'],
                'email': row['email_id'],
                'erp_customer_name': row['name'],
                'segment': row['website'],
                'territory': row['territory'],
            })
            
            if (idx + 1) % 50 == 0:
                print(f"  Created {idx + 1}/{len(erp_customers)} contacts...")
        
        except Exception as e:
            errors += 1
            if errors <= 3:
                print(f"  Error: {str(e)[:80]}")
    
    print(f"\nTotal: {len(contacts)} contacts created, {errors} errors")
    return contacts

In [12]:
def create_cases(sf, contacts: List[Dict]) -> List[Dict]:
    """
    Create support Cases for contacts.
    
    More cases for at-risk and churned customers (churn signal).
    """
    print("\n" + "=" * 60)
    print("Creating Cases (Support Tickets)")
    print("=" * 60)
    print("  Churn signal: More cases = higher churn risk")
    
    cases = []
    errors = 0
    segment_case_counts = {"active": 0, "at_risk": 0, "churned": 0}
    
    for idx, contact in enumerate(contacts):
        segment = contact['segment']
        num_cases = determine_case_count(segment)
        segment_case_counts[segment] += num_cases
        
        for _ in range(num_cases):
            case_type = random.choice(CASE_TYPES)
            case_date = random_date_in_range(START_DATE, END_DATE)
            
            # Churned customers more likely to have unresolved cases
            if segment == "churned":
                status = random.choice(["Escalated", "Closed", "Working"])
            elif segment == "at_risk":
                status = random.choice(["Working", "Closed", "New"])
            else:
                status = random.choice(["Closed", "Closed", "New"])  # Mostly resolved
            
            try:
                result = sf.Case.create({
                    'ContactId': contact['sf_contact_id'],
                    'Subject': case_type['subject'],
                    'Type': case_type['type'],
                    'Priority': case_type['priority'],
                    'Status': status,
                    'Origin': random.choice(['Phone', 'Email', 'Web', 'Mobile App']),
                    'Description': f"Customer segment: {segment}. Created: {case_date.strftime('%Y-%m-%d')}",
                })
                
                cases.append({
                    'sf_case_id': result['id'],
                    'sf_contact_id': contact['sf_contact_id'],
                    'email': contact['email'],
                    'subject': case_type['subject'],
                    'type': case_type['type'],
                    'priority': case_type['priority'],
                    'status': status,
                    'case_date': case_date.strftime('%Y-%m-%d'),
                    'segment': segment,
                })
            
            except Exception as e:
                errors += 1
                if errors <= 3:
                    print(f"  Error: {str(e)[:80]}")
        
        if (idx + 1) % 100 == 0:
            print(f"  Processed {idx + 1}/{len(contacts)} contacts ({len(cases)} cases)...")
    
    print(f"\nTotal: {len(cases)} cases created, {errors} errors")
    print(f"\nCases by segment:")
    for seg, count in segment_case_counts.items():
        print(f"  {seg}: {count} cases")
    
    return cases

In [13]:
def create_tasks(sf, contacts: List[Dict]) -> List[Dict]:
    """
    Create Tasks (interactions) for contacts.
    """
    print("\n" + "=" * 60)
    print("Creating Tasks (Interactions)")
    print("=" * 60)
    
    tasks = []
    errors = 0
    
    for idx, contact in enumerate(contacts):
        segment = contact['segment']
        num_tasks = determine_task_count(segment)
        
        for _ in range(num_tasks):
            task_type = random.choice(TASK_TYPES)
            task_date = random_date_in_range(START_DATE, END_DATE)
            
            try:
                result = sf.Task.create({
                    'WhoId': contact['sf_contact_id'],
                    'Subject': task_type['subject'],
                    'Priority': task_type['priority'],
                    'Status': random.choice(['Completed', 'Not Started', 'In Progress']),
                    'ActivityDate': task_date.strftime('%Y-%m-%d'),
                    'Description': f"Customer segment: {segment}",
                })
                
                tasks.append({
                    'sf_task_id': result['id'],
                    'sf_contact_id': contact['sf_contact_id'],
                    'email': contact['email'],
                    'subject': task_type['subject'],
                    'type': task_type['type'],
                    'priority': task_type['priority'],
                    'task_date': task_date.strftime('%Y-%m-%d'),
                    'segment': segment,
                })
            
            except Exception as e:
                errors += 1
                if errors <= 3:
                    print(f"  Error: {str(e)[:80]}")
        
        if (idx + 1) % 100 == 0:
            print(f"  Processed {idx + 1}/{len(contacts)} contacts ({len(tasks)} tasks)...")
    
    print(f"\nTotal: {len(tasks)} tasks created, {errors} errors")
    return tasks

In [14]:
# ============================================================================
# RUN DATA INGESTION
# ============================================================================
#
# This cell creates CRM data in Salesforce:
# - Contacts linked to ERPNext customers
# - Cases (support tickets) - more for at-risk/churned
# - Tasks (interactions)
#
# Runtime: Approximately 5-10 minutes for 500 customers

if sf and erp_customers is not None:
    print("=" * 60)
    print("Salesforce Data Ingestion")
    print(f"Target: {sf.sf_instance}")
    print(f"Customers to process: {len(erp_customers)}")
    print("=" * 60)
    
    # Create data
    contacts = create_contacts(sf, erp_customers)
    
    if contacts:
        cases = create_cases(sf, contacts)
        tasks = create_tasks(sf, contacts)
    else:
        cases = []
        tasks = []
    
    # Summary
    print("\n" + "=" * 60)
    print("INGESTION COMPLETE")
    print("=" * 60)
    print(f"  Contacts: {len(contacts)}")
    print(f"  Cases: {len(cases)}")
    print(f"  Tasks: {len(tasks)}")
else:
    print("Cannot run ingestion:")
    if not sf:
        print("  - Not connected to Salesforce")
    if erp_customers is None:
        print("  - ERPNext customers not loaded")
    contacts, cases, tasks = [], [], []

Salesforce Data Ingestion
Target: orgfarm-6f9d8da0ee-dev-ed.develop.my.salesforce.com
Customers to process: 502

Creating Contacts
  Created 50/502 contacts...
  Created 100/502 contacts...
  Created 150/502 contacts...
  Created 200/502 contacts...
  Created 250/502 contacts...
  Created 300/502 contacts...
  Error: Malformed request https://orgfarm-6f9d8da0ee-dev-ed.develop.my.salesforce.com/se
  Created 350/502 contacts...
  Created 400/502 contacts...
  Error: Malformed request https://orgfarm-6f9d8da0ee-dev-ed.develop.my.salesforce.com/se
  Created 450/502 contacts...
  Error: Malformed request https://orgfarm-6f9d8da0ee-dev-ed.develop.my.salesforce.com/se
  Created 500/502 contacts...

Total: 499 contacts created, 3 errors

Creating Cases (Support Tickets)
  Churn signal: More cases = higher churn risk
  Processed 100/499 contacts (215 cases)...
  Processed 200/499 contacts (450 cases)...
  Processed 300/499 contacts (650 cases)...
  Processed 400/499 contacts (848 cases)...

Tot

---
## 5. Data Extraction

Export Salesforce data to CSV/JSON for Databricks ingestion.

In [15]:
def extract_contacts_from_sf(sf) -> List[Dict]:
    """Extract all contacts from Salesforce."""
    print("\nExtracting Contacts...")
    
    query = """
        SELECT Id, FirstName, LastName, Email, Phone, Description, 
               LeadSource, CreatedDate, LastModifiedDate
        FROM Contact
    """
    
    result = sf.query_all(query)
    contacts = result['records']
    
    # Clean up Salesforce metadata
    clean_contacts = []
    for c in contacts:
        clean_contacts.append({
            'sf_contact_id': c['Id'],
            'first_name': c['FirstName'],
            'last_name': c['LastName'],
            'email': c['Email'],
            'phone': c['Phone'],
            'description': c['Description'],
            'lead_source': c['LeadSource'],
            'created_date': c['CreatedDate'],
            'modified_date': c['LastModifiedDate'],
        })
    
    print(f"  Found {len(clean_contacts)} contacts")
    return clean_contacts


def extract_cases_from_sf(sf) -> List[Dict]:
    """Extract all cases from Salesforce."""
    print("\nExtracting Cases...")
    
    query = """
        SELECT Id, ContactId, Subject, Type, Priority, Status, Origin,
               Description, CreatedDate, ClosedDate
        FROM Case
    """
    
    result = sf.query_all(query)
    cases = result['records']
    
    clean_cases = []
    for c in cases:
        clean_cases.append({
            'sf_case_id': c['Id'],
            'sf_contact_id': c['ContactId'],
            'subject': c['Subject'],
            'type': c['Type'],
            'priority': c['Priority'],
            'status': c['Status'],
            'origin': c['Origin'],
            'description': c['Description'],
            'created_date': c['CreatedDate'],
            'closed_date': c['ClosedDate'],
        })
    
    print(f"  Found {len(clean_cases)} cases")
    return clean_cases


def extract_tasks_from_sf(sf) -> List[Dict]:
    """Extract all tasks from Salesforce."""
    print("\nExtracting Tasks...")
    
    query = """
        SELECT Id, WhoId, Subject, Priority, Status, ActivityDate, Description
        FROM Task
    """
    
    result = sf.query_all(query)
    tasks = result['records']
    
    clean_tasks = []
    for t in tasks:
        clean_tasks.append({
            'sf_task_id': t['Id'],
            'sf_contact_id': t['WhoId'],
            'subject': t['Subject'],
            'priority': t['Priority'],
            'status': t['Status'],
            'activity_date': t['ActivityDate'],
            'description': t['Description'],
        })
    
    print(f"  Found {len(clean_tasks)} tasks")
    return clean_tasks

In [16]:
def save_to_csv(data: List[Dict], filename: str, output_dir: Path):
    """Save data to CSV file."""
    if not data:
        print(f"  No data for {filename}")
        return
    
    output_dir.mkdir(parents=True, exist_ok=True)
    filepath = output_dir / filename
    
    # Get all unique keys
    all_keys = set()
    for record in data:
        all_keys.update(record.keys())
    fieldnames = sorted(list(all_keys))
    
    with open(filepath, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(data)
    
    print(f"  Saved: {filename} ({len(data)} records)")


def save_to_json(data: List[Dict], filename: str, output_dir: Path):
    """Save data to JSON file."""
    if not data:
        print(f"  No data for {filename}")
        return
    
    output_dir.mkdir(parents=True, exist_ok=True)
    filepath = output_dir / filename
    
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, default=str)
    
    print(f"  Saved: {filename} ({len(data)} records)")

In [17]:
# ============================================================================
# RUN DATA EXTRACTION
# ============================================================================

if sf:
    print("=" * 60)
    print("Salesforce Data Extraction")
    print(f"Source: {sf.sf_instance}")
    print(f"Output: {OUTPUT_DIR.absolute()}")
    print("=" * 60)
    
    # Extract data from Salesforce
    sf_contacts = extract_contacts_from_sf(sf)
    sf_cases = extract_cases_from_sf(sf)
    sf_tasks = extract_tasks_from_sf(sf)
    
    # Save to CSV
    print("\n" + "=" * 60)
    print("Saving to CSV")
    print("=" * 60)
    
    save_to_csv(sf_contacts, "sf_contacts.csv", OUTPUT_DIR)
    save_to_csv(sf_cases, "sf_cases.csv", OUTPUT_DIR)
    save_to_csv(sf_tasks, "sf_tasks.csv", OUTPUT_DIR)
    
    # Save to JSON
    print("\n" + "=" * 60)
    print("Saving to JSON")
    print("=" * 60)
    
    save_to_json(sf_contacts, "sf_contacts.json", OUTPUT_DIR)
    save_to_json(sf_cases, "sf_cases.json", OUTPUT_DIR)
    save_to_json(sf_tasks, "sf_tasks.json", OUTPUT_DIR)
    
    # Summary
    print("\n" + "=" * 60)
    print("EXTRACTION COMPLETE")
    print("=" * 60)
    print(f"\nFiles saved to: {OUTPUT_DIR.absolute()}")
    print(f"\n  sf_contacts.csv : {len(sf_contacts)} records")
    print(f"  sf_cases.csv    : {len(sf_cases)} records")
    print(f"  sf_tasks.csv    : {len(sf_tasks)} records")
else:
    print("Not connected to Salesforce - cannot extract data")

Salesforce Data Extraction
Source: orgfarm-6f9d8da0ee-dev-ed.develop.my.salesforce.com
Output: c:\Users\SulaimanAhmed\Desktop\portfolio\Banking project\banking-churn-databricks\notebooks\exploration\..\..\data\raw

Extracting Contacts...
  Found 519 contacts

Extracting Cases...
  Found 1107 cases

Extracting Tasks...
  Found 1170 tasks

Saving to CSV
  Saved: sf_contacts.csv (519 records)
  Saved: sf_cases.csv (1107 records)
  Saved: sf_tasks.csv (1170 records)

Saving to JSON
  Saved: sf_contacts.json (519 records)
  Saved: sf_cases.json (1107 records)
  Saved: sf_tasks.json (1170 records)

EXTRACTION COMPLETE

Files saved to: c:\Users\SulaimanAhmed\Desktop\portfolio\Banking project\banking-churn-databricks\notebooks\exploration\..\..\data\raw

  sf_contacts.csv : 519 records
  sf_cases.csv    : 1107 records
  sf_tasks.csv    : 1170 records


---
## Summary

This notebook provides the complete Salesforce CRM data pipeline:

1. **Configuration** - Set up credentials and parameters
2. **API Client** - Connect using simple-salesforce
3. **Exploration** - Survey existing data
4. **Ingestion** - Create Contacts, Cases, Tasks
5. **Extraction** - Export to CSV/JSON

### Churn Signal Implementation

| Segment | Cases | Rationale |
|---------|-------|----------|
| Active | 0-2 | Happy customers have few issues |
| At-Risk | 2-5 | Growing frustration signals churn risk |
| Churned | 3-8 | High complaints before leaving |

### Entity Resolution

Customers are linked via **email**:
```
ERPNext.email_id = Salesforce.Contact.Email
```

### Files Generated

```
data/raw/
├── sf_contacts.csv    # Customer contacts
├── sf_contacts.json
├── sf_cases.csv       # Support tickets (churn signal!)
├── sf_cases.json
├── sf_tasks.csv       # Customer interactions
└── sf_tasks.json
```

### Next Steps

1. Run Supabase digital channel data generator
2. Run Google Sheets legacy data generator
3. Build Bronze layer in dbt

---
*Created for Banking Customer Churn Prediction POC*