## Pre-requsites

Go through the preprocessing/downloading_satellite_imagery.ipynb notebook.

```
data/
    dhs_tfrecords_raw/
        angola_2011_00.tfrecord.gz
        ...
        zimbabwe_2015_XX.tfrecord.gz
```

## Instructions

This notebook processes the exported TFRecords as follows:
    1. Verifies that the fields in the TFRecords match the original CSV files.
    2. Splits each monolithic TFRecord file exported from Google Earth Engine into one file per record.

After running this notebook, you should a new folder (`dhs_tfrecords`) under `data/`:

```
data/
    dhs_tfrecords/
        angola_2011/
            00000.tfrecord.gz
            ...
            00229.tfrecord.gz
         ...
         zimbabwe_2015/
            00000.tfrecord.gz
            ...
            00399.tfrecord.gz
```

This notebook also calculates the mean and standard deviation of each band

## Imports relevant modules and define Constants

In [0]:
from typing import Iterable
from glob import glob
from pprint import pprint
import os
import time
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
from tqdm.auto import tqdm
#os.chdir('/dbfs/mnt/raw/DataScientistSDSData/')
#stat_bands = ['BLUE', 'GREEN', 'RED', 'SWIR1', 'SWIR2', 'TEMP1', 'NIR', 'DMSP', 'VIIRS']
# REQUIRED_BANDS = [
#     'BLUE', 'GREEN', 'NIGHTLIGHTS', 'NIR', 'RED',    'SWIR1', 'SWIR2', 'TEMP1']
REQUIRED_BANDS = ['BLUE', 'GREEN', 'NIGHTLIGHTS', 'NIR', 'RED',    'SWIR1', 'SWIR2']
BANDS_ORDER = ['BLUE', 'GREEN', 'RED', 'SWIR1', 'SWIR2', 'TEMP1', 'NIR',    'DMSP', 'VIIRS']
csv_path='/dbfs/mnt/raw/DataScientistSDSData/TLSED/dhs_clusters.csv'
DHS_NEW_PROCESSED_FOLDER = 'ssl_dhs_tfrecords_raw_processed/'
input_dir='/dbfs/mnt/raw/DataScientistSDSData/TLSED_test/raw/'
processed_dir='/dbfs/mnt/raw/DataScientistSDSData/TLSED_test/processed/'



## Validate and Split Exported TFRecords

In [0]:
def process_dataset(csv_path: str, input_dir: str, processed_dir: str) -> None:
    '''
    Args
    - csv_path: str, path to CSV of DHS or LSMS clusters
    - input_dir: str, path to TFRecords exported from Google Earth Engine
    - processed_dir: str, folder where to save processed TFRecords
    '''
    df = pd.read_csv(csv_path, float_precision='high', index_col=0)
    df = df.reset_index()
    surveys = list(df.groupby(['country', 'year']).groups.keys())  # (country, year) tuples

    for country, year in surveys:
        country_year = f'{country}_{year}'
        #print('Processing:', country_year)

        #tfrecord_paths = glob(os.path.join(input_dir, country_year + '*'))
        tfrecord_paths = get_tfrecord_paths(input_dir, country_year)
        out_dir = f'{os.path.join(processed_dir, country_year)}/'
        os.makedirs(out_dir, exist_ok=True)
        subset_df = df[(df['country'] == country) & (df['year'] == year)].reset_index(drop=True)
        bands_missing_paths = validate_and_split_tfrecords(
            tfrecord_paths=tfrecord_paths, out_dir=out_dir, df=subset_df)
        

def get_tfrecord_paths(input_dir: str, country_year: tuple) -> Iterable[str]:
    tfrecord_paths = glob(os.path.join(input_dir, country_year + '*'))
    tfrecord_paths = [p for p in tfrecord_paths if p.endswith('.gz')]
    #tfrecord_paths.sort(key=lambda tfr: int(tfr[tfr.rfind('_') + 1 : tfr.rfind('_') + 1 + 4])) # Sort by file index
    return tfrecord_paths


# def validate_and_split_tfrecords(
#         tfrecord_paths: Iterable[str],
#         out_dir: str,
#         df: pd.DataFrame
#         ) -> None:
#     '''Validates and splits a list of exported TFRecord files (for a
#     given country-year survey) into individual TFrecords, one per cluster.

#     "Validating" a TFRecord comprises of 2 parts
#     1) verifying that it contains the required bands
#     2) verifying that its other features match the values from the dataset CSV

#     Args
#     - tfrecord_paths: str, path to exported TFRecords files
#     - out_dir: str, path to dir to save processed individual TFRecords
#     - df: pd.DataFrame, index is sequential and starts at 0
#     '''
#     # Create an iterator over the TFRecords file. The iterator yields
#     # the binary representations of Example messages as strings.
#     options = tf.io.TFRecordOptions(tf.io.TFRecordCompressionType.GZIP)

  
#     i = 0

#     #progbar = tqdm(total=len(df))

#     for tfrecord_path in tfrecord_paths:
#         iterator = tf.io.tf_record_iterator(tfrecord_path, options=options)
#         for record_str in iterator:
#             # parse into an actual Example message
#             ex = tf.train.Example.FromString(record_str)
#             feature_map = ex.features.feature

#             # verify required bands exist
#             for band in REQUIRED_BANDS:
#                 #print(type(band))
#                 if band not in feature_map: print(f'Band "{band}" not in record {i} of {tfrecord_path}')

#             # compare feature map values against CSV values
#             '''
#             csv_feats = df.loc[i, :].to_dict()
#             for col, val in csv_feats.items():
#                 ft_type = feature_map[col].WhichOneof('kind')
#                 ex_val = feature_map[col].__getattribute__(ft_type).value[0]
#                 #assert val == ex_val, f'Expected {col}={val}, but found {ex_val} instead'
#             '''
#             # serialize to string and write to file
#             #out_path = os.path.join(out_dir, f'{i:05d}.tfrecord.gz')  # all surveys have < 1e6 clusters
#             out_path = os.path.join(out_dir, f'{i:05d}.tfrecord.gz')
#             with tf.io.TFRecordWriter(out_path, options=options) as writer:
#                 writer.write(ex.SerializeToString())

#             i += 1
           

def validate_and_split_tfrecords(
        tfrecord_paths: Iterable[str],
        out_dir: str,
        df: pd.DataFrame
        ) -> None:
    '''
    Validates and splits a list of exported TFRecord files (for a
    given country-year survey) into individual TFrecords, one per cluster.

    Only files with ALL REQUIRED_BANDS present will be saved to out_dir.
    "Validating" a TFRecord comprises of 2 parts
    1) verifying that it contains the required bands
    2) verifying that its other features match the values from the dataset CSV

    Args
    - tfrecord_paths: str, path to exported TFRecords files
    - out_dir: str, path to dir to save processed individual TFRecords
    - df: pd.DataFrame, index is sequential and starts at 0
    '''
    options = tf.io.TFRecordOptions(tf.io.TFRecordCompressionType.GZIP)
    i = 0
    valid_i = 0  # Counter for valid files

    bands_missing_paths=[]

    for tfrecord_path in tqdm(tfrecord_paths, desc="Checking TFRecords"):
        iterator = tf.io.tf_record_iterator(tfrecord_path, options=options)
        for record_str in iterator:
            ex = tf.train.Example.FromString(record_str)
            feature_map = ex.features.feature

            # Check all REQUIRED_BANDS exist in this record
            bands_missing = [band for band in REQUIRED_BANDS if band not in feature_map]
            if bands_missing:
                print(f"Skipping record {i} in {tfrecord_path}: missing bands {bands_missing}")
            else:
                # Save only if valid
                out_path = os.path.join(out_dir, f'{valid_i:05d}.tfrecord.gz')
                with tf.io.TFRecordWriter(out_path, options=options) as writer:
                    writer.write(ex.SerializeToString())
                valid_i += 1
            i += 1
            bands_missing_paths.append(bands_missing_paths)
    return bands_missing_paths

In [0]:
start_time = time.time()

#os.makedirs(processed_dir, exist_ok=True)

bands_missing_paths = process_dataset(
    csv_path=csv_path,
    input_dir=input_dir,
    processed_dir=processed_dir
)

print(bands_missing_paths)


end_time = time.time()
elapsed_time = end_time - start_time

print(f"Total time taken: {elapsed_time:.2f} seconds ({elapsed_time/60:.2f} minutes)")



Checking TFRecords:   0%|          | 0/62 [00:00<?, ?it/s]

Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Checking TFRecords:   0%|          | 0/53 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/2 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/228 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/316 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/308 [00:00<?, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords:   0%|          | 0/80 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/666 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/288 [00:00<?, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords:   0%|          | 0/392 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/541 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/248 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/31 [00:00<?, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords:   0%|          | 0/338 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/38 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/200 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/322 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/230 [00:00<?, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords:   0%|          | 0/464 [00:00<?, ?it/s]

Skipping record 68 in /dbfs/mnt/raw/DataScientistSDSData/TLSED_test/raw/cameroon_2004_152.tfrecord.gz: missing bands ['BLUE', 'GREEN', 'NIR', 'RED', 'SWIR1', 'SWIR2']
Skipping record 103 in /dbfs/mnt/raw/DataScientistSDSData/TLSED_test/raw/cameroon_2004_184.tfrecord.gz: missing bands ['BLUE', 'GREEN', 'NIR', 'RED', 'SWIR1', 'SWIR2']


Checking TFRecords:   0%|          | 0/577 [00:00<?, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords:   0%|          | 0/235 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/389 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/242 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/291 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/395 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/97 [00:00<?, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords: 0it [00:00, ?it/s]

Checking TFRecords:   0%|          | 0/106 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/101 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/219 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/174 [00:00<?, ?it/s]

Checking TFRecords:   0%|          | 0/400 [00:00<?, ?it/s]

None
Total time taken: 3723.77 seconds (62.06 minutes)
