<a href="https://colab.research.google.com/github/walteralzurutt/classificacion-empresas/blob/main/TFG_Data_Cleanup.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Load environment and libraries

In [None]:
#@title Install basic packages and load environments. Initialize loaders for Snowflake and Google Sheets

!pip install "sphinx==7.2.6" > /dev/null

try:  # are we running on Colab?
  from google.colab import drive
  from google.colab import userdata
  colab = True
except Exception as e:
  colab = False

if colab:
  # install AWS CLI and dotenv
  !pip install "awscli==1.40.9" "python-dotenv==1.1.0" "gspread==6.2.0" > /dev/null

  import dotenv
  from getpass import getpass
  import os
  import gspread
  import base64
  import json
  import pandas as pd
  import numpy as np
  import base64
  from datetime import timedelta
  import sys

  import logging
  logging.basicConfig()
  logging.getLogger('snowflake').setLevel(logging.ERROR)

  import warnings
  warnings.filterwarnings('ignore')

  # mount Google Drive
  drive.mount("/content/gdrive")
  root_dir = "/content/gdrive/"
  project_drive = f"Shareddrives/Clarity AI/05 - Product Research & Innovation/21 - Team Lifecycle/11 - SME Repository/Colab setup"
  home_dir = f"{root_dir}MyDrive/Colab Notebooks/"
  sys.path.append(home_dir)

  # Configure AWS CLI credentials
  aws_dir = f"{os.getenv('HOME')}/.aws"
  os.makedirs(aws_dir, exist_ok=True)

  with open(f"{aws_dir}/credentials", "wt") as file:
    file.write(
  f"""[root]
  aws_access_key_id={userdata.get("AWS_ACCESS_KEY_ID")}
  aws_secret_access_key={userdata.get("AWS_SECRET_ACCESS_KEY")}
  """
    )
  with open(f"{aws_dir}/config", "wt") as file:
    file.write(
  f"""[profile mgmt]
  region=eu-central-1
  source_profile = root
  role_arn = arn:aws:iam::913932804865:role/federateclarity
  output=json
  [profile federate_root]
  region = eu-central-1
  source_profile = root
  role_arn = arn:aws:iam::064436394451:role/federateclarity
  """
    )
  os.environ["AWS_PROFILE"] = "mgmt"
  dotenv.load_dotenv(dotenv_path=f"{home_dir}.env")
  !pip install "boto3==1.38.10" "s3transfer>=0.12.0" "snowflake-connector-python==3.15.0" "snowflake-sqlalchemy==1.7.3" "numpy==2.0.2" "s3fs==0.4.2" "docutils==0.19" "pandas==2.2.2" "s3transfer>=0.12.0" > /dev/null


## SNOWFLAKE ##
# Import dependencies
import snowflake.connector as sc
from cryptography.hazmat.backends import default_backend
from sqlalchemy import create_engine, engine, text
from cryptography.hazmat.primitives import serialization
from sqlalchemy.dialects import registry

# Read certificate
try:
  with open("/content/gdrive/MyDrive/Colab Notebooks/private.pem", "rb") as key:
      private_key = serialization.load_pem_private_key(
          key.read(),
          password=userdata.get('certificatepass').encode(),
          backend=default_backend()
      )
except:
  with open("/content/gdrive/MyDrive/Colab_Notebooks/private.pem", "rb") as key:
      private_key = serialization.load_pem_private_key(
          key.read(),
          password=userdata.get('certificatepass').encode(),
          backend=default_backend()
      )
private_key_bytes = private_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption()
)

# Connect, create functions
class Snowflake:

  def __init__(self):
      self.engine = engine.create_engine("snowflake://not@used/db", creator=self.snow_connect)

  def snow_connect(self):
    return sc.connect(
            user=os.getenv("SNOWFLAKE_USERNAME"),
            account=os.getenv("SNOWFLAKE_ACCOUNT"),
            private_key=private_key_bytes,
            role=os.getenv("SNOWFLAKE_ROLE"),
            database=os.getenv("SNOWFLAKE_DATABASE"),
            warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
            )

  def read_sql_query(self, query):
    """
    Execute a query in Snowflake

    Args:
        query (str): SQL query to execute

    Returns:
        df (pd.DataFrame): DataFrame with the results of the query
    """
    with self.engine.begin() as connection:
        df = Snowflake().read_sql_query(text(query), con=connection)

    # Make sure all column names are lowercase
    df.columns = df.columns.str.lower()
    return df


## Initialize
snowflake_connector = Snowflake()

Mounted at /content/gdrive


In [None]:
class Snowflake:

    def __init__(self):
        self.engine = engine.create_engine(
            "snowflake://not@used/db",
            creator=self.snow_connect
        )

    def snow_connect(self):
        return sc.connect(
            user=os.getenv("SNOWFLAKE_USERNAME"),
            account=os.getenv("SNOWFLAKE_ACCOUNT"),
            private_key=private_key_bytes,
            role=os.getenv("SNOWFLAKE_ROLE"),
            database=os.getenv("SNOWFLAKE_DATABASE"),
            warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
        )

    def read_sql_query(self, query: str):
        """
        Execute a query in Snowflake and return results as a pandas DataFrame
        """
        with self.engine.begin() as connection:
            df = pd.read_sql_query(text(query), con=connection)

        # Make sure all column names are lowercase
        df.columns = df.columns.str.lower()
        return df

In [None]:
#@title Import libraries and define functions needed to read the  registry service

import requests
import asyncio # only for async requests
import nest_asyncio
from google.colab import userdata
import json
import pandas as pd

def extract_registry_service_data(release_tag,use_release_candidate_tag=False):

  uri_base = 'https://registry-service.mgmt.clarity.ai/v1'

  headers = {"Authorization": "Bearer {}".format(userdata.get('registry_service_bearer_token'))}

  if use_release_candidate_tag:
    uri = uri_base + '/tags/'+ release_candidate

  else:
    uri = uri_base + '/tags/'+ release_tag


  params = {
      'tenant': 'CLA'
  }

  tags = requests.get(uri, params = params, headers = headers)

  tags.json()['tag']

  current_tag = tags.json()['tag']

  uri = uri_base + '/tags/{}'.format(current_tag)

  params = {
  }

  tables = requests.get(uri, params = params, headers = headers)

  tables=tables.json()['datasets']

  registry_service=pd.DataFrame(tables)

  registry_service_data=registry_service[['table_name','dataset_name','location','dag_owner','execution_date','module','database']].copy()

  return registry_service_data


def fetch_table_name(registry_service_data,dataset_name):

  temp=registry_service_data[registry_service_data.table_name.notna()].copy()

  temp=temp[temp.dataset_name == dataset_name]

  temp['execution_date'] = pd.to_datetime(temp['execution_date'])

  latest_date = temp['execution_date'].max()

  temp=temp[temp['execution_date'] == latest_date].table_name.values

  return temp[0]

def fetch_table_location(registry_service_data,dataset_name):

  temp=registry_service_data[registry_service_data.dataset_name == dataset_name].copy()

  temp['execution_date'] = pd.to_datetime(temp['execution_date'])

  latest_date = temp['execution_date'].max()

  temp=temp[temp['execution_date'] == latest_date].location.values

  return temp[0]

# Data input paths

In [None]:
#Fetch data paths for a specific release tag

release_tag='RELEASE-CLA-2025-08.1'

registry_service_data=extract_registry_service_data(release_tag)

rd_organizations_path=fetch_table_name(registry_service_data,'rd_organizations')

cas_path=fetch_table_name(registry_service_data,'cas_rc')

In [None]:
# Load universe

universe=Snowflake().read_sql_query(f"""
select clarity_org_id, name, country_code, clarity_industry_code from domain_archive.{rd_organizations_path} where clarity_industry_code != '00000000/0' and country_code is not null

-- Must have scope 1 for 2023
and clarity_org_id in ( select distinct clarity_id from domain_archive.{cas_path}  where metric = 'CO2DIRECTSCOPE1' and metric_year = 2023 and disclosure = 'REPORTED' and provider_code ilike '%CLA%')

-- Must have scope 2 for 2023
and clarity_org_id in ( select distinct clarity_id from domain_archive.{cas_path} where metric = 'CO2INDIRECTSCOPE2' and metric_year = 2023 and disclosure = 'REPORTED' and provider_code ilike '%CLA%')

-- Must have rbics for 2023
and clarity_org_id in ( select distinct clarity_id from domain_archive.{cas_path} where metric ilike '%rbics%' and metric_year = 2023)

-- Must have revenues for 2023
and clarity_org_id in ( select distinct clarity_id from domain_archive.{cas_path} where metric ilike 'revenue' and metric_year = 2023)

-- Must have employees for 2023
and clarity_org_id in ( select distinct clarity_id from domain_archive.{cas_path} where metric ilike 'employees' and metric_year = 2023)

-- Must have georev for 2023
and clarity_org_id in (select distinct clarity_id from domain_archive.{cas_path} where metric ilike '%georev%' and metric_year = 2023)""")

In [None]:
scope_1=Snowflake().read_sql_query(f""" select clarity_id, value, metric_year from domain_archive.{cas_path} where metric = 'CO2DIRECTSCOPE1' and disclosure = 'REPORTED' and provider_code ilike '%CLA%'""" )

scope_1.value=scope_1.value.astype(float)

scope_2=Snowflake().read_sql_query(f""" select clarity_id, value, metric_year from domain_archive.{cas_path} where metric = 'CO2INDIRECTSCOPE2' and disclosure = 'REPORTED' and provider_code ilike '%CLA%'""" )

scope_2.value=scope_2.value.astype(float)

In [None]:
scope_1=scope_1[scope_1.clarity_id.isin(universe.clarity_org_id)]

scope_2=scope_2[scope_2.clarity_id.isin(universe.clarity_org_id)]

In [None]:
def rate_company_volatility(
    df,
    value_col='value',
    id_col='clarity_id',
    k=3
):
    """
    Computes a robust, scale-free volatility measure for each company,
    adjusted for differing numbers of data points.

    Method:
    - Uses MAD/median to get scale-free volatility.
    - Applies shrinkage toward median volatility based on sample size (n).

    Parameters
    ----------
    df : pd.DataFrame
        Must include [id_col, value_col].
    value_col : str
        Column with numeric values.
    id_col : str
        Company identifier.
    k : int, default=3
        Shrinkage parameter controlling how strongly to down-weight
        short time series. Larger k = stronger shrinkage.

    Returns
    -------
    summary_df : pd.DataFrame
        Per-company table with raw and adjusted volatility scores.
        Columns: [id_col, median_value, mad, n, volatility, adj_volatility]
    """

    summary = (
        df.groupby(id_col)[value_col]
        .agg(['median', lambda x: np.median(np.abs(x - np.median(x))), 'count'])
        .reset_index()
    )
    summary.columns = [id_col, 'median_value', 'mad', 'n']

    # Compute scale-free volatility
    summary['volatility'] = summary['mad'] / summary['median_value'].replace(0, np.nan)
    summary['volatility'].replace([np.inf, -np.inf], np.nan, inplace=True)

    # Compute global median volatility
    median_vol = summary['volatility'].median(skipna=True)

    # Apply shrinkage adjustment for sample size
    summary['adj_volatility'] = (
        (summary['n'] / (summary['n'] + k)) * summary['volatility']
        + (k / (summary['n'] + k)) * median_vol
    )

    return summary.sort_values('adj_volatility', ascending=False)

In [None]:
def remove_volatility_outliers(df, volatility_summary, id_col='clarity_id', threshold_quantile=0.95):
    """
    Removes companies with adjusted volatility above a percentile threshold,
    and automatically removes companies with fewer than 3 datapoints.
    """

    # Mark companies to remove based on percentile threshold
    cutoff = volatility_summary['adj_volatility'].quantile(threshold_quantile)
    summary = volatility_summary.copy()
    summary['remove'] = summary['adj_volatility'] > cutoff

    # Remove outliers
    cleaned_df = df[~df[id_col].isin(summary.loc[summary['remove'], id_col])].copy()

    # Remove companies with < 3 datapoints
    counts = df.groupby(id_col)[id_col].count().reset_index(name='n')
    short_companies = counts.loc[counts['n'] < 3, id_col]
    cleaned_df = cleaned_df[~cleaned_df[id_col].isin(short_companies)]

    return cleaned_df, summary

In [None]:
scope_1_cleaned,scope_1_volatility=remove_volatility_outliers(scope_1, rate_company_volatility(scope_1), threshold_quantile=0.95)

scope_1_cleaned=scope_1_cleaned[scope_1_cleaned.metric_year==2023]

scope_2_cleaned,scope_2_volatility=remove_volatility_outliers(scope_2, rate_company_volatility(scope_2), threshold_quantile=0.95)

scope_2_cleaned=scope_2_cleaned[scope_2_cleaned.metric_year==2023]

In [None]:
clean_companies=scope_1_cleaned.merge(scope_2_cleaned, on='clarity_id', how='inner')

In [None]:
clean_universe=universe[universe.clarity_org_id.isin(clean_companies.clarity_id)]

In [None]:
clean_universe

Unnamed: 0,clarity_org_id,name,country_code,clarity_industry_code
0,01FF543W84VSKRB4PM1FPW1KRB,Daifuku Co Ltd,JP,20106020/0
1,01FF543WA2Y02M3Q3PCEVSZFQ8,Renesas Electronics Corp,JP,45301020/0
3,01FF543VGM5PWQRKT32FEHGWTX,Eimskipafelag Islands hf,IS,20303010/0
4,01FF543T013H3AR59G4DJNTR0B,Quanta Computer Inc,TW,45202030/0
6,01FF543SVNDJXQFDQ92X3CDEV4,De' Longhi SpA,IT,25201040/0
...,...,...,...,...
6804,01FF543TV1G198DG2GKRKCFE31,DB HiTek Co Ltd,KR,45301020/0
6805,01FF543TH43QKMFB1Q5E47PF4N,Gamma Communications PLC,GB,50101010/0
6806,01FF543VH2DR97YCN9NC97RA0E,Toyobo Co Ltd,JP,15101010/0
6807,01FF543VTXVG8QD8EMBE59VKV1,Fortive Corp,US,20106020/0
