In [None]:
import requests
import json
import csv
import re
from typing import Optional, Dict, Any, List
from pathlib import Path

import phonenumbers
from phonenumbers import geocoder

from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

In [None]:
PIPEDRIVE_COMPANY_DOMAIN = os.getenv("PIPEDRIVE_COMPANY_DOMAIN")
PIPEDRIVE_API_TOKEN = os.getenv("PIPEDRIVE_API_TOKEN")

LIMIT = 500

DEFAULT_REGION = "US"   # used only if a phone has no +country code, change if needed

OUTPUT_JSON = Path("persons_page_1.json")
OUTPUT_CSV = Path("pipedrive_contacts_all.csv")

In [36]:
EMAIL_REGEX = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")

def is_valid_email(value: str) -> bool:
    if not value:
        return False
    value = value.strip()
    if len(value) < 5 or len(value) > 254:
        return False
    return bool(EMAIL_REGEX.match(value))

def pick_email(person: Dict[str, Any]) -> Optional[str]:
    """Pick a valid email from person['emails'], preferring primary."""
    emails = person.get("emails") or []
    if not isinstance(emails, list):
        return None

    primary_candidate = None
    other_candidates: List[str] = []

    for item in emails:
        if not isinstance(item, dict):
            continue
        raw = item.get("value")
        if not isinstance(raw, str):
            continue
        val = raw.strip()
        if not val:
            continue

        if item.get("primary") and is_valid_email(val):
            primary_candidate = val
            break
        elif is_valid_email(val):
            other_candidates.append(val)

    if primary_candidate:
        return primary_candidate
    if other_candidates:
        return other_candidates[0]
    return None

In [37]:
def normalize_phone(raw: str) -> Optional[str]:
    """
    Clean obvious junk in phone numbers.

    Examples:
      '++918521225200' -> '+918521225200'
      ' +91 8521-225-200 ' -> '+918521225200'
    """
    if not raw:
        return None
    s = str(raw).strip()

    # Collapse multiple leading '+' into a single '+'
    while s.startswith("++"):
        s = s[1:]

    cleaned_chars = []
    for ch in s:
        if ch.isdigit():
            cleaned_chars.append(ch)
        elif ch == "+" and not cleaned_chars:
            cleaned_chars.append(ch)

    if not cleaned_chars:
        return None

    cleaned = "".join(cleaned_chars)
    return cleaned if cleaned else None


def detect_country_full_name(phone: str, default_region: str = DEFAULT_REGION) -> Optional[str]:
    """Return full country name using phonenumbers geocoder."""
    if not phone:
        return None
    try:
        if phone.startswith("+"):
            num = phonenumbers.parse(phone, None)
        else:
            num = phonenumbers.parse(phone, default_region)

        if not phonenumbers.is_valid_number(num):
            return None

        country_name = geocoder.description_for_number(num, "en")
        return country_name or None
    except phonenumbers.NumberParseException:
        return None


def pick_phone(person: Dict[str, Any]) -> Optional[str]:
    """
    Pick a valid phone from person['phones'], preferring primary.
    Only accept numbers that parse as valid.
    """
    phones = person.get("phones") or []
    if not isinstance(phones, list):
        return None

    primary_candidate = None
    other_candidates: List[str] = []

    for item in phones:
        if not isinstance(item, dict):
            continue
        raw = item.get("value")
        if not isinstance(raw, str):
            continue

        normalized = normalize_phone(raw)
        if not normalized:
            continue

        if len(normalized.replace("+", "")) < 5:
            continue

        # Check if this number is valid and has a country
        country = detect_country_full_name(normalized)
        if country:
            if item.get("primary"):
                primary_candidate = normalized
                break
            else:
                other_candidates.append(normalized)

    if primary_candidate:
        return primary_candidate
    if other_candidates:
        return other_candidates[0]
    return None

In [38]:
def fetch_person_page(cursor: Optional[str] = None, limit: int = LIMIT) -> Dict[str, Any]:
    """Call Pipedrive v2 persons API for one page."""
    if not PIPEDRIVE_COMPANY_DOMAIN or not PIPEDRIVE_API_TOKEN:
        raise RuntimeError("Please set PIPEDRIVE_COMPANY_DOMAIN and PIPEDRIVE_API_TOKEN in the config section.")

    url = f"https://{PIPEDRIVE_COMPANY_DOMAIN}.pipedrive.com/api/v2/persons"
    headers = {
        "Accept": "application/json",
        "x-api-token": PIPEDRIVE_API_TOKEN
    }
    params = {"limit": limit}
    if cursor:
        params["cursor"] = cursor

    print(f"Requesting up to {limit} persons. cursor={cursor}")
    res = requests.get(url, headers=headers, params=params, timeout=60)
    try:
        res.raise_for_status()
    except requests.HTTPError as e:
        print("Error from Pipedrive API")
        print("Status code:", res.status_code)
        print("Body:", res.text)
        raise e

    return res.json()


In [39]:
all_rows: List[Dict[str, str]] = []

cursor = None
page = 1

total_raw = 0
total_kept = 0
total_skipped_missing = 0

while True:
    payload = fetch_person_page(cursor=cursor, limit=LIMIT)
    persons = payload.get("data") or []
    additional = payload.get("additional_data") or {}
    cursor = additional.get("next_cursor")

    if not persons:
        print("No more persons in response. stopping.")
        break

    total_raw += len(persons)

    for p in persons:
        email = pick_email(p)
        phone = pick_phone(p)

        # Drop people with missing email or phone
        if not email or not phone:
            total_skipped_missing += 1
            continue

        country_name = detect_country_full_name(phone) or ""

        all_rows.append({
            "email": email,
            "phone": phone,
            "phone_country": country_name
        })
        total_kept += 1

    print(
        f"Processed page {page}. "
        f"persons in page: {len(persons)}. "
        f"kept so far: {total_kept}. "
        f"skipped (missing email/phone) so far: {total_skipped_missing}. "
        f"next_cursor: {cursor}"
    )

    page += 1

    if not cursor:
        print("No next_cursor. reached final page.")
        break

# Write final CSV
with OUTPUT_CSV.open("w", encoding="utf-8", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=["email", "phone", "phone_country"])
    writer.writeheader()
    writer.writerows(all_rows)

print("\n=== DONE ===")
print(f"Total persons returned by API: {total_raw}")
print(f"Total kept (both email and phone present): {total_kept}")
print(f"Total dropped (missing email or phone): {total_skipped_missing}")
print(f"CSV written to: {OUTPUT_CSV.resolve()}")

# Quick preview of first 5 rows
for row in all_rows[:5]:
    print(row)

Requesting up to 500 persons. cursor=None
Processed page 1. persons in page: 500. kept so far: 443. skipped (missing email/phone) so far: 57. next_cursor: eyJmaWVsZCI6ImlkIiwiZmllbGRWYWx1ZSI6NTQ5LCJzb3J0RGlyZWN0aW9uIjoiYXNjIiwiaWQiOjU0OX0
Requesting up to 500 persons. cursor=eyJmaWVsZCI6ImlkIiwiZmllbGRWYWx1ZSI6NTQ5LCJzb3J0RGlyZWN0aW9uIjoiYXNjIiwiaWQiOjU0OX0
Processed page 2. persons in page: 500. kept so far: 883. skipped (missing email/phone) so far: 117. next_cursor: eyJmaWVsZCI6ImlkIiwiZmllbGRWYWx1ZSI6MTA1MCwic29ydERpcmVjdGlvbiI6ImFzYyIsImlkIjoxMDUwfQ
Requesting up to 500 persons. cursor=eyJmaWVsZCI6ImlkIiwiZmllbGRWYWx1ZSI6MTA1MCwic29ydERpcmVjdGlvbiI6ImFzYyIsImlkIjoxMDUwfQ
Processed page 3. persons in page: 500. kept so far: 1334. skipped (missing email/phone) so far: 166. next_cursor: eyJmaWVsZCI6ImlkIiwiZmllbGRWYWx1ZSI6MTU1MCwic29ydERpcmVjdGlvbiI6ImFzYyIsImlkIjoxNTUwfQ
Requesting up to 500 persons. cursor=eyJmaWVsZCI6ImlkIiwiZmllbGRWYWx1ZSI6MTU1MCwic29ydERpcmVjdGlvbiI6ImFzYyIsImlk