In [1]:
# !pip3 install tqdm

In [5]:
import pymongo
import pandas as pd
from tqdm.notebook import tqdm
import sys

# --- 1. Configuration & Connection ---
MONGO_CLIENT_URL = "mongodb+srv://admin:xDLiWOZri8rdPAUK@test.m9gl5ri.mongodb.net/"

try:
    client = pymongo.MongoClient(MONGO_CLIENT_URL)
    
    # September Data (Archive)
    db_sep = client["leadcruit_archive"]
    col_sep = db_sep["people"]
    
    # October Data (Current)
    db_oct = client["leadcruit"]
    col_oct = db_oct["people"]
    
    print(f"Connected to MongoDB. Server info: {client.server_info()['version']}")
    
    sep_count = col_sep.count_documents({})
    oct_count = col_oct.count_documents({})
    print(f"September Collection: {sep_count} documents")
    print(f"October Collection: {oct_count} documents")
    
except pymongo.errors.ConnectionFailure as e:
    print(f"Could not connect to MongoDB: {e}")
    sys.exit() # Stop execution if we can't connect

# --- 2. Define Projection (Still important for speed/memory!) ---
PROJECTION = {
    "_id": 0  # Exclude the _id
}

# --- 3. Extract September Data ---
print("\nExtracting September data...")
try:
    # Find returns a cursor, we wrap it in tqdm
    cursor_sep = col_sep.find({}, PROJECTION)
    
    # Use tqdm to show progress as pandas loads the data
    df_sep = pd.DataFrame(
        tqdm(cursor_sep, total=sep_count, desc="Processing Sep data")
    )
    
    # Save to Parquet
    df_sep.to_parquet("data_september.parquet", index=False)
    print(f"Successfully saved {len(df_sep)} records to 'data_september.parquet'")

except Exception as e:
    print(f"Error extracting September data: {e}")

# --- 4. Extract October Data ---
print("\nExtracting October data...")
try:
    # Find returns a cursor, we wrap it in tqdm
    cursor_oct = col_oct.find({}, PROJECTION)
    
    # Use tqdm to show progress as pandas loads the data
    df_oct = pd.DataFrame(
        tqdm(cursor_oct, total=oct_count, desc="Processing Oct data")
    )
    
    # Save to Parquet
    df_oct.to_parquet("data_october.parquet", index=False)
    print(f"Successfully saved {len(df_oct)} records to 'data_october.parquet'")

except Exception as e:
    print(f"Error extracting October data: {e}")

print("\n--- Data extraction complete. ---")
print("You can now run the analysis script (Script 2).")

In [6]:
# # df_oct['member_public_profile_id'] = df_oct['member_public_profile_id'].astype("object")
# df_oct['member_public_profile_id'] = df_oct['member_public_profile_id'].astype(str)

# df_oct.to_parquet("data_october.parquet")

# ToDo:

1. Add a condition to check if they are working in multiple companies, where to date is Null.
2. Add a condition to check for date_from, it should be of current month and date_to should be null, only then leadership changed.
3. We have to add a condition about checking company id mentioned outside with company id inside member_experience. to check if he is still working in the same compny as previous month, 
    if yes and also started working somewhere else then, no leadership out in the previous company but the new compnay will get the new leadership joining.

In [9]:
import pandas as pd
import pymongo
from collections import defaultdict
import re
import datetime
from tqdm.notebook import tqdm
import json
import sys
import numpy as np

# Set pandas display option
pd.set_option("display.max_columns", 999)

# --- 1. Define Logic (Keywords & Functions) ---

LEADERSHIP_KEYWORDS = [
    # English r'\blead\b',
    r'\bmanager\b', r'\bdirector\b', r'\bhead of\b', r'\bvp\b', r'\bvice president\b',
    r'\bchief\b', r'\bceo\b', r'\bcfo\b', r'\bcto\b', r'\bcoo\b',  r'\bpartner\b', r'\bexecutive\b', r'\bfounder\b',
    r'\bhead\b', r'\bcto\b', r'\bchief technology officer\b', r'\bchief financial officer\b', 
    r'\bleader\b', r'\bpresident\b', r'\bchief executive officer\b', r'\bchief operating officer\b', r'\bowner\b',
    r'\bteam leader\b', r'\bchairman\b', r'\bcofounder\b',

    # --- French ---
    r'\bdirecteur\b', r'\bdirectrice\b', r'\bgérant\b', r'\bresponsable\b', r'\bchef de\b',
    r'\bpdg\b', r'\bdirecteur général\b', r'\bdirecteur financier\b', r'\bdaf\b', r'\bdirecteur technique\b',
    r'\bdirecteur des opérations\b', r'\bassocié\b', r'\bassociée\b', r'\bcadre\b', r'\bdirigeant\b',
    r'\bfondateur\b', r'\bfondatrice\b', r'\bchef\b', r'\bprésident\b', r'\bprésidente\b',
    r'\bpropriétaire\b', r"\bchef d'équipe\b", r'\bprésident du conseil\b', r'\bcofounder\b', r'\bcofondatrice\b',

    # --- Dutch ---
    r'\bbestuurder\b', r'\bhoofd\b', r'\bhoofd van\b', r'\bvicepresident\b',
    r'\balgemeen directeur\b', r'\bfinancieel directeur\b', r'\btechnisch directeur\b', r'\boperationeel directeur\b',
    r'\bleider\b', r'\bteamleider\b', r'\bpartner\b', r'\bvennoot\b', r'\bleidinggevende\b', r'\bbestuurslid\b',
    r'\boprichter\b', r'\beigenaar\b', r'\bploegleider\b', r'\bvoorzitter\b', r'\bmede-oprichter\b',

    # --- German ---
    r'\bLeiter\b', r'\bLeiterin\b', r'\bDirektor\b', r'\bDirektorin\b', r'\bGeschäftsführer\b', r'\bGeschäftsführerin\b',
    r'\bLeiter von\b', r'\bLeitung\b', r'\bVizepräsident\b', r'\bChef\b', r'\bVorstandsvorsitzender\b',
    r'\bFinanzvorstand\b', r'\bFinanzdirektor\b', r'\bTechnischer Leiter\b', r'\bTechnologievorstand\b',
    r'\bBetriebsleiter\b', r'\bTeamleiter\b', r'\bGesellschafter\b', r'\bFührungskraft\b',
    r'\bLeitender Angestellter\b', r'\bGründer\b', r'\bGründerin\b', r'\bPräsident\b', r'\bInhaber\b', r'\bInhaberin\b',
    r'\bGruppenleiter\b', r'\bVorsitzender\b', r'\bMitbegründer\b',
]
LEADERSHIP_PATTERN = re.compile('|'.join(LEADERSHIP_KEYWORDS), re.IGNORECASE)
LEADERSHIP_MGMT_LEVELS = {'manager', 'director', 'vp', 'executive', 'partner', 'manager', 'vice president', 'president/vice president', 'head', 'owner', 'founder', 'c-level'}

# --- !! LOGIC CONSTANTS !! ---
ANALYSIS_YEAR = 2025
ANALYSIS_MONTH = 10 # October

# --- !! RENAMED FUNCTION !! ---
def check_is_leader(title, mgmt_level):
    """
    Checks if a given title/management level is a leader.
    """
    if pd.isna(mgmt_level) and pd.isna(title):
        return False
    if isinstance(mgmt_level, str) and mgmt_level.lower() in LEADERSHIP_MGMT_LEVELS:
        return True
    if pd.isna(title):
        return False
    if isinstance(title, str) and LEADERSHIP_PATTERN.search(title):
        return True
    return False

def robust_parse_date(date_str):
    """
    Safely parses date strings, handling None, NaN, and bad formats.
    """
    if not date_str or pd.isna(date_str):
        return None
    try:
        return datetime.datetime.strptime(date_str, '%Y-%m-%d')
    except (ValueError, TypeError):
        try:
            return datetime.datetime.strptime(date_str, '%Y-%m')
        except (ValueError, TypeError):
            return None

def get_all_active_jobs(experience_list):
    """
    Gets a list of all currently active jobs (date_to is null) from an experience array.
    """
    active_jobs = []
    
    if not isinstance(experience_list, (list, np.ndarray)):
        return active_jobs

    for job in experience_list:
        if not job: 
            continue
            
        date_to = job.get('date_to')
        if date_to is None or pd.isna(date_to):
            
            title = job.get('title')
            mgmt_level = job.get('management_level')
            date_from_str = job.get('date_from')
            parsed_date_from = robust_parse_date(date_from_str)
            
            active_jobs.append({
                "company_id": job.get("company_id"),
                "company_name": job.get("company_name", "Unknown"),
                "title": title,
                "management_level": mgmt_level,
                # --- !! UPDATED FUNCTION CALL !! ---
                "is_leader": check_is_leader(title, mgmt_level),
                "date_from": parsed_date_from
            })
            
    return active_jobs

# --- 2. Load Data from Parquet Files ---
print("Loading data from Parquet files...")
try:
    df_sep = pd.read_parquet("data_september.parquet")
    df_oct = pd.read_parquet("data_october.parquet")
    print(f"Loaded {len(df_sep)} Sep records and {len(df_oct)} Oct records.")
except FileNotFoundError:
    print("Error: Parquet files not found.")
    print("Please run 'Script 1: Data Extraction' (mongo_to_parquet.py) first.")
    sys.exit()

# --- 3. Main Processing Pipeline ---
print("Starting analysis...")
change_events = []

print("Merging September and October data...")
df_merged = pd.merge(
    df_sep, 
    df_oct, 
    on='member_id', 
    how='outer', 
    suffixes=('_sep', '_oct')
)
print(f"Merged DataFrame has {len(df_merged)} total unique members.")

del df_sep
del df_oct

# --- !! TEST & PROGRESS BAR !! ---
USE_TEST_SET = False  # Set to False to run on all data
TEST_SIZE = 1000

if USE_TEST_SET:
    print(f"--- RUNNING IN TEST MODE ON {TEST_SIZE} MEMBERS ---")
    df_to_process = df_merged.head(TEST_SIZE)
else:
    print("--- RUNNING IN FULL MODE ON ALL MEMBERS ---")
    df_to_process = df_merged
# ---------------------------------

print("Processing member changes...")
for row in tqdm(df_to_process.itertuples(index=False), total=len(df_to_process), desc="Processing members"):
    
    # --- !! UPDATED DATA QUALITY CHECK (Fix for your example) !! ---
    exp_sep = row.member_experience_sep
    exp_oct = row.member_experience_oct

    is_sep_data_valid_and_not_empty = isinstance(exp_sep, (list, np.ndarray)) and len(exp_sep) > 0
    is_oct_data_invalid_or_empty = not isinstance(exp_oct, (list, np.ndarray)) or len(exp_oct) == 0

    if is_sep_data_valid_and_not_empty and is_oct_data_invalid_or_empty:
        continue # Skip this member

    # --- End of updated check ---

    active_jobs_sep = get_all_active_jobs(exp_sep) 
    active_jobs_oct = get_all_active_jobs(exp_oct) 

    # --- !! LOGIC FIX !! ---
    # Create maps that check if a person is a leader in *ANY*
    # active role at a company. This fixes the bug.
    
    sep_leader_map = {} # {company_id: (is_leader, company_name)}
    for job in active_jobs_sep:
        if pd.isna(job['company_id']): continue
        company_id = job['company_id']
        # Use OR logic: if they are already a leader, stay True.
        # If they aren't, check if this new job makes them one.
        current_status = sep_leader_map.get(company_id, (False, "Unknown"))[0]
        # Use the boolean 'is_leader' key we created in get_all_active_jobs
        new_status = current_status or job['is_leader'] 
        sep_leader_map[company_id] = (new_status, job['company_name'])

    oct_leader_map = {}
    for job in active_jobs_oct:
        if pd.isna(job['company_id']): continue
        company_id = job['company_id']
        current_status = oct_leader_map.get(company_id, (False, "Unknown"))[0]
        new_status = current_status or job['is_leader']
        oct_leader_map[company_id] = (new_status, job['company_name'])

    # Get sets of company IDs from our new maps
    sep_co_ids = set(sep_leader_map.keys())
    oct_co_ids = set(oct_leader_map.keys())
    # -------------------

    # Find the differences
    new_co_ids = oct_co_ids - sep_co_ids
    left_co_ids = sep_co_ids - oct_co_ids
    common_co_ids = sep_co_ids & oct_co_ids

    # Get member details
    member_name = row.member_full_name_oct if pd.notna(row.member_full_name_oct) else row.member_full_name_sep
    linkedin_url = row.member_websites_linkedin_oct if pd.notna(row.member_websites_linkedin_oct) else row.member_websites_linkedin_sep
    member_details = {
        "member_id": row.member_id,
        "member_name": member_name if pd.notna(member_name) else "Unknown",
        "linkedin_url": linkedin_url if pd.notna(linkedin_url) else "Unknown"
    }

    # --- Scenario 3: A member joined the company as a leader. ---
    for company_id in new_co_ids:
        # We need to check the original job list for date_from,
        # as the map doesn't store all job details
        is_leader, company_name = oct_leader_map[company_id]
        if is_leader:
            is_recent = False
            # Find the specific job that made them a leader to check its date
            for job in active_jobs_oct:
                if job['company_id'] == company_id and job['is_leader']:
                    if job['date_from']:
                        if job['date_from'].year == ANALYSIS_YEAR and job['date_from'].month == ANALYSIS_MONTH:
                            is_recent = True
                            break # Found a recent leader job
            
            if is_recent:
                change_events.append({
                    "company_id": company_id,
                    "company_name": company_name,
                    "change_type": "leader_joined_from_outside",
                    "member": member_details
                })

    # --- Scenario 2: A member who was a leader left company. ---
    for company_id in left_co_ids:
        is_leader, company_name = sep_leader_map[company_id]
        if is_leader:
            change_events.append({
                "company_id": company_id,
                "company_name": company_name,
                "change_type": "leader_moved_out",
                "member": member_details
            })

    # --- Scenario 1: A member got promoted to Leader position. ---
    for company_id in common_co_ids:
        is_leader_sep, name_sep = sep_leader_map[company_id]
        is_leader_oct, name_oct = oct_leader_map[company_id]
        
        # Use the October name if available, otherwise fall back
        company_name = name_oct if name_oct != "Unknown" else name_sep
        
        if not is_leader_sep and is_leader_oct:
            change_events.append({
                "company_id": company_id,
                "company_name": company_name,
                "change_type": "leader_promoted_internally",
                "member": member_details
            })
        elif is_leader_sep and not is_leader_oct:
            change_events.append({
                "company_id": company_id,
                "company_name": company_name,
                "change_type": "leader_moved_out",
                "member": member_details
            })

print(f"Finished processing. Found {len(change_events)} individual leadership change events.")

# --- 4. Aggregate Results by Company ---
print("Aggregating results by company...")
company_changes = defaultdict(lambda: {
    "leadership_change": True, "company_name": "Unknown",
    "leader_moved_out": [], "leader_joined_from_outside": [], "leader_promoted_internally": []
})
for event in change_events:
    company_id = event["company_id"]
    if not company_id or pd.isna(company_id):
        continue 
    company_changes[company_id]["company_name"] = event["company_name"]
    company_changes[company_id][event["change_type"]].append(event["member"])
print(f"Found {len(company_changes)} companies with leadership changes.")

# --- 5. Save to MongoDB ---
print("Connecting to MongoDB to save results...")
try:
    MONGO_CLIENT_URL = "mongodb+srv://admin:xDLiWOZri8rdPAUK@test.m9gl5ri.mongodb.net/"
    client = pymongo.MongoClient(MONGO_CLIENT_URL)
    db_analysis = client["leadcruit"]
    col_output = db_analysis["leadership_change"]
    print(f"Saving aggregated data to {db_analysis.name}.{col_output.name}...")
    
    col_output.delete_many({}) 

    if company_changes:
        output_documents = []
        for company_id, data in company_changes.items():
            doc = {
                "company_id": int(company_id), 
                "company_name": data["company_name"],
                "leadership_change": data["leadership_change"],
                "leaders_moved_out": data["leader_moved_out"],
                "leaders_joined_from_outside": data["leader_joined_from_outside"],
                "leaders_promoted_internally": data["leader_promoted_internally"],
                "last_updated": datetime.datetime.now()
            }
            output_documents.append(doc)
        
        col_output.insert_many(output_documents)
        print(f"Successfully inserted {len(output_documents)} company analysis documents.")
    else:
        print("No leadership changes found to save.")
    print("\nAnalysis complete.")

except Exception as e:
    print(f"Error connecting to MongoDB or saving data: {e}")


# --- 6. Show Example Output ---
print("\n--- Example Output Document ---")
try:
    example = col_output.find_one()
    if example:
        print(json.dumps(example, indent=2, default=str))
    else:
        print("No output documents were generated.")
except Exception as e:
    print(f"Could not read example from DB: {e}")

Loading data from Parquet files...
Loaded 345940 Sep records and 345940 Oct records.
Starting analysis...
Merging September and October data...
Merged DataFrame has 345940 total unique members.
--- RUNNING IN FULL MODE ON ALL MEMBERS ---
Processing member changes...


Processing members:   0%|          | 0/345940 [00:00<?, ?it/s]

Finished processing. Found 4863 individual leadership change events.
Aggregating results by company...
Found 1721 companies with leadership changes.
Connecting to MongoDB to save results...
Saving aggregated data to leadcruit.leadership_change...
Successfully inserted 1721 company analysis documents.

Analysis complete.

--- Example Output Document ---
{
  "_id": "690ddaf32a8b76ef75cb79a8",
  "company_id": 27934216,
  "company_name": "Datashark Consultancy",
  "leadership_change": true,
  "leaders_moved_out": [
    {
      "member_id": 14922,
      "member_name": "Jeffrey Hasenbos",
      "linkedin_url": "https://www.linkedin.com/in/jeffreyhasenbos"
    }
  ],
  "leaders_joined_from_outside": [],
  "leaders_promoted_internally": [],
  "last_updated": "2025-11-07 17:11:39.927000"
}
