In [1]:
import os
from dotenv import load_dotenv
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.stats as stats
import statsmodels.api as sm
import eikon as ek
import random
from datetime import timedelta, datetime

### Generating random test data for the attribution model

In [2]:
def create_portfolio_data(start_date, tickers, sector_map, common_data=None, num_days=20):
    dates = [start_date + timedelta(days=i) for i in range(num_days)]
    data = []

    for date in dates:
        daily_weights = np.random.dirichlet(np.ones(len(tickers)), size=1).flatten()
        for ticker, weight in zip(tickers, daily_weights):
            # Use pre-generated returns for common tickers
            if common_data and ticker in common_data['returns']:
                return_ = common_data['returns'][ticker].loc[common_data['returns'][ticker]['Date'] == date]['Return'].values[0]
            else:
                return_ = np.random.uniform(-0.05, 0.05)
            # Use the pre-assigned GICS Sector
            gics_sector = sector_map[ticker]
            data.append([date, ticker, weight, return_, gics_sector])

    return pd.DataFrame(data, columns=['Date', 'Ticker', 'Weight', 'Return', 'GICS Sector'])

# Define all possible tickers and assign GICS Sectors. Note that this data is completely random and nonsensical used for tests only.
all_tickers = ['AAPL', 'TSLA', 'NVDA', 'GOOGL', 'FB', 'JNJ', 'PG', 'V']
gics_sectors = ['Information Technology', 'Consumer Discretionary', 'Health Care', 'Financials', 'Industrials']

# Create a persistent mapping of tickers to GICS Sectors
sector_map = {ticker: random.choice(gics_sectors) for ticker in all_tickers}

# Generate common returns for the common tickers
num_days = 20
dates = [datetime(2023, 9, 1) + timedelta(days=i) for i in range(num_days)]
common_returns_data = {
    ticker: pd.DataFrame({'Date': dates, 'Return': np.random.uniform(-0.05, 0.05, num_days)})
    for ticker in all_tickers if ticker in ['TSLA', 'NVDA']  # Common tickers
}
common_data = {'returns': common_returns_data}

# Define the contents of each portfolio
tickers_portfolio_1 = ['AAPL', 'TSLA', 'NVDA', 'GOOGL', 'FB']
tickers_portfolio_2 = ['TSLA', 'NVDA', 'JNJ', 'PG', 'V']

# Create two datasets for the same month with some common tickers
portfolio_1 = create_portfolio_data(datetime(2023, 9, 1), tickers_portfolio_1, sector_map, common_data=common_data, num_days=num_days)
portfolio_2 = create_portfolio_data(datetime(2023, 9, 1), tickers_portfolio_2, sector_map, common_data=common_data, num_days=num_days)


In [3]:


def merge_dataframes(df1, df2):
    merged_data = (pd.merge(df1, df2, on=['Date', 'Ticker', 'GICS Sector', 'Return'], how='outer', suffixes=('_portofolio', '_benchmark'))
                     .rename(columns={'Weight_portofolio': 'Portfolio Weight', 'Weight_benchmark': 'Benchmark Weight'})
                     .fillna(0)
                   )
    return merged_data

merged_data = merge_dataframes(portfolio_1, portfolio_2)
merged_data.sort_values(by=['Date', 'Ticker'], inplace=True, ignore_index=True)

### Manipulating the raw data

In [4]:
def calculate_weighted_returns(weights, returns):
    return weights * returns

def calculate_sector_weights(df, weight_column_name):
    return df.groupby(['Date', 'GICS Sector'])[weight_column_name].transform('sum')

# Function to calculate asset weight in sector
def calculate_asset_weight_in_sector(df, weight_column_name):
    sector_weights = calculate_sector_weights(df, weight_column_name)
    return np.where(sector_weights == 0, 0, df[weight_column_name] / sector_weights)

# Function to calculate an assets sector contribution
def calculate_sector_contribution_return(asset_weights_in_sector, returns):
    return asset_weights_in_sector * returns

# Function to calculate daily sector return
def calculate_total_sector_return(df, sector_contribution_column):
    return df.groupby(['Date', 'GICS Sector'])[sector_contribution_column].transform('sum')

# Function to calculate daily portfolio return
def calculate_daily_total_return(df, total_sector_return_column):
    return df.groupby('Date')[total_sector_return_column].transform('sum')


# Function to apply all calculations to a DataFrame for given portfolio or benchmark columns
def apply_calculations_to_df(df, weight_col, return_col, prefix):
    df[f'{prefix} Weighted Return'] = calculate_weighted_returns(df[weight_col], df[return_col])
    df[f'{prefix} Sector Weight'] = calculate_sector_weights(df, weight_col)
    df[f'{prefix} Asset Weight in Sector'] = calculate_asset_weight_in_sector(df, weight_col)
    sector_contribution_return = calculate_sector_contribution_return(df[f'{prefix} Asset Weight in Sector'], df[return_col])
    df[f'{prefix} Sector Contribution Return'] = sector_contribution_return
    total_sector_return = calculate_total_sector_return(df, f'{prefix} Sector Contribution Return')
    df[f'{prefix} Daily Sector Return'] = total_sector_return
    df[f'{prefix} Daily Total Return'] = calculate_daily_total_return(df, f'{prefix} Daily Sector Return')

    return df

# Apply the calculations to both portfolio and benchmark
merged_data = apply_calculations_to_df(merged_data, 'Portfolio Weight', 'Return', 'Portfolio')
merged_data = apply_calculations_to_df(merged_data, 'Benchmark Weight', 'Return', 'Benchmark')


### Attribution calculations

In [6]:
def calculate_allocation_effect(df):
    return (df['Portfolio Sector Weight'] - df['Benchmark Sector Weight']) * (df['Benchmark Daily Sector Return'] - df['Benchmark Daily Total Return'])

def calculate_selection_effect(df):
    return df['Benchmark Sector Weight'] * (df['Portfolio Daily Sector Return'] - df['Benchmark Daily Sector Return'])

def calculate_interaction_effect(df):
    return (df['Portfolio Sector Weight'] - df['Benchmark Sector Weight']) * (df['Portfolio Daily Sector Return'] - df['Benchmark Daily Sector Return'])

def sum_of_effects(allocation, selection, interaction):
    return allocation + selection + interaction

In [7]:
excess_test = merged_data[['Date', 'Portfolio Weighted Return', 'Benchmark Weighted Return']]
excess_test['Excess Return'] = excess_test['Portfolio Weighted Return'] - excess_test['Benchmark Weighted Return']
daily_excess_returns = excess_test[['Date', 'Excess Return']].groupby(['Date']).sum()

allocation = calculate_allocation_effect(merged_data)
selection = calculate_selection_effect(merged_data)
interaction = calculate_interaction_effect(merged_data)
sum_effects = sum_of_effects(allocation, selection, interaction)

sum_effects.index = merged_data['Date']
sum_effects.name = 'Sum Effects'  # Assign a name to the series
daily_sum_of_attributions = sum_effects.drop_duplicates().groupby(['Date']).sum()

# Merge the data and compare the results. They should be the same.
testing_calculation_validity = pd.merge(daily_excess_returns, daily_sum_of_attributions, on='Date', how='inner')
testing_calculation_validity

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  excess_test['Excess Return'] = excess_test['Portfolio Weighted Return'] - excess_test['Benchmark Weighted Return']


Unnamed: 0_level_0,Excess Return,Sum Effects
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2023-09-01,-0.008123,-0.008123
2023-09-02,0.027996,0.027996
2023-09-03,-0.013347,-0.013347
2023-09-04,-0.00727,-0.00727
2023-09-05,0.000475,0.000475
2023-09-06,-0.00541,-0.00541
2023-09-07,0.008788,0.008788
2023-09-08,0.006079,0.006079
2023-09-09,-0.0143,-0.0143
2023-09-10,-0.01083,-0.01083
