# Data Processing script for the NSM/SWEML v2.0
This .ipynb script uses python module for retrieving NASA ASO observations, locating nearest SNOTEL sites, connecting SNOTEL obs with ASO obs, and add geospatial features to the ML training/testing/hindcast dataframes.

In [None]:
from ASOget import ASODownload, ASODataProcessing

# Inputs for fetching ASO data for a region
short_name = 'ASO_50M_SWE'
version = '1'
time_start = '2013-04-02T00:00:00Z'
time_end = '2019-07-19T23:59:59Z'
region = 'S_Sierras'
folder_name = f"SWE_Data/{region}"
output_res = 100 #desired spatial resoultion in meters (m)
directory = "SWE_Data"

#Get ASO data
data_tool = ASODownload(short_name, version)
selected_region = data_tool.select_region(region)  # Call select_region on the instance, S Sierras is #2. There may be a need to modify this in order to cover CONUS, create region folders...

print(f"Fetching file URLs in progress for {selected_region} from {time_start} to {time_end}")
url_list = data_tool.cmr_search(time_start, time_end, data_tool.bounding_box)
data_tool.cmr_download(directory, region)

print('Converting .tif to csv')
data_processor = ASODataProcessing()
data_processor.convert_tiff_to_csv(folder_name, output_res, region) #Takes ~3 mins, can it be multithreaded?

#Applying polygon geometries, please be patient, this step can take a few minutes...Converting to GeoDataFrame
input_folder = f"ASO/{output_res}M_SWE_csv/{region}"
metadata_file = f"grid_cells_meta.csv"
output_folder = f"Processed_SWE/{region}"
data_processor.process_folder(input_folder, metadata_file, output_folder) #Takes ~20 minutes. Can this be multithreaded?

# Code for generating ML dataframe using nearest in situ monitoring sites

In [None]:
import GeoDF

# GeoDF used to create a dataframe for ML model development. Its function is to connect in situ observations to gridded locations
region = 'S_Sierras' #Should be done in above code block
output_res = 100

#load snotel meta location data, use haversive function
GeoDF.fetch_snotel_sites_for_cellids(region) # Using known up to date sites, can this be threaded?

# Get geophysical attributes for each site, need to see how to add output resolution
gdf = GeoDF.GeoSpatial(region)
gdf = gdf.head(100)
#use geodataframe with lat/long meta of all sites to determine slope, aspect, and elevation
metadf = GeoDF.extract_terrain_data_threaded(gdf, region)




In [1]:
import Obs_to_DF
region = "S_Sierras"
output_res = 100

#Connect nearest snotel observations with ASO data,
finaldf = Obs_to_DF.Nearest_Snotel_2_obs_MultiProcess(region, output_res) 

Connecting site observations with nearest monitoring network obs
Loading observations from 2013-2019
Loading goeospatial meta data for grids in S_Sierras
Loading 100M resolution grids for S_Sierras region
Processing datetime component of SNOTEL observation dataframe
Loading all available processed ASO observations for the S_Sierras at 100M resolution
Connecting 2 timesteps of observations for S_Sierras


100%|██████████| 2/2 [00:00<00:00,  3.14it/s]


Job complete for connecting SNOTEL obs to sites/dates, processing into dataframe


100%|██████████| 120433/120433 [03:58<00:00, 504.74it/s]
100%|██████████| 120433/120433 [03:58<00:00, 504.46it/s]
2it [04:30, 135.44s/it]


Connecting dataframe with geospatial features...


In [2]:
finaldf

Unnamed: 0,cell_id,Date,cen_lat,cen_lon,geometry,Elevation_m,Slope_Deg,Aspect_Deg,swe,nearest_site_1,nearest_site_2,nearest_site_3,nearest_site_4,nearest_site_5,nearest_site_6
0,11N_cell_-119.59073383567106_38.18624284828164,2013-04-03,38.185854,-119.590255,POINT (-119.5902546710551 38.18585423359679),3167.0,0.0,180.0,0.972643,44.52,49.00,0.0,0.1,18.90,18.80000
1,11N_cell_-119.58959364631137_38.186209698720205,2013-04-03,38.185854,-119.589255,POINT (-119.58925467105512 38.18585423359679),3168.0,2.0,18.0,0.536205,44.52,49.00,0.0,0.1,18.90,18.80000
2,11N_cell_-119.59309813700962_38.184509449472536,2013-04-03,38.184854,-119.593255,POINT (-119.5932546710551 38.1848542335968),3156.0,5.0,342.0,0.719424,44.52,49.00,0.0,0.1,18.90,18.80000
3,11N_cell_-119.59195797185825_38.18447632413384,2013-04-03,38.184854,-119.592255,POINT (-119.59225467105512 38.1848542335968),3158.0,5.0,34.0,0.618731,44.52,49.00,0.0,0.1,18.90,18.80000
4,11N_cell_-119.59081780857791_38.184443187748315,2013-04-03,38.184854,-119.591255,POINT (-119.59125467105513 38.1848542335968),3165.0,8.0,32.0,0.471944,44.52,49.00,0.0,0.1,18.90,18.80000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
240861,11N_cell_-119.27205242391986_37.7395618848736,2013-04-29,,,,,,,0.000000,13.98,3.47,0.0,8.5,5.72,5.82974
240862,11N_cell_-119.27091951465975_37.73952599896746,2013-04-29,,,,,,,0.101353,13.98,3.47,0.0,8.5,5.72,5.82974
240863,11N_cell_-119.26978660741051_37.739490102201515,2013-04-29,,,,,,,1.565556,13.98,3.47,0.0,8.5,5.72,5.82974
240864,11N_cell_-119.26978660741051_37.739490102201515,2013-04-29,,,,,,,1.307413,13.98,3.47,0.0,8.5,5.72,5.82974


In [None]:
from tqdm import tqdm
import os
HOME = os.path.expanduser('~')

#ASO observations
aso_swe_files_folder_path = f"{HOME}/SWEMLv2.0/data/Processed_SWE/{region}"

aso_swe_files = []
for aso_swe_file in tqdm(os.listdir(aso_swe_files_folder_path)):  #add file names to aso_swe_files
    aso_swe_files.append(aso_swe_file)

In [None]:
aso_swe_files

In [None]:
#Get Geospatial meta data
print(f"Loading goeospatial meta data for grids in {region}")
geodf_path = f"{HOME}/SWEMLv2.0/data/TrainingDFs/{region}"

aso_gdf = pd.read_csv(f"{geodf_path}/{region}_metadata.parquet")
aso_gdf

df = pd.merge(finaldf, aso_gdf, on = 'cell_id', how = 'left')
cols = [
    'cell_id', 'Date',  'cen_lat', 'cen_lon', 'geometry', 'Elevation_m', 'Slope_Deg',
       'Aspect_Deg', 'swe', 'nearest_site 1', 'nearest site 2', 'nearest site 3', 'nearest site 4', 
       'nearest site 5', 'nearest site 6'
      ]
df = df[cols]
df.head()


In [None]:
df.columns


In [None]:

df = pd.merge(finaldf, aso_gdf, on = 'cell_id', how = 'left')
df.head()

In [None]:
# Merge with metadata
req_cols = ['cell_id', 'lat', 'lon', 'BR_Coord_Long', 'BR_Coord_Lat', 'UR_Coord_Long', 'UR_Coord_Lat',
            'UL_Coord_Long', 'UL_Coord_Lat', 'BL_Coord_Long', 'BL_Coord_Lat', 'geometry']
Result = final_df.merge(metadata[req_cols], how='left', on='cell_id')

# Column renaming and ordering
Result.rename(columns={'swe': 'ASO_SWE_in'}, inplace=True)
Result = Result[['cell_id', 'Date', 'ASO_SWE_in', 'lat', 'lon', 'nearest site 1', 'nearest site 2',
                    'nearest site 3', 'nearest site 4', 'nearest site 5', 'nearest site 6',
                    'BR_Coord_Long', 'BR_Coord_Lat', 'UR_Coord_Long', 'UR_Coord_Lat',
                    'UL_Coord_Long', 'UL_Coord_Lat', 'BL_Coord_Long', 'BL_Coord_Lat']]

# Save the merged data to a new file
output_filename = f"{HOME}/SWEML/data/NSMv2.0/data/TrainingDFs/Merged_aso_snotel_data.parquet"
Result.to_csv(output_filename, index=False)
display(Result.head(10))
print("Processed and saved data")

In [None]:
region = 'S_Sierras'
ASO_meta_loc_DF = pd.read_csv(f"{HOME}/SWEMLv2.0/data/TrainingDFs/{region}/ASO_meta.parquet")

In [None]:
#Connect nearest snotel with ASO data, this should be last for now, need to add geophysical characteristics to the site first, then this...
finaldf = GeoDF.Nearest_Snotel_2_obs(region, output_res, dropna = True) 

In [None]:
"""
A Simple implementation of parallel processing using concurrency it takes so long to execute,
Explore terrain_daskconcurrency and terrain-processing_cluster python for more optimized implementations.
"""

def process_single_location(args):
    lat, lon, regions, tiles = args
    print(lat, lon, regions, tiles)

    if (lat, lon) in elevation_cache:
        elev, slop, asp = elevation_cache[(lat, lon)]
        return elev, slop, asp

    tile_id = 'Copernicus_DSM_COG_30_N' + str(math.floor(lon)) + '_00_W' + str(math.ceil(abs(lat))) + '_00_DEM'
    index_id = regions.loc[tile_id]['sliceID']

    signed_asset = planetary_computer.sign(tiles[index_id].assets["data"])
    #print(signed_asset)
    elevation = rxr.open_rasterio(signed_asset.href)
    
    slope = elevation.copy()
    aspect = elevation.copy()

    transformer = Transformer.from_crs("EPSG:4326", elevation.rio.crs, always_xy=True)
    xx, yy = transformer.transform(lon, lat)

    tilearray = np.around(elevation.values[0]).astype(int)
    #print(tilearray)
    geo = (math.floor(float(lon)), 90, 0.0, math.ceil(float(lat)), 0.0, -90)

    no_data_value = -9999
    driver = gdal.GetDriverByName('MEM')
    temp_ds = driver.Create('', tilearray.shape[1], tilearray.shape[0], 1, gdalconst.GDT_Float32)

    temp_ds.GetRasterBand(1).WriteArray(tilearray)
    temp_ds.GetRasterBand(1).SetNoDataValue(no_data_value)
    temp_ds.SetProjection('EPSG:4326')
    temp_ds.SetGeoTransform(geo)

    tilearray_np = temp_ds.GetRasterBand(1).ReadAsArray()
    slope_arr, aspect_arr = np.gradient(tilearray_np)
    aspect_arr = np.rad2deg(np.arctan2(aspect_arr[0], aspect_arr[1]))
    
    slope.values[0] = slope_arr
    aspect.values[0] = aspect_arr

    elev = round(elevation.sel(x=xx, y=yy, method="nearest").values[0])
    slop = round(slope.sel(x=xx, y=yy, method="nearest").values[0])
    asp = round(aspect.sel(x=xx, y=yy, method="nearest").values[0])

    elevation_cache[(lat, lon)] = (elev, slop, asp)  
    return elev, slop, asp

def extract_terrain_data_threaded(metadata_df, bounding_box, max_workers=10):
    global elevation_cache 

    elevation_cache = {} 
    min_x, min_y, max_x, max_y = *bounding_box[0], *bounding_box[1]
    
    client = Client.open(
            "https://planetarycomputer.microsoft.com/api/stac/v1",
            ignore_conformance=True,
        )

    search = client.search(
                    collections=["cop-dem-glo-90"],
                    intersects = {
                            "type": "Polygon",
                            "coordinates": [[
                            [min_x, min_y],
                            [max_x, min_y],
                            [max_x, max_y],
                            [min_x, max_y],
                            [min_x, min_y]  
                        ]]})

    tiles = list(search.items())

    regions = []

    print("Retrieving Copernicus 90m DEM tiles")
    for i in tqdm(range(0, len(tiles))):
        row = [i, tiles[i].id]
        regions.append(row)
    regions = pd.DataFrame(columns = ['sliceID', 'tileID'], data = regions)
    regions = regions.set_index(regions['tileID'])
    del regions['tileID']

    print("Interpolating Grid Cell Spatial Features")

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(process_single_location, (metadata_df.iloc[i]['cen_lat'], metadata_df.iloc[i]['cen_lon'], regions, tiles))
                   for i in tqdm(range(len(metadata_df)))]
        
        results = []
        for future in tqdm(as_completed(futures), total=len(futures)):
            results.append(future.result())
    
    metadata_df['Elevation_m'], metadata_df['Slope_Deg'], metadata_df['Aspect_L'] = zip(*results)

In [None]:
metadata_df = pd.read_csv(r"/home/vgindi/Provided_Data/Merged_aso_nearest_sites1.csv")
metadata_df= metadata_df.head(20)
bounding_box = ((-120.3763448720203, 36.29256774541929), (-118.292253412863, 38.994985247736324))    
    
extract_terrain_data_threaded(metadata_df, bounding_box)

# Display the results
metadata_df.head(10)

In [None]:
"""
This code block crops the global coverage VIIRS data to south sierras subregion. 
"""

def crop_sierras(input_file_path, output_file_path, shapes):
    with rasterio.open(input_file_path) as src:
        out_image, out_transform = rasterio.mask.mask(src, shapes, crop=True)
        out_meta = src.out_meta
        out_meta.update({"driver": "GTiff",
                         "height": out_image.shape[1],
                         "width": out_image.shape[2],
                         "transform": out_transform})
                         
        with rasterio.open(output_file_path, "w", **out_meta) as dest:
            dest.write(out_image)

def download_viirs_sca(input_dir, output_dir, shapefile_path):
    
    # Load shapes from the shapefile
    with fiona.open(shapefile_path, 'r') as shapefile:
        shapes = [feature["geometry"] for feature in shapefile]
    
    # Iterate through each year directory in the input directory
    for year_folder in os.listdir(input_dir):
        year_folder_path = os.path.join(input_dir, year_folder)
        if os.path.isdir(year_folder_path):
            # Extract year from the folder name (assuming folder names like 'WY2013')
            year = re.search(r'\d{4}', year_folder).group()
            output_year_folder = os.path.join(output_dir, year)
            os.makedirs(output_year_folder, exist_ok=True)
        
            for file_name in os.listdir(year_folder_path):        
                if file_name.endswith('.tif'):   
                    parts = file_name.split('_')
                    output_file_name = '_'.join(parts[:3]) + '.tif'
                    output_file_path = os.path.join(output_year_folder, output_file_name)
                    input_file_path = os.path.join(year_folder_path, file_name)
                    crop_sierras(input_file_path, output_file_path, shapes)
                    print(f"Processed and saved {output_file_path}")

if __name__ == "__main__":
    
    input_directory = r"/home/vgindi/VIIRS_Data"
    output_directory = r"/home/vgindi/VIIRS_Sierras"
    shapefile_path = r"/home/vgindi/Provided_Data/low_sierras_points.shp"
    download_viirs_sca(input_directory, output_directory, shapefile_path)

In [None]:
"""
This code cell transforms the raw VIIRS tiff files to 100m resolution and saves each file in .csv format
"""
def processing_VIIRS(input_file, output_res):
    try:
        # Define the output file path for TIFFs using the original file name
        output_folder_tiff = os.path.join("/home/vgindi/Processed_VIIRS", os.path.basename(os.path.dirname(input_file)))
        os.makedirs(output_folder_tiff, exist_ok=True)
        output_file = os.path.join(output_folder_tiff, os.path.basename(input_file))

        # Reproject and resample
        ds = gdal.Open(input_file)
        if ds is None:
            print(f"Failed to open '{input_file}'. Make sure the file is a valid GeoTIFF file.")
            return None
        
        gdal.Warp(output_file, ds, dstSRS="EPSG:4326", xRes=output_res, yRes=-output_res, resampleAlg="bilinear")

        # Read the processed TIFF file using rasterio
        rds = rxr.open_rasterio(output_file)
        rds = rds.squeeze().drop("spatial_ref").drop("band")
        rds.name = "data"
        df = rds.to_dataframe().reset_index()
        return df
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        return None

def process_and_convert_viirs(input_dir, output_res):
    # Iterate over subdirectories in the input directory
    for year in os.listdir(input_dir):
        year_dir = os.path.join(input_dir, year)
        
        if os.path.isdir(year_dir):
            for file_name in os.listdir(year_dir):
                if file_name.endswith('.tif'):
                    input_file_path = os.path.join(year_dir, file_name)
                    df = processing_VIIRS(input_file_path, output_res)
                    
                    if df is not None:
                        csv_folder = os.path.join("/home/vgindi/Processed_VIIRS", "VIIRS_csv")
                        os.makedirs(csv_folder, exist_ok=True)
                        csv_file_path = os.path.join(csv_folder, file_name.replace('.tif', '.csv'))
 
                        df.to_csv(csv_file_path, index=False)
                        print(f"Processed and saved {csv_file_path}")

if __name__ == "__main__":
    input_directory = "/home/vgindi/VIIRS_Sierras"
    output_res = 100  # Desired resolution in meters
    process_and_convert_viirs(input_directory, output_res)

In [None]:
"""
This code cell fetches the cell id using grid_cells_meta_idx metadata for each lat/lon pair for VIIRS csv file
"""
def create_polygon(self, row):
    return Polygon([(row['BL_Coord_Long'], row['BL_Coord_Lat']),
                    (row['BR_Coord_Long'], row['BR_Coord_Lat']),
                    (row['UR_Coord_Long'], row['UR_Coord_Lat']),
                    (row['UL_Coord_Long'], row['UL_Coord_Lat'])])
    
def process_folder(self, input_folder, metadata_path, output_folder):
    # Import the metadata into a pandas DataFrame
    pred_obs_metadata_df = pd.read_csv(metadata_path)

    # Assuming create_polygon is defined elsewhere, we add a column with polygon geometries
    pred_obs_metadata_df = pred_obs_metadata_df.drop(columns=['Unnamed: 0'], axis=1)
    pred_obs_metadata_df['geometry'] = pred_obs_metadata_df.apply(self.create_polygon, axis=1)

    # Convert the DataFrame to a GeoDataFrame
    metadata = gpd.GeoDataFrame(pred_obs_metadata_df, geometry='geometry')

    # Drop coordinates columns
    metadata = metadata.drop(columns=['BL_Coord_Long', 'BL_Coord_Lat', 
                                         'BR_Coord_Long', 'BR_Coord_Lat', 
                                         'UR_Coord_Long', 'UR_Coord_Lat', 
                                         'UL_Coord_Long', 'UL_Coord_Lat'], axis=1)

    # List all CSV files in the input folder
    csv_files = [f for f in os.listdir(input_folder) if f.endswith('.csv')]

    for csv_file in csv_files:
        input_path = os.path.join(input_folder, csv_file)
        output_path = os.path.join(output_folder, csv_file)

        # Check if the output file already exists
        if os.path.exists(output_path):
            print(f"CSV file {csv_file} already exists in the output folder.")
            continue

        # Process each CSV file
        viirs_sca_df = pd.read_csv(input_path)

        # Convert the "aso_swe_df" into a GeoDataFrame with point geometries
        geometry = [Point(xy) for xy in zip(viirs_sca_df['x'], viirs_sca_df['y'])]
        viirs_sca_geo = gpd.GeoDataFrame(viirs_sca_df, geometry=geometry)
        result = gpd.sjoin(viirs_sca_geo, metadata, how='left', predicate='within', op = 'intersects')

        # Select specific columns for the final DataFrame
        Final_df = result[['y', 'x', 'data', 'cell_id']]
        Final_df.rename(columns={'data': 'VIIRS_SCA'}, inplace=True)

        # Drop rows where 'cell_id' is NaN
        if Final_df['cell_id'].isnull().values.any():
            Final_df = Final_df.dropna(subset=['cell_id'])

        # Save the processed DataFrame to a CSV file
        Final_df.to_csv(output_path, index=False)
        print(f"Processed {csv_file}")

if __name__ == "__main__":
    input_folder = r""
    metadata_path = r""
    output_folder = r""
    process_folder(input_folder, metadata_path, output_folder)