## Preparation

Module imports

In [1]:
import dropbox
import config
import requests
import os
import numpy as np
from tqdm import tqdm
import pandas as pd
import geopandas as gpd
import plotly.express as px
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score



Variables

In [2]:
#global variables


# Methods

## Data 

#### Lets gather the data

First we import the data form our local folder or from dropbox

In [3]:
def download_excel_file(url, output_file):
    """
    Download an Excel file from a URL and save it locally.
    """
    # Modify the Dropbox URL to force download
    download_url = url.replace("?dl=0", "?dl=1")

    # Send an HTTP GET request to download the file
    response = requests.get(download_url)

    # Check if the request was successful (HTTP status code 200)
    if response.status_code == 200:
        # Create the output folder if it does not exist
        output_folder = os.path.dirname(output_file)
        if output_folder and not os.path.exists(output_folder):
            os.makedirs(output_folder)

        # Save the file in the output folder
        with open(output_file, "wb") as file:
            file.write(response.content)
    else:
        print("Error: Unable to download the file")


>> First lets import some Data from yahoo 


>> Lets import some Data from refinitive

>>> via the API

#### Let´s clean the Data

Make the data readable

In [4]:

def refinite_to_python(file_path):
    """
    Make a file from the refinitive screener readable for python and useable for the analysis.
    input: file_path
    output: cleaned dataframe
    """
    
    # Read data from excel file
    df_all_comp_all = pd.read_excel(file_path, header=[0, 1])

    # Combine multilevel columns into a single level
    df_all_comp_all.columns = ['_'.join(col).strip() for col in df_all_comp_all.columns.values]

    # Rename columns for better understanding
    df_all_comp_all.rename(columns={"Company Name_Unnamed: 1_level_1": "Company Name"}, inplace=True)
    df_all_comp_all.rename(columns={"Identifier (RIC)_Unnamed: 0_level_1": "RIC"}, inplace=True)
    df_all_comp_all.rename(columns={"Country of Headquarters_Unnamed: 2_level_1": "Country of Headquarters"}, inplace=True)
    df_all_comp_all.rename(columns={"NAICS Subsector Name_Unnamed: 5_level_1": "NAICS Subsector Name"}, inplace=True)

    # Remove unwanted characters from column names
    df_all_comp_all.columns = df_all_comp_all.columns.str.replace('\nIn the last 10 FY_FY', ' ')
    df_all_comp_all.columns = df_all_comp_all.columns.str.replace("\nIn the last 15 Y_Y"," ")
    df_all_comp_all.columns = df_all_comp_all.columns.str.replace('-', '')
    

    # Create a copy of the DataFrame
    df_all_comp = df_all_comp_all.copy()
    #return df_all_comp
    df_all_comp.drop(columns=["YTD Total Return 11"], inplace=True)
    df_all_comp.drop(columns=["YTD Total Return 12"], inplace=True)
    df_all_comp.drop(columns=["YTD Total Return 13"], inplace=True)
    df_all_comp.drop(columns=["YTD Total Return 14"], inplace=True)

    # Drop rows with all NaN values and fill remaining NaNs with 0
    df_all_comp.dropna(inplace=True, how="all")
    df_all_comp.fillna(0, inplace=True)

    # Extract unique column prefixes
    list_columns = [col for col in df_all_comp.columns if col[-1].isdigit()]
    list_columns = [col[:-1] for col in list_columns]
    list_columns = [col[:-1] if col[-1].isdigit() else col for col in list_columns]
    list_columns = list(set(list_columns))
    list_columns = [col.replace("\n", "") for col in list_columns]

    # Initialize an empty list to store DataFrames
    dfs = []

    # Iterate over unique column prefixes
    for colum in tqdm(list_columns):
        esg_cols = [col for col in df_all_comp.columns if col.startswith(colum)]
        df = df_all_comp[["Company Name"] + esg_cols].copy()

        # Melt the DataFrame to transform it into the desired format
        melted_df = df.melt(
            id_vars=["Company Name"],
            var_name="Period",
            value_name=colum
        )
        melted_df["Period"] = melted_df["Period"].apply(lambda x: 1 if x[-1].isdigit() == False else 10 if x[-2:-1] == "10" else int(x[-1]))

        dfs.append(melted_df)

    # Concatenate the melted DataFrames
    melted_df_all = pd.concat(dfs, axis=1)

    # Remove duplicate columns
    melted_df_all = melted_df_all.loc[:, ~melted_df_all.columns.duplicated()]

    # Sort the DataFrame by Company Name and Period
    melted_df_all = melted_df_all.sort_values(by=["Company Name", "Period"])

    # Create a copy of the final DataFrame
    df_data_ = melted_df_all.copy()

    #remove rows without any period (nan)
    #df_data_ = df_data_[df_data_["Period"].notna()]
    
    #replace all 0 with nan, except for the period column
    #it is important to diffirentiate between 0 and nan, for example for the correlation
    df_data_ = df_data_.replace(0, np.nan)
    df_data_ = df_data_.replace("0", np.nan)
    df_data_["Period"] = df_data_["Period"].fillna(0)

    #add RIC, Country of Headquarters and NAICS Subsector Name to the dataframe
    df_data_["RIC"] = df_all_comp_all["RIC"]
    df_data_["Country of Headquarters"] = df_all_comp_all["Country of Headquarters"]
    df_data_["NAICS Subsector Name"] = df_all_comp_all["NAICS Subsector Name"]
    #fill nan of RIC, Country of Headquarters and NAICS Subsector Name with the last value that is not nan
    df_data_["RIC"] = df_data_.groupby("Company Name")["RIC"].ffill()
    df_data_["Country of Headquarters"] = df_data_.groupby("Company Name")["Country of Headquarters"].ffill()
    df_data_["NAICS Subsector Name"] = df_data_.groupby("Company Name")["NAICS Subsector Name"].ffill()
    

    #remove all the whitespaces in the end of a column name
    df_data_.columns = df_data_.columns.str.rstrip()
        
    if "ESG Score" not in df_data_.columns:
        #make sure only one ESG Score column is in the dataframe
        df_data_["ESG Score"] = df_data_["ESG Score "]

    
    return df_data_
    

Clean the data and add some clusters

In [18]:
def clean_data(df, esg_col = "ESG Score", return_col = "Return"):
    """
    Clean and preprocess a dataframe for clustering based on "ESG Score" and "Return" columns.

    Args:
        df (pd.DataFrame): The input dataframe to clean.
        esg_col (str): The column name for the ESG Score.
        return_col (str): The column name for the Return.

    Returns:
        pd.DataFrame: The cleaned and preprocessed dataframe.
    """

    # Make a copy of the input dataframe to avoid modifying it
    df_data = df.copy()

    # Drop rows with NaN values in "Return" or "ESG Score" columns
    df_data.dropna(subset=[return_col, esg_col], inplace=True)

    # Remove outliers in the "Return" column (more than 2 std away from the mean)
    return_mean = df_data[return_col].mean()
    return_std = df_data[return_col].std()
    df_data = df_data[(df_data[return_col] < return_mean + 2 * return_std) & (df_data[return_col] > return_mean - 2 * return_std)]

    # Rescale the "ESG Score" and "Return" columns to be between 0 and 1
    df_data[esg_col] = df_data[esg_col] / df_data[esg_col].max()
    df_data[return_col] = df_data[return_col] / df_data[return_col].max()

    # Cluster the companies based on "ESG Score" and "Return" columns using KMeans
    model = KMeans(n_clusters=6)
    model.fit(df_data[[esg_col, return_col]])
    labels = model.predict(df_data[[esg_col, return_col]])

    # Return the cleaned dataframe
    return df_data

Add Transformation

In [6]:
def transform_columns(df,column):
    """
    takes a dataframe and a column name and transforms the column into:
    log
    log_2
    sqrt
    sqrt_2
    _2
    _3
    _4
    """
    
    df[column+"_log"] = np.log(df[column])
    df[column+"_log_2"] = np.log(df[column]**2)
    df[column+"_sqrt"] = np.sqrt(df[column])
    df[column+"_sqrt_2"] = np.sqrt(df[column]**2)
    df[column+"_2"] = df[column]**2
    df[column+"_3"] = df[column]**3
    df[column+"_4"] = df[column]**4
    
    return df

Scale the Data

In [22]:
def scale_data(df: pd.DataFrame) -> pd.DataFrame:
    """Scale the numerical columns of the DataFrame.

    Args:
        df (pd.DataFrame): The input DataFrame.

    Returns:
        pd.DataFrame: The scaled DataFrame.
    """
    scaler = StandardScaler()
    numerical_columns = df.select_dtypes(include=['float64', 'int64']).columns
    df_scaled = scaler.fit_transform(df[numerical_columns])
    return pd.DataFrame(df_scaled, columns=numerical_columns, index=df.index)

### Lets dive deep in the data

## MATH Formulas

#### Let´s define our basic math functions 


Gradient Descent

In [7]:
def initialize_coefficients(n_features, n_targets):
    """
    Initializes the coefficients for VAR.
    
    Parameters:
        n_features (int): Number of input features.
        n_targets (int): Number of output targets.
        
    Returns:
        numpy.ndarray: An (n_features x n_targets) matrix of initialized coefficients.
    """
    return np.random.rand(n_features, n_targets)

def calculate_loss(X, Y, coefficients):
    """
    Calculates the mean squared error (MSE) for the current coefficients.
    
    Parameters:
        X (numpy.ndarray): An (n_samples x n_features) array of input data.
        Y (numpy.ndarray): An (n_samples x n_targets) array of output data.
        coefficients (numpy.ndarray): An (n_features x n_targets) matrix of current coefficients.
        
    Returns:
        float: The mean squared error (MSE) for the current coefficients.
    """
    predicted = np.dot(X, coefficients)
    errors = Y - predicted
    return np.sum(errors**2) / (2 * X.shape[0])

def calculate_lasso_loss(_X, _Y, coefficients, lambda_):
    """
    Calculates the LASSO loss for the current coefficients.

    Parameters:
        X (numpy.ndarray): An (n_samples x n_features) array of input data.
        Y (numpy.ndarray): An (n_samples x n_targets) array of output data.
        coefficients (numpy.ndarray): An (n_features x n_targets) matrix of current coefficients.
        lambda_ (float): The regularization parameter.

    Returns:
        float: The LASSO loss for the current coefficients.
    """
    predicted = np.dot(_X, coefficients)
    errors = _Y - predicted
    mse = np.sum(errors**2) / (2 * _X.shape[0])
    l1_norm = np.sum(np.abs(coefficients))
    lasso_loss = mse + lambda_ * l1_norm
    return lasso_loss


def outer(a, b):
    """
    Computes the outer product of two 1-dimensional arrays.

    Parameters:
        a (array-like): 1-dimensional array.
        b (array-like): 1-dimensional array.

    Returns:
        2-dimensional array where the element at position (i, j) is the
        product of the i-th element of `a` and the j-th element of `b`.
    """
    outer_product = np.zeros((len(a), len(b)))  # initialize the result with zeros
    for i in range(len(a)):
        for j in range(len(b)):
            outer_product[i, j] = a[i] * b[j]  # compute the product of the i-th element of a and the j-th element of b
    return outer_product



def calculate_gradient(X, Y, coefficients):
    """
    Calculates the gradient of the loss function with respect to the coefficients.
    
    Parameters:
        X (numpy.ndarray): An (n_samples x n_features) array of input data.
        Y (numpy.ndarray): An (n_samples x n_targets) array of output data.
        coefficients (numpy.ndarray): An (n_features x n_targets) matrix of current coefficients.
        
    Returns:
        numpy.ndarray: An (n_features x n_targets) matrix of gradient values.
    """
    n_samples = X.shape[0]
    #without Lasso
    #grad = np.dot(X.T, np.dot(X, coefficients) - Y) / n_samples
    grad = (1/n_samples) * np.dot(X.T, np.dot(X, coefficients) - Y) + 0.2 * np.sign(coefficients)  
    return grad

def update_coefficients(coefficients, gradients, lr):
    """
    Updates the coefficients using the gradient and the learning rate.
    
    Parameters:
        coefficients (numpy.ndarray): An (n_features x n_targets) matrix of current coefficients.
        gradients (numpy.ndarray): An (n_features x n_targets) matrix of gradient values.
        lr (float): Learning rate for the optimization algorithm.
        
    Returns:
        numpy.ndarray: An (n_features x n_targets) matrix of updated coefficients.
    """
    return coefficients - lr * gradients

def gradient_descent_step(X__, Y__, coefficients__, lr):
    """
    Performs a single step of gradient descent for VAR.
    
    Parameters:
        X (numpy.ndarray): An (n_samples x n_features) array of input data.
        Y (numpy.ndarray): An (n_samples x n_targets) array of output data.
        coefficients (numpy.ndarray): An (n_features x n_targets) matrix of current coefficients.
        lr (float): Learning rate for the optimization algorithm.
        
    Returns:
        tuple: A tuple containing the updated coefficients and the loss for the current iteration.
    """
    lambda_=1#variieren mit cosine similarity
    gradients = calculate_gradient(X__, Y__, coefficients__)
    updated_coefficients = update_coefficients(coefficients__, gradients, lr)
    loss = calculate_lasso_loss(X__, Y__, coefficients__, lambda_)
    return updated_coefficients, loss

def calculate_gradient(X_, Y_, coefficients_):
    """
    Calculates the gradient of the loss function with respect to the coefficients.
    
    Parameters:
        X (numpy.ndarray): An (n_samples x n_features) array of input data.
        Y (numpy.ndarray): An (n_samples x n_targets) array of output data.
        coefficients (numpy.ndarray): An (n_features x n_targets) matrix of current coefficients.
        
    Returns:
        numpy.ndarray: An (n_features x n_targets) matrix of gradient values.
    """
    gradients = np.zeros_like(coefficients_)
    for i in range(X_.shape[0]):
        xi = X_[i, :]
        yi = Y_[i, :]
        predicted = np.dot(xi, coefficients_)
        error = yi - predicted
        gradients += outer(xi, error)
    gradients /= X_.shape[0]
    return gradients
import numpy as np

F-Test

In [8]:
def calculate_f_statistic(sample1, sample2):
    """
    Calculates the F-statistic for two samples.
    
    Parameters:
        sample1 (numpy.ndarray): An array of values for the first sample.
        sample2 (numpy.ndarray): An array of values for the second sample.
        
    Returns:
        float: The F-statistic for the two samples.
    """
    var1 = np.var(sample1)
    var2 = np.var(sample2)
    return var1 / var2

def calculate_critical_value(sample1, sample2, alpha):
    """
    Calculates the critical value of the F-distribution for two samples.
    
    Parameters:
        sample1 (numpy.ndarray): An array of values for the first sample.
        sample2 (numpy.ndarray): An array of values for the second sample.
        alpha (float): The significance level.
        
    Returns:
        float: The critical value of the F-distribution for the two samples.
    """
    df1 = len(sample1) - 1
    df2 = len(sample2) - 1
    return f.ppf(q=1-alpha, dfn=df1, dfd=df2)

def compare_f_statistic_to_critical_value(f_statistic, critical_value):
    """
    Compares the F-statistic to the critical value of the F-distribution.
    
    Parameters:
        f_statistic (float): The F-statistic for the two samples.
        critical_value (float): The critical value of the F-distribution for the two samples.
    """
    if f_statistic > critical_value:
        print('Reject the null hypothesis that the variances are equal')
        return False
    else:
        print('Accept the null hypothesis that the variances are equal')
        return True

Cosine similarity

In [9]:
def cosine_similarity(v1, v2):
    """
    calculates the cosine similarity between two vectors
    input: v1, v2: numpy arrays
    output: cosine similarity (float) 
    """
    return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))

def cosine_similarity_matrix(A,B):
    """
    calculates the cosine similarity similarity between the rows of A and B
    """
    return np.array([cosine_similarity(A[i,:],B[i,:]) for i in range(A.shape[0])])

#calculate the cosine similarity for every company to every other company
#select only the numerical columns


Lag Matrix

In [10]:
def lag_matrix(X,p):
    """
    lags matrix X by p
    """
    X_lagged = np.zeros((X.shape[0]-p,X.shape[1]*p))
    for i in range(p):
        X_lagged[:,i*X.shape[1]:(i+1)*X.shape[1]] = X[p-i-1:-i-1,:]
    return X_lagged

calculate optimal number cluster

In [None]:
def find_optimal_clusters(df_scaled: pd.DataFrame) -> int:
    """Find the optimal number of clusters using silhouette, calinski_harabasz,
    and davies_bouldin scores.

    Args:
        df_scaled (pd.DataFrame): The scaled DataFrame.

    Returns:
        int: The optimal number of clusters.
    """
    scores = {}
    for n_clusters in range(3, 30, 3):
        kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit(df_scaled)
        scores[n_clusters] = sum((
            silhouette_score(df_scaled, kmeans.labels_),
            calinski_harabasz_score(df_scaled, kmeans.labels_),
            davies_bouldin_score(df_scaled, kmeans.labels_)
        ))
    return max(scores, key=scores.get)

Cluster the Companies (KMeans)

In [23]:
def cluster_companies(df_data: pd.DataFrame, min_size: int = 15) -> pd.DataFrame:
    """Cluster the companies based on all the data into the optimal number of
    clusters, but every cluster has at least `min_size` companies.

    Args:
        df_data (pd.DataFrame): The input DataFrame.
        min_size (int, optional): The minimum number of companies per cluster.
            Defaults to 15.

    Returns:
        pd.DataFrame: The input DataFrame with an additional column 'cluster'
        indicating the cluster number for each company.
    """
    # Scale the data
    df_scaled = scale_data(df_data)

    # Find the optimal number of clusters
    optimal_clusters = find_optimal_clusters(df_scaled)

    # Perform the clustering
    kmeans = KMeans(n_clusters=optimal_clusters, random_state=0).fit(df_scaled)
    df_data['cluster'] = kmeans.labels_

    # Filter out clusters with less than `min_size` companies
    df_data['cluster_size'] = df_data.groupby('cluster').transform('count')['id']
    df_data = df_data[df_data['cluster_size'] >= min_size]
    return df_data


#### Let´s define our main models


VAR (Vektor Auto Regression)

In [11]:
def VAR_model(X, p):
    """
    This function takes a matrix X and a number of lags p and performs a VAR(p) model on the data.
    It returns the mean squared error and the R2 score, the matrix anf the coefficients.
    
    """
    n_samples, n_features = X.shape

    X_lagged = np.zeros((n_samples - p, p * n_features))
    for i in range(p):
        X_lagged[:, i*n_features:(i+1)*n_features] = X[p-i-1:-i-1, :]

    # Split the data into training and testing sets
    train_size = int(0.8 * n_samples)
    X_train = X_lagged[:train_size, :]
    Y_train = X[p:train_size+p, :]
    X_test = X_lagged[train_size-p:-p, :]
    Y_test = X[train_size+p:, :]
    print(X_train.shape, Y_train.shape, X_test.shape, Y_test.shape)
    # Compute the coefficients using the training set
    coeffs = np.linalg.inv(X_train.T @ X_train) @ X_train.T @ Y_train

    # make predictions for the test set
    Y_pred = X_test @ coeffs

    # calculate the mean squared error
    mse = np.mean((Y_test - Y_pred)**2)
    #calculate the R2 score
    r2 = 1 - np.sum((Y_test - Y_pred)**2) / np.sum((Y_test - np.mean(Y_test))**2)

    
    #return the mean squared error and the R2 score, the matrix anf the coefficients
    return mse, r2, X_lagged, coeffs

#### Granger Test

First a simple Granger test to analyze fast

In [12]:
def check_granger_simple(df_data: pd.DataFrame, p: int, column_A: str, column_B: str) -> tuple:
    """
    Check for Granger causality between two variables, given a pandas DataFrame with columns 'Company Name', 'Period',
    'Return', and 'ESG Score'.

    Parameters:
    df_data (pd.DataFrame): A pandas DataFrame with columns 'Company Name', 'Period', 'Return', and 'ESG Score'.
    p (int): The number of lags to include in the VAR models.
    column_A (str): The name of the first column to test for Granger causality.
    column_B (str): The name of the second column to test for Granger causality.

    Returns:
    A tuple containing the residuals, y, and coefficients for each of the three VAR models, as well as the R-squared
    values, F-statistics, and critical values for the Granger causality tests.
    """

    # Create a copy of the input dataframe to avoid modifying the original
    df_data_old = df_data.copy()

    # Initialize an empty dataframe with the same columns as the input dataframe
    df_data = pd.DataFrame(columns=df_data.columns)

    # Iterate through all unique company names in the input dataframe
    for company in df_data_old["Company Name"].unique():
        X_company = df_data_old[df_data_old["Company Name"] == company]

        # Only process companies with more than 5 data points
        if X_company.shape[0] > 5:
            # Add 'p' rows of zeros to the beginning of each company's data
            X_c = pd.DataFrame(columns=df_data.columns)
            X_c = X_c.append([X_company.iloc[0]] * p, ignore_index=True)
            X_c = X_c * 0
            X_c["Period"] = list(range(-p, 0))
            X_c = X_c.append(X_company, ignore_index=True)
            df_data = df_data.append(X_c, ignore_index=True)

    # Initialize the VAR models using different predictors
    # Model 1: column_A
    # Model 2: column_A, random noise
    # Model 3: column_A, column_B
    X1 = df_data[[column_A]].copy()
    X2 = df_data[[column_A]].copy()
    X2["random"] = np.random.rand(len(X2))
    X3 = df_data[[column_A, column_B]].copy()

    # Convert dataframes to numpy arrays
    X1, X2, X3 = X1.values, X2.values, X3.values

    # Fit the VAR models with 'p' lags
    mse1, r21, X_lagged1, coeffs_1 = VAR_model(X1, p)
    mse2, r22, X_lagged2, coeffs_2 = VAR_model(X2, p)
    mse3, r23, X_lagged3, coeffs_3 = VAR_model(X3, p)

    # Calculate the residuals for each model
    y = df_data[column_A].values[p:]
    residuals_1 = y - (X_lagged1 @ coeffs_1)[:, 0]
    residuals_2 = y - (X_lagged2 @ coeffs_2)[:, 0]
    residuals_3 = y - (X_lagged3 @ coeffs_3)[:, 0]

    # Define a function to calculate R-squared from residuals
    def r2_from_residuals(y,residuals):
        """
        Calculate the R-squared value from the residuals of a model.

        Parameters:
        y (numpy.ndarray): The dependent variable.
        residuals (numpy.ndarray): The residuals of the model.

        Returns:
        The R-squared value.
        """
        SST = sum((y - np.mean(y)) ** 2)
        SSR = sum(residuals ** 2)
        R2 = 1 - (SSR / SST)
        return R2

    # Print R-squared values for each model
    print(f"R-squared (Model 1): {r2_from_residuals(y, residuals_1)}")
    print(f"R-squared (Model 2): {r2_from_residuals(y, residuals_2)}")
    print(f"R-squared (Model 3): {r2_from_residuals(y, residuals_3)}")

    # Perform F-tests to compare residuals between models
    F12 = calculate_f_statistic(residuals_1, residuals_2)
    F13 = calculate_f_statistic(residuals_1, residuals_3)
    print(f"F-statistic (Model 1 vs Model 2): {F12}")
    print(f"F-statistic (Model 1 vs Model 3): {F13}")

    critical_value12 = calculate_critical_value(residuals_1, residuals_2, alpha=0.05)
    critical_value13 = calculate_critical_value(residuals_1, residuals_3, alpha=0.05)
    print(f"Critical value (Model 1 vs Model 2): {critical_value12}")
    print(f"Critical value (Model 1 vs Model 3): {critical_value13}")

    # Compare F-statistics to critical values
    compare_f_statistic_to_critical_value(F12, critical_value12)
    compare_f_statistic_to_critical_value(F13, critical_value13)

    # Return the residuals, y, and coefficients for each model
    return (residuals_1, residuals_2, residuals_3, y,
            coeffs_1, coeffs_2, coeffs_3)



## Analyze

## Visualize

ESG by Country

In [13]:
def create_esg_map(dataframe, country_col='Country of Headquarters', esg_score_col='ESG Score',download=True
                   ):
    """
    Create a map of the world with ESG scores for each country. The ESG scores are standardized to have a mean of 0 and a standard deviation of 1.
    input:
        dataframe: A dataframe with a column containing the country names and a column containing the ESG scores.
        country_col: The name of the column containing the country names.
        esg_score_col: The name of the column containing the ESG scores.
        download: A boolean indicating whether to download the map as an HTML file or to display it in the notebook.
    output:
        A map of the world with ESG scores for each country.(HTML file or displayed in the notebook)
    """
    
    world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres'))
    merged_data = world.merge(dataframe, left_on='name', right_on=country_col, how='left')
    merged_data = merged_data[merged_data[esg_score_col].notna()]

    # Standardize the ESG scores
    scaler = StandardScaler()
    merged_data['Standardized ESG Score'] = scaler.fit_transform(merged_data[[esg_score_col]])

    fig = px.choropleth(merged_data, geojson=merged_data.geometry, locations=merged_data.index,
                        color='Standardized ESG Score', 
                        range_color=(merged_data['Standardized ESG Score'].min(), merged_data['Standardized ESG Score'].max()),
                        projection='natural earth', hover_name='name', hover_data=[esg_score_col],
                        labels={'Standardized ESG Score': 'Standardized ESG Score'})

    fig.update_geos(showcountries=True, countrywidth=0.5)
    fig.update_layout(title_text='Standardized ESG Score of Companies by Country', title_x=0.5)
    if download:
        fig.write_html("esg_map.html")
    else:
        fig.show()

Plot the Clusters

In [14]:
def plot_clusters(df, labels, esg_col='ESG Score', return_col='Return'
                  ):
    """
    Plot the clusters of a dataframe based on cluster labels.

    Args:
        df (pd.DataFrame): The dataframe to plot.
        labels (np.ndarray): The cluster labels of each row in the dataframe.
    """

    # Create a scatter plot of "ESG Score" vs "Return" and use cluster labels to define the colors
    sns.scatterplot(x=esg_col, y=return_col, hue=labels, data=df)
    plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
    plt.show()

    # Print the histogram of the cluster labels
    sns.histplot(labels)
    plt.show()

Plot the ESG distribution per Cluster

# TEST CODE

# RUN CODE

##### First, we get the Data and clean it

Get the Data

In [15]:
#let's download the all files we need 
for key, value in config.DICT_URL.items():
    #check if the file is already downloaded
    if not os.path.exists(config.LOCAL_FOLDER + key):
        download_excel_file(value, config.LOCAL_FOLDER + key)
        print("Downloaded file: " + key)
    
df_individual_scores_10Y = refinite_to_python(config.LOCAL_FOLDER + "ESG-individual_scores_10Y.xlsx")

100%|██████████| 228/228 [00:02<00:00, 89.74it/s]


Calculate the optimal number of clusters

In [None]:
#Calculate the optimal number of clusters
optimal_n_clusters = calculate_optimal_n_clusters(df_individual_scores_10Y, config.ESG_SCORES, config.RETURN)


Clean it and add clusters

In [19]:
#first, lets rename the Return column 
df_individual_scores_10Y.rename(columns={'YTD Total Return':'Return'}, inplace=True)
df_individual_scores_10Y_clean = clean_data(df_individual_scores_10Y)



##### Now we start with some basic data exploration and visualization 

> ESG by country

In [20]:
create_esg_map(df_individual_scores_10Y_clean)

ESG by Cluster