# **SDG 15.4.2 Sub-indicator A: Calculate Global Default Values**

* This script allows batch processing for this indicator for all countries.

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from pathlib import Path
import ee
ee.Initialize()

In [3]:
import os
from datetime import datetime
import pandas as pd
from openpyxl.utils import get_column_letter
from openpyxl.styles import Alignment
from pathlib import Path

from component.scripts.gee import reduce_regions
from component.scripts.scripts import (
    get_a_years,
    get_reporting_years,
    map_matrix_to_dict,
    read_from_csv,
    map_matrix_to_dict,
)
from component.scripts.colab_combining_files import (
    sanitize_description,
    append_excel_files,
)

<IPython.core.display.Javascript object>

In [4]:
DEM_DEFAULT = "CGIAR/SRTM90_V4"

# Define the translation matrix between ESA and MGCI LC classes

LC_MAP_MATRIX = Path("content/corine_lc_map_matrix2.csv")

# Check they both exist
assert LC_MAP_MATRIX.exists()

In [5]:
admin_asset_id = "projects/ee-xavidelamo/assets/M49Countries"

admin_asset_property_name = "M49Name"

# For Sub-indicator A (sub_a), we need to set the following structure.
a_years = {
    1: {"asset": "COPERNICUS/CORINE/V20/100m/2000", "year": 2000}, 
    2: {"year": 2006, "asset": "COPERNICUS/CORINE/V20/100m/2006"}, 
    3: {"year": 2012, "asset": "COPERNICUS/CORINE/V20/100m/2012"},
    4: {"year": 2018, "asset": "COPERNICUS/CORINE/V20/100m/2018"},
}


Output parameters

---



In [6]:
# Set the base directory
base_dir = Path("content/sdg1542/sub_a")
base_dir.mkdir(parents=True, exist_ok=True)

In [7]:
csv_path = base_dir/"raw_stats"
raw_reports = base_dir/"raw_reports"
final_report = base_dir/"final_report"

error_log_file_path = base_dir / "error_log.csv"

final_report_file_path = final_report / "final_report.xlsx"

# Create the directories
csv_path.mkdir(parents=True, exist_ok=True)
raw_reports.mkdir(parents=True, exist_ok=True)
final_report.mkdir(parents=True, exist_ok=True)

In [8]:
export = False

Create list of boundaries to process

In [None]:
# admin boundary feature collection
admin_boundaries = ee.FeatureCollection(admin_asset_id)

# Uncomment to process all countries
# codes_to_process = [840]
# admin_boundaries = admin_boundaries.filter(ee.Filter.inList("M49Code", codes_to_process))


# list to process
list_of_countries = admin_boundaries.aggregate_array(admin_asset_property_name).getInfo()

print ("Length of admin boundaries to process", len(list_of_countries))

list_of_countries = list(set(list_of_countries)) # remove dupicates

print ("Length of distinct admin boundaries to process", (len(set(list_of_countries))))


Length of admin boundaries to process 1
Length of distinct admin boundaries to process 1


In [None]:
default_map_matrix = map_matrix_to_dict(LC_MAP_MATRIX)

# Calculate tasks in GEE

In [None]:
counter=0 # starting place of counter used to keep track of number of tasks that are being run

for aoi_name in list_of_countries:

    aoi = admin_boundaries.filter(ee.Filter.eq(admin_asset_property_name,aoi_name))

    # gets areas of landcover in each mountain belt in each country
    # uses reduce_regions function imported from the cloned sepal_mgci git hub repository (see Imports section)
    # pixels counted at native resolution (scale) of input land cover (or DEM if RSA implementation)
    process = ee.FeatureCollection([
        ee.Feature(
            None,
            reduce_regions(
                aoi,
                remap_matrix=default_map_matrix,
                rsa=False,
                dem=DEM_DEFAULT, #default digital elevation model (DEM). Relevant for the real surface area (RSA) implementation.
                lc_years= year,
                transition_matrix=False,
                scale=None # None means native resolution
            )
        ).set("process_id", year[0]["year"])
        for year in get_a_years(a_years) # creates GEE images and runs stats on each. Images to run are in the 'a_years" dictionary (above)
    ])

    #make name acceptable for running tasks (i.e., removes special characters)
    task_name = str(sanitize_description(aoi_name))

    task = ee.batch.Export.table.toDrive(
        **{  #asterisks unpack dictionary into keyword arguments format
            "collection": process,
            "description": task_name,
            "fileFormat": "CSV",
            "folder":"sdg1542/sub_a/raw_stats",
            "selectors": [
                "process_id",
                "sub_a",
            ],
        }
    )

    counter+=1

    print (f"\r process {counter}/{len(list_of_countries)} {aoi_name} ", end="") #print in place (remove \r and end="" for verbose version)

    if export:
      task.start()

# Read in and translate results into report tables

In [None]:
from component.scripts.scripts import get_sub_a_data_reports

counter = 0

# Loop over each AOI name in the list of countries
for stats_csv_file_path in csv_path.glob("[!.]*.csv"):
    counter += 1
    aoi_name = stats_csv_file_path.stem.replace("_sepal", "")

    # # Clean the AOI name
    aoi_name_clean = str(sanitize_description(aoi_name))

    message = f"Process {counter}, {stats_csv_file_path}"

    try:
        # Read the results from the CSV file and parse it to a dictionary
        results = read_from_csv(stats_csv_file_path)
        reporting_years_sub_a = get_reporting_years(a_years, "sub_a")
        details = {
            "geo_area_name": aoi_name,
            "ref_area": " ",
            "source_detail": " ",
        }

        sub_a_reports, mtn_reports = get_sub_a_data_reports(
            results, reporting_years_sub_a, **details
        )

        # Concatenate the mtn reports
        mtn_reports_df = pd.concat(mtn_reports)

        # Concatenate the sub a reports
        er_mtn_grnvi_df = pd.concat([report[0] for report in sub_a_reports])
        er_mtn_grncov_df = pd.concat([report[1] for report in sub_a_reports])

        # Define the output report file path
        report_file_path = raw_reports / f"{aoi_name_clean}.xlsx"

        # Create the Excel file with the reports
        with pd.ExcelWriter(report_file_path) as writer:
            mtn_reports_df.to_excel(
                writer, sheet_name="Table1_ER_MTN_TOTL", index=False
            )
            er_mtn_grncov_df.to_excel(
                writer, sheet_name="Table2_ER_MTN_GRNCOV", index=False
            )
            er_mtn_grnvi_df.to_excel(
                writer, sheet_name="Table3_ER_MTN_GRNCVI", index=False
            )

            # Adjust column widths and alignment for each sheet
            for sheetname in writer.sheets:
                worksheet = writer.sheets[sheetname]
                for col in worksheet.columns:
                    max_length = max(len(str(cell.value)) for cell in col)
                    column = col[0]
                    adjusted_width = max(max_length, len(str(column.value))) + 4
                    worksheet.column_dimensions[
                        get_column_letter(column.column)
                    ].width = adjusted_width

                    # Align "obs_value" column to the right
                    if "OBS" in column.value:
                        for cell in col:
                            cell.alignment = Alignment(horizontal="right")

    except Exception as e:
        # If an error occurs, catch the exception and handle it
        message = f"process {counter}, {stats_csv_file_path.stem}, Error: {e}"

        # Get the current time
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        # Write the error message and file name to the error log file
        error_info = pd.DataFrame(
            [[stats_csv_file_path.stem, str(e), current_time]],
            columns=["File Name", "Error Message", "Time"],
        )

        mode = "w" if not os.path.exists(error_log_file_path) else "a"
        header = False if os.path.exists(error_log_file_path) else True

        # Append or write to the error log file
        error_info.to_csv(error_log_file_path, mode=mode, header=header, index=False)


    print(message)

### 10) Combine excel report files into one

Make a list of files to combine

In [None]:
raw_reports_files = list(raw_reports.glob("[!.]*.xlsx"))

# Print the number of Excel files found in the folder
print(f"Number of Excel files in folder: {len(list(raw_reports_files))}")

append_excel_files(file_paths=raw_reports_files,num_sheets=3,output_file_path=str(final_report_file_path))

##### Run function to combine into a single report