In [None]:
# Import packages
# Dataframe Packages
import numpy as np
import xarray as xr
import pandas as pd

# Vector Packages
import geopandas as gpd
import shapely
from shapely import wkt
from shapely.geometry import Point, Polygon
from pyproj import CRS, Transformer

# Raster Packages
import rioxarray as rxr
import rasterio
from rasterio.mask import mask
from rioxarray.merge import merge_arrays
import rasterstats as rs
import gdal
from gdal import gdalconst

# Data Access Packages
import earthaccess as ea
import h5py
import pickle
from tensorflow.keras.models import load_model
from pystac_client import Client
import richdem as rd
import planetary_computer
from planetary_computer import sign

# General Packages
import os
import re
import shutil
import math
from datetime import datetime
import glob
from pprint import pprint
from typing import Union
from pathlib import Path
from tqdm import tqdm
import time
import requests
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import dask
import dask.dataframe as dd
from dask.distributed import progress
from dask.distributed import Client
from dask.diagnostics import ProgressBar
from retrying import retry
import fiona
import re
import s3fs

In [None]:
import NSIDC_Data

class ASODataTool:
    def __init__(self, short_name, version, polygon='', filename_filter=''):
        self.short_name = short_name
        self.version = version
        self.polygon = polygon
        self.filename_filter = filename_filter
        self.url_list = []
        self.CMR_URL = 'https://cmr.earthdata.nasa.gov'
        self.CMR_PAGE_SIZE = 2000
        self.CMR_FILE_URL = ('{0}/search/granules.json?provider=NSIDC_ECS'
                             '&sort_key[]=start_date&sort_key[]=producer_granule_id'
                             '&scroll=true&page_size={1}'.format(self.CMR_URL, self.CMR_PAGE_SIZE))

    def cmr_search(self, time_start, time_end, bounding_box):
        try:
            if not self.url_list:
                self.url_list = NSIDC_Data.cmr_search(
                    self.short_name, self.version, time_start, time_end,
                    bounding_box=self.bounding_box, polygon=self.polygon,
                    filename_filter=self.filename_filter, quiet=False)
            return self.url_list
        except KeyboardInterrupt:
            quit()

    def cmr_download(self, directory):
        if not os.path.exists(directory):
            os.makedirs(directory, exist_ok=True)

        NSIDC_Data.cmr_download(self.url_list, directory, False)

    @staticmethod
    def get_bounding_box(region):
        regions = pd.read_pickle("C:\\Users\\VISH NU\\National_snow_model\\National-Snow-Model\\Data\\Processed\\RegionVal.pkl")
        superset = []

        superset.append(regions[region])
        superset = pd.concat(superset)
        superset = gpd.GeoDataFrame(superset, geometry=gpd.points_from_xy(superset.Long, superset.Lat, crs="EPSG:4326"))
        bounding_box = list(superset.total_bounds)

        return f"{bounding_box[0]},{bounding_box[1]},{bounding_box[2]},{bounding_box[3]}"

class ASODownload(ASODataTool):
    def __init__(self, short_name, version, polygon='', filename_filter=''):
        super().__init__(short_name, version, polygon, filename_filter)
        self.region_list =    [ 'N_Sierras',
                                'S_Sierras',
                                'Greater_Yellowstone',
                                'N_Co_Rockies',
                                'SW_Mont',
                                'SW_Co_Rockies',
                                'GBasin',
                                'N_Wasatch',
                                'N_Cascade',
                                'S_Wasatch',
                                'SW_Mtns',
                                'E_WA_N_Id_W_Mont',
                                'S_Wyoming',
                                'SE_Co_Rockies',
                                'Sawtooth',
                                'Ca_Coast',
                                'E_Or',
                                'N_Yellowstone',
                                'S_Cascade',
                                'Wa_Coast',
                                'Greater_Glacier',
                                'Or_Coast'  ]

    def select_region(self):
        print("Select a region by entering its index:")
        for i, region in enumerate(self.region_list, start=1):
            print(f"{i}. {region}")

        try:
            user_input = int(input("Enter the index of the region: "))
            if 1 <= user_input <= len(self.region_list):
                selected_region = self.region_list[user_input - 1]
                self.bounding_box = self.get_bounding_box(selected_region)
                print(f"You selected: {selected_region}")
                print(f"Bounding Box: {self.bounding_box}")
            else:
                print("Invalid index. Please select a valid index.")
        except ValueError:
            print("Invalid input. Please enter a valid index.")
            
if __name__ == "__main__":
    short_name = 'ASO_50M_SWE'
    version = '1'

    data_tool = ASODownload(short_name, version)
    time_start = '2013-04-02T00:00:00Z'
    time_end = '2019-07-19T23:59:59Z'
    
    selected_region = data_tool.select_region()  # Call select_region on the instance
    directory = "SWE_Data"

    print(f"Fetching file URLs in progress for {selected_region} from {time_start} to {time_end}")
    url_list = data_tool.cmr_search(time_start, time_end, data_tool.bounding_box)
    data_tool.cmr_download(directory)

In [None]:
class ASODataProcessing:
    @staticmethod
    def processing_tiff(input_file, output_res):
        try:
            date = os.path.splitext(input_file)[0].split("_")[-1]
            
            output_folder = os.path.join(os.getcwd(), "Processed_Data")
            os.makedirs(output_folder, exist_ok=True)
            output_file = os.path.join(output_folder, f"ASO_100M_{date}.tif")
    
            ds = gdal.Open(input_file)
            if ds is None:
                print(f"Failed to open '{input_file}'. Make sure the file is a valid GeoTIFF file.")
                return None
            
            # Reproject and resample
            gdal.Warp(output_file, ds, dstSRS="EPSG:4326", xRes=output_res, yRes=-output_res, resampleAlg="bilinear")
    
            # Read the processed TIFF file using rasterio
            rds = rxr.open_rasterio(output_file)
            rds = rds.squeeze().drop("spatial_ref").drop("band")
            rds.name = "data"
            df = rds.to_dataframe().reset_index()
            return df
    
        except Exception as e:
            print(f"An error occurred: {str(e)}")
            return None
        
    @staticmethod
    def convert_tiff_to_csv(output_res, config):

        curr_dir = config["local_dir"]
        folder_path = os.path.join(curr_dir, "SWE_Data")
        
        if not os.path.exists(folder_path) or not os.path.isdir(folder_path):
            print(f"The folder '{input_folder}' does not exist.")
            return
        
        if not os.listdir(folder_path):
            print(f"The folder '{input_folder}' is empty.")
            return
    
        tiff_files = [filename for filename in os.listdir(folder_path) if filename.endswith(".tif")]
    
        for tiff_filename in tiff_files:
            
            tiff_filepath = os.path.join(folder_path, tiff_filename)
            df = ASODataProcessing.processing_tiff(tiff_filepath, output_res)
    
            if df is not None:
  
                date = os.path.splitext(tiff_filename)[0].split("_")[-1]
    
                # Define the CSV filename and folder
                csv_filename = f"ASO_SWE_{date}.csv"
                csv_folder = os.path.join(curr_dir, "Processed_Data", "SWE_csv")
                os.makedirs(csv_folder, exist_ok=True)
                csv_filepath = os.path.join(csv_folder, csv_filename)
    
                df.to_csv(csv_filepath, index=False)
    
                print(f"Converted '{tiff_filename}' to '{csv_filename}'")
                

    def process_folder(self, config):

        pred_obs_metadata_df = check_and_fetch_meta(config)

    	def create_polygon(self, row):
        	return Polygon([(row['BL_Coord_Long'], row['BL_Coord_Lat']),
                        	(row['BR_Coord_Long'], row['BR_Coord_Lat']),
                        	(row['UR_Coord_Long'], row['UR_Coord_Lat']),
                        	(row['UL_Coord_Long'], row['UL_Coord_Lat'])])
    
        pred_obs_metadata_df = pred_obs_metadata_df.drop(columns=['Unnamed: 0'], axis=1)
        pred_obs_metadata_df['geometry'] = pred_obs_metadata_df.apply(create_polygon, axis=1)

        metadata = gpd.GeoDataFrame(pred_obs_metadata_df, geometry='geometry')
    
        # Drop coordinates columns to reduce complexity for further processing
        metadata_df = metadata.drop(columns=['BL_Coord_Long', 'BL_Coord_Lat', 
                                             'BR_Coord_Long', 'BR_Coord_Lat', 
                                             'UR_Coord_Long', 'UR_Coord_Lat', 
                                             'UL_Coord_Long', 'UL_Coord_Lat'], axis=1)
    	
    	input_folder = os.path.join(config["local_dir"], "Processed_Data", "swe_csv")     #created in the previous function
    	output_folder = os.path.join(config["local_dir"], "Processed_SWE")                #create a new folder to store files that has fetched cell ids for lat/longs
        csv_files = [f for f in os.listdir(input_folder) if f.endswith('.csv')]
    
        for csv_file in csv_files:
            input_aso_path = os.path.join(input_folder, csv_file)
            output_aso_path = os.path.join(output_folder, csv_file)
    
            if os.path.exists(output_aso_path):
                print(f"CSV file {csv_file} already exists in the output folder.")
                continue
    
            aso_swe_df = pd.read_csv(input_aso_path)
            geometry = [Point(xy) for xy in zip(aso_swe_df['x'], aso_swe_df['y'])]

            aso_swe_geo = gpd.GeoDataFrame(aso_swe_df, geometry=geometry)

            result = gpd.sjoin(aso_swe_geo, metadata_df, how='left', predicate='within', op = 'intersects')
    
            Final_df = result[['y', 'x', 'data', 'cell_id']]
            Final_df.rename(columns={'data': 'swe'}, inplace=True)
    
            # Drop rows where 'cell_id' is NaN
            if Final_df['cell_id'].isnull().values.any():
                Final_df = Final_df.dropna(subset=['cell_id'])
    
            Final_df.to_csv(output_aso_path, index=False)
            print(f"Processed {csv_file}")
        
def check_and_fetch_meta(config):

    file_path = config["grid_cells_meta"],
    s3_path = config["s3_url"],
    access_key = config["s3_access_key"],
    secret_key = config["s3_secret_key"]
	
    if os.path.exists(file_path):
        print("File found locally")
	df = pd.read_csv(file_path)
        return df
    else:
        print("File not found locally, fetching from S3")

        session = boto3.Session(
            aws_access_key_id = access_key,
            aws_secret_access_key = secret_key,
        )

        s3 = session.client('s3')
        local_file_name = os.path.basename(s3_path)
        local_file_path = os.path.join(config["local_dir"], local_file_name)
        
        try:
            s3.get_object(Bucket = s3_path.split('/')[2], Key='/'.join(s3_path.split('/')[3:]))
            csv_string = response['Body'].read().decode('utf-8')
            df = pd.read_csv(StringIO(csv_string))
            return df
        except Exception as e:
            print(f"Error reading from S3: {e}")
            return None


def load_config(config_path):
    try:
        with open(config_path, 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        logging.error(f"Configuration file not found: {file_path}")
        return None

def main():
    """
    Main function to process data by processing TIFF and converting to CSV.
    """
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    
    config_path = os.cwd() + '/config.json'  #test
    config = load_config(config_path)
    if config is None:
        logging.error("No configuration to proceed. Exiting.")
        return
    
    data_processor = ASODataProcessing()
    output_res = 0.001

    data_processor.convert_tiff_to_csv(output_res, config)
    data_processor.process_folder(config)

if __name__ == "__main__":
    main()

In [None]:
def load_aso_snotel_geometry(aso_swe_file, folder_path):
    
    aso_file = pd.read_csv(os.path.join(folder_path, aso_swe_file))
    aso_file.set_index('cell_id', inplace=True)
    aso_geometry = [Point(xy) for xy in zip(aso_file['x'], aso_file['y'])]
    aso_gdf = gpd.GeoDataFrame(aso_file, geometry=aso_geometry)
    
    return aso_gdf

def haversine_vectorized(lat1, lon1, lat2, lon2):
    
    lon1 = np.radians(lon1)
    lon2 = np.radians(lon2)
    lat1 = np.radians(lat1)
    lat2 = np.radians(lat2)

    # Haversine formula
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
    c = 2 * np.arcsin(np.sqrt(a))
    
    r = 6371.0
    # Distance calculation
    distances = r * c

    return distances

def calculate_nearest_snotel(aso_gdf, snotel_gdf, n=6, distance_cache=None):

    if distance_cache is None:
        distance_cache = {}

    nearest_snotel = {}
    for idx, aso_row in aso_gdf.iterrows():
        cell_id = idx

        if cell_id in distance_cache:
            nearest_snotel[idx] = distance_cache[cell_id]
        else:
            distances = haversine_vectorized(
                aso_row.geometry.y, aso_row.geometry.x,
                snotel_gdf.geometry.y.values, snotel_gdf.geometry.x.values)

            nearest_snotel[idx] = list(snotel_gdf['station_id'].iloc[distances.argsort()[:n]])
            distance_cache[cell_id] = nearest_snotel[idx]

    return nearest_snotel, distance_cache

def calculate_distances_for_cell(aso_row, snotel_gdf, n=6):
   
    distances = haversine_vectorized(
        aso_row.geometry.y, aso_row.geometry.x,
        snotel_gdf.geometry.y.values, snotel_gdf.geometry.x.values)
    
    nearest_sites = list(snotel_gdf['station_id'].iloc[distances.argsort()[:n]])
    
    return nearest_sites

def calculate_nearest_snotel_parallel(aso_gdf, snotel_gdf, n = 6, distance_cache = None):
    
    if distance_cache is None:
        distance_cache = {}

    nearest_snotel = {}
    with ProcessPoolExecutor(max_workers = 16) as executor:
        futures = []
        
        for idx, aso_row in aso_gdf.iterrows():
            if idx not in distance_cache:
                # Submit the task for parallel execution
                futures.append(executor.submit(calculate_distances_for_cell, aso_row, snotel_gdf, n))
            else:
                nearest_snotel[idx] = distance_cache[idx]

        # Retrieve results as they are completed
        for future in tqdm(futures):
            result = future.result()
  
            cell_id = result[0]  
            nearest_snotel[cell_id] = result[1]
            distance_cache[cell_id] = result[1]

    return nearest_snotel, distance_cache

def fetch_snotel_sites_for_cellids(config):
    """
	Might take longer to run depends on the size of the dataset. 
    """
    
    metadata_df = check_and_fetch_meta(config)
    #metadata_df['geometry'] = metadata_df['geometry'].apply(wkt.loads)
    
    def create_polygon(row):
        return Polygon([(row['BL_Coord_Long'], row['BL_Coord_Lat']),
                        (row['BR_Coord_Long'], row['BR_Coord_Lat']),
                        (row['UR_Coord_Long'], row['UR_Coord_Lat']),
                        (row['UL_Coord_Long'], row['UL_Coord_Lat'])])
        
    metadata_df = metadata_df.drop(columns=['Unnamed: 0'], axis=1)
    metadata_df['geometry'] = metadata_df.apply(create_polygon, axis=1)
    metadata = gpd.GeoDataFrame(metadata_df, geometry='geometry')

    snotel_data = pd.read_csv(config['snotel_obs_path'])

    date_columns = snotel_data.columns[1:]
    new_column_names = {col: pd.to_datetime(col, format='%Y-%m-%d').strftime('%Y%m%d') for col in date_columns}
    snotel_data_f = snotel_data.rename(columns=new_column_names)

    snotel_file = pd.read_csv(config['snotel_ground_obs'])
    snotel_geometry = [Point(xy) for xy in zip(snotel_file['longitude'], snotel_file['latitude'])]
    snotel_gdf = gpd.GeoDataFrame(snotel_file, geometry=snotel_geometry)

    final_df = pd.DataFrame()

    folder_path = os.path.join(config['local_dir'], 'Processed_SWE')
    for aso_swe_file in os.listdir(folder_path):

        if os.path.isdir(os.path.join(folder_path, aso_swe_file)):
            continue

        timestamp = aso_swe_file.split('_')[-1].split('.')[0]
        print(f"Processing file with timestamp: {timestamp}")

        aso_gdf = load_aso_snotel_geometry(aso_swe_file, aso_swe_files_folder_path)
        aso_swe_data = pd.read_csv(os.path.join(aso_swe_files_folder_path, aso_swe_file))

        # Calculating nearest SNOTEL sites
        nearest_snotel, distance_cache = calculate_nearest_snotel(aso_gdf, snotel_gdf, n=6)
        print(f"calculated nearest snotel for file with timestamp {timestamp}")

        transposed_data = {}

        if timestamp in new_column_names.values():
            for idx, aso_row in aso_gdf.iterrows():    
                cell_id = idx
                station_ids = nearest_snotel[cell_id]
                selected_snotel_data = snotel_data_f[['station_id', timestamp]].loc[snotel_data_f['station_id'].isin(station_ids)]
                station_mapping = {old_id: f"nearest site {i+1}" for i, old_id in enumerate(station_ids)}
                
                # Rename the station IDs in the selected SNOTEL data
                selected_snotel_data['station_id'] = selected_snotel_data['station_id'].map(station_mapping)

                # Transpose and set the index correctly
                transposed_data[cell_id] = selected_snotel_data.set_index('station_id').T

            transposed_df = pd.concat(transposed_data, axis=0)
            
            # Reset index and rename columns
            transposed_df = transposed_df.reset_index()
            transposed_df.rename(columns={'level_0': 'cell_id', 'level_1': 'Date'}, inplace = True)
            transposed_df['Date'] = pd.to_datetime(transposed_df['Date'])
        
            aso_swe_data['Date'] = pd.to_datetime(timestamp)
            aso_swe_data = aso_swe_data[['cell_id', 'Date', 'swe']]
            merged_df = pd.merge(aso_swe_data, transposed_df, how='left', on=['cell_id', 'Date'])
        
            final_df = pd.concat([final_df, merged_df], ignore_index=True)
        
        else:
            aso_swe_data['Date'] = pd.to_datetime(timestamp)
            aso_swe_data = aso_swe_data[['cell_id', 'Date', 'swe']]
    
            # No need to merge in this case, directly concatenate
            final_df = pd.concat([final_df, aso_swe_data], ignore_index=True)

    req_cols = ['cell_id', 'lat', 'lon', 'BR_Coord_Long', 'BR_Coord_Lat', 'UR_Coord_Long', 'UR_Coord_Lat',
                'UL_Coord_Long', 'UL_Coord_Lat', 'BL_Coord_Long', 'BL_Coord_Lat', 'geometry']
    Result = final_df.merge(metadata[req_cols], how='left', on='cell_id')

    Result.rename(columns={'swe': 'ASO_SWE_in'}, inplace=True)
    Result = Result[['cell_id', 'Date', 'ASO_SWE_in', 'lat', 'lon', 'nearest site 1', 'nearest site 2',
                     'nearest site 3', 'nearest site 4', 'nearest site 5', 'nearest site 6',
                     'BR_Coord_Long', 'BR_Coord_Lat', 'UR_Coord_Long', 'UR_Coord_Lat',
                     'UL_Coord_Long', 'UL_Coord_Lat', 'BL_Coord_Long', 'BL_Coord_Lat']]

    output_filename = os.path.join(config['local_dir'], "Provided_Data", "grid_cells_meta.csv")
    Result.to_csv(output_filename, index=False)
    print("Processed and saved data")
    
def main():
    """
    Main function to process data by processing TIFF and converting to CSV.
    """
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    
    config_path = os.path.join(os.getcwd(), 'config.json')
    config = load_config(config_path)
    if config is None:
        logging.error("No configuration to proceed. Exiting.")
        return

    fetch_snotel_sites_for_cellids(config)
    
if __name__ == "__main__":
    main()

In [None]:
def upload_file_to_s3(input_file, s3_url, access_key=None, secret_key=None):

    path = s3_url.split('/')
    bucket_name = path[2]
    object_name = '/'.join(path[3:]) 

    session = boto3.Session(
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_key,
    )
    
    s3_client = session.client('s3')

    try:
        s3_client.upload_file(input_file, bucket_name, object_name)
        print(f"File {input_file} uploaded to {bucket_name}/{object_name}")
    except FileNotFoundError:
        print(f"Error: The file {input_file} was not found")
    except NoCredentialsError:
        print("Error: Credentials not available")
    except Exception as e:
        print(f"Unexpected error: {e}")

access_key = config['AWS_access_key']
secret_key = config['AWS_secret_key']

input_file = r"/home/vgindi/Provided_Data/grid_cells_meta.csv"
s3_url = "s3://national-snow-model/NSMv2.0/data/TrainingDFs/grid_cells_meta.csv"
upload_file_to_s3(input_file, s3_url, access_key, secret_key)

In [None]:
"""
A Simple implementation of parallel processing using concurrency it takes so long to execute,
Explore terrain_daskconcurrency and terrain-processing_cluster python for more optimized implementations.
"""

def process_single_location(args):
    lat, lon, regions, tiles = args

    if (lat, lon) in elevation_cache:
        elev, slop, asp = elevation_cache[(lat, lon)]
        return elev, slop, asp

    tile_id = 'Copernicus_DSM_COG_30_N' + str(math.floor(lon)) + '_00_W' + str(math.ceil(abs(lat))) + '_00_DEM'
    index_id = regions.loc[tile_id]['sliceID']

    signed_asset = planetary_computer.sign(tiles[index_id].assets["data"])

    elevation = rxr.open_rasterio(signed_asset.href)
    
    slope = elevation.copy()
    aspect = elevation.copy()

    transformer = Transformer.from_crs("EPSG:4326", elevation.rio.crs, always_xy=True)
    xx, yy = transformer.transform(lon, lat)

    tilearray = np.around(elevation.values[0]).astype(int)

    geo = (math.floor(float(lon)), 90, 0.0, math.ceil(float(lat)), 0.0, -90)

    no_data_value = -9999
    driver = gdal.GetDriverByName('MEM')
    temp_ds = driver.Create('', tilearray.shape[1], tilearray.shape[0], 1, gdalconst.GDT_Float32)

    temp_ds.GetRasterBand(1).WriteArray(tilearray)
    temp_ds.GetRasterBand(1).SetNoDataValue(no_data_value)
    temp_ds.SetProjection('EPSG:4326')
    temp_ds.SetGeoTransform(geo)

    tilearray_np = temp_ds.GetRasterBand(1).ReadAsArray()
    slope_arr, aspect_arr = np.gradient(tilearray_np)
    aspect_arr = np.rad2deg(np.arctan2(aspect_arr[0], aspect_arr[1]))
    
    slope.values[0] = slope_arr
    aspect.values[0] = aspect_arr

    elev = round(elevation.sel(x=xx, y=yy, method="nearest").values[0])
    slop = round(slope.sel(x=xx, y=yy, method="nearest").values[0])
    asp = round(aspect.sel(x=xx, y=yy, method="nearest").values[0])

    elevation_cache[(lat, lon)] = (elev, slop, asp)  
    return elev, slop, asp

def extract_terrain_data_threaded(metadata_df, bounding_box, max_workers=10):
    global elevation_cache 

    elevation_cache = {} 
    min_x, min_y, max_x, max_y = *bounding_box[0], *bounding_box[1]
    
    client = Client.open(
            "https://planetarycomputer.microsoft.com/api/stac/v1",
            ignore_conformance=True,
        )

    search = client.search(
                    collections=["cop-dem-glo-90"],
                    intersects = {
                            "type": "Polygon",
                            "coordinates": [[
                            [min_x, min_y],
                            [max_x, min_y],
                            [max_x, max_y],
                            [min_x, max_y],
                            [min_x, min_y]  
                        ]]})

    tiles = list(search.items())

    regions = []

    print("Retrieving Copernicus 90m DEM tiles")
    for i in tqdm(range(0, len(tiles))):
        row = [i, tiles[i].id]
        regions.append(row)
    regions = pd.DataFrame(columns = ['sliceID', 'tileID'], data = regions)
    regions = regions.set_index(regions['tileID'])
    del regions['tileID']

    print("Interpolating Grid Cell Spatial Features")

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(process_single_location, (metadata_df.iloc[i]['cen_lat'], metadata_df.iloc[i]['cen_lon'], regions, tiles))
                   for i in tqdm(range(len(metadata_df)))]
        
        results = []
        for future in tqdm(as_completed(futures), total=len(futures)):
            results.append(future.result())
    
    metadata_df['Elevation_m'], metadata_df['Slope_Deg'], metadata_df['Aspect_L'] = zip(*results)

metadata_df = pd.read_csv(r"/home/vgindi/Provided_Data/grid_cells_meta.csv")
metadata_df= metadata_df.head(20)
bounding_box = ((-120.3763448720203, 36.29256774541929), (-118.292253412863, 38.994985247736324))    
    
extract_terrain_data_threaded(metadata_df, bounding_box)

# Display the results
metadata_df.head(10)

In [None]:

#This code block crops the global coverage VIIRS data to south sierras subregion. 

from rasterio.errors import WindowError 

def crop_sierras(input_file_path, output_file_path, shapes):
    try:
        with rasterio.open(input_file_path) as src:
            out_image, out_transform = rasterio.mask.mask(src, shapes, crop=True)
            out_meta = src.meta
            out_meta.update({
                "driver": "GTiff",
                "height": out_image.shape[1],
                "width": out_image.shape[2],
                "transform": out_transform
            })
            with rasterio.open(output_file_path, "w", **out_meta) as dest:
                dest.write(out_image)
    except (WindowError, ValueError) as e:
        print(f"Error processing {input_file_path}: {e}")

def download_viirs_sca(config):
    
    # Load shapes from the shapefile
    with fiona.open(config['low_sierras_shapefile'], 'r') as shapefile:
        shapes = [feature["geometry"] for feature in shapefile]

    input_dir = os.path.join(config['local_dir'], "VIIRS_Data")

    for year_folder in os.listdir(input_dir):
        year_folder_path = os.path.join(input_dir, year_folder)
        
        if os.path.isdir(year_folder_path):   
            year = re.search(r'\d{4}', year_folder).group()
            output_year_folder = os.path.join(config['local_dir'], "VIIRS_Sierras", year)
            os.makedirs(output_year_folder, exist_ok=True)
        
            for file_name in os.listdir(year_folder_path):
                if file_name.endswith('.tif'):
                    if 'h08v05' in file_name:   
                        parts = file_name.split('_')
                        output_file_name = '_'.join(parts[:3]) + '.tif'
                        output_file_path = os.path.join(output_year_folder, output_file_name)
                        input_file_path = os.path.join(year_folder_path, file_name)
                        if not os.path.exists(output_file_path):
                            crop_sierras(input_file_path, output_file_path, shapes)
                            print(f"Processed and saved {output_file_path}")
                        else:
                            print(f"Skipping {output_file_name} as it already exists.")

In [None]:

#This code cell transforms the raw VIIRS tiff files to 100m resolution and saves each file in .csv format

def processing_VIIRS(input_file, output_res, config):
    try:
        output_folder_tiff = os.path.join(config['local_dir'], "Processed_VIIRS", os.path.basename(os.path.dirname(input_file)))
        os.makedirs(output_folder_tiff, exist_ok=True)
        output_file = os.path.join(output_folder_tiff, os.path.basename(input_file))

        # Reproject and resample
        ds = gdal.Open(input_file)
        if ds is None:
            print(f"Failed to open '{input_file}'. Make sure the file is a valid GeoTIFF file.")
            return None
        
        gdal.Warp(output_file, ds, dstSRS="EPSG:4326", xRes=output_res, yRes=-output_res, resampleAlg="bilinear")

        # Read the processed TIFF file using rasterio
        rds = rxr.open_rasterio(output_file)
        rds = rds.squeeze().drop("spatial_ref").drop("band")
        rds.name = "data"
        df = rds.to_dataframe().reset_index()
        return df
        
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        return None

def process_and_convert_viirs(output_res, config):

    input_dir = os.path.join(config['loacl_dir'], "VIIRS_Sierras")
    for year in os.listdir(input_dir):
        year_dir = os.path.join(input_dir, year)
        
        if os.path.isdir(year_dir):
            for file_name in os.listdir(year_dir):
                if file_name.endswith('.tif'):
                    input_file_path = os.path.join(year_dir, file_name)
                    df = processing_VIIRS(input_file_path, output_res, config)
                    
                    if df is not None:
                        csv_folder = os.path.join(config['local_dir'], "Processed_VIIRS", "VIIRS_csv")
                        os.makedirs(csv_folder, exist_ok=True)
                        csv_file_path = os.path.join(csv_folder, file_name.replace('.tif', '.csv'))
 
                        df.to_csv(csv_file_path, index=False)
                        print(f"Processed and saved {csv_file_path}")


def main():
    """
    Main function to process data by processing TIFF and converting to CSV.
    """
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    
    config_path = os.path.join(os.getcwd(), 'config.json') 
    config = load_config(config_path)
    if config is None:
        logging.error("No configuration to proceed. Exiting.")
        return
        
	download_viirs_sca(config)
    output_res = 0.001 # approximate value in meters, actual value is 0.0009 
    process_and_convert_viirs(output_res, config)

if __name__ == "__main__":
    main()

In [None]:
"""
This code cell fetches the cell id using grid_cells_meta_idx metadata for each lat/lon pair for VIIRS csv file
"""
def create_polygon(row):
    return Polygon([(row['BL_Coord_Long'], row['BL_Coord_Lat']),
                    (row['BR_Coord_Long'], row['BR_Coord_Lat']),
                    (row['UR_Coord_Long'], row['UR_Coord_Lat']),
                    (row['UL_Coord_Long'], row['UL_Coord_Lat'])])
    
def process_folder(config):

    pred_obs_metadata_df = check_and_fetch_meta(config)

    pred_obs_metadata_df = pred_obs_metadata_df.drop(columns=['Unnamed: 0'], axis=1)
    pred_obs_metadata_df['geometry'] = pred_obs_metadata_df.apply(create_polygon, axis=1)

    metadata = gpd.GeoDataFrame(pred_obs_metadata_df, geometry='geometry')

    metadata = metadata.drop(columns=['BL_Coord_Long', 'BL_Coord_Lat', 
                                    'BR_Coord_Long', 'BR_Coord_Lat', 
                                    'UR_Coord_Long', 'UR_Coord_Lat', 
                                    'UL_Coord_Long', 'UL_Coord_Lat', 'index_id', 'lat', 'lon'], axis=1)
	
    input_folder = os.path.join(config['local_dir'], "Processed_VIIRS", "VIIRS_csv")
    csv_files = [f for f in os.listdir(input_folder) if f.endswith('.csv')]
	
    output_folder = os.path.join(config['local_dir'], "VIIRS_cellids")

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for csv_file in csv_files:
        input_path = os.path.join(input_folder, csv_file)          
        output_path = os.path.join(output_folder, csv_file)

        if os.path.exists(output_path):
            print(f"CSV file {csv_file} already exists in the output folder.")
            continue

        viirs_sca_df = pd.read_csv(input_path)

        geometry = [Point(xy) for xy in zip(viirs_sca_df['x'], viirs_sca_df['y'])]
        viirs_sca_geo = gpd.GeoDataFrame(viirs_sca_df, geometry=geometry)
        result = gpd.sjoin(viirs_sca_geo, metadata, how='left', predicate='within', op = 'intersects')

        # Select specific columns for the final DataFrame
        Final_df = result[['y', 'x', 'data', 'cell_id']]
        Final_df.rename(columns={'data': 'VIIRS_SCA'}, inplace=True)

        if Final_df['cell_id'].isnull().values.any():
            Final_df = Final_df.dropna(subset=['cell_id'])

        Final_df.to_csv(output_path, index=False)
        print(f"Processed {csv_file}")
        
def main():
    """
    Main function to process data by processing TIFF and converting to CSV.
    """
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    
    config_path = os.path.join(os.getcwd(), 'config.json') 
    config = load_config(config_path)
    if config is None:
        logging.error("No configuration to proceed. Exiting.")
        return
    process_folder(config)

if __name__ == "__main__":
    main()