# PST to Deduplicated Contacts Excel (Windows, Python 3.10+)

## How to use
1. In the **Configuration** cell, set `PST_PATH` to your local `.pst` file (required).
2. Optionally set `OUTPUT_XLSX_PATH` (default: `./pst_contacts_output.xlsx`).
3. Choose parser mode:
   - Preferred: `PARSER_MODE = "libpff"` (does not require Outlook installed, depends on `pypff`).
   - Fallback: `PARSER_MODE = "com"` (Windows Outlook COM; requires Outlook + `pywin32`).
4. Run all cells top-to-bottom.
5. Output Excel is written to the configured path with exact schema and deduplicated rows.

## Notes
- Processing is streaming/incremental and uses a temporary SQLite DB in the working folder for scale (~5GB PST).
- Email folders processed (minimum): Inbox, Sent Items, Deleted Items.
- Contacts are also scanned; dates for contacts are linked to latest email date for matching email when available.


In [None]:
%pip install --quiet pandas openpyxl tqdm pywin32

In [None]:
# Optional dependency for Outlook-free PST parsing (preferred mode).
# We attempt common package names used for Python bindings to libpff.
# If none install, set PARSER_MODE='com' to use Outlook COM fallback.
import subprocess, sys

candidates = ['pypff-python', 'libpff-python']
installed = False
for pkg in candidates:
    try:
        print(f'Trying optional install: {pkg}')
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', '--quiet', pkg])
        installed = True
        print(f'Installed optional package: {pkg}')
        break
    except Exception:
        pass

if not installed:
    print("Optional libpff binding not installed. Use PARSER_MODE='com' if import fails.")


In [None]:
from __future__ import annotations

import os
import re
import sqlite3
from dataclasses import dataclass
from datetime import datetime
from email.utils import getaddresses
from pathlib import Path
from typing import Dict, Generator, Iterable, List, Optional

import pandas as pd
from tqdm.auto import tqdm

# =========================
# Configuration
# =========================
PST_PATH = r"C:\\path\\to\\your\\file.pst"
OUTPUT_XLSX_PATH = r"./pst_contacts_output.xlsx"
TEMP_SQLITE_PATH = r"./pst_contacts_work.db"

# Preferred: 'libpff' (no Outlook required). Fallback: 'com' (Outlook COM).
PARSER_MODE = "libpff"  # change to "com" if needed

# Email folders to process at minimum
TARGET_EMAIL_FOLDERS = {"inbox", "sent items", "deleted items"}

# Commit cadence for sqlite
SQLITE_COMMIT_EVERY = 1000

assert PST_PATH and PST_PATH.lower().endswith(".pst"), "Set PST_PATH to a valid .pst path"
print(f"Configured PST: {PST_PATH}")
print(f"Output Excel: {OUTPUT_XLSX_PATH}")
print(f"SQLite working DB: {TEMP_SQLITE_PATH}")


In [None]:
@dataclass
class IdentityRecord:
    name: str = ""
    email: str = ""
    mobile: str = ""
    org: str = ""
    date_iso: str = ""  # YYYY-MM-DD
    date_ddmmyy: str = ""  # DDMMYY
    source: str = ""  # 'email' or 'contact'


def norm_email(v: Optional[str]) -> str:
    if not v:
        return ""
    v = v.strip().lower()
    v = re.sub(r"^mailto:", "", v)
    return v


def norm_name(v: Optional[str]) -> str:
    if not v:
        return ""
    return re.sub(r"\s+", " ", v).strip().lower()


def norm_mobile(v: Optional[str]) -> str:
    if not v:
        return ""
    return re.sub(r"[^0-9+]", "", v)


def parse_dt(obj) -> Optional[datetime]:
    if obj is None:
        return None
    if isinstance(obj, datetime):
        return obj
    if isinstance(obj, str):
        txt = obj.strip()
        for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%d/%m/%Y %H:%M:%S", "%m/%d/%Y %H:%M:%S"):
            try:
                return datetime.strptime(txt, fmt)
            except ValueError:
                continue
        try:
            return datetime.fromisoformat(txt)
        except ValueError:
            return None
    return None


def dt_to_iso_and_ddmmyy(dt: Optional[datetime]) -> tuple[str, str]:
    if not dt:
        return "", ""
    return dt.strftime("%Y-%m-%d"), dt.strftime("%d%m%y")


def better(a: str, b: str) -> str:
    # Prefer non-empty existing value; otherwise use new
    return a if (a or "").strip() else (b or "").strip()


def best_display_name(*parts: str) -> str:
    for p in parts:
        if p and str(p).strip():
            return str(p).strip()
    return ""


In [None]:
class SQLiteDeduper:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("PRAGMA journal_mode=WAL;")
        self.conn.execute("PRAGMA synchronous=NORMAL;")
        self.conn.execute("PRAGMA temp_store=MEMORY;")
        self._init_schema()
        self.insert_count = 0
        self.merge_count = 0
        self.identity_input_count = 0
        self._unique_counter = 0

    def _init_schema(self):
        self.conn.executescript("""
        CREATE TABLE IF NOT EXISTS identities (
            dedupe_key TEXT PRIMARY KEY,
            name TEXT,
            email TEXT,
            mobile TEXT,
            org TEXT,
            date_iso TEXT,
            date_ddmmyy TEXT,
            source TEXT
        );

        CREATE TABLE IF NOT EXISTS email_latest (
            email_norm TEXT PRIMARY KEY,
            date_iso TEXT,
            date_ddmmyy TEXT
        );
        """)
        self.conn.commit()

    def _dedupe_key(self, rec: IdentityRecord) -> str:
        e = norm_email(rec.email)
        if e:
            return f"email::{e}"
        n = norm_name(rec.name)
        m = norm_mobile(rec.mobile)
        if n and m:
            return f"nm::{n}::{m}"
        self._unique_counter += 1
        return f"unique::{self._unique_counter}"

    def _max_date(self, old_iso: str, old_dd: str, new_iso: str, new_dd: str) -> tuple[str, str]:
        if not old_iso:
            return new_iso, new_dd
        if not new_iso:
            return old_iso, old_dd
        return (new_iso, new_dd) if new_iso > old_iso else (old_iso, old_dd)

    def upsert_identity(self, rec: IdentityRecord):
        self.identity_input_count += 1
        key = self._dedupe_key(rec)
        cur = self.conn.execute("SELECT name,email,mobile,org,date_iso,date_ddmmyy,source FROM identities WHERE dedupe_key=?", (key,))
        row = cur.fetchone()
        if row is None:
            self.conn.execute("""
                INSERT INTO identities (dedupe_key,name,email,mobile,org,date_iso,date_ddmmyy,source)
                VALUES (?,?,?,?,?,?,?,?)
            """, (key, rec.name, norm_email(rec.email), rec.mobile, rec.org, rec.date_iso, rec.date_ddmmyy, rec.source))
            self.insert_count += 1
        else:
            old_name, old_email, old_mobile, old_org, old_iso, old_dd, old_source = row
            date_iso, date_dd = self._max_date(old_iso or "", old_dd or "", rec.date_iso, rec.date_ddmmyy)
            self.conn.execute("""
                UPDATE identities
                SET name=?, email=?, mobile=?, org=?, date_iso=?, date_ddmmyy=?, source=?
                WHERE dedupe_key=?
            """, (
                better(old_name or "", rec.name),
                better(old_email or "", norm_email(rec.email)),
                better(old_mobile or "", rec.mobile),
                better(old_org or "", rec.org),
                date_iso,
                date_dd,
                better(old_source or "", rec.source),
                key
            ))
            self.merge_count += 1

        e = norm_email(rec.email)
        if e and rec.date_iso:
            cur = self.conn.execute("SELECT date_iso,date_ddmmyy FROM email_latest WHERE email_norm=?", (e,))
            old = cur.fetchone()
            if not old:
                self.conn.execute("INSERT INTO email_latest (email_norm,date_iso,date_ddmmyy) VALUES (?,?,?)", (e, rec.date_iso, rec.date_ddmmyy))
            else:
                old_iso, old_dd = old
                m_iso, m_dd = self._max_date(old_iso or "", old_dd or "", rec.date_iso, rec.date_ddmmyy)
                self.conn.execute("UPDATE email_latest SET date_iso=?,date_ddmmyy=? WHERE email_norm=?", (m_iso, m_dd, e))

        if self.identity_input_count % SQLITE_COMMIT_EVERY == 0:
            self.conn.commit()

    def latest_email_date(self, email: str) -> tuple[str, str]:
        e = norm_email(email)
        if not e:
            return "", ""
        cur = self.conn.execute("SELECT date_iso,date_ddmmyy FROM email_latest WHERE email_norm=?", (e,))
        row = cur.fetchone()
        return (row[0] or "", row[1] or "") if row else ("", "")

    def finalize(self):
        self.conn.commit()

    def to_dataframe(self) -> pd.DataFrame:
        df = pd.read_sql_query("SELECT name,email,mobile,org,date_ddmmyy FROM identities", self.conn)
        df = df.rename(columns={
            "name": "Name of the person",
            "email": "Email",
            "mobile": "Mobile number",
            "org": "Name of the organization",
            "date_ddmmyy": "Date of the mail from this data is extracted (DDMMYY format)",
        })
        cols = [
            "Name of the person",
            "Email",
            "Mobile number",
            "Name of the organization",
            "Date of the mail from this data is extracted (DDMMYY format)",
        ]
        for c in cols:
            if c not in df.columns:
                df[c] = ""
        return df[cols].fillna("")

    def close(self):
        self.conn.commit()
        self.conn.close()


In [None]:
def extract_identities_from_header_list(header_text: str) -> List[tuple[str, str]]:
    results: List[tuple[str, str]] = []
    if not header_text:
        return results
    try:
        parsed = getaddresses([header_text])
        for name, email in parsed:
            n = (name or "").strip()
            e = norm_email(email)
            if n or e:
                results.append((n, e))
    except Exception:
        pass
    return results


def choose_contact_mobile(mobile: str, business: str, home: str) -> str:
    # Preference: mobile > business > home
    for v in (mobile, business, home):
        if v and str(v).strip():
            return str(v).strip()
    return ""


def parse_transport_headers_for_recipients(headers: str) -> Dict[str, str]:
    fields = {"to": "", "cc": "", "bcc": ""}
    if not headers:
        return fields
    for line in headers.splitlines():
        lower = line.lower()
        if lower.startswith("to:"):
            fields["to"] += line[3:].strip() + " "
        elif lower.startswith("cc:"):
            fields["cc"] += line[3:].strip() + " "
        elif lower.startswith("bcc:"):
            fields["bcc"] += line[4:].strip() + " "
    return {k: v.strip() for k, v in fields.items()}


In [None]:
class PSTReaderBase:
    def iter_emails(self) -> Generator[dict, None, None]:
        raise NotImplementedError

    def iter_contacts(self) -> Generator[dict, None, None]:
        raise NotImplementedError

    def estimate_totals(self) -> Dict[str, int]:
        # Returns estimated totals for progress bars. 0 means unknown/unavailable.
        return {"email_items": 0, "contact_items": 0, "email_folders": 0, "contact_folders": 0}


class LibPffReader(PSTReaderBase):
    def __init__(self, pst_path: str):
        import pypff  # type: ignore

        self.pypff = pypff
        self.file = pypff.file()
        self.file.open(pst_path)
        self.root = self.file.get_root_folder()

    def _walk_folders(self, folder):
        yield folder
        for i in range(folder.number_of_sub_folders):
            try:
                sub = folder.get_sub_folder(i)
                if sub:
                    yield from self._walk_folders(sub)
            except Exception:
                continue

    def estimate_totals(self) -> Dict[str, int]:
        totals = {"email_items": 0, "contact_items": 0, "email_folders": 0, "contact_folders": 0}
        for folder in self._walk_folders(self.root):
            fname = (getattr(folder, "name", "") or "").strip().lower()
            msg_count = int(getattr(folder, "number_of_sub_messages", 0) or 0)
            if fname in TARGET_EMAIL_FOLDERS:
                totals["email_folders"] += 1
                totals["email_items"] += msg_count
            if fname in {"contacts", "address book", "people"}:
                totals["contact_folders"] += 1
                totals["contact_items"] += msg_count
        return totals

    def iter_emails(self) -> Generator[dict, None, None]:
        for folder in self._walk_folders(self.root):
            fname = (getattr(folder, "name", "") or "").strip().lower()
            if fname not in TARGET_EMAIL_FOLDERS:
                continue
            for i in range(getattr(folder, "number_of_sub_messages", 0)):
                try:
                    m = folder.get_sub_message(i)
                    if not m:
                        continue
                    sent_dt = parse_dt(getattr(m, "client_submit_time", None))
                    recv_dt = parse_dt(getattr(m, "delivery_time", None))
                    dt = sent_dt or recv_dt
                    yield {
                        "from_name": getattr(m, "sender_name", "") or "",
                        "from_email": getattr(m, "sender_email_address", "") or "",
                        "to": getattr(m, "display_to", "") or "",
                        "cc": getattr(m, "display_cc", "") or "",
                        "bcc": getattr(m, "display_bcc", "") or "",
                        "headers": getattr(m, "transport_headers", "") or "",
                        "date": dt,
                    }
                except Exception:
                    continue

    def iter_contacts(self) -> Generator[dict, None, None]:
        for folder in self._walk_folders(self.root):
            fname = (getattr(folder, "name", "") or "").strip().lower()
            likely_contacts = fname in {"contacts", "address book", "people"}
            if not likely_contacts:
                continue
            for i in range(getattr(folder, "number_of_sub_messages", 0)):
                try:
                    c = folder.get_sub_message(i)
                    if not c:
                        continue
                    emails = [
                        getattr(c, "email_address_1", "") or "",
                        getattr(c, "email_address_2", "") or "",
                        getattr(c, "email_address_3", "") or "",
                    ]
                    yield {
                        "name": best_display_name(
                            getattr(c, "display_name", ""),
                            getattr(c, "subject", ""),
                            getattr(c, "given_name", ""),
                        ),
                        "emails": [e for e in emails if str(e).strip()],
                        "mobile": getattr(c, "mobile_telephone_number", "") or "",
                        "business": getattr(c, "business_telephone_number", "") or "",
                        "home": getattr(c, "home_telephone_number", "") or "",
                        "org": getattr(c, "company_name", "") or "",
                    }
                except Exception:
                    continue


class OutlookCOMReader(PSTReaderBase):
    def __init__(self, pst_path: str):
        import win32com.client  # type: ignore

        self.outlook = win32com.client.Dispatch("Outlook.Application").GetNamespace("MAPI")
        self.outlook.AddStore(str(Path(pst_path).resolve()))
        self.store = self.outlook.Stores.Item(self.outlook.Stores.Count)
        self.root = self.store.GetRootFolder()

    def _walk_folders(self, folder):
        yield folder
        for i in range(1, folder.Folders.Count + 1):
            try:
                yield from self._walk_folders(folder.Folders.Item(i))
            except Exception:
                continue

    def estimate_totals(self) -> Dict[str, int]:
        totals = {"email_items": 0, "contact_items": 0, "email_folders": 0, "contact_folders": 0}
        for folder in self._walk_folders(self.root):
            fname = (getattr(folder, "Name", "") or "").strip().lower()
            try:
                msg_count = int(getattr(folder.Items, "Count", 0) or 0)
            except Exception:
                msg_count = 0
            if fname in TARGET_EMAIL_FOLDERS:
                totals["email_folders"] += 1
                totals["email_items"] += msg_count
            if fname in {"contacts", "address book", "people"}:
                totals["contact_folders"] += 1
                totals["contact_items"] += msg_count
        return totals

    def iter_emails(self) -> Generator[dict, None, None]:
        for folder in self._walk_folders(self.root):
            fname = (getattr(folder, "Name", "") or "").strip().lower()
            if fname not in TARGET_EMAIL_FOLDERS:
                continue
            try:
                items = folder.Items
                for item in items:
                    try:
                        if int(getattr(item, "Class", 0)) != 43:  # olMail
                            continue
                        sent_dt = parse_dt(getattr(item, "SentOn", None))
                        recv_dt = parse_dt(getattr(item, "ReceivedTime", None))
                        dt = sent_dt or recv_dt
                        yield {
                            "from_name": getattr(item, "SenderName", "") or "",
                            "from_email": getattr(item, "SenderEmailAddress", "") or "",
                            "to": getattr(item, "To", "") or "",
                            "cc": getattr(item, "CC", "") or "",
                            "bcc": getattr(item, "BCC", "") or "",
                            "headers": "",
                            "date": dt,
                        }
                    except Exception:
                        continue
            except Exception:
                continue

    def iter_contacts(self) -> Generator[dict, None, None]:
        for folder in self._walk_folders(self.root):
            fname = (getattr(folder, "Name", "") or "").strip().lower()
            if fname not in {"contacts", "address book", "people"}:
                continue
            try:
                items = folder.Items
                for item in items:
                    try:
                        if int(getattr(item, "Class", 0)) != 40:  # olContact
                            continue
                        emails = [
                            getattr(item, "Email1Address", "") or "",
                            getattr(item, "Email2Address", "") or "",
                            getattr(item, "Email3Address", "") or "",
                        ]
                        yield {
                            "name": best_display_name(
                                getattr(item, "FullName", ""),
                                getattr(item, "FileAs", ""),
                                getattr(item, "CompanyName", ""),
                            ),
                            "emails": [e for e in emails if str(e).strip()],
                            "mobile": getattr(item, "MobileTelephoneNumber", "") or "",
                            "business": getattr(item, "BusinessTelephoneNumber", "") or "",
                            "home": getattr(item, "HomeTelephoneNumber", "") or "",
                            "org": getattr(item, "CompanyName", "") or "",
                        }
                    except Exception:
                        continue
            except Exception:
                continue



In [None]:
def get_reader(mode: str, pst_path: str) -> PSTReaderBase:
    mode = (mode or "").strip().lower()
    if mode == "libpff":
        try:
            return LibPffReader(pst_path)
        except Exception as e:
            raise RuntimeError(
                "Failed to initialize libpff parser. Install pypff-python or switch PARSER_MODE='com'."
            ) from e
    if mode == "com":
        return OutlookCOMReader(pst_path)
    raise ValueError("PARSER_MODE must be either 'libpff' or 'com'")


def process_pst_to_excel(pst_path: str, output_xlsx: str, sqlite_path: str, parser_mode: str = "libpff"):
    if not os.path.exists(pst_path):
        raise FileNotFoundError(f"PST not found: {pst_path}")

    if os.path.exists(sqlite_path):
        os.remove(sqlite_path)

    deduper = SQLiteDeduper(sqlite_path)
    reader = get_reader(parser_mode, pst_path)

    estimated = reader.estimate_totals()
    print(
        "Estimated scope -> "
        f"email folders: {estimated.get('email_folders', 0)}, "
        f"email items: {estimated.get('email_items', 0)}, "
        f"contact folders: {estimated.get('contact_folders', 0)}, "
        f"contact items: {estimated.get('contact_items', 0)}"
    )

    stats = {
        "emails_scanned": 0,
        "contacts_scanned": 0,
        "identities_extracted": 0,
    }

    # Pass 1: emails
    email_total = estimated.get("email_items", 0) or None
    pbar_e = tqdm(total=email_total, desc="Scanning email items", unit="msg")
    for em in reader.iter_emails():
        stats["emails_scanned"] += 1
        pbar_e.update(1)
        if stats["emails_scanned"] % 5000 == 0:
            pbar_e.set_postfix({"processed": stats["emails_scanned"], "identities": stats["identities_extracted"]})
        try:
            msg_dt = parse_dt(em.get("date"))
            date_iso, date_dd = dt_to_iso_and_ddmmyy(msg_dt)

            # From
            f_name = (em.get("from_name") or "").strip()
            f_email = norm_email(em.get("from_email") or "")
            if f_name or f_email:
                deduper.upsert_identity(IdentityRecord(
                    name=f_name,
                    email=f_email,
                    mobile="",
                    org="",
                    date_iso=date_iso,
                    date_ddmmyy=date_dd,
                    source="email",
                ))
                stats["identities_extracted"] += 1

            header_fields = parse_transport_headers_for_recipients(em.get("headers", ""))
            to_text = "; ".join([em.get("to", ""), header_fields.get("to", "")]).strip()
            cc_text = "; ".join([em.get("cc", ""), header_fields.get("cc", "")]).strip()
            bcc_text = "; ".join([em.get("bcc", ""), header_fields.get("bcc", "")]).strip()

            for field in (to_text, cc_text, bcc_text):
                for n, e in extract_identities_from_header_list(field):
                    deduper.upsert_identity(IdentityRecord(
                        name=n,
                        email=e,
                        mobile="",
                        org="",
                        date_iso=date_iso,
                        date_ddmmyy=date_dd,
                        source="email",
                    ))
                    stats["identities_extracted"] += 1
        except Exception:
            continue
    pbar_e.close()
    deduper.finalize()

    # Pass 2: contacts
    contact_total = estimated.get("contact_items", 0) or None
    pbar_c = tqdm(total=contact_total, desc="Scanning contact items", unit="contact")
    for c in reader.iter_contacts():
        stats["contacts_scanned"] += 1
        pbar_c.update(1)
        if stats["contacts_scanned"] % 1000 == 0:
            pbar_c.set_postfix({"processed": stats["contacts_scanned"], "identities": stats["identities_extracted"]})
        try:
            name = (c.get("name") or "").strip()
            org = (c.get("org") or "").strip()
            mobile = choose_contact_mobile(c.get("mobile", ""), c.get("business", ""), c.get("home", ""))

            emails = [norm_email(x) for x in c.get("emails", []) if norm_email(x)]

            if emails:
                for e in emails:
                    date_iso, date_dd = deduper.latest_email_date(e)
                    deduper.upsert_identity(IdentityRecord(
                        name=name,
                        email=e,
                        mobile=mobile,
                        org=org,
                        date_iso=date_iso,
                        date_ddmmyy=date_dd,
                        source="contact",
                    ))
                    stats["identities_extracted"] += 1
            else:
                deduper.upsert_identity(IdentityRecord(
                    name=name,
                    email="",
                    mobile=mobile,
                    org=org,
                    date_iso="",
                    date_ddmmyy="",
                    source="contact",
                ))
                stats["identities_extracted"] += 1
        except Exception:
            continue

    pbar_c.close()
    deduper.finalize()

    df = deduper.to_dataframe()
    df.to_excel(output_xlsx, index=False, engine="openpyxl")

    summary = {
        "estimated_email_items": estimated.get("email_items", 0),
        "estimated_contact_items": estimated.get("contact_items", 0),
        "emails_scanned": stats["emails_scanned"],
        "contacts_scanned": stats["contacts_scanned"],
        "identities_extracted": stats["identities_extracted"],
        "duplicates_merged": deduper.merge_count,
        "final_rows": len(df),
        "sqlite_db": sqlite_path,
        "output_xlsx": output_xlsx,
    }

    deduper.close()
    return summary



In [None]:
summary = process_pst_to_excel(
    pst_path=PST_PATH,
    output_xlsx=OUTPUT_XLSX_PATH,
    sqlite_path=TEMP_SQLITE_PATH,
    parser_mode=PARSER_MODE,
)
summary

## Mobile number fallback behavior (implemented)
For contact items, mobile number selection follows:
1. `Mobile`
2. if missing, `Business`
3. if missing, `Home`

This preserves the required preference while maximizing non-empty values in deduplicated output.
