### Get HIBP Data 

1. For everypol
2. For all breaches
3. For all bihar pols

In [1]:
import logging
import openpyxl
import os
import time
import json
from typing import Dict, Set
import requests
import pandas as pd
from pathlib import Path

In [2]:
with open("hibp_key", "r") as key_file:
    hibp_api_key = key_file.read().strip()

payload = {}
headers = {
    'hibp-api-key': hibp_api_key,
    'format': 'application/json',
    'timeout': '2.5',
    'HIBP': hibp_api_key,
    'user-agent': 'PythonScript'
}

In [3]:
# Rate limit
rate_limit_interval = 6
last_request_time = time.time()

In [4]:
class HIBPProcessor:
    def __init__(self, output_folder: str, headers: Dict, payload: Dict, rate_limit_interval: float):
        self.output_folder = Path(output_folder)
        self.headers = headers
        self.payload = payload
        self.rate_limit_interval = rate_limit_interval
        self.processed_emails: Set[str] = set()
        self.last_request_time = 0
        
        # Ensure output directory exists
        self.output_folder.mkdir(parents=True, exist_ok=True)
        
        # Set up logging
        self._setup_logging()
        
    def _setup_logging(self):
        """Configure logging with both file and console handlers."""
        log_format = '%(asctime)s - %(levelname)s - %(message)s'
        logging.basicConfig(
            level=logging.INFO,
            format=log_format,
            handlers=[
                logging.FileHandler(self.output_folder / 'processing.log'),
                logging.StreamHandler()
            ]
        )
    
    def _load_processed_emails(self) -> Set[str]:
        """Load previously processed emails from existing files and logs."""
        processed = set()
        
        # Ensure output folder exists
        if not self.output_folder.exists() or not self.output_folder.is_dir():
            return processed
            
        # Process only JSON files and extract email from filename
        try:
            for file_path in self.output_folder.glob('*.json'):
                if file_path.is_file():  # Extra check to ensure it's a file
                    email = file_path.stem  # Get filename without extension
                    if '@' in email:  # Basic validation that it's an email
                        processed.add(email)
        except Exception as e:
            print(f"Error processing JSON files: {e}")
            
        return processed
    
    def _wait_for_rate_limit(self):
        """Enforce rate limiting between requests."""
        now = time.time()
        elapsed_time = now - self.last_request_time
        if elapsed_time < self.rate_limit_interval:
            time.sleep(self.rate_limit_interval - elapsed_time)
        self.last_request_time = time.time()
    
    def _query_hibp_api(self, email: str) -> tuple[bool, dict]:
        """
        Query the HIBP API for a single email.
        Returns (success, response_data) tuple.
        """
        url = f"https://haveibeenpwned.com/api/v3/breachedaccount/{email}"
        response = None
        
        try:
            self._wait_for_rate_limit()
            response = requests.get(url, headers=self.headers, data=self.payload)
            
            if response.status_code == 404:
                return True, {}
                
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 1))
                logging.warning(f"Rate limit hit. Waiting {retry_after} seconds")
                time.sleep(retry_after + 2)
                return False, {}
                
            response.raise_for_status()
            return True, response.json()
            
        except requests.exceptions.RequestException as e:
            status_code = getattr(response, 'status_code', 'N/A')
            logging.error(f"Request failed: {e} (Status Code: {status_code})")
            return False, {}
        except json.JSONDecodeError as e:
            logging.error(f"Invalid JSON response: {e}")
            return False, {}
        except Exception as e:
            logging.error(f"Unexpected error: {e}")
            return False, {}
    
    def process_dataframe(self, df: pd.DataFrame) -> Dict:
        """
        Process all emails in the DataFrame.
        Returns statistics about the processing run.
        """
        if 'email' not in df.columns:
            raise ValueError("DataFrame must contain an 'email' column")
            
        # Load previously processed emails
        self.processed_emails = self._load_processed_emails()
        
        total_emails = len(df)
        stats = {
            'total': total_emails,
            'processed': 0,
            'skipped': 0,
            'errors': 0
        }
        
        logging.info(f"Starting to process {total_emails} emails")
        
        for index, row in df.iterrows():
            email = row['email']
            log_prefix = f"[{index + 1}/{total_emails}] {email}"
            
            if email in self.processed_emails:
                logging.info(f"{log_prefix} - Skipping (previously processed)")
                stats['skipped'] += 1
                continue
                
            success, response_data = self._query_hibp_api(email)
            
            if not success:
                stats['errors'] += 1
                continue
                
            # Save response data (even if empty for no breaches)
            output_path = self.output_folder / f"{email}.json"
            try:
                with output_path.open('w') as f:
                    json.dump(response_data, f, indent=2)
                logging.info(f"{log_prefix} - Successfully processed and saved")
                self.processed_emails.add(email)
                stats['processed'] += 1
            except IOError as e:
                logging.error(f"{log_prefix} - Failed to save results: {e}")
                stats['errors'] += 1
        
        logging.info(f"""
        Processing completed:
        - Total emails: {stats['total']}
        - Processed: {stats['processed']}
        - Skipped: {stats['skipped']}
        - Errors: {stats['errors']}
        """)
        
        return stats

In [5]:
def clean_email_column(df, column_name="email"):
    """
    Cleans the specified email column in a DataFrame by:
    1. Stripping whitespace, converting to lowercase, and removing commas.
    2. Dropping rows where the email contains only a single letter or symbol.
    3. Dropping rows where the email is NaN.
    4. Drop dupes
    5. Strip 1. and 2. etc. at the start of email (problem with India emails)
    6. Valid email

    Args:
        df (pd.DataFrame): The DataFrame to clean.
        column_name (str): The column to process (default: "email").

    Returns:
        pd.DataFrame: Cleaned DataFrame (modification done safely).
    """
    if column_name in df.columns:
        df = df.copy()
        df[column_name] = df[column_name].str.strip().str.lower().str.replace(",", "", regex=True).str.replace(" ", "")
        df = df[~df[column_name].str.match(r"^[A-Za-z,_-]$", na=False)]
        df = df.dropna(subset=[column_name])
        df[column_name] = df[column_name].str.replace(r'^\d+\.', '', regex=True)
        df = df.drop_duplicates(subset=[column_name], keep="first")
        
        email_regex = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'

        df = df[df[column_name].str.match(email_regex, na=False)]

    return df

### Singapore

In [6]:
sg_df = pd.read_csv("../data/sg/sg_mp.csv")
output_folder = "../data/sg_hibp/"
processor = HIBPProcessor(output_folder, headers, payload, 10)
sg_df = clean_email_column(sg_df)
stats = processor.process_dataframe(sg_df)
print(f"Processing complete. Processed {stats['processed']} emails with {stats['errors']} errors.")

2025-02-02 11:05:49,956 - INFO - Starting to process 86 emails
2025-02-02 11:05:49,959 - INFO - [1/86] seah_kian_peng@parl.gov.sg - Skipping (previously processed)
2025-02-02 11:05:49,960 - INFO - [2/86] chloe_tan@parl.gov.sg - Skipping (previously processed)
2025-02-02 11:05:49,961 - INFO - [3/86] lawrence_wong@pmo.gov.sg - Skipping (previously processed)
2025-02-02 11:05:49,964 - INFO - [4/86] lee_hsien_loong@pmo.gov.sg - Skipping (previously processed)
2025-02-02 11:05:49,966 - INFO - [5/86] gan_kim_yong@mti.gov.sg - Skipping (previously processed)
2025-02-02 11:05:49,967 - INFO - [6/86] heng_swee_keat@pmo.gov.sg - Skipping (previously processed)
2025-02-02 11:05:49,968 - INFO - [7/86] teo_chee_hean@pmo.gov.sg - Skipping (previously processed)
2025-02-02 11:05:49,969 - INFO - [8/86] ng_eng_hen@mindef.gov.sg - Skipping (previously processed)
2025-02-02 11:05:49,970 - INFO - [9/86] v.bala@mfa.gov.sg - Skipping (previously processed)
2025-02-02 11:05:49,971 - INFO - [11/86] grace_fu@ms

Processing complete. Processed 0 emails with 0 errors.


### Everypol

In [None]:
output_folder = "../data/everypol/everypol_hibp/"
processor = HIBPProcessor(output_folder, headers, payload, 10)
everypol_df = pd.read_csv("../data/everypol/everypol_unique_emails.csv")
everypol_df = clean_email_column(everypol_df)
stats = processor.process_dataframe(everypol_df)
print(f"Processing complete. Processed {stats['processed']} emails with {stats['errors']} errors.")

2025-02-02 11:05:50,180 - INFO - Starting to process 8512 emails
2025-02-02 11:05:50,181 - INFO - [1/8512] shivajirao@sansad.nic.in - Skipping (previously processed)
2025-02-02 11:05:50,182 - INFO - [2/8512] adhikari.deepak@sansad.nic.in - Skipping (previously processed)
2025-02-02 11:05:50,182 - INFO - [3/8512] sisiradhikari76@yahoo.com - Skipping (previously processed)
2025-02-02 11:05:50,182 - INFO - [4/8512] adhikari.suvendu@sansad.nic.in - Skipping (previously processed)
2025-02-02 11:05:50,183 - INFO - [5/8512] yogi.adityanath@sansad.nic.in - Skipping (previously processed)
2025-02-02 11:05:50,183 - INFO - [6/8512] av.adsul@sansad.nic.in - Skipping (previously processed)
2025-02-02 11:05:50,183 - INFO - [7/8512] officelka@gmail.com - Skipping (previously processed)
2025-02-02 11:05:50,184 - INFO - [8/8512] rajendra.agrawal51@gmail.com - Skipping (previously processed)
2025-02-02 11:05:50,184 - INFO - [9/8512] eahmed@hotmail.com - Skipping (previously processed)
2025-02-02 11:05:5

### Get all the breaches

In [None]:
breach_url = "https://haveibeenpwned.com/api/v3/breaches"

response = requests.get(breach_url, headers=headers)
breaches_data = response.json()

In [None]:
breaches_df = pd.DataFrame(breaches_data)
breaches_df.head()

In [None]:
breaches_df.to_csv("../data/hipb_01_2025_breaches_data.csv", index=False)

### Get data for Bihar

From: https://vidhansabha.bih.nic.in/KnowyourMLA%20in%20Hindi.html

In [None]:
with open('../data/india/bihar/bihar.txt', 'r', encoding='utf-8') as file:
    lines = file.read().splitlines()
    lines = [line.split('\t') for line in lines]

bihar_df = pd.DataFrame(lines, columns=['sr_no', 'photo', 'constituency', 'name', 'gender', 'party', 'contact', 'email'])

In [None]:
output_folder = "../data/india/bihar_hibp/"
processor = HIBPProcessor(output_folder, headers, payload, 10)
bihar_df = clean_email_column(bihar_df)
stats = processor.process_dataframe(bihar_df)
print(f"Processing complete. Processed {stats['processed']} emails with {stats['errors']} errors.")

### HP

In [None]:
hp_df = pd.read_csv("../data/india/hp_14.csv")
hp_df.columns = hp_df.columns.str.lower()

In [None]:
output_folder = "../data/india/hp_hibp/"
hp_df = clean_email_column(hp_df)
processor = HIBPProcessor(output_folder, headers, payload, 10)
stats = processor.process_dataframe(hp_df)
print(f"Processing complete. Processed {stats['processed']} emails with {stats['errors']} errors.")

### TN

From https://assembly.tn.gov.in/16thassembly/members.php

In [None]:
tn_df = pd.read_csv("../data/india/tn/tn.csv")
tn_df.rename(columns={"Email Address": "email"}, inplace=True)

In [None]:
output_folder = "../data/india/tn_hibp/"
processor = HIBPProcessor(output_folder, headers, payload, 10)
tn_df = clean_email_column(tn_df)
stats = processor.process_dataframe(tn_df)
print(f"Processing complete. Processed {stats['processed']} emails with {stats['errors']} errors.")

### UP

In [None]:
up_df = pd.read_csv("../data/india/up/up_18_mlas.csv", usecols=['email'])

In [None]:
output_folder = "../data/india/up_hibp/"
up_df = clean_email_column(up_df)
processor = HIBPProcessor(output_folder, headers, payload, 10)
stats = processor.process_dataframe(up_df)
print(f"Processing complete. Processed {stats['processed']} emails with {stats['errors']} errors.")

### Delhi

In [None]:
del_df = pd.read_csv("../data/india/delhi/delhi_7th_assembly.csv", usecols=['Email'])
del_df.rename(columns={"Email": "email"}, inplace=True)
del_df = clean_email_column(del_df)

In [None]:
output_folder = "../data/india/delhi_hibp/"
processor = HIBPProcessor(output_folder, headers, payload, 10)
stats = processor.process_dataframe(del_df)
print(f"Processing complete. Processed {stats['processed']} emails with {stats['errors']} errors.")

### Norway

In [None]:
no_parl = pd.read_csv("../data/no/no_parliament.csv")
output_folder = "../data/no_hibp/"
no_parl = clean_email_column(no_parl)
processor = HIBPProcessor(output_folder, headers, payload, 10)
stats = processor.process_dataframe(no_parl)
print(f"Processing complete. Processed {stats['processed']} emails with {stats['errors']} errors.")

### Denmark

In [None]:
wb = openpyxl.load_workbook("../data/danish_parliament_1_2025.xlsx")
ws = wb.active  
data = list(ws.values)
dk_parl = pd.DataFrame(data)
dk_parl.columns = dk_parl.iloc[0]
dk_parl = dk_parl[1:].reset_index(drop=True)

In [None]:
output_folder = "../data/dk_hibp/"
dk_parl.rename(columns={"Email": "email"}, inplace=True)
dk_parl = clean_email_column(dk_parl)
processor = HIBPProcessor(output_folder, headers, payload, 10)
stats = processor.process_dataframe(dk_parl)
print(f"Processing complete. Processed {stats['processed']} emails with {stats['errors']} errors.")

### India LS

In [None]:
in_parl = pd.read_csv("../data/india/ls_long.csv")
in_parl.rename(columns={"email": "old_email"}, inplace=True)
in_parl.rename(columns={"email_fix": "email"}, inplace=True)

output_folder = "../data/india/ls_hibp/"

in_parl = clean_email_column(in_parl)
processor = HIBPProcessor(output_folder, headers, payload, 10)
stats = processor.process_dataframe(in_parl)
print(f"Processing complete. Processed {stats['processed']} emails with {stats['errors']} errors.")