In [0]:
# Set the path to the Excel file
# https://datahelpdesk.worldbank.org/knowledgebase/articles/906519-world-bank-country-and-lending-groups
file = '/Workspace/Users/wbendinelli@gmail.com/PostHarvestLoss/data/external/income_group_historical.xlsx'

import pandas as pd

# Read the Excel file from the specified path using the openpyxl engine.
# - sheet_name: "Country Analytical History" (the sheet to read)
# - skiprows: Skip rows 0 and 4 (rows that should not be read)
# - header: Use row 3 as the header row
# - dtype: Read all columns as string type
df = pd.read_excel(
    file,
    sheet_name='Country Analytical History',
    skiprows=[0, 4],
    header=3,
    engine='openpyxl',
    dtype=str
)

# Remove the first 5 rows from the DataFrame and reset the index.
df = df.iloc[5:].reset_index(drop=True)

# Rename the first two columns:
# - Rename the first column to "code_c"
# - Rename the second column to "country"
df = df.rename(columns={df.columns[0]: "code_c", df.columns[1]: "country"})

# Drop rows where the "code_c" column has missing values.
df = df.dropna(subset=["code_c"])

# Define the identifier columns that will remain unchanged during the melting process.
id_vars = ["code_c", "country"]

# All remaining columns (starting from the third column) will be melted into a new column.
value_vars = df.columns[2:]

# Transform the DataFrame from wide to long format using pd.melt.
# - id_vars: Columns to keep as identifiers ("code_c" and "country")
# - value_vars: Columns that will be unpivoted (years)
# - var_name: Name for the new column that will hold the original column names ("year")
# - value_name: Name for the new column that will hold the values ("income_group")
df_long = pd.melt(df,
                  id_vars=id_vars,
                  value_vars=value_vars,
                  var_name="year",
                  value_name="income_group")

# Drop rows where the "income_group" column is null.
df_long = df_long.dropna(subset=["income_group"])

# Drop rows where the "income_group" column has the value ".."
df_long = df_long[df_long["income_group"] != ".."]

# Define a mapping dictionary to rename the income group codes to full descriptions.
mapping = {
    'L': 'Low income',
    'UM': 'Upper middle income',
    'H': 'High income',
    'LM': 'Lower middle income',
    'LM*': 'Lower middle income'
}

# Replace the values in the "income_group" column using the mapping dictionary.
df_long['income_group'] = df_long['income_group'].replace(mapping)

# Define the mapping dictionary for code_g
mapping_code_g = {
    'Low income': 'LIC',
    'Upper middle income': 'UMC',
    'Lower middle income': 'LMC',
    'High income': 'HIC'
}
# Create new column code_g by mapping the 'income_group' column using mapping_code_g
df_long['code_g'] = df_long['income_group'].replace(mapping_code_g)

# Convert the Pandas DataFrame (df_long) to a Spark DataFrame
spark_df = spark.createDataFrame(df_long)

# Write the Spark DataFrame as a Delta table in the Databricks database.
# Adjust the table name as needed.
spark_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("workspace.postharvestloss.aux_table_income_group_historical")

# Display the final transformed DataFrame.
df_long.display()