In [None]:
# IMPORTANT: The parameters below are set only for running this notebook independently. 
# When executing the full Ploomber pipeline, these values will be overridden by the settings in `pipeline.yaml`. 
# Any modifications made here will not persist when running the pipeline.
upstream = None
COUNTRY =  'ARM' # Code of the Country
product = {'data': f'../data/processed/{COUNTRY}/total_indicator_achievements.xlsx'}  # Path to save the final data product (stored under the 'data' key)
data_source = 'data/raw/insight-ram-data-sets'  # Path to the source data directory

This Notebook summarizes the indicator performance (standard and additional indicators) from RAM3 insight data, looking mostly at end-of-year assessments of indicators.

In [None]:
import re
import pandas as pd
from pathlib import Path
from unicef_cpe.config import PROJ_ROOT
import unicef_cpe

In [None]:
country_map = {k:v for k,v in unicef_cpe.utils.get_ecaro_countries_mapping(priority=False).items() if k in COUNTRY}
country_code_map = {v:k for k,v in country_map.items() }

### Indicator Data

Comments:

To add this to Mykola's structure:

- clean_indicators - > processing.py

- add function: extract_year (for getting finalization years from finalization_date)

- change main.ipynb to include differences in deduplicating data (df_indicators and df_indicators_cleaned should be merged based on what we think is the best de-duplicating strategy so it is not messy)

- include the plotting - > plotting.py

**RAM Indicators**

In [None]:
file_paths = PROJ_ROOT / data_source
file_paths = sorted(Path(file_paths).glob('*.xlsx'))
file_paths

In [4]:
def extract_year(date_str):
    '''
    Return dates as years in integers
    '''
    try:
        # Attempt to parse as dd.mm.yyyy
        return int(pd.to_datetime(date_str, format='%d.%m.%Y').year)
    except ValueError:
        try:
            # If it fails, attempt to parse as mm.dd.yyyy
            return int(pd.to_datetime(date_str, format='%m.%d.%Y').year)
        except ValueError:
            # If it fails again, return None 
            return None

The clean_indicators function can be found in processing.py. I have changed it here to collect all rows with an indicator entry, even where there is no indicator_actual data.

I also keep extra columns ["ram3_year", "result_area", "indicator_disaggregation", "indicator_rating_type", "rating", "indicator_rating_finalization_status"].

"ram3_year" -  year of xlsx data file


In [5]:
def clean_indicators(df: pd.DataFrame) -> pd.DataFrame:

    # Remove leading or trailing whitespace, convert column names to lowercase with spaces replaced with _
    df.rename(lambda x: re.sub(r"\s+", "_", x.strip().lower()), axis=1, inplace=True)
    # Check if the result_area strings starts with one or more digits, treating NaN values as empty strings
    mask = df["result_area"].fillna("").str.match(r"\d+")
    df = df.loc[mask].copy()
    # Remove any columns from the DataFrame that contain only NaN 
    df.dropna(axis=1, how="all", inplace=True)
    # Remove rows where the value in the "indicator_code"/"indicator_actual" column is NaN (rows without indicators)
    df.dropna(subset=["indicator_code"], inplace=True) 

    def to_float(x):
        try:
            return float(x)
        except:
            return None
        
    def to_int(x):
        try:
            return int(x)
        except:
            return None
        
    # convert the years to integers, if blank leave 
    for column in ("baseline_year", "target_year"):
        df[column] = df[column].apply(to_int)

    # convert non-text entries to floats
    for column in ["baseline_value", "target_value", "indicator_actual"]:
        df[column] = df.apply(
        lambda row: to_float(row[column]) if row["indicator_unit"] != "TEXT" else row[column],
        axis=1
        )
 
    mapping = unicef_cpe.utils.get_ecaro_countries_mapping(keys="code", values="iso")
    df["country"] = df["business_area"].str.split(" - ").str.get(-1).replace(mapping)

    to_keep = [
        "country",
        "indicator_code",
        "ram3_year",
        "result_area",
        "indicator",
        "indicator_category",
        "indicator_unit",
        "baseline_year",
        "baseline_value",
        "indicator_disaggregation",
        "target_year",
        "target_value",
        "finalization_date",
        "indicator_actual",
        "indicator_rating_type",
        "rating",
        "indicator_rating_finalization_status",
#        "start_date",
#        "end_date",
    ]
    df = df.reindex(to_keep, axis=1)

    return df


In [None]:
dfs = []
for file_path in file_paths:
    # check skiprows based on raw data (Mykola = 8)
    df_indicators = pd.read_excel(file_path, skiprows=7)
    # Extract the year from the file_path
    match = re.search(r'\b\d{4}\b', str(file_path))
    ram3_year = match.group(0) if match else None
    # Add the spreadsheet year as a new column ['ram3_year'] to the DataFrame
    # file: RAM3 Indicator Performance by Result Area 2018 appears as 2018 in ['ram3_year']
    df_indicators['ram3_year'] = ram3_year
    df_indicators = clean_indicators(df_indicators)
    dfs.append(df_indicators)

df_indicators = pd.concat(dfs, axis=0, ignore_index=True)
print('Shape before:', df_indicators.shape)
# sort by country, indicator code and year of exported spreadsheet
df_indicators.sort_values(
    by=['country', 'indicator_code', 'ram3_year'],
    ignore_index=True,
    inplace=True,
)
df_indicators = df_indicators.loc[df_indicators['country'].eq(COUNTRY)].copy()
print('Shape after countries selected:', df_indicators.shape)

#display(df_indicators.head())

This is a version of Mykola's drop_duplicates but I have to ignore the xlsx file year (ram3_year) and the result_area. result_area has to be ignored since the codes change from 2021 to 2022 and duplicated data has different results codes (I checked this) .

In [None]:
print('Shape before:', df_indicators.shape)
# drop duplicates, disregarding ram3_year and 'result_area' (changes from 2021 -> 2022) to avoid repeated entries
subset_columns = df_indicators.columns.difference(['ram3_year','result_area'])
df_indicators.drop_duplicates(subset=subset_columns, ignore_index=True, inplace=True)
print('Shape after duplicates dropped:', df_indicators.shape)

This is where I don't agree with the way Mykola deduplicates data. 

The way he filters will drop based on the last date, but this only works if the dates are different (and they aren't often). This is because, without indicator_disaggregation, indicators that distinguish between boys/girls/total or by age group etc all appear the same. They are usually finalized on the same date so you have no idea if you have taken the total data or the data for girls/boys. It mostly works for him as it seems Unicef follows a similar pattern for how they enter the disaggregated data, but we should be careful here if we make changes and also maybe we want to include all the different categories not just the total.

In [None]:
# Mykola: deduplicate the data
print('Shape before:', df_indicators.shape)

df_indicators_cleaned = df_indicators.copy()
# Mykola
to_keep = ['country', 'indicator_code', 'target_year','finalization_date']
# I think this would be better: 
#to_keep = ['country', 'indicator_code', 'target_year','indicator_rating_type','indicator_disaggregation','finalization_date' ]
df_indicators_cleaned[to_keep[-1]] = pd.to_datetime(df_indicators_cleaned[to_keep[-1]], format='%d.%m.%Y')
# take the row with the most recent finalisation date
df_indicators_cleaned = df_indicators_cleaned.sort_values(to_keep).groupby(to_keep[:-1]).tail(1)
assert df_indicators_cleaned.duplicated(subset=to_keep[:-1]).sum() == 0, 'Duplicates'

print('Shape after:', df_indicators_cleaned.shape)

**Output Indicators**

In [11]:
# Add the finalization year as a column
df_indicators['finalization_year'] = df_indicators['finalization_date'].apply(extract_year)
df_indicators_cleaned['finalization_year'] = df_indicators_cleaned['finalization_date'].apply(extract_year)

In [12]:
df_indicators = df_indicators[df_indicators['finalization_year']>2017]

In [None]:


output_path = Path(product['data'])
output_path.parent.mkdir(parents=True, exist_ok=True)  # Create missing directories
df_indicators.to_excel(product['data'], index=False)

In [14]:
################################################################################################################################################################################################