# Compile all individual datasets


In [1]:
%reset -f

In [2]:
%reload_ext autoreload
%autoreload 2

In [3]:
import os
import sys
from pathlib import Path
from tqdm import tqdm
import pandas as pd
import polars as pl
import duckdb
import numpy as np
import dask.dataframe as dd
import geopandas as gpd
from functools import reduce
import warnings



import os
os.environ['USE_PYGEOS'] = '0'
import geopandas

In the next release, GeoPandas will switch to using Shapely by default, even if PyGEOS is installed. If you only have PyGEOS installed to get speed-ups, this switch should be smooth. However, if you are using PyGEOS directly (calling PyGEOS functions on geometries from GeoPandas), this will then stop working and you are encouraged to migrate from PyGEOS to Shapely 2.0 (https://shapely.readthedocs.io/en/latest/migration_pygeos.html).
  import geopandas as gpd


In [4]:
# Set filepaths
PROJ = Path(os.path.realpath("."))
if str(PROJ) == "/n/home10/shreyasgm":
    PROJ = Path(
        "/n/holystore01/LABS/hausmann_lab/lab/glocal_aggregations/shreyas/proj/2023-02-05 - Pipeline/"
    )
ROOT = PROJ.parents[1]
DATA = ROOT / "data/"


In [5]:
# Import custom modules
sys.path.append(str(PROJ))
sys.path.append(str(ROOT / "src/"))
from general_utils import *

# from download_fao import *


In [6]:
# get current hostname
import os

os.uname()


posix.uname_result(sysname='Linux', nodename='holy7c02412.rc.fas.harvard.edu', release='4.18.0-425.10.1.el8_7.x86_64', version='#1 SMP Thu Jan 12 16:32:13 UTC 2023', machine='x86_64')

## Supporting data


In [7]:
gadm_2 = pd.read_parquet(
    DATA / "intermediate/gadm_without_geometry/gadm36_2.parquet",
    columns=["GID_0", "GID_1", "GID_2"],
)
gadm_1 = pd.read_parquet(
    DATA / "intermediate/gadm_without_geometry/gadm36_1.parquet",
    columns=[
        "GID_0",
        "GID_1",
    ],
)
gadm_2.head()


Unnamed: 0,GID_0,GID_1,GID_2
0,AFG,AFG.1_1,AFG.1.1_1
1,AFG,AFG.1_1,AFG.1.2_1
2,AFG,AFG.1_1,AFG.1.3_1
3,AFG,AFG.1_1,AFG.1.4_1
4,AFG,AFG.1_1,AFG.1.5_1


In [8]:
ghs = pd.read_parquet(DATA / "intermediate/ghs/ghs.parquet")
ghs.head()


Unnamed: 0,ghs_id,iso,country,uc_name,region,subregion,pop_2015,pop_2000,ntl_2015,timetocap
0,1.0,USA,United States,Honolulu,Northern America,Northern America,512853.666675,458967.881664,24.768574,15412.057919
1,2.0,PYF,French Polynesia,Papeete,Oceania,Polynesia,91521.124603,83726.092071,9.028501,
2,3.0,USA,United States,Santa Maria,Northern America,Northern America,123181.284843,114315.451935,19.102939,2653.754871
3,4.0,USA,United States,Monterey,Northern America,Northern America,67772.288858,65621.290962,14.669142,2716.114413
4,5.0,USA,United States,Santa Barbara,Northern America,Northern America,114753.150167,106699.837791,19.633925,2604.691241


# Helper functions


In [9]:
def add_gadm_names(df, selected_admin_level):
    # Assign names
    gadm_names = pd.read_parquet(
        ROOT
        / f"data/intermediate/gadm_without_geometry/gadm36_{selected_admin_level}.parquet"
    )
    df = df.merge(
        gadm_names[[f"GID_{selected_admin_level}", f"NAME_{selected_admin_level}"]],
        on=f"GID_{selected_admin_level}",
    )

    return df


In [10]:
def add_gadm_ids(df, selected_admin_level):
    # Assign ids
    gadm = pd.read_parquet(
        ROOT
        / f"data/intermediate/gadm_without_geometry/gadm36_{selected_admin_level}.parquet"
    )
    if selected_admin_level == 1:
        id_vars = ["GID_0", "GID_1"]
        df = df.merge(
            gadm[id_vars],
            on="GID_1",
        )
        df = df[id_vars + [x for x in df.columns if x not in id_vars]]
    elif selected_admin_level == 2:
        id_vars = ["GID_0", "GID_1", "GID_2"]
        df = df.merge(
            gadm[id_vars],
            on=f"GID_2",
        )
        df = df[id_vars + [x for x in df.columns if x not in id_vars]]
    return df


In [11]:
def add_gadm_ids_polars(df, selected_admin_level):
    # Assign ids
    gadm = pl.read_parquet(
        ROOT
        / f"data/intermediate/gadm_without_geometry/gadm36_{selected_admin_level}.parquet"
    )
    if selected_admin_level == 1:
        id_vars = ["GID_0", "GID_1"]
        df = df.join(
            gadm[id_vars],
            on="GID_1",
        )
        df = df[id_vars + [x for x in df.columns if x not in id_vars]]
    elif selected_admin_level == 2:
        id_vars = ["GID_0", "GID_1", "GID_2"]
        df = df.join(
            gadm[id_vars],
            on=f"GID_2",
        )
        df = df[id_vars + [x for x in df.columns if x not in id_vars]]
    return df


In [12]:
def reorder_and_check_for_duplicates(
    df, id_cols, on_duplicates="error", drop_duplicates=False
):
    """
    Check for duplicates in id_cols.
    If present, drop and warn if on_duplicates="warn", or raise error if "error". ignore if "ignore".
    """
    # Reorder columns
    df = df[id_cols + [x for x in df.columns if x not in id_cols]]
    # Check for duplicates
    if df.duplicated(id_cols).sum() > 0:
        if on_duplicates == "warn":
            warnings.warn("Duplicates present in index, dropping")
        elif on_duplicates == "error":
            raise ValueError("Duplicates present in index")
        else:
            pass
        if drop_duplicates:
            return df.drop_duplicates(id_cols)
    return df


In [13]:
def reorder_and_check_for_duplicates_polars(
    df, id_cols, on_duplicates="error", drop_duplicates=False
):
    """
    Check for duplicates in id_cols.
    If present, drop and warn if on_duplicates="warn", or raise error if "error". ignore if "ignore".
    """
    # Reorder columns
    df = df.select(id_cols + [x for x in df.columns if x not in id_cols])
    # Check for duplicates
    if df.select(id_cols).is_duplicated().sum() > 0:
        if on_duplicates == "warn":
            warnings.warn("Duplicates present in index, dropping")
        elif on_duplicates == "error":
            raise ValueError("Duplicates present in index")
        else:
            pass
        if drop_duplicates:
            return df.unique(maintain_order=False, subset=id_cols)
    return df


In [14]:
def rectangularize(df, cols_list, fill_var=None):
    """
    Rectangularize a dataframe

    Args:
        df: Pandas DataFrame
        cols_list: list of names of columns to use for rectangularization
        fill_var: (Optional) value to use to fill missing records
    """
    import pandas as pd

    # Get list of unique values in each column
    unique_vals_list = [df[x].unique() for x in cols_list]
    # Create multiindex with cartesian product
    index = pd.MultiIndex.from_product(unique_vals_list)
    # Create pandas dataframe
    df_index = pd.DataFrame(index=index).reset_index()
    df_index.columns = cols_list
    # Merge into original df
    df_rectangular = df.merge(df_index, on=cols_list, how="right")
    # Fill NA
    if fill_var is not None:
        cols_to_fill = [x for x in df.columns if x not in cols_list]
        df_rectangular[cols_to_fill] = df_rectangular[cols_to_fill].fillna(fill_var)
    return df_rectangular


In [15]:
def show_available_aggregations():
    # List folders
    # Read list of outputs from raster aggregations
    raster_agg_prefix = [
        "dmsp",
        "elevation",
        "fao",
        "gdelt",
        "population",
        "precipitation",
        "ruggedness",
        "temperature",
        "viirs",
    ]
    raster_agg_rootdir = DATA / "intermediate/raster_aggregations/"
    # Also include any other folders in raster_agg_rootdir
    raster_agg_prefix = raster_agg_prefix + [
        x.name
        for x in raster_agg_rootdir.iterdir()
        if x.is_dir() and x.name not in raster_agg_prefix
    ]
    # Open each folder and read the list of intermediate aggregations
    raster_agg_dict = {}
    for prefix in raster_agg_prefix:
        raster_agg_folder = raster_agg_rootdir / prefix
        filelist = raster_agg_folder.glob("*level_*.parquet")
        raster_agg_dict[prefix] = [str(file) for file in filelist]
    # Print them out
    for k, v in raster_agg_dict.items():
        print("------------------ ")
        print(k)
        if len(v) > 0:
            print([Path(x).stem for x in v])


In [16]:
def get_gadm_id_vars(selected_admin_level):
    return [f"GID_{x}" for x in range(selected_admin_level + 1)]


# Local aggregations


## Functions for legacy aggregations from Luis


In [17]:
def rename_gadm(df):
    df = df.rename(
        columns={x: x.replace("adm", "GID_") for x in ["adm0", "adm1", "adm2"]}
    )
    return df


In [18]:
def split_by_level(df, additional_ids: list, metric_cols_agg: dict, export=False):
    """
    Split JR's data into multiple levels
    """
    for level in range(3):
        print(level)
        id_cols_level = [f"GID_{x}" for x in range(level + 1)] + additional_ids
        # Aggregate
        df_level = df.groupby(id_cols_level).agg(metric_cols_agg).reset_index()
        # Check for dups
        df_level = reorder_and_check_for_duplicates(df_level, id_cols_level)
        # Export
        if export:
            if additional_ids == ["year", "month"]:
                freq = "monthly"
            elif additional_ids == ["year"]:
                freq = "yearly"
            df_level.to_parquet(
                DATA / f"intermediate/{export}_level_{level}_{freq}.parquet"
            )
    print(df_level.head())


In [19]:
def fix_identifiers(df):
    """Split GADM identifiers into adm0, adm1 and adm2"""
    id_vars = ["GID_0", "GID_1", "GID_2"]
    # Merge with GADM ids
    df = df.merge(gadm_2[id_vars], left_on="id", right_on="GID_2", how="inner")
    df = df.drop(columns="id")
    # Reorder
    df = df[id_vars + [col for col in df.columns if col not in id_vars]]
    return df


## Functions for aggregations


In [20]:
def process_local_agg_file(
    admin_level,
    dataset_name,
    sub_dataset_selected,
    filename_pattern=None,
    monthly=False,
    default_year=None,
    selected_agg_func="mean",
    on_duplicates="error",
    drop_duplicates=True,
    export=None,
):
    """
    Process local aggregation for a given sub-dataset and admin level

    Selected Args:
        export: if True, export to parquet. if string, export to parquet with that prefix. if None, do not export.

    """
    # Read file
    raster_dataset_rootdir = raster_agg_rootdir / dataset_name
    df = pl.read_parquet(
        raster_dataset_rootdir / f"{sub_dataset_selected}_level_{admin_level}.parquet"
    )
    # Get column names that are not "gid"
    metric_cols = [col for col in df.columns if col != "gid"]
    # Split into sub_dataset and aggregation function
    metric_df = pd.DataFrame({"metric": metric_cols})
    metric_df["agg_func"] = metric_df["metric"].str.split(".").str[0]
    metric_df["sub_dataset"] = metric_df["metric"].str.split(".", n=1).str[1]
    # Only keep selected aggregation function
    metric_df = metric_df[metric_df["agg_func"] == selected_agg_func]
    # Only keep selected metrics
    df = df.select(["gid"] + metric_df["metric"].tolist())
    # Rename metric columns
    df = df.rename({row.metric: row.sub_dataset for _, row in metric_df.iterrows()})
    # Melt to long
    df_long = df.melt(
        id_vars="gid", variable_name="sub_dataset", value_name=sub_dataset_selected
    ).to_pandas()
    # Get year and month from sub_dataset name
    if filename_pattern is not None:
        if monthly:
            df_long[["year", "month"]] = df_long["sub_dataset"].str.extract(
                filename_pattern
            )
        else:
            df_long["year"] = df_long["sub_dataset"].str.extract(filename_pattern)
            df_long["month"] = None
    else:
        df_long["year"] = default_year
        df_long["month"] = None
    # df_long = df_long.drop(columns="sub_dataset")
    # Reorder and drop duplicates
    df_long = reorder_and_check_for_duplicates(
        df_long,
        ["year", "month", "gid"],
        on_duplicates=on_duplicates,
        drop_duplicates=drop_duplicates,
    )
    # If not monthly, drop month column
    if not monthly:
        df_long = df_long.drop(columns="month")
    # Rename GADM columns
    df_long = df_long.rename(columns={"gid": f"GID_{admin_level}"})
    # Set year and month as integers
    df_long["year"] = df_long["year"].astype(int)
    if monthly:
        df_long["month"] = df_long["month"].astype(int)
    # Export
    if export:
        if export == True:
            export = sub_dataset_selected
        if monthly:
            freq = "monthly"
        else:
            freq = "yearly"
        df_long.to_parquet(
            DATA
            / f"intermediate/individual_aggregations/{export}_level_{admin_level}_{freq}.parquet"
        )
    return df_long


In [21]:
def process_local_dataset(**kwargs):
    # Run for each sub-dataset, for each admin level
    pbar = tqdm(range(3))
    for admin_level in pbar:
        sub_dataset_selected = kwargs["sub_dataset_selected"]
        pbar.set_description(f"{sub_dataset_selected} at level {admin_level}")
        df = process_local_agg_file(
            admin_level=admin_level,
            **kwargs,
        )


In [22]:
def show_subdatasets(dataset_name):
    raster_dataset_rootdir = raster_agg_rootdir / dataset_name
    raster_dataset_rootdir_files = [x.name for x in raster_dataset_rootdir.iterdir()]
    raster_subdatasets = list(
        set([x.split("_level")[0] for x in raster_dataset_rootdir_files])
    )
    return raster_subdatasets


## Legacy aggregations


### GDELT


In [23]:
def process_gdelt():
    # Prepare GDELT files
    gdelt_files = list((DATA / "raw/gdelt_jr").glob("*.csv"))
    gdelt_files_df = pd.DataFrame(
        {"filepath": gdelt_files, "filename": [x.stem for x in gdelt_files]}
    )
    gdelt_files_df[["region", "freq"]] = gdelt_files_df.filename.str.replace(
        r"^GDELT_(.+)_new$", r"\1", regex=True
    ).str.split("_", expand=True)
    gdelt_files_df["level"] = gdelt_files_df["region"].replace(
        {"country": 0, "edo": 1, "mun": 2}
    )
    # Only keep monthly
    gdelt_files_df = gdelt_files_df[gdelt_files_df["freq"] == "monthly"]

    # Aggregate GDELT
    for i, row in tqdm(gdelt_files_df.iterrows(), total=len(gdelt_files_df)):
        gdelt_df = pd.read_csv(row.filepath)
        # Rename time columns
        if row.freq == "yearly":
            gdelt_df = gdelt_df.rename(columns={"time_value": "year"})
            additional_cols = ["year"]
        elif row.freq == "monthly":
            gdelt_df["year"] = gdelt_df.time_value.astype(str).str[:4].astype(int)
            gdelt_df["month"] = gdelt_df.time_value.astype(str).str[-2:].astype(int)
            gdelt_df = gdelt_df.drop(columns="time_value")
            additional_cols = ["year", "month"]
        # Rename other columns
        gdelt_df = gdelt_df.rename(columns={"gid": f"GID_{row.level}"})
        gdelt_df = gdelt_df.rename(
            columns={x: f"gdelt_{x}" for x in ["coercion", "protest"]}
        )
        # Add other GID values if any
        if row.level == 1:
            gdelt_df = gdelt_df.merge(gadm_1, on="GID_1")
        elif row.level == 2:
            gdelt_df = gdelt_df.merge(gadm_2, on="GID_2")
        # Check if duplicates exist and warn if yes
        id_cols_level = [f"GID_{x}" for x in range(row.level + 1)] + additional_cols
        gdelt_df = reorder_and_check_for_duplicates(gdelt_df, id_cols_level)

        # Export
        gdelt_df.to_parquet(
            DATA
            / f"intermediate/individual_aggregations/gdelt_level_{row.level}_{row.freq}.parquet",
            index=False,
        )


In [24]:
process_gdelt()


100%|██████████| 3/3 [00:04<00:00,  1.46s/it]


## Run aggregations


In [23]:
# Show the list of folders that have raster aggregations
raster_agg_rootdir = DATA / "intermediate/raster_aggregations/"
[Path(x).name for x in raster_agg_rootdir.iterdir() if x.is_dir()]


['precipitation',
 'ruggedness',
 'elevation',
 'viirs',
 'population',
 'roads',
 'ntl_dvnl',
 'solar_potential',
 'wind_potential',
 'dmsp',
 'fao',
 'temperature',
 'emissions',
 'telecom_mobile_coverage',
 'ntl_dmsp_ext']

### Elevation


In [None]:
show_subdatasets("elevation")


['elevation']

In [None]:
process_local_dataset(
    dataset_name="elevation",
    sub_dataset_selected="elevation",
    filename_pattern=None,
    monthly=False,
    default_year=1996,
    selected_agg_func="mean",
    export=True,
)


  df_long = df.melt(
  df_long = df.melt(
  df_long = df.melt(
elevation at level 2: 100%|██████████| 3/3 [00:19<00:00,  6.60s/it]


In [None]:
# Read in output to check
check = pl.read_parquet(
    DATA / "intermediate/individual_aggregations/elevation_level_1_yearly.parquet"
)
check.head()


year,GID_1,elevation
i64,str,f64
1996,"""AFG.1_1""",3586.531738
1996,"""AFG.2_1""",1558.607544
1996,"""AFG.3_1""",2283.279541
1996,"""AFG.4_1""",783.0755
1996,"""AFG.5_1""",3230.569092


### Precipitation


In [None]:
show_subdatasets("precipitation")


['precipitation_gpcp', 'precipitation_cru', 'precipitation_gpcc']

In [None]:
process_local_dataset(
    dataset_name="precipitation",
    sub_dataset_selected="precipitation_gpcp",
    filename_pattern="(\d{4})-(\d{2}).+",
    monthly=True,
    selected_agg_func="mean",
    drop_duplicates=False,
    on_duplicates="error",
    export=True,
)


precipitation_gpcp at level 2: 100%|██████████| 3/3 [01:18<00:00, 26.16s/it]


In [None]:
process_local_dataset(
    dataset_name="precipitation",
    sub_dataset_selected="precipitation_cru",
    filename_pattern="(\d{4})-(\d{2}).+",
    monthly=True,
    selected_agg_func="mean",
    drop_duplicates=True,
    on_duplicates="error",
    export=True,
)


precipitation_cru at level 2: 100%|██████████| 3/3 [03:21<00:00, 67.03s/it]


In [None]:
process_local_dataset(
    dataset_name="precipitation",
    sub_dataset_selected="precipitation_gpcc",
    filename_pattern="(\d{4})-(\d{2}).+",
    monthly=True,
    selected_agg_func="mean",
    drop_duplicates=False,
    on_duplicates="error",
    export=True,
)


precipitation_gpcc at level 2: 100%|██████████| 3/3 [03:50<00:00, 76.75s/it] 


### Testing zone


In [57]:
# check = process_local_agg_file(
#     admin_level=0,
#     dataset_name="dmsp",
#     sub_dataset_selected="dmsp_stable_lights",
#     # filename_pattern="VNL_v21_npp_(\d{4}).+global.+",
#     # monthly=True,
#     default_year=2023,
#     selected_agg_func="mean",
#     drop_duplicates=False,
#     on_duplicates="warn",
#     export=False,
# )
# check = check.sort_values(["year", "GID_0", "sub_dataset"])
# check.head()


In [58]:
# check.sub_dataset.iloc[0]


In [59]:
# # Show duplicates
# check[check.duplicated(["year", "GID_0"], keep=False)].sort_values(["year", "GID_0"])


### FAO


In [None]:
show_subdatasets("fao")


['fao_value', 'fao_yield']

In [None]:
process_local_dataset(
    dataset_name="fao",
    sub_dataset_selected="fao_yield",
    filename_pattern="all_(\d{4})_yld",
    monthly=False,
    default_year=None,
    selected_agg_func="mean",
    export=True,
)


fao_yield at level 2: 100%|██████████| 3/3 [00:00<00:00, 10.22it/s]


In [None]:
process_local_dataset(
    dataset_name="fao",
    sub_dataset_selected="fao_value",
    filename_pattern="all_(\d{4})_val",
    monthly=False,
    default_year=None,
    selected_agg_func="mean",
    export=True,
)


fao_value at level 2: 100%|██████████| 3/3 [00:00<00:00, 10.18it/s]


### Ruggedness


In [None]:
process_local_dataset(
    dataset_name="ruggedness",
    sub_dataset_selected="ruggedness",
    monthly=False,
    default_year=1996,
    selected_agg_func="weighted_mean",
    export=True,
)


ruggedness at level 2: 100%|██████████| 3/3 [00:00<00:00, 19.69it/s]


### Temperature


In [125]:
process_local_dataset(
    dataset_name="temperature",
    sub_dataset_selected="temperature",
    filename_pattern="(\d{4})-(\d{2}).+",
    monthly=True,
    selected_agg_func="mean",
    export=True,
)


temperature at level 2: 100%|██████████| 3/3 [03:26<00:00, 68.94s/it]


### DMSP


In [None]:
process_local_dataset(
    dataset_name="dmsp",
    sub_dataset_selected="dmsp_cloud_free_coverage",
    filename_pattern="F\d{2}(\d{4}).+",
    monthly=False,
    default_year=None,
    selected_agg_func="sum",
    export=True,
)


dmsp_cloud_free_coverage at level 2: 100%|██████████| 3/3 [00:02<00:00,  1.33it/s]


In [39]:
process_local_dataset(
    dataset_name="dmsp",
    sub_dataset_selected="dmsp_stable_lights",
    filename_pattern="F\d{2}(\d{4}).+",
    monthly=False,
    default_year=None,
    selected_agg_func="mean",
    export=True,
)


dmsp_stable_lights at level 2: 100%|██████████| 3/3 [00:02<00:00,  1.01it/s]


### DVNL and DMSP_EXT


In [127]:
process_local_dataset(
    dataset_name="ntl_dvnl",
    sub_dataset_selected="ntl_dvnl",
    filename_pattern="DVNL_(\d{4})",
    monthly=False,
    default_year=None,
    selected_agg_func="mean",
    export=True,
)


ntl_dvnl at level 2: 100%|██████████| 3/3 [00:00<00:00,  3.03it/s]


In [131]:
process_local_dataset(
    dataset_name="ntl_dmsp_ext",
    sub_dataset_selected="ntl_dmsp_ext",
    filename_pattern="F\d{2}_(\d{4}).+",
    monthly=False,
    default_year=None,
    selected_agg_func="mean",
    export=True,
)


ntl_dmsp_ext at level 2: 100%|██████████| 3/3 [00:01<00:00,  2.35it/s]


### Population


In [136]:
process_local_dataset(
    dataset_name="population",
    sub_dataset_selected="population_count",
    filename_pattern="gpw_v4_population_count_adjusted_to_2015_unwpp_country_totals_rev11_(\d{4})_30_sec",
    monthly=False,
    default_year=None,
    selected_agg_func="sum",
    export=True,
)


population_count at level 2: 100%|██████████| 3/3 [00:00<00:00,  5.21it/s]


In [138]:
process_local_dataset(
    dataset_name="population",
    sub_dataset_selected="population_density",
    filename_pattern="gpw_v4_population_density_adjusted_to_2015_unwpp_country_totals_rev11_(\d{4})_30_sec",
    monthly=False,
    default_year=None,
    selected_agg_func="mean",
    export=True,
)


population_density at level 2: 100%|██████████| 3/3 [00:00<00:00,  3.18it/s]


### VIIRS Annual


In [144]:
# Note that 2012 has two files, one for 201204-201212 and one for 201204-201303
# I'm only keeping the first one as "2012"
process_local_dataset(
    dataset_name="viirs",
    sub_dataset_selected="viirs",
    filename_pattern="VNL_v21_npp_(\d{4}).+global.+",
    monthly=False,
    default_year=None,
    selected_agg_func="mean",
    on_duplicates="warn",
    drop_duplicates=True,
    export=True,
)


viirs at level 2: 100%|██████████| 3/3 [00:01<00:00,  1.98it/s]


### Solar potential


In [45]:
process_local_dataset(
    dataset_name="solar_potential",
    sub_dataset_selected="solar_potential",
    monthly=False,
    default_year=2019,
    selected_agg_func="sum",
    on_duplicates="warn",
    drop_duplicates=True,
    export=True,
)


solar_potential at level 2: 100%|██████████| 3/3 [00:00<00:00,  9.53it/s]


### Wind potential


In [None]:
process_local_dataset(
    dataset_name="wind_potential",
    sub_dataset_selected="wind_potential",
    monthly=False,
    default_year=2023,
    selected_agg_func="sum",
    on_duplicates="warn",
    drop_duplicates=True,
    export=True,
)


wind_potential at level 2: 100%|██████████| 3/3 [00:00<00:00, 13.29it/s]


### Telecom mobile coverage


In [47]:
process_local_dataset(
    dataset_name="telecom_mobile_coverage",
    sub_dataset_selected="telecom_mobile_coverage_mce_2g",
    monthly=False,
    selected_agg_func="sum",
    on_duplicates="warn",
    default_year=2021,
    drop_duplicates=True,
    export=True,
)
process_local_dataset(
    dataset_name="telecom_mobile_coverage",
    sub_dataset_selected="telecom_mobile_coverage_mce_3g",
    monthly=False,
    selected_agg_func="sum",
    on_duplicates="warn",
    default_year=2021,
    drop_duplicates=True,
    export=True,
)
process_local_dataset(
    dataset_name="telecom_mobile_coverage",
    sub_dataset_selected="telecom_mobile_coverage_mce_4g",
    monthly=False,
    selected_agg_func="sum",
    on_duplicates="warn",
    default_year=2021,
    drop_duplicates=True,
    export=True,
)
process_local_dataset(
    dataset_name="telecom_mobile_coverage",
    sub_dataset_selected="telecom_mobile_coverage_mce_5g",
    monthly=False,
    selected_agg_func="sum",
    on_duplicates="warn",
    default_year=2021,
    drop_duplicates=True,
    export=True,
)


telecom_mobile_coverage_mce_2g at level 2: 100%|██████████| 3/3 [00:00<00:00, 15.16it/s]
telecom_mobile_coverage_mce_3g at level 2: 100%|██████████| 3/3 [00:00<00:00, 14.08it/s]
telecom_mobile_coverage_mce_4g at level 2: 100%|██████████| 3/3 [00:00<00:00, 13.89it/s]
telecom_mobile_coverage_mce_5g at level 2: 100%|██████████| 3/3 [00:00<00:00, 14.52it/s]


In [None]:
# Suppo

In [48]:
process_local_dataset(
    dataset_name="telecom_mobile_coverage",
    sub_dataset_selected="telecom_mobile_coverage_oci_2g",
    monthly=False,
    default_year=2021,
    selected_agg_func="mean",
    on_duplicates="warn",
    drop_duplicates=True,
    export=True,
)
process_local_dataset(
    dataset_name="telecom_mobile_coverage",
    sub_dataset_selected="telecom_mobile_coverage_oci_3g",
    monthly=False,
    default_year=2021,
    selected_agg_func="mean",
    on_duplicates="warn",
    drop_duplicates=True,
    export=True,
)
process_local_dataset(
    dataset_name="telecom_mobile_coverage",
    sub_dataset_selected="telecom_mobile_coverage_oci_4g",
    monthly=False,
    default_year=2021,
    selected_agg_func="mean",
    on_duplicates="warn",
    drop_duplicates=True,
    export=True,
)


telecom_mobile_coverage_oci_2g at level 1:   0%|          | 0/3 [00:00<?, ?it/s]

telecom_mobile_coverage_oci_2g at level 2: 100%|██████████| 3/3 [00:00<00:00, 14.85it/s]
telecom_mobile_coverage_oci_3g at level 2: 100%|██████████| 3/3 [00:00<00:00, 15.19it/s]
telecom_mobile_coverage_oci_4g at level 2: 100%|██████████| 3/3 [00:00<00:00, 14.12it/s]


# Supporting data

Some of this comes from JR's RA's


In [23]:
# Calculate land area
def calculate_land_area(level):
    gadm = gpd.read_parquet(DATA / f"raw/shapefiles/gadm_geoparquet/gadm_{level}.parquet")
    # Use Mollweide projection
    gadm["area_sq_km"] = gadm.to_crs("ESRI:54009").area / 10 ** 6
    # Get geometric centroid
    gadm["lon_geomcenter"] = gadm.geometry.centroid.x
    gadm["lat_geomcenter"] = gadm.geometry.centroid.y
    return gadm

gadm_0 = calculate_land_area(0)
gadm_1 = calculate_land_area(1)
gadm_2 = calculate_land_area(2)
gadm_2.head()


  gadm["lon_geomcenter"] = gadm.geometry.centroid.x

  gadm["lat_geomcenter"] = gadm.geometry.centroid.y

  gadm["lon_geomcenter"] = gadm.geometry.centroid.x

  gadm["lat_geomcenter"] = gadm.geometry.centroid.y

  gadm["lon_geomcenter"] = gadm.geometry.centroid.x

  gadm["lat_geomcenter"] = gadm.geometry.centroid.y


Unnamed: 0,GID_0,NAME_0,GID_1,NAME_1,NL_NAME_1,GID_2,NAME_2,VARNAME_2,NL_NAME_2,TYPE_2,ENGTYPE_2,CC_2,HASC_2,geometry,area_sq_km,lon_geomcenter,lat_geomcenter
0,AFG,Afghanistan,AFG.1_1,Badakhshan,,AFG.1.1_1,Baharak,,,Wuleswali,District,,AF.BD.BA,"POLYGON ((71.18169 36.49196, 71.18560 36.49435...",3009.465743,71.104537,37.021148
1,AFG,Afghanistan,AFG.1_1,Badakhshan,,AFG.1.2_1,Darwaz,,,Wuleswali,District,,AF.BD.DA,"POLYGON ((71.33762 38.11841, 71.33733 38.11514...",2925.250463,70.939218,38.210966
2,AFG,Afghanistan,AFG.1_1,Badakhshan,,AFG.1.3_1,Fayzabad,,,Wuleswali,District,,AF.BD.FA,"POLYGON ((70.09976 37.00258, 70.09885 37.01114...",2945.125708,70.47058,37.11494
3,AFG,Afghanistan,AFG.1_1,Badakhshan,,AFG.1.4_1,Ishkashim,,,Wuleswali,District,,AF.BD.IK,"POLYGON ((71.31934 37.24848, 71.33519 37.24978...",1572.397559,71.427598,36.807793
4,AFG,Afghanistan,AFG.1_1,Badakhshan,,AFG.1.5_1,Jurm,,,Wuleswali,District,,AF.BD.JU,"POLYGON ((71.18169 36.49196, 71.17219 36.48955...",3516.958969,70.827375,36.579871


In [30]:
def process_supporting_data_jr(df):
    if "pop_centr" in df.columns:
        df = df.drop(columns="pop_centr")
    has_vars = [
        "capital",
        "land_border",
        "rivers_lakes",
        "coasts",
        "flare",
        "ports",
        "int_airports",
        "large_airports",
        "medium_airports",
        "min_deposit",
        "tech_min",
        "energy_min",
        "precious_min",
        "other_min",
    ]
    rename_dict = {x: f"has_{x}" for x in has_vars if x in df.columns}
    df = df.rename(columns=rename_dict)
    df = df.rename(
        columns={
            "has_min_despoit": "has_mineral_deposit",
            "has_tech_min": "has_tech_minerals",
            "has_energy_min": "has_energy_minerals",
            "has_precious_min": "has_precious_minerals",
            "has_other_min": "has_other_minerals",
        }
    )
    df = df.rename(
        columns={
            "dist_km": "dist_geomcentr_to_capital_km",
            "dist_km_popcentr": "dist_popcentr_to_capital_km",
        }
    )
    return df


In [41]:
supporting_data_1 = pd.read_excel(
    DATA / "raw/metadata/15_ADMProjects/data/clean/Dataset_ADM1.xlsx"
)
supporting_data_1 = process_supporting_data_jr(supporting_data_1)
# Merge in land area
supporting_data_1 = supporting_data_1.merge(gadm_1[["GID_0", "GID_1", "area_sq_km"]], on="GID_1", how="right")

supporting_data_1.to_parquet(
    DATA / "processed/supporting_data/supporting_data_level_1.parquet", index=False
)
supporting_data_1.to_csv(
    DATA / "processed/supporting_data/supporting_data_level_1.csv", index=False
)
supporting_data_1.head()


Unnamed: 0,GID_1,has_capital,has_land_border,dist_geomcentr_to_capital_km,lon_geomcentr,lat_geomcentr,has_rivers_lakes,has_coasts,lon_popcentr,lat_popcentr,...,has_int_airports,has_large_airports,has_medium_airports,has_min_deposit,has_tech_minerals,has_energy_minerals,has_precious_minerals,has_other_minerals,GID_0,area_sq_km
0,AFG.1_1,0,1,463.014746,73.32697,36.967899,1,0,70.669235,37.175719,...,0,0,0,0,0,0,0,0,AFG,43773.306123
1,AFG.2_1,0,0,514.345027,63.628308,35.257927,0,0,63.542024,35.125733,...,0,0,0,0,0,0,0,0,AFG,20636.832989
2,AFG.3_1,0,0,135.18392,69.022308,35.727964,0,0,68.799123,35.898465,...,0,0,0,0,0,0,0,0,AFG,21165.754131
3,AFG.4_1,0,1,284.89767,67.16338,36.482851,1,0,67.015214,36.669818,...,0,0,1,0,0,0,0,0,AFG,17287.218057
4,AFG.5_1,0,0,165.738012,67.39181,34.712677,1,0,67.248353,34.609991,...,0,0,0,0,0,0,0,0,AFG,14207.143294


In [42]:
# Check to make sure all countries are represented
supporting_data_1["GID_0"] = supporting_data_1["GID_1"].str[:3]
num_present = supporting_data_1.GID_0.nunique()
total_countries = gadm_0.GID_0.nunique()
print(f"{num_present} out of {total_countries} countries present")

228 out of 256 countries present


In [43]:
supporting_data_2 = pd.read_excel(
    DATA / "raw/metadata/15_ADMProjects/data/clean/Dataset_ADM2.xlsx"
)
supporting_data_2 = process_supporting_data_jr(supporting_data_2)
# Merge in land area
supporting_data_2 = supporting_data_2.merge(gadm_2[["GID_0", "GID_1", "GID_2", "area_sq_km"]], on="GID_2", how="right")

supporting_data_2.to_parquet(
    DATA / "processed/supporting_data/supporting_data_level_2.parquet", index=False
)
supporting_data_2.to_csv(
    DATA / "processed/supporting_data/supporting_data_level_2.csv", index=False
)
supporting_data_2.head()


Unnamed: 0,GID_2,has_capital,has_land_border,dist_geomcentr_to_capital_km,lon_geomcentr,lat_geomcentr,has_rivers_lakes,has_coasts,lon_popcentr,lat_popcentr,...,has_large_airports,has_medium_airports,has_min_deposit,has_tech_minerals,has_energy_minerals,has_precious_minerals,has_other_minerals,GID_0,GID_1,area_sq_km
0,AFG.1.1_1,0,0,321.50817,71.128464,36.941952,0,0,71.047053,37.027428,...,0,0,0,0,0,0,0,AFG,AFG.1_1,3009.465743
1,AFG.1.2_1,0,0,440.667548,70.952635,38.221376,1,0,70.878676,38.211332,...,0,0,0,0,0,0,0,AFG,AFG.1_1,2925.250463
2,AFG.1.3_1,0,0,308.537781,70.442711,37.101351,0,0,70.47173,37.112097,...,0,0,0,0,0,0,0,AFG,AFG.1_1,2945.125708
3,AFG.1.4_1,0,0,330.495935,71.429871,36.86511,1,0,71.411107,36.82492,...,0,0,0,0,0,0,0,AFG,AFG.1_1,1572.397559
4,AFG.1.5_1,0,0,278.914453,70.865658,36.62159,0,0,70.804477,36.737237,...,0,0,0,0,0,0,0,AFG,AFG.1_1,3516.958969


In [44]:
# For level 0, take the has_vars and aggregate if any of the subregions have it
has_vars = [
        "capital",
        "land_border",
        "rivers_lakes",
        "coasts",
        "flare",
        "ports",
        "int_airports",
        "large_airports",
        "medium_airports",
        "min_deposit",
        "tech_min",
        "energy_min",
        "precious_min",
        "other_min",
    ]
supporting_data_0 = pd.read_excel(
    DATA / "raw/metadata/15_ADMProjects/data/clean/Dataset_ADM1.xlsx"
)
supporting_data_0["GID_0"] = supporting_data_0["GID_1"].str[:3]
supporting_data_0 = supporting_data_0.groupby("GID_0")[has_vars].max().reset_index()
# Process
supporting_data_0 = process_supporting_data_jr(supporting_data_0)
# Add geometric center and area
supporting_data_0 = supporting_data_0.merge(gadm_0[["GID_0", "lat_geomcenter", "lon_geomcenter", "area_sq_km"]].rename(columns={"lat_geomcenter": "lat_geomcentr", "lon_geomcenter": "lon_geomcentr"}), on="GID_0", how="right")
supporting_data_0.to_parquet(
    DATA / "processed/supporting_data/supporting_data_level_0.parquet", index=False
)
supporting_data_0.to_csv(
    DATA / "processed/supporting_data/supporting_data_level_0.csv", index=False
)
supporting_data_0.head(2)

Unnamed: 0,GID_0,has_capital,has_land_border,has_rivers_lakes,has_coasts,has_flare,has_ports,has_int_airports,has_large_airports,has_medium_airports,has_min_deposit,has_tech_minerals,has_energy_minerals,has_precious_minerals,has_other_minerals,lat_geomcentr,lon_geomcentr,area_sq_km
0,ABW,,,,,,,,,,,,,,,12.509315,-69.970276,183.04954
1,AFG,1.0,1.0,1.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,33.828415,66.029586,645514.646863


In [45]:
list(supporting_data_1.columns)

['GID_1',
 'has_capital',
 'has_land_border',
 'dist_geomcentr_to_capital_km',
 'lon_geomcentr',
 'lat_geomcentr',
 'has_rivers_lakes',
 'has_coasts',
 'lon_popcentr',
 'lat_popcentr',
 'dist_popcentr_to_capital_km',
 'has_flare',
 'has_ports',
 'has_int_airports',
 'has_large_airports',
 'has_medium_airports',
 'has_min_deposit',
 'has_tech_minerals',
 'has_energy_minerals',
 'has_precious_minerals',
 'has_other_minerals',
 'GID_0',
 'area_sq_km']

# Mining

In [52]:
mining_2 = pd.read_excel(DATA / "raw/mining/Minas_ADM2.xlsx")
mining_2 = mining_2.drop(columns=["NAME_0", "NAME_1"])
# Export
mining_2.to_csv(DATA / "intermediate/mining/mining_level_2.csv", index=False)
mining_2.to_parquet(DATA / "intermediate/mining/mining_level_2.parquet", index=False)
mining_2.head()

Unnamed: 0,GID_2,"Aggregate, Light Weight",Aluminum,"Aluminum, Contained or Metal","Aluminum, High Alumina Clay",Andalusite,Anthracite,Antimony,Arsenic,Asbestos,...,Vermiculite,Volcanic Materials,"Water, Free",Wollastonite,Yttrium,Zeolites,Zinc,"Zinc, Refiner","Zinc, Smelter",Zirconium
0,AFG.1.13_1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,AFG.10.5_1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,AFG.12.5_1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,AFG.17.10_1,0,0,0,0,0,0,0,0,2,...,0,0,0,0,0,0,0,0,0,0
4,AFG.18.10_1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [62]:
mining_1_converted = mining_2.copy()
# Split the 'GID_2' column and take the first two parts
mining_1_converted['GID_1'] = mining_1_converted['GID_2'].str.split('.').str[0] + '.' + mining_1_converted['GID_2'].str.split('.').str[1]
# Drop the 'GID_2' column
mining_1_converted = mining_1_converted.drop(columns=['GID_2'])
# Aggregate as sum
mining_1_converted = mining_1_converted.groupby(['GID_1']).sum().reset_index()
mining_1_converted.head(2)

Unnamed: 0,GID_1,"Aggregate, Light Weight",Aluminum,"Aluminum, Contained or Metal","Aluminum, High Alumina Clay",Andalusite,Anthracite,Antimony,Arsenic,Asbestos,...,Vermiculite,Volcanic Materials,"Water, Free",Wollastonite,Yttrium,Zeolites,Zinc,"Zinc, Refiner","Zinc, Smelter",Zirconium
0,AFG.1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,AFG.10,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [63]:
# Read in mining data only available at GID_1 level
mining_1 = pd.read_excel(DATA / "raw/mining/Minas_ADM1.xlsx")
mining_1 = mining_1.drop(columns=["NAME_0", "NAME_1"])
# Concatenate with converted
mining_1 = pd.concat([mining_1, mining_1_converted])
# Check for duplicates by GID_1
assert mining_1.duplicated(['GID_1']).sum() == 0, "Duplicates present in GID_1"
# Sort by GID_1
mining_1 = mining_1.sort_values(['GID_1'])
# Export
mining_1.to_csv(DATA / "intermediate/mining/mining_level_1.csv", index=False)
mining_1.to_parquet(DATA / "intermediate/mining/mining_level_1.parquet", index=False)
mining_1.head()

Unnamed: 0,GID_1,"Aggregate, Light Weight",Aluminum,"Aluminum, Contained or Metal","Aluminum, High Alumina Clay",Andalusite,Anthracite,Antimony,Arsenic,Asbestos,...,Vermiculite,Volcanic Materials,"Water, Free",Wollastonite,Yttrium,Zeolites,Zinc,"Zinc, Refiner","Zinc, Smelter",Zirconium
0,AFG.1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,AFG.10,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,AFG.12,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,AFG.17,0,0,0,0,0,0,0,0,2,...,0,0,0,0,0,0,0,0,0,0
4,AFG.18,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [64]:
mining_0 = mining_1.copy()
mining_0["GID_0"] = mining_0["GID_1"].str[:3]
mining_0 = mining_0.drop(columns="GID_1")
# Aggregate such that if any of the GID_1 values is 1, then set to 1
mining_0 = mining_0.groupby("GID_0").sum(min_count=1).reset_index()
# Export
mining_0.to_csv(DATA / "intermediate/mining/mining_level_0.csv", index=False)
mining_0.to_parquet(DATA / "intermediate/mining/mining_level_0.parquet", index=False)
mining_0.head()

Unnamed: 0,GID_0,"Aggregate, Light Weight",Aluminum,"Aluminum, Contained or Metal","Aluminum, High Alumina Clay",Andalusite,Anthracite,Antimony,Arsenic,Asbestos,...,Vermiculite,Volcanic Materials,"Water, Free",Wollastonite,Yttrium,Zeolites,Zinc,"Zinc, Refiner","Zinc, Smelter",Zirconium
0,AFG,0,0,0,0,0,0,0,0,7,...,0,0,0,0,0,0,1,0,0,0
1,AGO,0,1,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,ALB,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,3,0,0,0
3,ARE,0,1,1,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,ARG,0,7,0,1,0,0,27,44,17,...,5,5,0,0,0,1,396,1,0,0


# Market access measures

In [27]:
market_access = pd.read_parquet(DATA / "intermediate/market_access/market_access_metrics_gadm.parquet")
market_access.head(2)

Unnamed: 0,source,gdp_dest_cumsum_geodesic_distance_100km,ntl_dest_cumsum_geodesic_distance_100km,gdp_dest_cumsum_geodesic_distance_250km,ntl_dest_cumsum_geodesic_distance_250km,gdp_dest_cumsum_geodesic_distance_500km,ntl_dest_cumsum_geodesic_distance_500km,gdp_dest_cumsum_geodesic_distance_750km,ntl_dest_cumsum_geodesic_distance_750km,gdp_dest_cumsum_geodesic_distance_1000km,...,log_gdp_dest_cumsum_geodesic_distance_5000km,log_ntl_dest_cumsum_geodesic_distance_100km,log_ntl_dest_cumsum_geodesic_distance_250km,log_ntl_dest_cumsum_geodesic_distance_500km,log_ntl_dest_cumsum_geodesic_distance_750km,log_ntl_dest_cumsum_geodesic_distance_1000km,log_ntl_dest_cumsum_geodesic_distance_2000km,log_ntl_dest_cumsum_geodesic_distance_3000km,log_ntl_dest_cumsum_geodesic_distance_4000km,log_ntl_dest_cumsum_geodesic_distance_5000km
0,AFG.10_1,2080388.0,0.000206,390342600.0,0.038603,19585080000.0,2.392007,89962350000.0,127.604115,506048100000.0,...,31.127443,-8.4889,-3.254429,0.872133,4.848933,5.316127,7.59548,7.808978,8.115981,8.607694
1,AFG.11_1,79155280.0,0.007828,334106500.0,0.033041,3117402000.0,0.392231,39802420000.0,3.730013,442708300000.0,...,31.040254,-4.850043,-3.409994,-0.935904,1.316412,4.959633,7.574072,7.80006,8.090364,8.448744


In [28]:
market_access.columns

Index(['source', 'gdp_dest_cumsum_geodesic_distance_100km',
       'ntl_dest_cumsum_geodesic_distance_100km',
       'gdp_dest_cumsum_geodesic_distance_250km',
       'ntl_dest_cumsum_geodesic_distance_250km',
       'gdp_dest_cumsum_geodesic_distance_500km',
       'ntl_dest_cumsum_geodesic_distance_500km',
       'gdp_dest_cumsum_geodesic_distance_750km',
       'ntl_dest_cumsum_geodesic_distance_750km',
       'gdp_dest_cumsum_geodesic_distance_1000km',
       'ntl_dest_cumsum_geodesic_distance_1000km',
       'gdp_dest_cumsum_geodesic_distance_2000km',
       'ntl_dest_cumsum_geodesic_distance_2000km',
       'gdp_dest_cumsum_geodesic_distance_3000km',
       'ntl_dest_cumsum_geodesic_distance_3000km',
       'gdp_dest_cumsum_geodesic_distance_4000km',
       'ntl_dest_cumsum_geodesic_distance_4000km',
       'gdp_dest_cumsum_geodesic_distance_5000km',
       'ntl_dest_cumsum_geodesic_distance_5000km', 'GID_1', 'NAME_1', 'GID_0',
       'region_name_iso', 'market_potential_geodesic

# Road lengths


In [44]:
for level in range(3):
    roads_level_df = pl.read_parquet(
        DATA / f"intermediate/roads/grip/road_lengths_{level}.parquet"
    )
    road_type_dict = {
        1: "highways",
        2: "primary_roads",
        3: "secondary_roads",
        4: "tertiary_roads",
        5: "local_roads",
        0: "unspecified_roads",
    }
    # Convert to a dataframe
    road_type_dict_df = pd.DataFrame(
        {
            "GP_RTP": list(road_type_dict.keys()),
            "road_type": list(road_type_dict.values()),
        }
    )
    # Prepend "road_length_" to GP_RTP
    roads_level_df = roads_level_df.join(
        pl.from_pandas(road_type_dict_df), on="GP_RTP", how="left"
    )
    roads_level_df = roads_level_df.with_columns(
        [("road_length_" + pl.col("road_type")).alias("road_type")]
    )
    # Pivot
    roads_level_df = roads_level_df.pivot(
        index=f"GID_{level}",
        columns="road_type",
        values="road_length",
    )
    # Fill nulls
    roads_level_df = roads_level_df.fill_null(0)
    roads_level_df = roads_level_df.with_columns(
        [pl.lit("2018").alias("year")]
    )
    roads_level_df.write_parquet(
        DATA
        / f"intermediate/individual_aggregations/road_lengths_level_{level}_yearly.parquet"
    )
    

# GEE


## VIIRS Monthly


In [24]:
# # Download all missing files
# downloaded_files = download_missing_gcp(
#     local_folderpath=DATA / "intermediate/gee_viirs_agg/",
#     gcs_bucketname="earth_engine_aggregations",
# )


In [147]:
def prepare_viirs():
    # Get list of files from GCP
    csv_dict = {}
    for x in range(3):
        # read filelist
        csvlist = list(
            (DATA / "intermediate/gee_viirs_agg/").glob(rf"VIIRS_vcmsl_*_level{x}*")
        )
        # Make sure file exists and has data in it
        csvlist_valid = [x for x in csvlist if x.stat().st_size > 10]
        print(f"{len(csvlist)} became {len(csvlist_valid)}")
        # Read files
        viirs_df = dd.read_csv(csvlist_valid)
        viirs_df["date"] = dd.to_datetime(viirs_df["date"])
        viirs_df = viirs_df.compute()
        # Export
        viirs_df.to_parquet(
            PROJ / f"gee_viirs_monthly/viirs_vcmsl_world_level{x}.parquet", index=False
        )


# prepare_viirs()


In [73]:
for level in range(0, 3):
    print(level)
    viirs = pd.read_parquet(
        DATA / f"intermediate/gee_viirs_monthly/viirs_vcmsl_world_level{level}.parquet"
    )
    viirs["year"] = viirs.date.dt.year
    viirs["month"] = viirs.date.dt.month
    viirs = viirs.drop(columns=["date", "min", "max", "stdDev"])
    viirs = viirs.rename(
        columns={x: f"viirs_{x}" for x in ["mean", "sum", "median", "count"]}
    )
    viirs = add_gadm_ids(viirs, level)
    # To make the groupby efficient, only aggregate those with dups
    id_cols_level = [f"GID_{x}" for x in range(level + 1)] + ["year", "month"]
    viirs_dup = viirs[viirs.duplicated(id_cols_level, keep=False)].copy()
    # Aggregate
    viirs_dup["viirs_mean"] = viirs_dup["viirs_mean"] * viirs_dup["viirs_count"]
    viirs_dup["viirs_median"] = viirs_dup["viirs_median"] * viirs_dup["viirs_count"]
    viirs_dup_agg = (
        viirs_dup.groupby(id_cols_level)[
            ["viirs_sum", "viirs_count", "viirs_mean", "viirs_median"]
        ]
        .sum(min_count=1)
        .reset_index()
    )
    viirs_dup_agg["viirs_mean"] = (
        viirs_dup_agg["viirs_mean"] / viirs_dup_agg["viirs_count"]
    )
    viirs_dup_agg["viirs_median"] = (
        viirs_dup_agg["viirs_median"] / viirs_dup_agg["viirs_count"]
    )
    viirs_dup_agg = viirs_dup_agg.drop(
        columns=[x for x in viirs_dup_agg if x.startswith("level_")]
    )
    # Append dups back to original df
    viirs = pd.concat(
        [viirs[~viirs.duplicated(id_cols_level, keep=False)], viirs_dup_agg]
    )
    # Check if duplicates exist and warn if yes
    viirs = reorder_and_check_for_duplicates(viirs, id_cols_level)
    # Export
    viirs.to_parquet(
        DATA
        / f"intermediate/individual_aggregations/viirs_monthly_level_{level}_monthly.parquet",
        index=False,
    )


0
1
2


## Landcover


In [57]:
def process_landcover(admin_level, landcover_type):
    # Read data
    landcover_df = pl.read_parquet(
        DATA
        / f"intermediate/gee_landcover_processed/{landcover_type}_landcover_{admin_level}.parquet"
    )
    # Aggregate to lc_effective
    landcover_df = landcover_df.groupby(
        ["date", f"GID_{admin_level}", "lc_effective"]
    ).agg([pl.col("area").sum()])
    # Pivot
    landcover_df = (
        landcover_df.pivot(
            index=["date", f"GID_{admin_level}"], columns="lc_effective", values="area"
        )
    ).sort(["date", f"GID_{admin_level}"])
    # Rename all columns except GID by prefixing landcover_type
    landcover_df = landcover_df.rename(
        {
            x: f"landcover_{landcover_type}_{x}"
            for x in landcover_df.columns
            if x not in [f"GID_{admin_level}", "date"]
        }
    )
    landcover_df = landcover_df.to_pandas()
    # Export
    if landcover_type == "MODIS":
        freq = "annual"
        landcover_df["date"] = pd.to_datetime(landcover_df["date"])
        landcover_df["year"] = landcover_df["date"].dt.year
        landcover_df = landcover_df.drop(columns=["date"])
    elif landcover_type == "DW":
        freq = "monthly"
        landcover_df["date"] = pd.to_datetime(landcover_df["date"])
        landcover_df["year"] = landcover_df["date"].dt.year
        landcover_df["month"] = landcover_df["date"].dt.month
        landcover_df = landcover_df.drop(columns=["date"])

    landcover_df.to_parquet(
        DATA
        / f"intermediate/individual_aggregations/landcover_{landcover_type}_level_{admin_level}_{freq}.parquet",
        index=False,
    )


In [58]:
# for admin_level in tqdm(range(3)):
#     for landcover_type in ["MODIS", "DW"]:
#         process_landcover(
#             admin_level=admin_level,
#             landcover_type=landcover_type,
#         )


100%|██████████| 3/3 [00:10<00:00,  3.36s/it]


## Forest change


In [52]:
# for admin_level in tqdm(range(3)):
#     forest_df = pl.read_parquet(
#         DATA / f"intermediate/gee_forest_change/forest_loss_gadm_{admin_level}.parquet"
#     )
#     forest_df.write_parquet(
#         DATA
#         / f"intermediate/individual_aggregations/forest_loss_level_{admin_level}_annual.parquet"
#     )


100%|██████████| 3/3 [00:00<00:00, 12.07it/s]


## Accessibility


In [36]:
# Read access data
access_dict = {}
for level in range(3):
    dflist = []
    pbar = tqdm(["cities", "large_cities", "medium_cities", "ports", "airports"])
    for x in pbar:
        pbar.set_description(f"Processing {x}")
        df = pd.read_parquet(
            ROOT
            / f"proj/2021-07-28 - GEE/tables/time_to_cities/{x}_travel_time_level_{level}.parquet"
        )
        # Drop duplicates by fname
        df = df.drop_duplicates(["date", f"GID_{level}", "fname"])
        df = df[["date", f"GID_{level}", "median"]]
        df = df.rename(columns={"median": f"time_to_{x}_mins"})
        dflist.append(df)
    df_level = reduce(
        lambda left, right: pd.merge(
            left, right, on=["date", f"GID_{level}"], how="outer"
        ),
        dflist,
    )
    # Add GADM ids
    df_level = add_gadm_ids(df_level, level)
    # Fix date
    df_level["date"] = pd.to_datetime("2019-01-01")
    df_level["year"] = df_level["date"].dt.year
    df_level = df_level.drop(columns=["date"])
    # Drop unnecessary GADM ID's
    if level > 0:
        df_level = df_level.drop(columns=[f"GID_{x}" for x in range(0, level)])
    # Check if duplicates exist and warn if yes
    id_cols_level = [f"GID_{level}", "year"]
    df_level = reorder_and_check_for_duplicates(df_level, id_cols_level)
    # Export
    access_dict[level] = df_level
    df_level.to_parquet(
        DATA
        / f"intermediate/individual_aggregations/access_level_{level}_annual.parquet",
        index=False,
    )
access_dict[1].head()


Processing airports: 100%|██████████| 5/5 [00:00<00:00, 85.97it/s]
Processing airports: 100%|██████████| 5/5 [00:00<00:00, 85.83it/s]
Processing airports: 100%|██████████| 5/5 [00:00<00:00, 22.58it/s]     


Unnamed: 0,GID_1,year,time_to_cities_mins,time_to_large_cities_mins,time_to_medium_cities_mins,time_to_ports_mins,time_to_airports_mins
0,PSE.1_1,2019,88.849348,88.849348,88.849348,15.16977,
1,PSE.2_1,2019,20.756536,86.467379,61.521715,132.47129,
2,CIV.2_1,2019,66.124239,358.130309,125.995983,138.053015,
3,CIV.3_1,2019,48.996337,198.219897,198.219897,197.965287,
4,CIV.4_1,2019,242.498073,435.11964,258.970207,644.616177,


## Accessibility - cities


In [37]:
# Time to destination for cities only
dflist = []
pbar = tqdm(["medium_cities", "large_cities", "ports", "airports"])
for x in pbar:
    pbar.set_description(f"Processing {x}")
    df = pd.read_parquet(
        ROOT
        / f"proj/2021-07-28 - GEE/tables/time_to_cities/{x}_travel_time_ghs.parquet"
    )
    # Drop duplicates by fname
    df = df.drop_duplicates(["date", "ghs_id", "fname"])
    df = df[["date", "ghs_id", "median"]]
    df = df.rename(columns={"median": f"time_to_{x}_mins"})
    dflist.append(df)
time_to_dest_ghs = reduce(
    lambda left, right: pd.merge(left, right, on=["date", "ghs_id"], how="outer"),
    dflist,
)
# Check if duplicates exist and warn if yes
id_cols_level = ["date", "ghs_id"]
time_to_dest_ghs = reorder_and_check_for_duplicates(time_to_dest_ghs, id_cols_level)
# Fix date
time_to_dest_ghs["date"] = pd.to_datetime("2019-01-01")
time_to_dest_ghs.head(2)


Processing airports: 100%|██████████| 4/4 [00:00<00:00, 29.23it/s]


Unnamed: 0,date,ghs_id,time_to_medium_cities_mins,time_to_large_cities_mins,time_to_ports_mins,time_to_airports_mins
0,2019-01-01,2486.0,68.661462,239.257003,,
1,2019-01-01,2518.0,3.7722,178.72151,,


In [81]:
# Assign GHS to GADM
def sjoin_ghs_gadm(ghs, gadm_1, gadm_2):
    ghs_gadm_1 = gpd.sjoin(ghs, gadm_1, how="right")
    ghs_gadm_2 = gpd.sjoin(ghs, gadm_2, how="right")
    # Keep id columns
    ghs_gadm_1 = ghs_gadm_1[["ghs_id", "GID_0", "GID_1"]]
    ghs_gadm_2 = ghs_gadm_2[["ghs_id", "GID_0", "GID_1", "GID_2"]]
    # Export
    ghs_gadm_1.to_parquet(
        DATA / "intermediate/ghs_gadm/ghs_gadm_1.parquet", index=False
    )
    ghs_gadm_2.to_parquet(
        DATA / "intermediate/ghs_gadm/ghs_gadm_2.parquet", index=False
    )


# sjoin_ghs_gadm(ghs, gadm_1, gadm_2)


In [40]:
# Aggregate
for level in tqdm(range(3)):
    if level == 0:
        ghs_gadm_level = pd.read_parquet(
            DATA / f"intermediate/ghs_gadm/ghs_gadm_1.parquet"
        )
        ghs_gadm_level = ghs_gadm_level[["ghs_id", "GID_0"]].drop_duplicates()
    else:
        ghs_gadm_level = pd.read_parquet(
            DATA / f"intermediate/ghs_gadm/ghs_gadm_{level}.parquet"
        )

    time_to_dest_ghs_gadm_level = time_to_dest_ghs.merge(
        ghs_gadm_level[[f"GID_{level}", "ghs_id"]], on="ghs_id", how="left"
    ).merge(ghs[["ghs_id", "pop_2015"]], on="ghs_id", how="left")
    # Convert to long format
    time_to_dest_ghs_gadm_level = time_to_dest_ghs_gadm_level.drop(columns=["ghs_id"])
    time_to_dest_ghs_gadm_level = time_to_dest_ghs_gadm_level.melt(
        id_vars=["date", f"GID_{level}", "pop_2015"],
        var_name="dest_type",
        value_name="time_to_dest",
    )
    # Weighted median of time to destination, weighted by population of cities overlapping GADM
    time_to_dest_ghs_gadm_level["time_to_dest"] = (
        time_to_dest_ghs_gadm_level["time_to_dest"]
        * time_to_dest_ghs_gadm_level["pop_2015"]
    )
    time_to_dest_ghs_gadm_level = (
        time_to_dest_ghs_gadm_level.groupby(["date", f"GID_{level}", "dest_type"])[
            ["time_to_dest", "pop_2015"]
        ]
        .sum(min_count=1)
        .reset_index()
    )
    time_to_dest_ghs_gadm_level["time_to_dest"] = (
        time_to_dest_ghs_gadm_level["time_to_dest"]
        / time_to_dest_ghs_gadm_level["pop_2015"]
    )
    time_to_dest_ghs_gadm_level = time_to_dest_ghs_gadm_level.drop(columns=["pop_2015"])
    # Prefix dest_type with urban_
    time_to_dest_ghs_gadm_level["dest_type"] = (
        "urban_" + time_to_dest_ghs_gadm_level["dest_type"]
    )
    # Pivot
    time_to_dest_ghs_gadm_level = (
        time_to_dest_ghs_gadm_level.pivot(
            index=["date", f"GID_{level}"], columns="dest_type", values="time_to_dest"
        )
        .reset_index()
        .rename_axis(None, axis=1)
    )
    # Get year
    time_to_dest_ghs_gadm_level["year"] = time_to_dest_ghs_gadm_level["date"].dt.year
    # Drop date
    time_to_dest_ghs_gadm_level = time_to_dest_ghs_gadm_level.drop(columns=["date"])
    # Check if duplicates exist and warn if yes
    id_cols_level = ["year", f"GID_{level}"]
    time_to_dest_ghs_gadm_level = reorder_and_check_for_duplicates(
        time_to_dest_ghs_gadm_level, id_cols_level
    )
    # Export
    time_to_dest_ghs_gadm_level.to_parquet(
        DATA
        / f"intermediate/individual_aggregations/urban_access_level_{level}_annual.parquet",
        index=False,
    )
time_to_dest_ghs_gadm_level.head(2)


100%|██████████| 3/3 [00:00<00:00,  8.83it/s]


Unnamed: 0,year,GID_2,urban_time_to_airports_mins,urban_time_to_large_cities_mins,urban_time_to_medium_cities_mins,urban_time_to_ports_mins
0,2019,AFG.1.11_1,,522.87744,522.87744,
1,2019,AFG.1.3_1,,384.187039,241.174895,


# Combine everything


In [49]:
# Get file lists
files_dest = DATA / "intermediate/individual_aggregations"
monthly = list(files_dest.glob("*monthly.parquet"))
yearly = list(files_dest.glob("*yearly.parquet")) + list(
    files_dest.glob("*annual.parquet")
)
cross_section = list(files_dest.glob("*cross_section.parquet"))


## Monthly


In [50]:
[x.stem for x in monthly]


['gdelt_level_2_monthly',
 'precipitation_gpcp_level_0_monthly',
 'precipitation_gpcc_level_2_monthly',
 'temperature_level_0_monthly',
 'landcover_DW_level_2_monthly',
 'viirs_monthly_level_2_monthly',
 'landcover_DW_level_1_monthly',
 'temperature_level_1_monthly',
 'landcover_DW_level_0_monthly',
 'precipitation_gpcc_level_1_monthly',
 'gdelt_level_0_monthly',
 'precipitation_cru_level_2_monthly',
 'gdelt_level_1_monthly',
 'precipitation_gpcp_level_2_monthly',
 'precipitation_gpcp_level_1_monthly',
 'temperature_level_2_monthly',
 'precipitation_cru_level_1_monthly',
 'precipitation_gpcc_level_0_monthly',
 'viirs_monthly_level_1_monthly',
 'viirs_monthly_level_0_monthly',
 'precipitation_cru_level_0_monthly']

In [51]:
# Bring together datasets
monthly_dict = {}
for level in tqdm(range(3)):
    # Aggregate without level data
    id_cols = [f"GID_{level}", "year", "month"]
    # Prepare data with level
    def read_monthly_data(filepath, level):
        df = pl.read_parquet(filepath)
        # If other levels are present, drop them
        df = df.drop(
            columns=[
                x
                for x in ["GID_0", "GID_1", "GID_2"]
                if x != f"GID_{level}" and x in df.columns
            ]
        )
        # If there's a column called "sub_dataset", drop it
        if "sub_dataset" in df.columns:
            df = df.drop(columns=["sub_dataset"])
        # Only keep years after 1950
        df = df.filter(pl.col("year") > 1950)
        return df

    monthly_dict[level] = reduce(
        lambda x, y: x.join(y, on=id_cols, how="outer"),
        [read_monthly_data(x, level) for x in monthly if f"level_{level}" in str(x)],
    )
    # Reorder and check for duplicates
    monthly_dict[level] = reorder_and_check_for_duplicates_polars(
        monthly_dict[level], id_cols=id_cols
    )
monthly_dict[1].head()


  0%|          | 0/3 [00:00<?, ?it/s]

100%|██████████| 3/3 [01:36<00:00, 32.21s/it]


GID_1,year,month,landcover_DW_crops,landcover_DW_built,landcover_DW_bare,landcover_DW_snow_and_ice,landcover_DW_trees,landcover_DW_water,landcover_DW_flooded_vegetation,landcover_DW_grass,landcover_DW_shrub_and_scrub,landcover_DW_null,temperature,precipitation_gpcc,gdelt_protest,gdelt_coercion,precipitation_gpcp,precipitation_cru,viirs_mean,viirs_sum,viirs_median,viirs_count
str,i64,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,i64,i64,f64,f64,f64,f64,f64,i64
"""AFG.1_1""",1951,1,,,,,,,,,,,-13.39137,38.055367,,,,84.859138,,,,
"""AFG.2_1""",1951,1,,,,,,,,,,,-2.204422,48.117958,,,,54.299019,,,,
"""AFG.3_1""",1951,1,,,,,,,,,,,-5.380813,53.49892,,,,70.83387,,,,
"""AFG.4_1""",1951,1,,,,,,,,,,,0.393292,22.967308,,,,38.254765,,,,
"""AFG.5_1""",1951,1,,,,,,,,,,,-10.304203,21.916719,,,,44.439884,,,,


In [52]:
# Export
for level in range(3):
    monthly_dict[level].write_parquet(
        DATA / f"processed/imagery_aggregations/monthly_level_{level}.parquet",
    )


### Annualize monthly dataset


In [53]:
monthly_dict = {}
for level in range(3):
    monthly_dict[level] = pl.read_parquet(
        DATA / f"processed/imagery_aggregations/monthly_level_{level}.parquet"
    )


In [54]:
# Aggregate monthly to yearly
sum_cols = [
    # "violence_Riots",
    # "violence_Battles",
    # "violence_Protests",
    # "violence_Strategic developments",
    # "violence_Violence against civilians",
    # "violence_Explosions/Remote violence",
]
mean_cols = [
    "landcover_DW_snow_and_ice",
    "landcover_DW_shrub_and_scrub",
    "landcover_DW_built",
    "landcover_DW_crops",
    "landcover_DW_flooded_vegetation",
    "landcover_DW_water",
    "landcover_DW_trees",
    "landcover_DW_null",
    "landcover_DW_bare",
    "landcover_DW_grass",
    "temperature",
    "precipitation_gpcc",
    "gdelt_protest",
    "gdelt_coercion",
    "precipitation_gpcp",
    "precipitation_cru",
    "viirs_mean",
    "viirs_sum",
    "viirs_median",
    "viirs_count",
]

monthly_annualized = {}
for level, df in monthly_dict.items():
    print(level)
    # Aggregate monthly to yearly
    # IN THIS STEP ADD A CHECK - IF EVERYTHING IS NA, THEN SUM SHOULD BE NA
    id_cols_level = [f"GID_{level}", "year"]
    monthly_annualized[level] = df.groupby(id_cols_level).agg(
        [pl.col(x).sum().alias(x) for x in sum_cols]
        + [pl.col(x).mean().alias(x) for x in mean_cols]
        + [pl.col(x).count().alias(f"{x}_count") for x in mean_cols + sum_cols]
    )
    # If x_count is 0, then x should be NA
    monthly_annualized[level] = monthly_annualized[level].with_columns(
        [
            pl.when(pl.col(f"{col}_count") > 0)
            .then(pl.col(col))
            .otherwise(None)
            .alias(col)
            for col in mean_cols + sum_cols
        ]
    )
    # Drop count columns
    monthly_annualized[level] = monthly_annualized[level].drop(
        columns=[f"{x}_count" for x in mean_cols + sum_cols]
    )
monthly_annualized[1].head()


0
1
2


GID_1,year,landcover_DW_snow_and_ice,landcover_DW_shrub_and_scrub,landcover_DW_built,landcover_DW_crops,landcover_DW_flooded_vegetation,landcover_DW_water,landcover_DW_trees,landcover_DW_null,landcover_DW_bare,landcover_DW_grass,temperature,precipitation_gpcc,gdelt_protest,gdelt_coercion,precipitation_gpcp,precipitation_cru,viirs_mean,viirs_sum,viirs_median,viirs_count
str,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
"""CPV.2_1""",2019,,,,,,,,,,,23.708334,17.040833,0.222222,1.111111,,48.433333,0.501297,98.656977,0.317639,225.833333
"""ISL.4_1""",1966,,,,,,,,,,,-0.537881,37.851497,,,,56.091195,,,,
"""IND.13_1""",1967,,,,,,,,,,,7.934162,121.96607,,,,78.088784,,,,
"""UZB.5_1""",1993,,,,,,,,,,,10.696269,11.98322,,,0.651362,11.679645,,,,
"""YEM.3_1""",2006,,,,,,,,,,,20.42786,19.790196,0.0,2.25,0.694217,38.516806,,,,


In [55]:
# Export
for level in range(3):
    monthly_annualized[level].write_parquet(
        DATA
        / f"processed/imagery_aggregations/monthly_annualized_level_{level}.parquet",
        # index=False,
    )


## Yearly


In [64]:
sorted([x.stem for x in yearly])


['access_level_0_annual',
 'access_level_1_annual',
 'access_level_2_annual',
 'dmsp_cloud_free_coverage_level_0_yearly',
 'dmsp_cloud_free_coverage_level_1_yearly',
 'dmsp_cloud_free_coverage_level_2_yearly',
 'dmsp_stable_lights_level_0_yearly',
 'dmsp_stable_lights_level_1_yearly',
 'dmsp_stable_lights_level_2_yearly',
 'elevation_level_0_yearly',
 'elevation_level_1_yearly',
 'elevation_level_2_yearly',
 'fao_value_level_0_yearly',
 'fao_value_level_1_yearly',
 'fao_value_level_2_yearly',
 'fao_yield_level_0_yearly',
 'fao_yield_level_1_yearly',
 'fao_yield_level_2_yearly',
 'forest_loss_level_0_annual',
 'forest_loss_level_1_annual',
 'forest_loss_level_2_annual',
 'landcover_MODIS_level_0_annual',
 'landcover_MODIS_level_1_annual',
 'landcover_MODIS_level_2_annual',
 'ntl_dmsp_ext_level_0_yearly',
 'ntl_dmsp_ext_level_1_yearly',
 'ntl_dmsp_ext_level_2_yearly',
 'ntl_dvnl_level_0_yearly',
 'ntl_dvnl_level_1_yearly',
 'ntl_dvnl_level_2_yearly',
 'population_count_level_0_yearly',
 

In [75]:
yearly_dict = {}
for level in range(3):
    id_cols_level = [f"GID_{level}", "year"]
    # Prepare data with level
    def read_df(x):
        df = pl.read_parquet(x)
        df = df.with_column(pl.col("year").cast(pl.Int64))
        if "__index_level_0__" in df.columns:
            df = df.drop(["__index_level_0__"])
        if "sub_dataset" in df.columns:
            df = df.drop(["sub_dataset"])
        # Reorder and check for duplicates
        df = reorder_and_check_for_duplicates_polars(df, id_cols=id_cols_level)
        return df

    df = reduce(
        lambda x, y: x.join(y, on=id_cols_level, how="outer"),
        [read_df(x) for x in yearly if f"level_{level}" in str(x)],
    )
    # Merge
    # df["year"] = pd.to_numeric(df["year"])
    yearly_dict[level] = df
yearly_dict[1].head()


GID_1,year,telecom_mobile_coverage_oci_3g,telecom_mobile_coverage_mce,telecom_mobile_coverage_oci_2g,ntl_dvnl,population_density,wind_potential,solar_potential,ntl_dmsp_ext,ruggedness,dmsp_cloud_free_coverage,telecom_mobile_coverage_mce_5g,telecom_mobile_coverage_mce_3g,fao_value,telecom_mobile_coverage_mce_4g,dmsp_stable_lights,telecom_mobile_coverage_oci,road_length_tertiary_roads,road_length_primary_roads,road_length_secondary_roads,road_length_local_roads,road_length_highways,telecom_mobile_coverage_mce_2g,elevation,telecom_mobile_coverage_oci_4g,viirs,population_count,fao_yield,urban_time_to_airports_mins,urban_time_to_large_cities_mins,urban_time_to_medium_cities_mins,urban_time_to_ports_mins,forest_loss_count,forest_loss_sum,forest_loss_mean,time_to_cities_mins,time_to_large_cities_mins,time_to_medium_cities_mins,time_to_ports_mins,time_to_airports_mins,landcover_MODIS_water,landcover_MODIS_barren,landcover_MODIS_savanna,landcover_MODIS_urban_builtup,landcover_MODIS_wetland,landcover_MODIS_forest,landcover_MODIS_cropland,landcover_MODIS_grassland,landcover_MODIS_shrub,landcover_MODIS_null
str,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
"""AFG.1_1""",1992,,,,,,,,,,1082923.5,,,,,2.1e-05,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
"""AFG.2_1""",1992,,,,,,,,,,512842.78125,,,,,0.002994,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
"""AFG.3_1""",1992,,,,,,,,,,553175.75,,,,,0.060968,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
"""AFG.4_1""",1992,,,,,,,,,,466047.40625,,,,,0.500511,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
"""AFG.5_1""",1992,,,,,,,,,,368320.15625,,,,,0.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [76]:
# Merge in metadata
for level in range(1, 3):
    metadata_level_df = pl.read_parquet(
        DATA / f"processed/supporting_data/supporting_data_level_{level}.parquet"
    )
    metadata_level_df = metadata_level_df.with_column(
        pl.lit(yearly_dict[level]["year"].max()).cast(pl.Int64).alias("year")
    )
    yearly_dict[level] = yearly_dict[level].join(
        metadata_level_df, on=[f"GID_{level}", "year"], how="left"
    )


In [77]:
# If, for a given variable, the number of unique years is just one, then separate
# those variables out and export them as cross-sectional data
# First, figure out which variables are cross-sectional
# For every variable that is not GID_0 or year, dropna and check if there's only one unique year in yearly_dict[0]
cross_section_vars = []
for col in yearly_dict[0].columns:
    if col not in ["GID_0", "year"]:
        non_null_years = (
            yearly_dict[0]
            .filter(pl.col(col).is_not_null())
            .select(pl.col(col))
            .drop_nulls()
            .shape[0]
        )
        if non_null_years == 1:
            cross_section_vars.append(col)
# Separate these variables out
cs_dict = {}
for level in range(3):
    id_cols_level = [f"GID_{level}", "year"]
    # Select cross section variables, filter out rows where everything is null
    cs_dict[level] = (
        yearly_dict[level]
        .select(id_cols_level + [x for x in cross_section_vars])
    )
    # cs_dict[level] = cs_dict[level].drop_nulls(subset=list(cs_dict[level].columns))
    # Drop those columns from yearly_dict
    yearly_dict[level] = (
        yearly_dict[level]
        .drop(columns=cross_section_vars)
    )
    # yearly_dict[level] = yearly_dict[level].drop_nulls(subset=list(yearly_dict[level].columns))
    # Reorder and check for duplicates - yearly
    yearly_dict[level] = reorder_and_check_for_duplicates_polars(
        yearly_dict[level], id_cols=id_cols_level
    )
    yearly_dict[level].write_parquet(
        DATA / f"processed/imagery_aggregations/yearly_level_{level}.parquet",
    )
    # Reorder and check for duplicates - cross section
    cs_dict[level] = reorder_and_check_for_duplicates_polars(
        cs_dict[level], id_cols=id_cols_level
    )
    cs_dict[level].write_parquet(
        DATA / f"processed/imagery_aggregations/cross_section_level_{level}.parquet",
    )


In [78]:
# # Export
# for level in range(3):
#     # Reorder and check for duplicates
#     id_cols_level = [f"GID_{level}", "year"]
#     yearly_dict[level] = reorder_and_check_for_duplicates_polars(
#         yearly_dict[level], id_cols=id_cols_level
#     )
#     yearly_dict[level].write_parquet(
#         DATA / f"processed/imagery_aggregations/yearly_level_{level}.parquet",
#     )


## Cross section


In [79]:
cross_section = [x for x in cross_section if "fao_cross_section" not in str(x)]
cross_section


[]

In [80]:
# cs_dict = {}
# for level in range(3):
#     id_cols_level = [f"GID_{x}" for x in range(level + 1)] + ["year"]

#     def read_df_with_level(x):
#         df = pd.read_parquet(x)
#         if "year" not in df.columns:
#             df["year"] = df["date"].dt.year
#             df = df.drop(columns="date")
#         df = df.drop(columns=[x for x in df.columns if x.startswith("NAME_")])
#         return df

#     df = reduce(
#         lambda x, y: x.merge(y, on=id_cols_level, how="outer"),
#         [read_df_with_level(x) for x in cross_section if f"level_{level}" in str(x)],
#     )
#     # Merge
#     cs_dict[level] = df
# cs_dict[1].head()


In [81]:
# # Export
# for level in range(3):
#     cs_dict[level].to_parquet(
#         DATA / f"
#         / f"data/processed/imagery_aggregations/cross_section_level_{level}.parquet",
#         index=False,
#     )


## Combine


In [82]:
for level in range(3):
    print(level)
    monthly_df = pl.read_parquet(
        DATA
        / f"processed/imagery_aggregations/monthly_annualized_level_{level}.parquet"
    )
    yearly_df = pl.read_parquet(
        DATA / f"processed/imagery_aggregations/yearly_level_{level}.parquet"
    )
    cs_df = pd.read_parquet(
        DATA / f"processed/imagery_aggregations/cross_section_level_{level}.parquet"
    )
    # Merge
    id_cols_level = [f"GID_{level}", "year"]
    annualized = monthly_df.join(yearly_df, on=id_cols_level, how="outer")
    # Remove stray GADM ID's
    if level > 0:
        annualized = annualized.drop(
            columns=[
                f"GID_{x}" for x in range(level) if f"GID_{x}" in annualized.columns
            ]
        )
    # # Add GADM ID's
    # annualized = add_gadm_ids_polars(annualized, level)
    # annualized = annualized.merge(cs_df, on=id_cols_level, how="outer")
    # Some additional cleaning
    # Remove stray column
    if "__index_level_0__" in annualized.columns:
        annualized = annualized.drop(["__index_level_0__"])
    # Disambiguate viirs
    annualized = annualized.rename(
        {
            "viirs_mean": "viirs_custom_mean",
            "viirs_sum": "viirs_custom_sum",
            "viirs_median": "viirs_custom_median",
            "viirs_count": "viirs_custom_count",
        }
    )
    # Reorder and check for duplicates
    annualized = reorder_and_check_for_duplicates_polars(
        annualized, id_cols=id_cols_level
    )
    annualized.write_parquet(
        DATA / f"processed/imagery_aggregations/annualized_level_{level}.parquet",
    )
    annualized.to_pandas().to_stata(
        DATA / f"processed/imagery_aggregations/annualized_level_{level}.dta",
        write_index=False,
        variable_labels={k: k for k in annualized.columns},
    )
    annualized.write_csv(
        DATA / f"processed/imagery_aggregations/annualized_level_{level}.csv",
    )


0
1
2


In [83]:
annualized.head()


GID_2,year,landcover_DW_snow_and_ice,landcover_DW_shrub_and_scrub,landcover_DW_built,landcover_DW_crops,landcover_DW_flooded_vegetation,landcover_DW_water,landcover_DW_trees,landcover_DW_null,landcover_DW_bare,landcover_DW_grass,temperature,precipitation_gpcc,gdelt_protest,gdelt_coercion,precipitation_gpcp,precipitation_cru,viirs_custom_mean,viirs_custom_sum,viirs_custom_median,viirs_custom_count,telecom_mobile_coverage_oci_2g,ruggedness,fao_value,telecom_mobile_coverage_oci_4g,population_count,ntl_dvnl,ntl_dmsp_ext,dmsp_cloud_free_coverage,road_length_tertiary_roads,road_length_secondary_roads,road_length_primary_roads,road_length_local_roads,road_length_highways,population_density,telecom_mobile_coverage_mce_5g,...,landcover_MODIS_water,landcover_MODIS_grassland,landcover_MODIS_cropland,landcover_MODIS_shrub,landcover_MODIS_barren,urban_time_to_airports_mins,urban_time_to_large_cities_mins,urban_time_to_medium_cities_mins,urban_time_to_ports_mins,time_to_cities_mins,time_to_large_cities_mins,time_to_medium_cities_mins,time_to_ports_mins,time_to_airports_mins,forest_loss_count,forest_loss_sum,forest_loss_mean,has_capital,has_land_border,dist_geomcentr_to_capital_km,lon_geomcentr,lat_geomcentr,has_rivers_lakes,has_coasts,lon_popcentr,lat_popcentr,dist_popcentr_to_capital_km,has_flare,has_ports,has_int_airports,has_large_airports,has_medium_airports,has_min_deposit,has_tech_minerals,has_energy_minerals,has_precious_minerals,has_other_minerals
str,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,...,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,i64,i64,f64,f64,f64,i64,i64,f64,f64,f64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64
"""JPN.25.23_1""",1978,,,,,,,,,,,16.766667,153.882503,,,,151.750001,,,,,,,,,,,,,,,,,,,,...,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
"""ROU.30.16_1""",1976,,,,,,,,,,,4.301166,57.008429,,,,61.471392,,,,,,,,,,,,,,,,,,,,...,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
"""VEN.21.19_1""",1983,,,,,,,,,,,25.460744,147.01002,,,,136.306831,,,,,,,,,,,,,,,,,,,,...,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
"""GTM.7.29_1""",1984,,,,,,,,,,,17.616667,94.1125,,,,140.258337,,,,,,,,,,,,,,,,,,,,...,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
"""TWN.6.1_1""",1991,,,,,,,,,,,21.600866,220.882676,2.272727,11.272727,4.202229,123.194164,,,,,,,,,,,,,,,,,,,,...,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
