In [2]:
import rasterio as rio
import numpy as np
import pandas as pd
from glob import glob
import os
import math
from tqdm import tqdm
import ee 
import ee_utils
import tensorflow as tf

2023-08-08 14:39:46.835238: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-08-08 14:39:46.895959: I tensorflow/core/util/port.cc:104] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


#### Earth Engine LST 

In [33]:
try:
    ee.Initialize()
except Exception as e:
    ee.Authenticate()
    ee.Initialize()


Successfully saved authorization token.


In [34]:
# ========== ADAPT THESE PARAMETERS ==========
# To export to Google Drive, uncomment the next 2 lines
EXPORT = ''
BUCKET = None
# export location parameters
ERA5_EXPORT_FOLDER = ''
CSV_PATH = '../data/dataset_viirs_only.csv'
BANDS = ['mean_2m_air_temperature', 'minimum_2m_air_temperature', 'maximum_2m_air_temperature']
# image export parameters
PROJECTION = 'EPSG:3857'  # see https://epsg.io/3857
SCALE = 30                # export resolution: 30m/px
EXPORT_TILE_RADIUS = 3  # We only need the central values here
CHUNK_SIZE = None    # set to a small number (<= 50) if Google Earth Engine reports memory errors; 
csv = pd.read_csv(CSV_PATH)

In [35]:
def export_images(
        df: pd.DataFrame,
        collection: ee.ImageCollection,
        country: str,
        year: int,
        export_folder: str,
        chunk_size = 1,
 ):
    '''
    Args
    - df: pd.DataFrame, contains columns ['lat', 'lon', 'country', 'year']
    - country: str, together with `year` determines the survey to export
    - year: int, together with `country` determines the survey to export
    - export_folder: str, name of folder for export
    - chunk_size: int, optionally set a limit to the # of images exported per TFRecord file
        - set to a small number (<= 50) if Google Earth Engine reports memory errors

    Returns: dict, maps task name tuple (export_folder, country, year, chunk) to ee.batch.Task
    '''

    subset_df = df[(df['country'] == country) & (df['year'] == year)].reset_index(drop=True)
    if chunk_size is None:
        chunk_size = len(subset_df)
    num_chunks = int(math.ceil(len(subset_df) / chunk_size))
    tasks = {}

    for i in range(num_chunks):
        chunk_slice = slice(i * chunk_size, (i+1) * chunk_size - 1)  # df.loc[] is inclusive
        fc = ee_utils.df_to_fc(subset_df.loc[chunk_slice, :])
        for prev_year in range(year-4, year+1):
            start_date, end_date = str(prev_year)+'-01-01',str(prev_year)+'-12-31'
            roi = fc.geometry()
            collection_max = collection.select(BANDS[2]).filterDate(start_date, end_date).filterBounds(roi)
            collection_min = collection.select(BANDS[1]).filterDate(start_date, end_date).filterBounds(roi)
            collection_ave = collection.select(BANDS[0]).filterDate(start_date, end_date).filterBounds(roi)
            ave = collection_ave.median()
            max = collection_max.max()
            min = collection_min.min()
            ave = ee_utils.add_latlon(ave)
            max = ee_utils.add_latlon(max)
            min = ee_utils.add_latlon(min)

            fname = f'{country}_{year}_{prev_year}_{i:02d}'
            tasks[(export_folder, country, prev_year, i)] = ee_utils.get_array_patches(
                    img=ave, scale=SCALE, ksize=EXPORT_TILE_RADIUS,
                    points=fc, export='drive',
                    prefix=export_folder, fname=fname+'_ave',
                    bucket=None), ee_utils.get_array_patches(
                    img=min, scale=SCALE, ksize=EXPORT_TILE_RADIUS,
                    points=fc, export='drive',
                    prefix=export_folder, fname=fname+'_min',
                    bucket=None), ee_utils.get_array_patches(
                    img=max, scale=SCALE, ksize=EXPORT_TILE_RADIUS,
                    points=fc, export='drive',
                    prefix=export_folder, fname=fname+'_max',
                    bucket=None)
    return tasks

In [None]:
collection = ee.ImageCollection("ECMWF/ERA5/MONTHLY")
dataset = pd.read_csv('../data/dataset_viirs_only.csv')
dataset_ = list(dataset.groupby(['country', 'year']).groups.keys())
tasks = {}
for country, year in tqdm(dataset_):
    print(country, year)
    new_tasks = export_images(
        df=dataset, collection=collection, country=country, year=year,
        export_folder=ERA5_EXPORT_FOLDER, chunk_size=CHUNK_SIZE)
    tasks.update(new_tasks)


In [37]:
REQUIRED_BANDS = ['minimum_2m_air_temperature', 'maximum_2m_air_temperature','mean_2m_air_temperature']

BANDS_ORDER = ['minimum_2m_air_temperature', 'maximum_2m_air_temperature','mean_2m_air_temperature']


EXPORT_FOLDER = '../data/additional_data/temperature'
PROCESSED_FOLDER = '../data/additional_data/temperature'
def validate_and_split_tfrecords(
        tfrecord_paths,
        out_dir: str,
        df: pd.DataFrame,
        country,
        year
        ) -> None:
    '''Validates and splits a list of exported TFRecord files (for a
    given country-year survey) into individual TFrecords, one per cluster.

    "Validating" a TFRecord comprises of 2 parts
    1) verifying that it contains the required bands
    2) verifying that its other features match the values from the dataset CSV

    Args
    - tfrecord_paths: list of str, paths to exported TFRecords files
    - out_dir: str, path to dir to save processed individual TFRecords
    - df: pd.DataFrame, index is sequential and starts at 0
    '''
    # Create an iterator over the TFRecords file. The iterator yields
    # the binary representations of Example messages as strings.
    options = tf.io.TFRecordOptions(compression_type = 'GZIP')

    # cast float64 => float32 and str => bytes
    for col in df.columns:
        if df[col].dtype == np.float64:
            df[col] = df[col].astype(np.float32)
        elif df[col].dtype == object:  # pandas uses 'object' type for str
            df[col] = df[col].astype(bytes)

   
    progbar = tqdm(total=len(df))

    for tfrecord_path in tfrecord_paths:
        iterator = tf.compat.v1.io.tf_record_iterator(tfrecord_path, options=options)
        for record_str in iterator:
            # parse into an actual Example message
            ex = tf.train.Example.FromString(record_str)
            feature_map = ex.features.feature
            index = str(int(feature_map["cluster"].float_list.value[0]))
            # for band in REQUIRED_BANDS:
            #     assert band in feature_map, f'Band "{band}" not in record {index} of {tfrecord_path}'
            # serialize to string and write to file
            out_path = os.path.join(out_dir, f'{index}'+"_"+tfrecord_path[-15:-12]+'.tfrecord.gz')  # all surveys have < 1e6 clusters
            with tf.io.TFRecordWriter(out_path, options=options) as writer:
                writer.write(ex.SerializeToString())

            progbar.update(1)
    progbar.close()
    

def process_dataset(csv_path: str, input_dir: str, processed_dir: str) -> None:
    '''
    Args
    - csv_path: str, path to CSV of DHS or LSMS clusters
    - input_dir: str, path to TFRecords exported from Google Earth Engine
    - processed_dir: str, folder where to save processed TFRecords
    '''
    df = pd.read_csv(csv_path, float_precision='high', index_col=False)
    surveys = list(df.groupby(['country', 'year']).groups.keys())  # (country, year) tuples
   
    # print(year, type(year))
    for country, year in surveys:
        if year == 2012: 
                year=2013  
        
        # Checking inside potential subfolders
        for prev_year in range(year-4, year+1):
            country_year = f'{country}_{year}_{prev_year}'
            print('Processing:', country_year)
            tfrecord_paths = glob(os.path.join(input_dir, country_year+'*.tfrecord.gz'))
            tfrecord_paths += glob(os.path.join(input_dir, "*", country_year + '*.tfrecord.gz'))
            tfrecord_paths += glob(os.path.join(input_dir, "*","*", country_year + '*.tfrecord.gz'))

            out_dir = os.path.join(processed_dir, country_year)
            os.makedirs(out_dir, exist_ok=True)
            subset_df = df[(df['country'] == country) & (df['year'] == year)].reset_index(drop=True)
            validate_and_split_tfrecords(
            tfrecord_paths=tfrecord_paths, out_dir=out_dir, df=subset_df, country=country, year=year)

In [None]:
process_dataset(
    csv_path='../data/dataset_viirs_only.csv',
    input_dir=EXPORT_FOLDER,
    processed_dir=PROCESSED_FOLDER
)

In [9]:
import gzip
import shutil
from tfrecord.torch.dataset import TFRecordDataset
import torch
import rasterio

In [7]:
CSV              = os.path.join( "..", "data", "dataset_viirs_only.csv" )
RECORDS_DIR      = os.path.join( "..", "data", "additional_data", "temperature", "")
TIF_DIR          = os.path.join( "..", "data", "additional_data", "temperature", "" )

csv = pd.read_csv(CSV)
records = dict()
for year in csv.year.unique():
    sub_year = csv[ csv.year == year ]
    for prev_year in range(year-4, year+1):
        records[year, prev_year] = dict()
        for country in sub_year.country.unique():
            sub_country = sub_year[ sub_year.country == country ].copy()
            pattern = RECORDS_DIR+"*"+str(country)+"_"+str(year)+"_"+str(prev_year)+"/*.tfrecord*"
            records[year,prev_year][country] = glob(pattern)

def decompress_tfrecord(tfrecord_archive):
    with gzip.open(tfrecord_archive, 'rb') as f_in:
        # WITHOUT .GZ
        with open(tfrecord_archive[:-3], 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)
    return tfrecord_archive[:-3]

def tensor_to_string(data, variable):
    filename = (data[variable].numpy())[0][0]
    return str(filename).replace(".","")



In [16]:
DESCRIPTORMIN       = {
                'cluster':"float",
                'lat':"float", 
                "lon":"float",
                'wealthpooled':"float",
                'minimum_2m_air_temperature':'float',
              } 
DESCRIPTORMAX       = {
                'cluster':"float",
                'lat':"float", 
                "lon":"float",
                'wealthpooled':"float",
                'maximum_2m_air_temperature':'float',
              } 
DESCRIPTORAVE       = {
                'cluster':"float",
                'lat':"float", 
                "lon":"float",
                'wealthpooled':"float",
                'mean_2m_air_temperature':'float'
              }  
BANDNAMES = {'ave':'mean_2m_air_temperature', 'min':'minimum_2m_air_temperature','max':'maximum_2m_air_temperature'}

def tfrecord_to_tif(data, filename, mins, maxs,band):
    arrays = [] 
    new_arr = data[BANDNAMES[band]].numpy().reshape((7,7))
    arrays.append(new_arr)
    mins = min(mins, new_arr.min())
    maxs = max(maxs, new_arr.max())

    arr = np.swapaxes(np.array(arrays), 0, 2 )
    tif_path = TIF_DIR + filename
    tif = rasterio.open(tif_path, 'w', driver='GTiff',
                            height = arr.shape[0], width = arr.shape[1],
                            count=8, dtype=str(arr.dtype),
                            crs='epsg:3857',
                            transform=None)
    tif.write(arr[:,:,0],1)
    tif.close()

    return mins, maxs

def read_record(data,band):
    new_arr = data[BANDNAMES[band]].numpy().reshape((7,7))
    return new_arr[3,3]

def map_row(row,band, val, year, country, cluster):
    if int(row.year)==int(year) and row.country==country and int(row.cluster) == int(cluster):
        row[str(int(prev_year))+'_'+band] = val
    return val

In [18]:
df=pd.read_pickle('../data/dataset_precipitation.pkl')
df.head()

Unnamed: 0,country,year,cluster,lat,lon,households,wealthpooled,urban_rural,fold,precipitation
0,angola,2015,1,-12.1014,14.1407,26,-1.132639,0.0,D,"[[662.8333333333334, 0, 1648], [681.9166666666..."
1,angola,2015,2,-9.6635,20.377,26,0.669843,1.0,E,"[[1048.25, 0, 2346], [1021.0, 0, 2415], [1059...."
2,angola,2015,3,-8.9289,13.2995,10,1.515591,1.0,D,"[[555.0833333333334, 0, 2752], [242.9166666666..."
3,angola,2015,4,-14.2876,17.6217,26,-0.559135,0.0,B,"[[980.75, 0, 2385], [853.9166666666666, 0, 234..."
4,angola,2015,5,-14.211,13.5463,26,-1.186118,0.0,B,"[[656.25, 0, 2028], [491.75, 0, 2191], [634.75..."


In [23]:
for year, prev_year in records:
    print(year, prev_year)
    for country in records[year, prev_year]:
        for tfrecord_archive in records[year,prev_year][country]:
            if tfrecord_archive[-3:] == '.gz':
                tfrecord = decompress_tfrecord(tfrecord_archive=tfrecord_archive)
                tfrecord = tfrecord_archive[:-3]
            else:
                tfrecord = tfrecord_archive
            band = tfrecord[-12:-9]
            if band == 'ave':
                descriptor = DESCRIPTORAVE
            if band == 'min':
                descriptor = DESCRIPTORMIN
            if band == 'max':
                descriptor = DESCRIPTORMAX
            dataset = TFRecordDataset(tfrecord, index_path=None, description=descriptor)
            loader = torch.utils.data.DataLoader(dataset, batch_size=1)
            iterator = iter(loader)
            while (data := next(iterator, None)) is not None:
                # filename = str(country)+"_"+str(year)+"_"+str(prev_year)+"/"+tensor_to_string(data, "cluster")[:-1]+"_"+band+".tif"
                # mins[band], maxs[band] = tfrecord_to_tif(data, filename, mins[band], maxs[band], band)
                val = read_record(data,band)
                if str(int(prev_year))+'_'+band not in df.columns:
                    df[str(int(prev_year))+'_'+band] = ''
                cluster =tensor_to_string(data, "cluster")[:-1]
                df = df.apply(lambda row: map_row(row,band, val, year ,country,cluster),axis=1)
            

2015 2011


KeyboardInterrupt: 

In [3]:
dataset = pd.read_pickle('../data/dataset_additional.pkl')
len(dataset[dataset['temperature']==''])

14386

In [4]:
def map_tmp(row):
    vector = []
    for year in range(int(row.year)-4,int(row.year)+1):
        bands = ['ave', 'min', 'max']
        val = [0.,0.,0.]
        for i in range(len(bands)):
            tif = os.path.join( os.path.join('../data', 'additional_data','temperature'), str(row.country)+"_"+str(int(row.year))+"_"+str(int(year)), str(int(row.cluster))+"_"+bands[i]+".tif")
            with rio.open(tif) as src: 
                # We extract the central value
                for value in src.sample([(3, 3)]): 
                    val[i] = value[0]
                src.close()
            vector.append(val)
    return vector

In [None]:
dataset = pd.read_pickle('../data/dataset_additional.pkl')
dataset.temperature = dataset.apply(map_tmp, axis=1)
dataset.to_pickle('../data/dataset_additional.pkl')

#### FOA WAPOR PCP (From CHIRPS catalog)

In [3]:
dataset = pd.read_csv('../data/dataset_viirs_only.csv')
PATH = os.path.join('../data', 'additional_data','precipitation')

In [12]:
dataset.head()
dataset.to_csv('data/dataset_additional.csv', index=False)

In [5]:
dataset["precipitation"] = ''

In [6]:
MIN = 1e6
MAX = -1e6

In [10]:
def correct_island_coordinates(x,y):
    '''return the closest valid points when dealing with islands due to coarse tif resolution'''
    # Tanzania
    if int(y)==-5 or int(y)==-6 and int(x)==39:
        return 39.29, -5.98
    # Sierra Leone 
    if (int(x)==-13 or int(x)==-12) and (int(y)==8 or int(y)==7):
        return -12.7, 7.8
    # Senegal 
    if int(x) in [-12,-13,-16,-17] and int(y) in [12,13,14,15,16]:
        return x-1, y
    # Mozambique 
    if (int(x), int(y)) in [(32,-25),(40,-12)]:
        return int(x), int(y)
    if (int(x), int(y))== (34, -19):
        return 34.80, -19.80
    # Madagascar
    if (int(x)==43 and int(y)==-23):
        return x+1, y
    if (int(x) in (48,49) and int(y) in (-12,-13)):
        return x, y-2 
    # Guinea 
    if int(x)==-13 and int(y)==9:
        return x+0.5, y+0.5
    # Cote d'Ivoir
    if int(x)==-6 and int(y)==4:
        return x+0.5, y+0.5
    # Benin
    if int(x) in (1,2) and int(y)==6:
        return x, y+0.5
    # Angola
    if int(x)==-13 and int(y) in (-8,-12):
        return x+0.5, y
    return x,y

In [12]:
for year in dataset.year.unique():
    df = dataset[dataset.year==year]
    for country in df.country.unique():
        df_country = df[df.country==country]
        for cluster in df_country.cluster.unique():
            vector = []
            row = df_country.loc[(df['cluster'] == cluster)]
            x = float(row.at[row.index[0],'lon'])
            y = float(row.at[row.index[0],'lat'])
            x, y = correct_island_coordinates(x,y)
            for prev_year in range(year-4,year+1):
                monthly_tifs = glob.glob(os.path.join(PATH, str(prev_year)+"*.tif"))
                min_row = 1e6
                max_row = -1e5   
                ave_row = 0
                for tif in monthly_tifs:
                    with rio.open(tif) as src: 
                        for val in src.sample([(x, y)]): 
                            # THE ORIGINAL RASTERS HAVE TO COARSE RESOLUTION TO CAPTURE SMALL ISLANDS AS VALID COORDINATES
                            # WE TAKE THE THE CLOSEST COASTAL POINT IN THIS CASE 

                            max_row = max(val[0], max_row)
                            min_row = min(val[0], min_row)
                            ave_row += val[0]
                            MIN = min(val[0], MIN)
                            MAX = max(val[0], MAX)
                ave_row /= len(monthly_tifs)
                vector.append([ave_row, min_row, max_row])
            dataset.at[row.index[0],'precipitation'] = vector
dataset.head()

Unnamed: 0,country,year,cluster,lat,lon,households,wealthpooled,urban_rural,fold,precipitation
0,angola,2015,1,-12.1014,14.1407,26,-1.132639,0.0,D,"[[662.8333333333334, 0, 1648], [681.9166666666..."
1,angola,2015,2,-9.6635,20.377,26,0.669843,1.0,E,"[[1048.25, 0, 2346], [1021.0, 0, 2415], [1059...."
2,angola,2015,3,-8.9289,13.2995,10,1.515591,1.0,D,"[[555.0833333333334, 0, 2752], [242.9166666666..."
3,angola,2015,4,-14.2876,17.6217,26,-0.559135,0.0,B,"[[980.75, 0, 2385], [853.9166666666666, 0, 234..."
4,angola,2015,5,-14.211,13.5463,26,-1.186118,0.0,B,"[[656.25, 0, 2028], [491.75, 0, 2191], [634.75..."


In [14]:
dataset.to_csv('../data/dataset_precipitation.csv',index=False)
dataset.to_pickle('../data/dataset_precipitation.pkl')

In [None]:
dataset.to_pickle('../data/dataset_precipitation.pkl')