# MNE Groups Data Extraction Challenge - MUR team

Phase 2: ISIN and website search

#### Load libraries

In [None]:
import pandas as pd
import requests
import re
import random
import time
import os
from datetime import datetime
from googlesearch import search
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import yaml

#### Configuration

In [None]:
def load_config(config_file='config.yaml'):
    """
    Loads configuration parameters from a YAML file.

    Parameters:
        config_file (str): The path to the YAML configuration file.

    Returns:
        dict: A dictionary containing the loaded configuration parameters.
    """
    with open(config_file, 'r') as f:
        return yaml.safe_load(f)

In [None]:
config = load_config()

# --- Global Parameters from Config ---
DATA_PATH = config['data_path']
INPUT_FILENAME = config['input_filename']
PAUSE_TIME_MIN = config['pause_time_min']
PAUSE_TIME_MAX = config['pause_time_max']
SEARCH_TIMEOUT = config['search_timeout']
LONG_PAUSE_GROUPS_INTERVAL = config['long_pause_groups_interval']
LONG_PAUSE_TIME = config['long_pause_time']
OUTPUT_PHASE2 = config['output_phase2']
PHASE1_PATTERN = config['phase1_pattern']

#### Functions

In [None]:
def check_isin(isin_code: str) -> bool:
    """
    Verifies if a given string is a valid ISIN code, including the check digit calculation.

    Parameters:
        isin_code (str): The ISIN code to validate.

    Returns:
        bool: True if the ISIN code is valid, False otherwise.
    """
    if not isinstance(isin_code, str) or len(isin_code) != 12:
        return False

    isin_code_upper = isin_code.upper()
    if not isin_code_upper[:2].isalpha() or not isin_code_upper[2:].isalnum() or not isin_code_upper[-1].isdigit():
        return False

    converted_digits = []
    for char in isin_code_upper:
        if '0' <= char <= '9':  
            converted_digits.append(int(char))
        elif 'A' <= char <= 'Z':
            # Convert letters to numbers (A=10, B=11, ..., Z=35) and then split into digits
            converted_digits.extend(divmod(ord(char) - ord('A') + 10, 10))
        else:
            return False 

    total_sum = 0
    # Luhn algorithm: process from right to left
    for i in range(len(converted_digits) - 1, -1, -1):
        digit = converted_digits[i]
        if (len(converted_digits) - 1 - i) % 2 == 1:
            doubled_digit = digit * 2
            if doubled_digit > 9:
                total_sum += (doubled_digit % 10) + (doubled_digit // 10)
            else:
                total_sum += doubled_digit
        else:
            total_sum += digit
    return total_sum % 10 == 0

In [None]:
def search_isin(name: str, max_results: int = 10, timeout: int = SEARCH_TIMEOUT) -> tuple:
    """
    Searches for the ISIN of a company using Google Search.

    Args:
        company_name (str): The name of the company to search for.

    Returns:
        tuple: A tuple containing the ISIN (str) if found, and the source (str).
               Returns (None, None) if ISIN is not found.
    """
    query = f"{name} " + " OR ".join(["isin", "stock"])
    pause_time = random.uniform(PAUSE_TIME_MIN, PAUSE_TIME_MAX)
    time.sleep(pause_time)

    try:
        search_results = list(search(query, num_results=max_results))

        for url in search_results:
            try:
                headers = {
                    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                                  "AppleWebKit/537.36 (KHTML, like Gecko) "
                                  "Chrome/124.0.0.0 Safari/537.36"
                }
                response = requests.get(url, headers=headers, timeout=timeout)
                response.raise_for_status()
                soup = BeautifulSoup(response.text, "html.parser")
                text = soup.get_text(separator=' ')

                processed_text = re.sub(r'[^a-z0-9\s]', ' ', text.lower())
                isin_pattern = r'\b([a-z]{2}[a-z0-9]{10})\b'
                match = re.search(isin_pattern, processed_text)

                if match and check_isin(match.group(1)):
                    found_isin = match.group(1).upper()
                    return found_isin, url

            except requests.exceptions.RequestException as e:
                print(f"Error getting URL {url} during ISIN search: {e}")
                continue
            except Exception as e:
                print(f"Error processing URL {url} during ISIN search: {e}")
                continue
        return None, None

    except Exception as e:
        print(f"Error during Google search for ISIN '{name}': {e}")
        print("Waiting before retrying Google search...")
        time.sleep(LONG_PAUSE_TIME)
        return None, None

In [None]:
def get_domain(url: str) -> str:
    """
    Extracts the domain name from a URL.

    Parameters:
        url (str): The input URL.

    Returns:
        str: The domain name (netloc), or an empty string if the URL is invalid.
    """
    try:
        return urlparse(url).netloc
    except:
        return ""

In [None]:
def search_website(name: str, max_results: int = 10, timeout: int = SEARCH_TIMEOUT) -> tuple:
    """
    Searches for the official website of a company using Google Search.

    Args:
        company_name (str): The name of the company to search for.

    Returns:
        tuple: A tuple containing the website URL (str) if found, and the source (str).
               Returns (None, None) if website is not found.
    """
    query = f"{name} official website"
    pause_time = random.uniform(PAUSE_TIME_MIN, PAUSE_TIME_MAX)
    time.sleep(pause_time)

    try:
        search_results = list(search(query, num_results=1))

        url = search_results[0]
        domain = get_domain(url)
        if domain:
            return domain, url
        else:
            return url, url

    except Exception as e:
        print(f"Error during Google search for website '{name}': {e}")
        print("Waiting before retrying Google search...")
        time.sleep(LONG_PAUSE_TIME)
        return None, None

### Main

In [None]:
# === Load base and phase 1 output ===
df_base = pd.read_csv(os.path.join(DATA_PATH, INPUT_FILENAME), sep=";", keep_default_na=False)
df_base = df_base.replace('', pd.NA)

phase1_files = [
    os.path.join(DATA_PATH, f)
    for f in os.listdir(DATA_PATH)
    if f.startswith(PHASE1_PATTERN)
]
df_phase1 = pd.concat([pd.read_csv(f, sep=";", keep_default_na=False) for f in phase1_files], ignore_index=True)
df_phase1 = df_phase1.replace('', pd.NA)

# === Combine phase 1 and base ===
df_combined = df_base.merge(
    df_phase1[["ID", "NAME", "VARIABLE", "SRC", "VALUE", "CURRENCY", "REFYEAR"]],
    on=["ID", "NAME", "VARIABLE"],
    how="left",
    suffixes=("", "_new")
)

for col in ["SRC", "VALUE", "CURRENCY", "REFYEAR"]:
    df_combined[col] = df_combined[col].combine_first(df_combined[f"{col}_new"])
    df_combined.drop(columns=[f"{col}_new"], inplace=True)

# === Filter COUNTRY and WEBSITE entries without VALUE ===
mask_missing = (
    ((df_combined["VARIABLE"] == "COUNTRY") | (df_combined["VARIABLE"] == "WEBSITE")) &
    (df_combined["VALUE"].isna() | (df_combined["VALUE"] == ""))
)
df_missing = df_combined[mask_missing].copy()
df_missing["VALUE"] = ""
df_missing["SRC"] = ""
df_missing["REFYEAR"] = ""

# === Process missing entries ===
for i, row in df_missing.iterrows():
    name = row["NAME"]
    variable = row["VARIABLE"]
    print(f"Searching {variable} for: {name}")
    try:
        if variable == "COUNTRY":
            isin, src = search_isin(name)
            if isin:
                df_missing.at[i, "VALUE"] = isin[:2]
                df_missing.at[i, "SRC"] = src
                df_missing.at[i, "REFYEAR"] = 2024
        elif variable == "WEBSITE":
            website, src = search_website(name)
            if website:
                df_missing.at[i, "VALUE"] = website
                df_missing.at[i, "SRC"] = src
                df_missing.at[i, "REFYEAR"] = 2024
    except Exception as e:
        print(f"Error processing {name} ({variable}): {e}")
        continue

# === Integrate phase 2 results into the base dataframe ===
for i, row in df_missing.iterrows():
    cond = (
        (df_combined["ID"] == row["ID"]) &
        (df_combined["VARIABLE"] == row["VARIABLE"]) &
        ((df_combined["VALUE"].isna()) | (df_combined["VALUE"] == ""))
    )
    df_combined.loc[cond, ["VALUE", "SRC", "REFYEAR"]] = row[["VALUE", "SRC", "REFYEAR"]].values

# === Save final combined output ===
df_combined.to_csv(os.path.join(DATA_PATH, OUTPUT_PHASE2), sep=";", index=False)
print(f"Final results saved to {OUTPUT_PHASE2}")