# Firesmoke Data Conversion to IDX using OpenVisus

## Import necessary libraries, install them if you do not have them. This was developed in Python 3.9

In [None]:
# Used to read/manipulate netCDF data
import xarray as xr

# Used to convert to .idx
from OpenVisus import *

# Used for numerical work
import numpy as np

# Used for processing netCDF time data
import datetime

# Used for interacting with OS file system (to get directory file names)
import os

# To load/save final sequence array to file
import pickle

# Used for resampling arrays to fit the same lat/lon grid
from scipy.interpolate import griddata

# for plotting
import matplotlib.pyplot as plt
import cartopy.crs as ccrs

# for checking and using timestamps
import pandas as pd

# Accessory, used to generate progress bar for running for loops
# from tqdm.notebook import tqdm
# import ipywidgets
# import jupyterlab_widgets
from tqdm import tqdm

# For logging
import logging

In [None]:
# print for debugging
verbose = 0

## Get relevant directory paths

In [None]:
# ******* THIS IS WHEN RUNNING FROM LOCAL MACHINE (canada1) **************
# path to all original netCDF files from UBC
firesmoke_dir = "/opt/wired-data/firesmoke/final_union_set"

# path to save idx file and data
idx_dir = "/opt/wired-data/firesmoke/idx_parallel"

## Gather dimension sizes of all available NetCDF files (which potentially vary file to file) and files openable in xarray.

In [None]:
# Ordered list of all files that are available from UBC
successful_files = np.array([])

# Track all unique sizes of netCDF variables across all files
# creating a dict of variables for each file that has differing attributes
all_unique_dims = {}

# get list of NetCDF file names, in alphabetical (chronological) order
file_names = sorted(os.listdir(firesmoke_dir))

# all_unique_dims starts empty
all_unique_dims_empty = 1

# try opening each file, process only if it successfully opens
for file in tqdm(file_names):
    # get file's path
    path = f'{firesmoke_dir}/{file}'

    # keep track of which files successfully open
    try:
        # open the file with xarray
        ds = xr.open_dataset(path)

        # append file name to successful_files
        successful_files = np.append(successful_files,file)

        # populate all_unique_attr dict upon first successful file opening
        if all_unique_dims_empty:
            all_unique_dims[0] = [ds.sizes, ds.attrs]
            all_unique_dims_empty = 0
        
        # check if this file has unique attrs different from what we've already tracked
        need_new_key = 1
        for unique_key in all_unique_dims.keys():
            if ds.sizes == all_unique_dims[unique_key][0]:
                # we've already recorded this unique size
                need_new_key = 0
                continue 

        # add a new entry for new size
        if need_new_key:    
            new_key = len(all_unique_dims.keys())
            all_unique_dims[new_key] = [ds.sizes, ds.attrs]
        
    except:
        # netcdf file does not exist
        continue

# Sort list of successful files so they're in order of date
successful_files = np.sort(successful_files).tolist()

### Create resampling grids to resample values on smaller grid to larger grid during conversion to IDX.
IDX file format requires data at each timestep to be of consistent shape.

By visual inspection of `all_unique_dims` we see there is only 2 unique grid sizes used, we will refer to these as `max_grid` and `min_grid`.

In [None]:
# Set max_grid and min_grid to unwrapped and merged ds.sizes and ds.attr dicts
last_key = len(all_unique_dims.keys())-1
max_grid = {**dict(all_unique_dims[1][0]), **dict(all_unique_dims[1][1])}
min_grid = {**dict(all_unique_dims[last_key][0]), **dict(all_unique_dims[last_key][1])}

# swap if they're incorrectly set
if max_grid['ROW'] * max_grid['COL'] < min_grid['ROW'] * min_grid['COL']:
    tmp = min_grid
    min_grid = max_grid
    max_grid = tmp

# get arrays of bigger lat/lon grid
big_lon = np.linspace(max_grid['XORIG'], max_grid['XORIG'] + max_grid['XCELL'] * (max_grid['COL'] - 1), max_grid['COL'])
big_lat = np.linspace(max_grid['YORIG'], max_grid['YORIG'] + max_grid['YCELL'] * (max_grid['ROW'] - 1), max_grid['ROW'])

# get coordinates made of new lat/lon arrays
big_lon_pts, big_lat_pts = np.meshgrid(big_lon, big_lat)
big_tups = np.array([tup for tup in zip(big_lon_pts.flatten(), big_lat_pts.flatten())])

# get arrays of smaller lat/lon grid
sml_lon = np.linspace(min_grid['XORIG'], min_grid['XORIG'] + min_grid['XCELL'] * (min_grid['COL'] - 1), min_grid['COL'])
sml_lat = np.linspace(min_grid['YORIG'], min_grid['YORIG'] + min_grid['YCELL'] * (min_grid['ROW'] - 1), min_grid['ROW'])

# get coordinates made of small lat/lon arrays
sml_lon_pts, sml_lat_pts = np.meshgrid(sml_lon, sml_lat)
sml_tups = np.array([tup for tup in zip(sml_lon_pts.flatten(), sml_lat_pts.flatten())])

## Determine sequence of files to load later for IDX conversion

### First determine what hours are available in all datasets, from there we construct final sequence

In [None]:
# for parsing time flags (TFLAG) from netcdf files
# return tflag converted to a datetime object
# param [int, int] tflag: the TFLAG to parse and return as datetime object 
def parse_tflag(tflag):
    year = int(tflag[0] // 1000)
    day_of_year = int(tflag[0] % 1000)
    date = datetime.datetime(year, 1, 1) + datetime.timedelta(days=day_of_year - 1)

    time_in_day = int(tflag[1])
    hours = time_in_day // 10000
    minutes = (time_in_day % 10000) // 100
    seconds = time_in_day % 100

    full_datetime = datetime.datetime(year, date.month, date.day, hours, minutes, seconds)
    return full_datetime

In [None]:
# get set of all available hours from successful_files
# we use a dictionary so we can index by CDATE
available_dates = {np.int32(file.split('_')[1]): {} for file in successful_files}

rows = []

for file in tqdm(successful_files):
    # get file's path
    path = f'{firesmoke_dir}/{file}'
    
    # open the file with xarray
    ds = xr.open_dataset(path)

    # for each CDATE_CTIME, store their respective TFLAGs
    cdatetime = pd.to_datetime(f"{ds.CDATE}_{ds.CTIME:06d}", format='%Y%j_%H%M%S')
    tflags = ds['TFLAG'][:, 0, :].values

    # append new row of CDATETIMEs with their respective TFLAGs
    rows.append({
            'CDATETIME': cdatetime,
            'TFLAG': tflags
        })
available_dates_df = pd.DataFrame(rows)


## Select best NetCDF file and TSTEP[i] to represent each hour from `start_date` to `end_date` as indicated below.

In [None]:
def update_idx_calls(arr, cdatetime, tstep_idx):
    '''
    For the given array, append arguments specifying which dispersion.nc file to open
        and which TFLAG to use as found in dates_df
    :param list arr: array that holds final idx write sequence
    :param str cdatetime: datetime object of CDATE + CTIME of the dispersion file we will open
    :param datetime time_idx: The index of the TFLAG in TFLAG array we will select from dispersion file
    '''

    # get path of file we will use
    cdate_str = cdatetime.date().strftime("%Y%j")
    ctime_str = cdatetime.time().strftime("%H%M%S").zfill(6)
    file_str = f"dispersion_{cdate_str}_{ctime_str}.nc"
    path = f'{firesmoke_dir}/{file_str}'

    # open the file with xarray
    ds = xr.open_dataset(path)
    arr.append([file_str, parse_tflag(ds['TFLAG'].values[tstep_idx][0]), tstep_idx])
    return arr

In [None]:
# set up log of selected IDX files for debugging
logger = logging.getLogger(__name__)

# Set up logging
# ref: https://realpython.com/python-logging/
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[
        logging.FileHandler("/home/arleth/NSDF-WIRED/conversion/idx_calls_building_parallel.log"),
        logging.StreamHandler()
    ]
)

In [None]:
# verbose = 1
# Arrays to hold the final order we will index files
idx_calls = []

# Define the start and end dates we will step through
start_date = datetime.datetime.strptime("2021059", "%Y%j")
end_date = datetime.datetime.strptime("2025317", "%Y%j")

# iterate over each day
current_date = start_date
# iterate over each hour of the current day
current_hour = datetime.datetime(current_date.year, current_date.month, current_date.day)

# file to open
file_str = ''

while current_date <= end_date:    
    while current_hour < current_date + datetime.timedelta(days=1):
        # set search counter
        found = 0
        most_recent_row = None

        # select all files where data point for current_hour may be found
        # Select rows that have CDATETIME created at most 4 days before or at same time as current_hour
        # Only match on the date (ignore the hour, only compare YYYYMMDD)
        mask_is_valid_date = (
            (available_dates_df['CDATETIME'].dt.date >= (current_hour - datetime.timedelta(days=4)).date()) &
            (available_dates_df['CDATETIME'].dt.date <= (current_hour + datetime.timedelta(days=4)).date())
        )
        dt_mask = mask_is_valid_date

        # If such files exists, select the row with closest CDATETIME to current_hour
        if available_dates_df[dt_mask].shape[0] > 0:
            curr_row = len(available_dates_df[dt_mask]) - 1
            # ensure current hour actually exists between tflag_0 and tflag_last, otherwise, select the next most recent row
            while curr_row >= 0 and found == 0:
                most_recent_row = available_dates_df[dt_mask].iloc[curr_row]
                tflag_0 = parse_tflag(most_recent_row['TFLAG'][0])
                tflag_last = parse_tflag(most_recent_row['TFLAG'][-1])

                if tflag_0 <= current_hour <= tflag_last:
                    found = 1
                else:
                    curr_row -= 1

        # if we found a row, update idx_calls array to specify we will use:
        # TFLAG[tstep_idx] from the dispersion file named with CDATETIME to represent current_hour in our final IDX file
        if found:
            # the number of hours difference between 0th TFLAG and current_hour is tstep_idx
            tflag_0 = parse_tflag(most_recent_row['TFLAG'][0])
            tstep_idx = int((current_hour - tflag_0).total_seconds() / 3600)
            if verbose:
                print(f'Found data for current_hour {current_hour}: using CDATETIME={most_recent_row["CDATETIME"]}')
                print(f"tflag_0 = {tflag_0}, current_hour = {current_hour}, TFLAG[tstep_idx] = {parse_tflag(most_recent_row['TFLAG'][tstep_idx])}")
            
            logging.info(f"current_hour: {current_hour} -> CDATETIME={most_recent_row['CDATETIME']}, TFLAG[{tstep_idx}] = {most_recent_row['TFLAG'][tstep_idx]}")

            # update idx call array
            update_idx_calls(idx_calls, most_recent_row['CDATETIME'], tstep_idx)
        else:
            logging.info(f"current_hour: {current_hour} -> No available file found.")
            if verbose:
                print(f'WARNING: No available file found for current_hour {current_hour} (date: {current_date}). Skipping.')

        # move to next hour
        current_hour += datetime.timedelta(hours=1)

        if verbose:
            print('~~~~~')

    # move to the next day
    current_date += datetime.timedelta(days=1)

In [None]:
# # just to view in txt
# %%capture captured_output
# for c in idx_calls:
#     print(c)

# with open('idx_calls_v5.txt', 'w') as f:
#     f.write(captured_output.stdout)

In [None]:
# save idx_calls to pickle file
with open('idx_calls_v5_parallel.pkl', 'wb') as f:
    pickle.dump(idx_calls, f)

## Do conversion from netCDF files to IDX

In [None]:
# Create idx file of i'th dataset
# useful for dealing with fields that are not all the same size:
# https://github.com/sci-visus/OpenVisus/blob/master/Samples/jupyter/nasa_conversion_example.ipynb

import concurrent.futures

# create OpenVisus field for the pm25 variable
f = Field('PM25', 'float32')

# create the idx file for this dataset using field f
# dims is maximum array size, we will resample data accordingly to fit this
# time is number of files * 24 (hours)
db = CreateIdx(url=idx_dir + '/firesmoke.idx', fields=[f], 
               dims=[int(max_grid['COL']), int(max_grid['ROW'])], time=[0, len(idx_calls) - 1, '%00000000d/'])

# threshold to use to change small-enough resampled values to 0
thresh = 1e-15

# Function to process a single call; returns result for later writing
def process_call(args):
    call, firesmoke_dir, max_grid, sml_tups, big_tups, big_lat, big_lon, thresh = args
    file_name = call[0]
    timestamp = call[1]
    tstep_index = call[2]
    try:
        ds = xr.open_dataset(f'{firesmoke_dir}/{file_name}')
        file_vals = np.squeeze(ds['PM25'].values)
        resamp = ds.XORIG != max_grid['XORIG']
        if resamp:
            file_vals_resamp = griddata(sml_tups, file_vals[tstep_index].flatten(), big_tups, method='cubic', fill_value=0)
            file_vals_resamp[file_vals_resamp < thresh] = 0
            file_vals_resamp = file_vals_resamp.reshape((len(big_lat), len(big_lon)))
            result_data = file_vals_resamp.astype(np.float32)
        else:
            result_data = file_vals[tstep_index]
        return (timestamp, result_data, resamp)
    except Exception as e:
        print(f"Error processing {file_name}: {e}")
        return (timestamp, None, None)

# Using ThreadPoolExecutor or ProcessPoolExecutor as appropriate
# Writing to IDX must be sequential to keep timestep order
results = [None] * len(idx_calls)

with concurrent.futures.ThreadPoolExecutor() as executor:
    # Map futures to idx_call indices to preserve ordering for sequential write
    future_to_idx = {executor.submit(process_call, (call, firesmoke_dir, max_grid, sml_tups, big_tups, big_lat, big_lon, thresh)): i 
                     for i, call in enumerate(idx_calls)}
    for future in tqdm(concurrent.futures.as_completed(future_to_idx), total=len(idx_calls)):
        idx = future_to_idx[future]
        try:
            result = future.result()
            results[idx] = result # maintain correct time order
        except Exception as exc:
            print(f'Call at idx {idx} generated an exception: {exc}')

# Now, sequentially write outputs to db in order
for tstep, (timestamp, data, resamp) in enumerate(results):
    if data is None:
        print(f"Skipping tstep {tstep} due to error.")
        continue
    db.write(data=data, field=f, time=tstep)


In [None]:
# go to idx data directory
os.chdir(idx_dir)

In [None]:
# compress dataset
db.compressDataset(['zip'])