For all datasets:
1. Extract dataset: 
    - HTTP download into csv file format
        - Resilience: Add retries after n-seconds if unsuccessful initially

2. Transform dataset
    - Drop all columns that are unnecessary
    - (Translate column names) - this would be hardcoded and has potential license implications (just like transforming actually)
    - Check for missing values:
        - if over a certain threshold: handle with:
            - mean/mode/deletion/regression/knn
    - Change time datatypes into datetime format
    - (Potentially) Aggregate datasets into monthly formats
        - Not sure if that is best for this step or better saved for a later step

3. Load dataset into /data/ folder

- In order to make it modular (and because we are working with a large number of datasets):
    - define functions for standard tasks
        - download
        - datetime transformation
        - missing value handling
        - (dropping columns)
        - saving dataset
- Throughout it all use logging (on console & also in a log file? use library?) 
- Focus on Error Handling
- Don't forget to update github issue with this content
- Add .sh
- Perform operations not in place

In [108]:
# Installs
#%pip install retry

# TODO: Necessary? Does this fw the .sh -> create a proper venv for this project?

In [1]:
# Imports

import pandas as pd
import logging
from retry import retry
import requests	
from enum import Enum, auto
import io
import copy
import os

In [2]:
# Configure the logging system
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')

In [29]:
@retry(tries=3, delay=30, logger=logging.getLogger())
def extract_dataset(dataset_url: str, timeout: (int,int) = (None, None)):
    """
    Download datasets via HTTP request.
    Retry three times, after waiting for 30s each, if unsuccessful.
    
    Parameters:
    dataset_url: (str): URL of a dataset in the csv-file format.
    timeout: (int, int): The timeout for the HTTP request in seconds. First tuple value is connection timeout, second tuple value is read timeout. Default behaviour is, that no time-out is applied
    
    Returns:
    response: The response of the HTTP GET request.
    """
    
    logging.info(f"Attempting to fetch data from {dataset_url}") 
    response = requests.get(dataset_url, timeout=timeout) 
    response.raise_for_status()  # Raise an exception for HTTP errors 
    logging.info(f"Successfully fetched data from {dataset_url}") 
    return response

In [22]:
def extract_into_df(csv_data, separator=","):
    '''
    Load a csv dataset into a pandas dataframe for further transformation.
    
    Parameters:
    csv_data: Dataset in CSV format.
    
    Returns:
    pd.DataFrame: The resulting DataFrame
    '''
    
    try: 
        logging.info(f"Attempting to load data into DataFrame") 
        df = pd.read_csv(io.StringIO(csv_data.content.decode('utf-8')), sep=separator)
        logging.info(f"Successfully loaded data into DataFrame with {len(df)} rows") 
        return df 
    except requests.exceptions.RequestException as e: 
        logging.error(f"Failed to load data: {e}") 
        raise e 
    except Exception as e: 
        logging.error(f"Unexpected error: {e}") 
        raise e

In [23]:
def filter_drop_columns(df, white_list):
    """
    Drop columns from a DataFrame except those in the whitelist.
    
    Parameters:
    df (pd.DataFrame): The DataFrame to operate on.
    white_list (list): List of columns to keep.
    
    Returns:
    pd.DataFrame: DataFrame with only the columns in the whitelist.
    """
    try:
        logging.info(f"Attempting to drop non-whitelisted columns")
        
        # Check if whitelist columns exist in DataFrame
        missing_columns = [col for col in white_list if col not in df.columns]
        if missing_columns:
            raise ValueError(f"The following columns in the whitelist are missing from the DataFrame: {missing_columns}")
        
        # Drop columns not in the whitelist
        columns_to_drop = [col for col in df.columns if col not in white_list]
        df_dropped = df.drop(columns=columns_to_drop)
        
        logging.info(f"Successfully dropped columns. Remaining columns: {list(df_dropped.columns)}")
        return df_dropped
    
    except ValueError as e:
        logging.error(f"Please make sure all columns in the white list are contained in the DataFrame: {e}")
        return df
    except Exception as e:
        logging.error(f"Unexpected error: {e}")
        return df


In [38]:
class Strategy(Enum):
    '''
    Each enumeration represents a stratgie for handling missing values of pd.DataFrame
    '''
    
    BFILL = auto()
    FFILL = auto()
    DROP_ROW = auto()
    LINEAR_INTERPOLATION = auto()
    MODE = auto()
    MEDIAN = auto()

def filter_handle_missing_values(df, column, threshold=0, strategy=Strategy.DROP_ROW):
    """
    Handle missing values in a specific column of a DataFrame based on a given strategy.
    
    Parameters:
    df (pd.DataFrame): The DataFrame to operate on.
    column (str): The column to handle missing values for.
    threshold (float): Threshold of missing values (0-1) after which the strategy is applied.
    strategy (Strategy): Strategy for handling missing values.
    
    Returns:
    pd.DataFrame: DataFrame with missing values handled in the specified column.
    """
    try:
        temp_df = copy.deepcopy(df) # Avoid making in place changes to the dataframe
        
        # Calculate the percentage of missing values in the column
        missing_ratio = temp_df[column].isnull().mean()
        
        if missing_ratio > threshold:
            logging.info(f"Column '{column}' has {missing_ratio * 100:.2f}% missing values, applying {strategy.name} strategy")
            
            if strategy == Strategy.BFILL:
                temp_df[column] = temp_df[column].fillna(method='bfill')
                logging.info(f"Applied back fill strategy to column '{column}'")
            
            elif strategy == Strategy.FFILL:
                temp_df[column] = temp_df[column].fillna(method='ffill')
                logging.info(f"Applied forward fill strategy to column '{column}'")
            
            elif strategy == Strategy.DROP_ROW:
                temp_df = temp_df.dropna(subset=[column])
                logging.info(f"Dropped rows with missing values in column '{column}'")
            
            elif strategy == Strategy.LINEAR_INTERPOLATION:
                temp_df[column] = temp_df[column].interpolate(method='linear')
                logging.info(f"Applied linear interpolation to column '{column}'")
            
            elif strategy == Strategy.MODE:
                mode_value = temp_df[column].mode()[0]
                temp_df[column] = temp_df[column].fillna(mode_value)
                logging.info(f"Applied mode imputation to column '{column}' with mode value {mode_value}")
            
            elif strategy == Strategy.MEDIAN:
                median_value = temp_df[column].median()
                temp_df[column] = temp_df[column].fillna(median_value)
                logging.info(f"Applied median imputation to column '{column}' with median value {median_value}")
            
            return temp_df
        else:
            logging.info(f"Column '{column}' has {missing_ratio * 100:.2f}% missing values, which is lower than the threshold of {threshold * 100:.2f}%. Not applying a strategy.")
            return df
    
    except Exception as e:
        logging.error(f"Unexpected error while handling missing values in column '{column}' with strategy '{strategy.name}': {e}")
        return df

In [44]:
def filter_rows_by_value(df, column_name, column_value):
    """
    Filter rows in a DataFrame based on column value and drop all rows where values do not match the given value.
    
    Parameters:
    df (pd.DataFrame): The DataFrame to filter.
    column_name (str): The column name to check for the value.
    column_value: The value to filter rows by.
    
    Returns:
    pd.DataFrame: DataFrame with rows filtered by the given column value.
    """
    try:
        # Ensure the column exists in the DataFrame
        if column_name not in df.columns:
            logging.error(f"Column '{column_name}' does not exist in the DataFrame.")
            return df
        
        # Create a mask for the matching rows
        mask = df[column_name] == column_value
        affected_rows = len(df) - mask.sum()  # Calculate the number of rows that do not match
        
        # Filter the DataFrame
        filtered_df = df[mask]
        
        logging.info(f"Column '{column_name}': Filtering rows where value is '{column_value}'.")
        logging.info(f"Number of rows dropped: {affected_rows}")
        return filtered_df
    except Exception as e:
        logging.error(f"Unexpected error while filtering rows by '{column_name}' with value '{column_value}': {e}")
        return df

In [25]:
# TODO: I will have to see how well this works with the different datasets. Check how many NaT (not a time) values there are, and handle that case (possibly using above function)

def filter_transform_to_datetime(df, column):
    """
    Transform the date column in various formats to a uniform datetime datatype.
    
    Parameters:
    df (pd.DataFrame): The DataFrame containing the date column.
    column (str): The column name containing date values in various formats.
    
    Returns:
    pd.DataFrame: DataFrame with the date column transformed to datetime.
    """
    try:
        temp_df = copy.deepcopy(df) # Avoid making in place changes to the dataframe
        
        logging.info(f"Transforming column '{column}' to datetime")
        temp_df[column] = pd.to_datetime(temp_df[column], errors='coerce')
        logging.info(f"Successfully transformed column '{column}' to datetime")
    except Exception as e:
        logging.error(f"Unexpected error while transforming column '{column}' to datetime: {e}")
        return None
    
    return temp_df

In [26]:
def load_df_to_csv(df, file_name, file_path='../data/', overwrite=False):
    """
    Save a DataFrame to a CSV file.
    
    Parameters:
    df (pd.DataFrame): The DataFrame to save.
    file_path (str): The path where the CSV file will be saved. The default path is that to the local /data/ folder, as required by the project specifications.
    file_name (str): The name of the file to be stored, excluding the file ending, which is hardcoded as '.csv'
    overwrite (bool): Flag to allow overwriting of existing files.
    
    Returns:
    None
    """
    
    if not file_path: # Check if file_path is an empty string 
        file_path = './' # Default to current working directory 
    
    full_path = os.path.join(file_path, file_name + '.csv')
    
    # Check if the file already exists 
    if os.path.exists(full_path):
        if not overwrite:
            logging.error(f"File '{full_path}' already exists. Set overwrite-flag to True in order to perform this action") 
            return
        else: 
            logging.warning(f"File '{full_path}' is being overwritten as the ovewrite-flag is set to True")
        
    
    try:
        df.to_csv(full_path, index=False)
        logging.info(f"DataFrame successfully saved to {full_path}")
    except PermissionError as e:
        logging.error(f"Permission error while trying to save the DataFrame to {full_path}: {e}")
    except FileNotFoundError as e:
        logging.error(f"File not found error while trying to save the DataFrame to {full_path}: {e}")
    except Exception as e:
        logging.error(f"Unexpected error while saving the DataFrame to {full_path}: {e}")

# Applying the ETL Pipeline to the datasets 

### Chile Covid Mortality Dataset

In [33]:
chile_url = "https://datos.gob.cl/dataset/8982a05a-91f7-422d-97bc-3eee08fde784/resource/8e5539b7-10b2-409b-ae5a-36dae4faf817/download/defunciones_covid19_2020_2024.csv"

# Extract the dataset into a data-frame
chile_data = extract_dataset(chile_url, timeout=(200,200))
chile_df = extract_into_df(chile_data, separator=";")

2024-11-12 10:16:22,112 - INFO - Attempting to fetch data from https://datos.gob.cl/dataset/8982a05a-91f7-422d-97bc-3eee08fde784/resource/8e5539b7-10b2-409b-ae5a-36dae4faf817/download/defunciones_covid19_2020_2024.csv
2024-11-12 10:16:22,114 - DEBUG - Starting new HTTPS connection (1): datos.gob.cl:443
2024-11-12 10:16:23,016 - DEBUG - https://datos.gob.cl:443 "GET /dataset/8982a05a-91f7-422d-97bc-3eee08fde784/resource/8e5539b7-10b2-409b-ae5a-36dae4faf817/download/defunciones_covid19_2020_2024.csv HTTP/11" 200 16071695
2024-11-12 10:16:25,468 - INFO - Successfully fetched data from https://datos.gob.cl/dataset/8982a05a-91f7-422d-97bc-3eee08fde784/resource/8e5539b7-10b2-409b-ae5a-36dae4faf817/download/defunciones_covid19_2020_2024.csv
2024-11-12 10:16:25,470 - INFO - Attempting to load data into DataFrame
2024-11-12 10:16:25,653 - INFO - Successfully loaded data into DataFrame with 58289 rows


In [41]:
# Perform transformations

# Required fields for analysis are the death-date and the diagnosis (COVID-19)
chile_df = filter_drop_columns(chile_df, ["FECHA_DEF", "DIAG1"])

# Transform the date-fields into datetime objects
chile_df = filter_transform_to_datetime(chile_df, "FECHA_DEF")

# No missing values are imputed, as there are not enough missing values in the dataset
for column in chile_df.columns:
    chile_df = filter_handle_missing_values(chile_df, column=column, strategy=Strategy.DROP_ROW)

2024-11-12 10:18:39,514 - INFO - Attempting to drop non-whitelisted columns
2024-11-12 10:18:39,520 - INFO - Successfully dropped columns. Remaining columns: ['FECHA_DEF', 'DIAG1']
2024-11-12 10:18:39,522 - INFO - Transforming column 'FECHA_DEF' to datetime
2024-11-12 10:18:39,535 - INFO - Successfully transformed column 'FECHA_DEF' to datetime
2024-11-12 10:18:39,537 - INFO - Column 'FECHA_DEF' has 0.00% missing values, which is lower than the threshold of 0.00%. Not applying a strategy.
2024-11-12 10:18:39,540 - INFO - Column 'DIAG1' has 0.00% missing values, which is lower than the threshold of 0.00%. Not applying a strategy.


FECHA_DEF
DIAG1


In [42]:
# Load the transformed dataframe back into a CSV-database file.
load_df_to_csv(chile_df, file_name='chile_covid_mortality', overwrite=False)

2024-11-12 10:19:08,242 - ERROR - File '../data/chile_covid_mortality.csv' already exists. Set overwrite-flag to True in order to perform this action


### USA Covid Mortality Dataset

In [78]:
usa_url = "https://data.cdc.gov/api/views/exs3-hbne/rows.csv?fourfour=exs3-hbne&cacheBust=1729520760&date=20241106&accessType=DOWNLOAD"

# Extract the dataset into a data-frame
data = extract_dataset(usa_url, timeout=(200, 200))
usa_df = extract_into_df(data)

2024-11-12 10:44:27,907 - INFO - Attempting to fetch data from https://data.cdc.gov/api/views/exs3-hbne/rows.csv?fourfour=exs3-hbne&cacheBust=1729520760&date=20241106&accessType=DOWNLOAD
2024-11-12 10:44:27,910 - DEBUG - Starting new HTTPS connection (1): data.cdc.gov:443
2024-11-12 10:44:29,209 - DEBUG - https://data.cdc.gov:443 "GET /api/views/exs3-hbne/rows.csv?fourfour=exs3-hbne&cacheBust=1729520760&date=20241106&accessType=DOWNLOAD HTTP/11" 200 None
2024-11-12 10:44:32,628 - INFO - Successfully fetched data from https://data.cdc.gov/api/views/exs3-hbne/rows.csv?fourfour=exs3-hbne&cacheBust=1729520760&date=20241106&accessType=DOWNLOAD
2024-11-12 10:44:32,629 - INFO - Attempting to load data into DataFrame
2024-11-12 10:44:32,742 - INFO - Successfully loaded data into DataFrame with 79002 rows


In [81]:
# Perform transformations

# This dataset has duplicate values, therefore drop all rows for the different regions in the US and keep only the total US rows.
usa_df = filter_rows_by_value(usa_df, "jurisdiction_residence", "United States")

# Keep only required fields for analysis
usa_df = filter_drop_columns(usa_df, ["data_period_start", "data_period_end", "group", "subgroup1", "covid_deaths", "crude_rate"]) 

# Transform the date-fields into datetime objects. This also works for the american M/D/Y date format.
usa_df = filter_transform_to_datetime(usa_df, "data_period_start") 
usa_df = filter_transform_to_datetime(usa_df, "data_period_end")

# Drop the rows for which there is no data about covid mortality
print(f"Before: \n{usa_df.isnull().sum()} \n")
for column in usa_df.columns:
    usa_df = filter_handle_missing_values(usa_df, column=column, strategy=Strategy.DROP_ROW)
print(f"After: \n{usa_df.isnull().sum()} \n")

2024-11-12 10:44:56,053 - ERROR - Column 'jurisdiction_residence' does not exist in the DataFrame.
2024-11-12 10:44:56,055 - INFO - Attempting to drop non-whitelisted columns
2024-11-12 10:44:56,057 - INFO - Successfully dropped columns. Remaining columns: ['data_period_start', 'data_period_end', 'group', 'subgroup1', 'covid_deaths', 'crude_rate']
2024-11-12 10:44:56,059 - INFO - Transforming column 'data_period_start' to datetime
2024-11-12 10:44:56,066 - INFO - Successfully transformed column 'data_period_start' to datetime
2024-11-12 10:44:56,067 - INFO - Transforming column 'data_period_end' to datetime
2024-11-12 10:44:56,072 - INFO - Successfully transformed column 'data_period_end' to datetime
2024-11-12 10:44:56,075 - INFO - Column 'data_period_start' has 0.00% missing values, which is lower than the threshold of 0.00%. Not applying a strategy.
2024-11-12 10:44:56,076 - INFO - Column 'data_period_end' has 0.00% missing values, which is lower than the threshold of 0.00%. Not app

Before: 
data_period_start    0
data_period_end      0
group                0
subgroup1            0
covid_deaths         0
crude_rate           0
dtype: int64 

After: 
data_period_start    0
data_period_end      0
group                0
subgroup1            0
covid_deaths         0
crude_rate           0
dtype: int64 



In [82]:
# Load the transformed dataframe back into a CSV-database file.
load_df_to_csv(usa_df, file_name='usa_covid_mortality', overwrite=False)

2024-11-12 10:46:25,907 - INFO - DataFrame successfully saved to ../data/usa_covid_mortality.csv


In [None]:
usa_url = "https://data.cdc.gov/api/views/exs3-hbne/rows.csv?fourfour=exs3-hbne&cacheBust=1729520760&date=20241106&accessType=DOWNLOAD"

# Extract the dataset into a data-frame
data = extract_dataset(usa_url, timeout=(200, 200))
usa_df = extract_into_df(data)

In [None]:
# Perform transformations

# This dataset has duplicate values, therefore drop all rows for the different regions in the US and keep only the total US rows.
usa_df = filter_rows_by_value(usa_df, "jurisdiction_residence", "United States")

# Keep only required fields for analysis
usa_df = filter_drop_columns(usa_df, ["data_period_start", "data_period_end", "group", "subgroup1", "covid_deaths", "crude_rate"]) 

# Transform the date-fields into datetime objects. This also works for the american M/D/Y date format.
usa_df = filter_transform_to_datetime(usa_df, "data_period_start") 
usa_df = filter_transform_to_datetime(usa_df, "data_period_end")

# Drop the rows for which there is no data about covid mortality
print(f"Before: \n{usa_df.isnull().sum()} \n")
for column in usa_df.columns:
    usa_df = filter_handle_missing_values(usa_df, column=column, strategy=Strategy.DROP_ROW)
print(f"After: \n{usa_df.isnull().sum()} \n")

In [None]:
# Load the transformed dataframe back into a CSV-database file.
load_df_to_csv(usa_df, file_name='usa_covid_mortality', overwrite=False)