# Generate parquet for ingestion

In order to ingest data into the S2S database, it needs to follow a specific schema. This notebook will explore that schema and combine the various CSV files into a final output parquet file.

In [12]:
import sys
import os
import urllib3
import boto3
import ssl
import warnings

import geopandas as gpd
import pandas as pd
import numpy as np

from tqdm.notebook import tqdm
from shapely.geometry import Point
from operator import itemgetter

sys.path.append(r"C:\WBG\Work\Code\GOSTrocks\src")

from GOSTrocks.misc import tPrint

ssl._create_default_https_context = ssl._create_unverified_context

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
warnings.simplefilter(action='ignore', category=FutureWarning)

s3 = boto3.client('s3', verify=False)


In [2]:
good_schema_example = "s3://wbg-geography01/Space2Stats/parquet/GLOBAL/GHS_SMOD_counts_and_pop.parquet"
good_schema = pd.read_parquet(good_schema_example)
good_schema.head()

Unnamed: 0,hex_id,ghs_11_count,ghs_12_count,ghs_13_count,ghs_21_count,ghs_22_count,ghs_23_count,ghs_30_count,ghs_total_count,ghs_11_pop,ghs_12_pop,ghs_13_pop,ghs_21_pop,ghs_22_pop,ghs_23_pop,ghs_30_pop,ghs_total_pop
0,860000007ffffff,0,0,0,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,86000000fffffff,0,0,0,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,860000017ffffff,0,0,0,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,86000001fffffff,0,0,0,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,860000027ffffff,0,0,0,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


The table schema for tables to be imported into Space2Stats is simple with only two columns required:
- hex_id: the h3 grid id for the polygon used in the zonal calculation; default is h3 level 6
- sum_pop_f_0_2020: attribute of interest to be ingested, should be written in the following style __f'{aggregation_method}_{attribute_name}'__

# Combine output CSV files into single parquet

In [3]:
#Search for all relevant CSV files
bucket = 'wbg-geography01' 
prefix = 'Space2Stats/h3_stats_data/GLOBAL/VIIRS_Monthly_LEN/'
region = 'us-east-1'
s3client = boto3.client('s3', region_name=region, verify=False)

# Loop through the S3 bucket and get all the keys for files that are .csv 
more_results = True
loops = 0
verbose=True
good_res = []
while more_results:
    if verbose:
        print(f"Completed loop: {loops}")
    if loops > 0:
        objects = s3client.list_objects_v2(Bucket=bucket, Prefix=prefix, ContinuationToken=token)
    else:
        objects = s3client.list_objects_v2(Bucket=bucket, Prefix=prefix)
    more_results = objects['IsTruncated']
    if more_results:
        token = objects['NextContinuationToken']
    loops += 1
    for res in objects['Contents']:
        if res['Key'].endswith('.csv'):            
            cur_variable = res['Key'].split("/")[-3]
            good_res.append(res['Key'])

Completed loop: 0
Completed loop: 1
Completed loop: 2
Completed loop: 3
Completed loop: 4
Completed loop: 5
Completed loop: 6
Completed loop: 7
Completed loop: 8
Completed loop: 9
Completed loop: 10
Completed loop: 11
Completed loop: 12
Completed loop: 13
Completed loop: 14
Completed loop: 15
Completed loop: 16
Completed loop: 17
Completed loop: 18


In [10]:
year_set = {}
for res in good_res:
    year = os.path.basename(res).split("_")[2][:4]    
    if year not in year_set.keys():
        year_set[year] = []
    year_set[year].append(res)

In [None]:
# Create empty data frame based on the schema of the good schema
out_res = pd.DataFrame(index=good_schema['hex_id'])
for c_year in range(2012, 2025):
    for mnth in range(1, 13):
        out_res[f"sum_viirs_ntl_{c_year}{mnth:02d}"] = None

out_res

In [19]:
out_folder = r"C:\WBG\Work\S2S\ingest"

In [53]:
for year, year_res in year_set.items():
    out_file = os.path.join(out_folder, f"VIIRS_NTL_{year}.parquet")
    if not os.path.exists(out_file):
        out_res = pd.DataFrame(index=good_schema['hex_id'])
        for mnth in range(1, 13):
            out_res[f"sum_viirs_ntl_{year}{mnth:02d}"] = None   
        for in_file in tqdm(year_res, desc=f"Processing {year}"):
            month = os.path.basename(in_file)[8:14]
            full_path = f's3://{bucket}/{in_file}'

            obj = s3.get_object(Bucket=bucket, Key=in_file)
            curD = pd.read_csv(obj['Body'])
            curD['SUM'].replace(-1.0, None, inplace=True)
            curD.drop(['MIN','MAX','MEAN'], axis=1, inplace=True)
            curD.columns = [f"SUM_VIIRS_NTL_{month}", "hex_id"] 
            curD.columns = [x.lower() for x in curD.columns]   
            curD.set_index('hex_id', inplace=True)
            curD.fillna(value=np.nan)

            out_res.loc[curD.index, curD.columns[0]] = curD[curD.columns[0]] 
        out_res.reset_index(inplace=True)
        out_res.to_parquet(out_file)
    else:
        tPrint(f'{year} completed')
        #tempD = pd.read_parquet(out_file)        
        #print(tempD.isna().sum())

11:46:37	2023 completed
11:46:37	2024 completed
11:46:37	2012 completed
11:46:37	2013 completed
11:46:37	2014 completed
11:46:37	2015 completed
11:46:37	2016 completed
11:46:37	2017 completed
11:46:37	2018 completed
11:46:37	2019 completed
11:46:37	2020 completed
11:46:37	2021 completed


Processing 2022:   0%|          | 0/1220 [00:00<?, ?it/s]

# Create annual data from the monthly data

In [54]:
parquet_folder = r"C:\WBG\Work\S2S\ingest"
parquet_files = [os.path.join(parquet_folder, x) for x in os.listdir(parquet_folder) if x.endswith('.parquet')]

In [84]:
tPrint('start')
#out_data = pd.DataFrame(index=good_schema['hex_id'])
annual_res = {}
for parquet_file in parquet_files:
    year = os.path.basename(parquet_file).split("_")[2][:4]
    tempD = pd.read_parquet(parquet_file)
    annual_sum = tempD.apply(lambda x: x[1:].max(skipna=True), axis=1)
    annual_res[f"sum_viirs_ntl_{year}"] = annual_sum
    #out_data[f"sum_viirs_ntl_{year}"] = annual_sum
    tPrint(f"Completed {year}")

08:31:22	start
08:36:47	Completed 2012
08:42:07	Completed 2013
08:47:57	Completed 2014
08:53:16	Completed 2015
08:58:44	Completed 2016
09:04:02	Completed 2017
09:09:22	Completed 2018
09:14:47	Completed 2019
09:20:05	Completed 2020
09:29:33	Completed 2021
09:36:55	Completed 2022
09:43:15	Completed 2023
09:49:35	Completed 2024


In [87]:
out_data = pd.DataFrame(annual_res)
out_data['hex_id'] = tempD['hex_id']
out_data.to_parquet(os.path.join(parquet_folder, "VIIRS_NTL_annual.parquet"))


# DEBURRGGING


In [66]:
annual_sum.min(skipna=True)

0.0

In [50]:
# Convert None to NaN in broken dataset
in_file = r"C:\WBG\Work\S2S\ingest\VIIRS_NTL_2022.parquet"
tempD = pd.read_parquet(in_file)
tempD.head()

Unnamed: 0,hex_id,sum_viirs_ntl_202201,sum_viirs_ntl_202202,sum_viirs_ntl_202203,sum_viirs_ntl_202204,sum_viirs_ntl_202205,sum_viirs_ntl_202206,sum_viirs_ntl_202207,sum_viirs_ntl_202208,sum_viirs_ntl_202209,sum_viirs_ntl_202210,sum_viirs_ntl_202211,sum_viirs_ntl_202212
0,860000007ffffff,,,,,,,,,,,,
1,86000000fffffff,,,,,,,,,,,,
2,860000017ffffff,,,,,,,,,,,,
3,86000001fffffff,,,,,,,,,,,,
4,860000027ffffff,,,,,,,,,,,,


In [52]:
tempD.fillna(value=np.nan).to_parquet(in_file)