# Filter ETFs, Index Funds and Active Funds

## filter_function
This function loads merged_data.csv and assigns funds to one of the following categories: `ETF`, `index_fund`, `active_fund`. Further the function allows for a filter on the location of the fund.

- Data Ingestion: Reads input CSV files in parallel with Dask.
- Data Cleaning: Drops rows with missing values in essential columns (stock_RIC, fund_type, fund_name, percent_of_traded_shares).
- Filtering: Filters data based on specified fund types (etf, mutual fund, index fund, active fund). Optionally restricts the data to European countries using a predefined country list.
- Data Transformation: Adjusts the date column to the last day of the previous month for ETFs and mutual funds. Removes any unnamed columns and duplicates based on key identifiers.


In [2]:
import os
import logging
from typing import List, Optional
import dask.dataframe as dd
import pandas as pd

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def country_filter(ddf: dd.DataFrame, selected_countries: Optional[List[str]] = None) -> dd.DataFrame:
    """
    Filters the Dask DataFrame for the specified countries.

    Parameters:
    - ddf: Dask DataFrame to filter.
    - selected_countries: List of country names to include. Defaults to European countries.

    Returns:
    - Filtered Dask DataFrame.
    """
    if selected_countries is None:
        selected_countries = [
            "Albania", "Andorra", "Armenia", "Austria", "Azerbaijan", "Belarus", "Belgium",
            "Bosnia and Herzegovina", "Bulgaria", "Croatia", "Cyprus", "Czech Republic",
            "Denmark", "Estonia", "Finland", "France", "Georgia", "Germany", "Greece",
            "Hungary", "Iceland", "Ireland", "Italy", "Kazakhstan", "Kosovo", "Latvia",
            "Liechtenstein", "Lithuania", "Luxembourg", "Malta", "Moldova", "Monaco",
            "Montenegro", "Netherlands", "North Macedonia", "Norway", "Poland", "Portugal",
            "Romania", "San Marino", "Serbia", "Slovakia", "Slovenia", "Spain", "Sweden",
            "Switzerland", "Turkey", "Ukraine", "United Kingdom", "Vatican City"
        ]
    logging.info("Applying country filter...")
    return ddf[ddf['country'].isin(selected_countries)]

def filter_function(
    input_file: str,
    output_file: str,
    fund_type: str = "etf",
    subset: Optional[str] = None
) -> None:
    """
    Filters the input CSV based on fund type and subset, then writes the result to a CSV.

    Parameters:
    - input_file: Path to the input CSV file.
    - output_file: Path to the output CSV file.
    - fund_type: Type of fund to filter ('etf', 'mutual fund', 'index fund', 'active fund').
    - subset: Subset criteria (e.g., 'europe').
    """
    try:
        logging.info(f"Reading input file: {input_file}")
        ddf = dd.read_csv(input_file)

        # Drop rows with missing essential fields
        logging.info("Dropping rows with missing values in essential columns...")
        essential_columns = ["stock_RIC", "fund_type", "fund_name", "percent_of_traded_shares"]
        ddf = ddf.dropna(subset=essential_columns)

        # Define filtering logic based on fund type
        logging.info(f"Filtering for fund type: {fund_type}")
        if fund_type == "etf":
            filtered_ddf = ddf[ddf['fund_type'] == 'Exchange-Traded Fund']
        elif fund_type == "mutual fund":
            filtered_ddf = ddf[
                ((ddf['fund_type'] != 'Exchange-Traded Fund') & ~ddf['fund_name'].str.contains('Vanguard', case=False, na=False)) |
                ((ddf['fund_investment_type'] != 'Index') & (ddf['fund_type'] != 'Exchange-Traded Fund'))
            ]
        elif fund_type == "index fund":
            filtered_ddf = ddf[ddf['fund_investment_type'] == 'Index']
        elif fund_type == "active fund":
            filtered_ddf = ddf[ddf['fund_investment_type'] != 'Index']
        else:
            logging.error(f"Unsupported fund type: {fund_type}")
            return

        # Apply country filter if subset is specified
        if subset == "europe":
            filtered_ddf = country_filter(filtered_ddf)

        # Persist the filtered Dask DataFrame in memory
        logging.info("Persisting filtered data in memory...")
        filtered_ddf = filtered_ddf.persist()

        # Perform transformations with Dask
        if fund_type in ["etf", "mutual fund"]:
            logging.info("Processing date column...")
            filtered_ddf['date'] = dd.to_datetime(filtered_ddf['date'], errors='coerce')
            filtered_ddf['date'] = filtered_ddf['date'] - pd.offsets.MonthEnd(1)

        # Drop unnamed columns
        logging.info("Cleaning DataFrame by dropping unnamed columns and duplicates...")
        df_clean = filtered_ddf.loc[:, ~filtered_ddf.columns.str.contains('Unnamed')]
        df_clean = df_clean.drop_duplicates(subset=['stock_RIC', 'fund_name', 'date', 'percent_of_traded_shares'], keep='first')

        # write to CSV
        logging.info(f"Writing filtered data to output file: {output_file}")
        df_clean.to_csv(output_file, single_file=True, index=False)
        logging.info("Filter function completed successfully.")
    except Exception as e:
        logging.error(f"An error occurred in filter_function: {e}")

def aggregation_function(
    input_file: str,
    output_file: str,
    fund_type: str = "etf"
) -> None:
    """
    Aggregates the input CSV by 'stock_RIC' and 'date', summing specified columns, and writes to CSV.

    Parameters:
    - input_file: Path to the input CSV file.
    - output_file: Path to the output CSV file.
    - fund_type: Type of fund for renaming columns ('mutual fund', 'index fund', 'active fund').
    """
    try:
        logging.info(f"Reading input file for aggregation: {input_file}")
        df = pd.read_csv(input_file)

        logging.info("Grouping and aggregating data...")
        grouped_df = df.groupby(['stock_RIC', 'date'])[['stock_value_held', "percent_of_traded_shares"]].sum().reset_index()

        # Rename columns based on fund type
        rename_mapping = {}
        if fund_type == "mutual fund":
            rename_mapping = {
                'stock_value_held': 'FUND_stock_value_held',
                'percent_of_traded_shares': 'FUND_percent_of_traded_shares'
            }
        elif fund_type == "index fund":
            rename_mapping = {
                'stock_value_held': 'INDEX_FUND_stock_value_held',
                'percent_of_traded_shares': 'INDEX_FUND_percent_of_traded_shares'
            }
        elif fund_type == "active fund":
            rename_mapping = {
                'stock_value_held': 'ACTIVE_FUND_stock_value_held',
                'percent_of_traded_shares': 'ACTIVE_FUND_percent_of_traded_shares'
            }
        else:
            logging.warning(f"Unknown fund type '{fund_type}'. Columns will not be renamed.")

        if rename_mapping:
            grouped_df.rename(columns=rename_mapping, inplace=True)

        logging.info(f"Writing aggregated data to output file: {output_file}")
        grouped_df.to_csv(output_file, index=False)
        logging.info("Aggregation function completed successfully.")
    except Exception as e:
        logging.error(f"An error occurred in aggregation_function: {e}")

def main():
    # Configuration dictionary for file paths and parameters
    config = {
        "input_file": '/Users/jonathanzeh/Library/CloudStorage/OneDrive-Personal/BA_Thesis/BA_coding/datasets/eikon_data/fund_holdings_data/merged_data.csv',
        "output_dir": '/Users/jonathanzeh/Library/CloudStorage/OneDrive-Personal/BA_Thesis/BA_coding/datasets/eikon_data/fund_holdings_data/fund_type_filtered',
        "subset": "europe",
        "fund_types": ["etf", "mutual fund", "index fund", "active fund"]
    }

    os.makedirs(config["output_dir"], exist_ok=True)

    # ETF Filtering
    filter_function(
        input_file=config["input_file"],
        output_file=os.path.join(config["output_dir"], "etf_data_europe_van.csv"),
        fund_type="etf",
        subset=config["subset"]
    )

    # Mutual Fund Filtering
    filter_function(
        input_file=config["input_file"],
        output_file=os.path.join(config["output_dir"], "fund_data_europe_van.csv"),
        fund_type="mutual fund",
        subset=config["subset"]
    )

    # Index Fund Filtering
    filter_function(
        input_file=os.path.join(config["output_dir"], "fund_data_europe_van.csv"),
        output_file=os.path.join(config["output_dir"], "index_fund_data_europe_van.csv"),
        fund_type="index fund",
        subset=config["subset"]
    )

    # Active Fund Filtering
    filter_function(
        input_file=os.path.join(config["output_dir"], "fund_data_europe_van.csv"),
        output_file=os.path.join(config["output_dir"], "active_fund_data_europe_van.csv"),
        fund_type="active fund",
        subset=config["subset"]
    )

if __name__ == "__main__":
    main()
