In [None]:
!pip install dnspython pandas
!pip install openpyxl
!pip install pymongo

Collecting dnspython
  Downloading dnspython-2.7.0-py3-none-any.whl.metadata (5.8 kB)
Downloading dnspython-2.7.0-py3-none-any.whl (313 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m313.6/313.6 kB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dnspython
Successfully installed dnspython-2.7.0
Collecting pymongo
  Downloading pymongo-4.12.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Downloading pymongo-4.12.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m21.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pymongo
Successfully installed pymongo-4.12.0


In [None]:
# === Google Colab Setup ===
from google.colab import files, drive
import pandas as pd
import json
import os
import re
import math
import time
import string
import random
import smtplib
import dns.resolver
from difflib import SequenceMatcher
from pymongo import MongoClient
from tqdm import tqdm
from vertexai.generative_models import GenerativeModel

# === Environment Setup ===
# Prompt user to upload the Vertex AI credentials JSON
print("Please upload your Vertex AI credentials JSON file:")
uploaded = files.upload()

# Get the uploaded filename (should be only one)
credentials_file = next(iter(uploaded))

# Set the environment variable for Google Vertex AI
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_file

# Initialize Vertex AI model
model = GenerativeModel("gemini-1.5-pro")

drive.mount('/content/drive', force_remount=True)
RESULTS_DIR = '/content/drive/MyDrive/email_generation_and_validation'
RESULTS_FILE = os.path.join(RESULTS_DIR, "participant_data_with_valid_email.json")
os.makedirs(RESULTS_DIR, exist_ok=True)

# === Utility Functions ===
def load_json(input_file):
    with open(input_file, 'r', encoding="utf-8") as f:
        return json.load(f)

def save_json(data, output_file):
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=4, ensure_ascii=False)

def preprocess_name(name):
    if not isinstance(name, str):
        return ''
    clean_name = re.sub(r'[^\w\u4e00-\u9fff\u3040-\u30ff\u31f0-\u31ff]', '', name.lower())
    clean_name = re.sub(r'(股份有限公司|有限公司|公司|company|inc|pvt)', '', clean_name)
    return clean_name.strip()

def clean_domain(domain):
    domain = re.sub(r'https?://(www\.)?', '', domain)
    domain = re.sub(r'(org|com|hk|team|group|company|inc|pvt|limited|ltd)', '', domain)
    domain = re.sub(r'[^a-zA-Z0-9]', '', domain)
    return domain

def calculate_similarity(a, b):
    return SequenceMatcher(None, a, b).ratio()

def add_domain(participant, df):
    company = participant['company_clean']
    matched = []
    for _, row in df.iterrows():
        company_clean = row["Company"]
        english_clean = row["Company_English_Name"]
        if len(company) <= 4:
            if company in company_clean or company in english_clean:
                if company in row["Domain"]:
                    matched.append(row["Domain"])
        else:
            if company in company_clean or company in english_clean or \
               any(calculate_similarity(company, x) > 0.8 for x in [company_clean, english_clean]):
                matched.append(row["Domain"])
    participant['possible_domain'] = list(set(matched))
    return participant

def generate_prompt(participant, customer_dict_list):
    prompt = f"""
    You are excellent at guessing people's email addresses.
    Given the participant info and domain list, generate at least 10 email guesses.
    Past employee data:
    {[c for c in customer_dict_list if c.get('Domain') in participant['possible_domain']]}

    Input: {participant}
    Output (valid JSON): {{"possible_email": ["email1@domain.com", "email2@domain.com", ...]}}
    """
    return prompt

def generate_possible_emails(participant, model, customer_dict_list):
    retries, attempt = 3, 0
    while attempt < retries:
        try:
            response = model.generate_content(generate_prompt(participant, customer_dict_list)).text
            cleaned = response.replace('```json', '').replace('```', '').replace('\\n', '').replace('\n', '').strip()
            participant['possible_email'] = json.loads(cleaned)['possible_email']
            return participant
        except Exception as e:
            print(f"Error for {participant['name']} (Try {attempt + 1}): {e}")
            attempt += 1
            time.sleep(2)
    participant['possible_email'] = []
    return participant

def is_valid_syntax(email):
    return re.match(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$", email)

def check_mx_records(domain):
    for _ in range(3):
        try:
            mx = dns.resolver.resolve(domain, 'MX')
            return any(mx)
        except Exception:
            time.sleep(2)
    return False

def verify_smtp(email, error_log):
    try:
        domain = email.split('@')[1]
        mx_host = dns.resolver.resolve(domain, 'MX')[0].exchange.to_text().strip()
        with smtplib.SMTP(mx_host, 25, timeout=10) as server:
            server.ehlo()
            server.mail('verify@example.com')
            code, _ = server.rcpt(email)
            return code == 250
    except Exception as e:
        error_log.setdefault(str(e), set()).add(domain)
        return False

def get_email_domain(email):
    return email.split('@')[-1]

def generate_random_email(domain):
    return ''.join(random.choices(string.ascii_letters + string.digits, k=10)) + '@' + domain

def get_mx_host(domain):
    try:
        mx = dns.resolver.resolve(domain, 'MX')
        return mx[0].exchange.to_text().strip()
    except:
        return None

def is_catch_all_domain(domain):
    host = get_mx_host(domain)
    if not host:
        return False
    test_email = generate_random_email(domain)
    for _ in range(3):
        try:
            with smtplib.SMTP(host, 25, timeout=10) as server:
                server.helo()
                server.mail("test@example.com")
                code, _ = server.rcpt(test_email)
                return code == 250
        except:
            time.sleep(2)
    return False

def connect_to_mongodb():
    client = MongoClient("mongodb://mila:Admazetest123@mongodb-prd.admazes.marketing:27017/dev_db_mila")
    return client.dev_db_mila.participant_info

def delete_existing_collection(collection):
    collection.drop()
    print("Old MongoDB collection dropped.")

def upload_to_mongodb(collection, json_file):
    with open(json_file, "r", encoding="utf-8") as file:
        data = json.load(file)
    if isinstance(data, list):
        collection.insert_many(data)
    else:
        collection.insert_one(data)
    print("Data uploaded to MongoDB.")

# === Main Execution ===
def main():
    # Upload input files
    print("Upload participant_data.json:")
    participant_file = files.upload()
    participant_data = load_json(next(iter(participant_file)))

    print("Upload customer list CSV:")
    customer_file = files.upload()
    customer_list = pd.read_csv(next(iter(customer_file)))

    # Preprocess participant and customer data
    filtered_participants = [
        {
            'name': p['name'],
            'company': p['company'],
            'company_clean': preprocess_name(p['company'])
        }
        for p in participant_data
        if not (
            # Exclude entries where 'name' or 'company' is NaN
            (isinstance(p.get('name'), float) and math.isnan(p.get('name'))) or
            (isinstance(p.get('company'), float) and math.isnan(p.get('company'))) or

            # Exclude entries where 'name' is in the list of invalid values
            p.get('name') in ['N/A', '-', '/', 'NA'] or

            # Exclude entries where 'company' is in the list of invalid values
            p.get('company') in ['N/A', '-', '/', 'NA', 'HK', 'Hong Kong', 'China'] or

            # Exclude entries where the length of 'company' is 2 or less
            len(p.get('company', '')) <= 2
        )
    ]
    df_customer = customer_list[['Company', 'Domain']]
    df_customer['Company_English_Name'] = df_customer['Domain'].apply(clean_domain)
    df_customer['Company'] = df_customer['Company'].apply(preprocess_name)
    customer_dict_list = customer_list[['First Name', 'Last Name', 'Email', 'Domain']].to_dict(orient='records')

    # Match domains and generate possible emails
    enriched = [add_domain(p, df_customer) for p in filtered_participants]
    enriched = [p for p in enriched if p.get('possible_domain')]
    for p in enriched:
        generate_possible_emails(p, model, customer_dict_list)

    # Email validation
    error_log = {}
    pbar = tqdm(enriched, desc="Verifying Emails")
    for entry in pbar:
        valid_emails = []
        for email in entry.get('possible_email', []):
            domain = get_email_domain(email)
            if is_valid_syntax(email) and check_mx_records(domain) and verify_smtp(email, error_log):
                valid_emails.append(email)
        entry['valid_email'] = list(set(valid_emails))

    # Catch-all detection
    all_domains = {get_email_domain(e) for p in enriched for e in p.get('valid_email', [])}
    catch_all_domains = {d for d in all_domains if is_catch_all_domain(d)}

    # Combine and mark catch-all
    for entry in enriched:
        entry['catch-all'] = any(get_email_domain(e) in catch_all_domains for e in entry.get('valid_email', []))

    save_json(enriched, RESULTS_FILE)
    print(f"Saved to {RESULTS_FILE}")

    # Upload to MongoDB
    col = connect_to_mongodb()
    delete_existing_collection(col)
    upload_to_mongodb(col, RESULTS_FILE)

# Run
main()
