In [0]:
!pip install numpy==1.23.0
!pip install xarray
!pip install rioxarray

In [0]:
# restart the python kernel to import xarray correctly!
dbutils.library.restartPython()

In [0]:
import os
import pandas as pd
import numpy as np
import datetime
import pickle
import re

import xarray as xr
import rioxarray

import rasterio
from rasterio.windows import Window, from_bounds
from rasterio.warp import Resampling
from rasterio.vrt import WarpedVRT
from rasterio import shutil as rio_shutil

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, FloatType

In [0]:
# paths
dirpath = '/dbfs/FileStore/Myanmar_Survey_ML/data/geo'
# this is the reference grid 
refpath = f'netcdf:{dirpath}/landcover/C3S-LC-L4-LCCS-Map-300m-P1Y-2017-v2.1.1-002.nc:lccs_class'

In [0]:
# myanmar bounding box
min_lon = 92.3032344909 
min_lat = 9.93295990645 
max_lon = 101.180005324 
max_lat = 28.335945136 

### Reference grid vrt options

In [0]:
with rasterio.open(refpath) as src:
    # clip to myanmar
    win = from_bounds(min_lon, min_lat, max_lon, max_lat, src.transform)
    win = Window(*[max(0, v) for v in win.flatten()])
    vrt_options = {
        "transform": src.window_transform(win),
        "width": int(win.width),
        "height": int(win.height),
    }
    
vrt_options.update({'resampling':Resampling.bilinear})
vrt_options

### Landcover (annual) - reference grid

In [0]:
# path
landcover_path = f'netcdf:{dirpath}/landcover/C3S-LC-L4-LCCS-Map-300m-P1Y-2017-v2.1.1-002.nc:lccs_class'
# open transformed and clipped to myanmar
with rasterio.open(landcover_path) as src:
    with WarpedVRT(src, **vrt_options) as vrt:
        lcvr = rioxarray.open_rasterio(vrt)

In [0]:
# filter to landcover that is in the data
flags = lcvr.attrs['flag_meanings'].split(' ')
dct = {int(lcvr.attrs['flag_values'][i]): flags[i] for i in range(len(lcvr.attrs['flag_values']))}
# take out 'no data' and 'water' (the ocean)
dct.pop(0)
dct.pop(210)
dct

In [0]:
# some cleaning - note, no non values in myanmar
lcvr = lcvr.drop('band').drop('spatial_ref')
lcvr.name = 'landcover'
lcvr = lcvr.to_dataset()
lcvr = (
    lcvr.rename({'x':'lon', 'y':'lat'})
    .squeeze('band')
)

In [0]:
lcvr = lcvr.to_dataframe()
# total landcover coordinates
print(lcvr.shape)
lcvr = lcvr[lcvr['landcover'].isin(list(dct.keys()))]
lcvr = lcvr.reset_index()
# landcover coordinates after taking out ones not in legend and no data / water
print(lcvr.shape)

In [0]:
lcvr

### Landscan (annual)

In [0]:
# path
landscan_path = f'{dirpath}/landscan/landscan_global_2017.tif'
# open transformed and clipped to myanmar
with rasterio.open(landscan_path) as src:
    with WarpedVRT(src, **vrt_options) as vrt:
        lscn = rioxarray.open_rasterio(vrt)

In [0]:
# fill non value 
lscn = lscn.where(lscn != lscn.attrs['_FillValue'], np.nan)
# some cleaning
lscn = lscn.drop('band').drop('spatial_ref')
lscn.name = 'landscan'
lscn = lscn.to_dataset()
lscn = (
    lscn.rename({'x':'lon', 'y':'lat'})
    .squeeze('band')
)

In [0]:
lscn = lscn.to_dataframe()
lscn = lscn.reset_index()

In [0]:
# merge together
m = pd.merge(lcvr, lscn, how='inner')

In [0]:
# how many is nulls per column
m.isnull().sum()

In [0]:
# dummify landcover
dummies = pd.get_dummies(m['landcover'])
dummies.columns = [str(int(col)) for col in dummies.columns]

# concat together
m = m.drop('landcover', axis=1)
m = pd.concat([m, dummies], axis=1)

In [0]:
# declare schema, change to pyspark df, save
schema = [StructField("lon", FloatType(), True), StructField("lat", FloatType(), True), StructField("landscan", FloatType(), True)]
schema2 = [StructField(col, IntegerType(), True) for col in m.columns if col not in ['lat', 'lon', 'landscan']]
schema.extend(schema2)
schema = StructType(schema)

m = spark.createDataFrame(m, schema)
display(m)

In [0]:
m.write.mode('append').format('delta').saveAsTable(f'myanmar_ml.lcvr_ref_lscn_lcvr_2017')

### VIIRS (monthly)

In [0]:
viirs_lst = [file for file in os.listdir(f'{dirpath}/viirs/2017/') if re.search(".tif", file)]
viirs_xarrays = []

for file in viirs_lst:
    # path
    viirs_path = f'{dirpath}/viirs/2017/{file}'
    # open transformed and clipped to myanmar
    with rasterio.open(viirs_path) as src:
        with WarpedVRT(src, **vrt_options) as vrt:
            viirs = rioxarray.open_rasterio(vrt)
            
    # process one month dataset    
    yearmonth_tif = file.split('_')[-1]
    year = int(yearmonth_tif[:4])
    month = int(yearmonth_tif[4:6])
    
    # fill non value 
    viirs = viirs.where(viirs != viirs.attrs['_FillValue'], np.nan)
    # some cleaning
    viirs = viirs.drop('band').drop('spatial_ref')
    viirs.name = 'viirs'
    viirs = viirs.to_dataset()
    viirs = (
        viirs.expand_dims({'time':[datetime.date(year, month, 1)]})
        .rename({'x':'lon', 'y':'lat'})
        .squeeze('band')
    )
    viirs_xarrays.append(viirs)

In [0]:
# check
print(len(viirs_xarrays))
viirs_all = xr.combine_by_coords(viirs_xarrays, combine_attrs='drop')
viirs_all

In [0]:
viirs_all = viirs_all.to_dataframe()
viirs_all = viirs_all.reset_index()
viirs_all

In [0]:
m = pd.merge(viirs_all, lcvr, how='inner')

In [0]:
print(m.shape)
m.isnull().sum()

In [0]:
m = m.drop('landcover', axis=1)

In [0]:
schema = StructType([StructField("time", DateType(), True), StructField("lon", FloatType(), True), StructField("lat", FloatType(), True), StructField('viirs', FloatType(), True)])
m = spark.createDataFrame(m, schema)

In [0]:
m.write.mode('append').format('delta').saveAsTable(f'myanmar_ml.lcvr_ref_viirs_2017')

### FLDAS (monthly)

In [0]:
# all_vars = ['Evap_tavg','LWdown_f_tavg','Lwnet_tavg','Psurf_f_tavg','Qair_f_tavg','Qg_tavg','Qh_tavg','Qle_tavg','Qs_tavg','Qsb_tavg','RadT_tavg','Rainf_f_tavg','SWE_inst','SWdown_f_tavg','SnowCover_inst','SnowDepth_inst','Snowf_tavg','Swnet_tavg','Tair_f_tavg','Wind_f_tavg','SoilMoi00_10cm_tavg','SoilMoi10_40cm_tavg','SoilMoi40_100cm_tavg','SoilMoi100_200cm_tavg','SoilTemp00_10cm_tavg','SoilTemp10_40cm_tavg','SoilTemp40_100cm_tavg','SoilTemp100_200cm_tavg']
fldas_vars = ['Evap_tavg','Rainf_f_tavg','Tair_f_tavg','SoilMoi00_10cm_tavg','SoilMoi10_40cm_tavg','SoilMoi40_100cm_tavg','SoilMoi100_200cm_tavg','Qair_f_tavg','Qg_tavg','Qh_tavg','Qs_tavg']

In [0]:
fldas_lst = [file for file in os.listdir(f'{dirpath}/fldas/2017/') if re.search(".nc", file)]
fldas_xarrays = []

for file in fldas_lst:
    for var in fldas_vars:
        # path
        fldas_path = f'netcdf:{dirpath}/fldas/2017/{file}:{var}'
        # open transformed and clipped to myanmar
        with rasterio.open(fldas_path) as src:
            with WarpedVRT(src, **vrt_options) as vrt:
                fldas = rioxarray.open_rasterio(vrt)

        # process one month dataset    
        yearmonth = file.split('_')[-2]
        year = int(yearmonth[1:5])
        month = int(yearmonth[5:7])
        
        # fill non value 
        fldas = fldas.where(fldas != fldas.attrs['_FillValue'], np.nan)
        # some cleaning
        fldas = fldas.drop('band').drop('spatial_ref')
        fldas.name = var
        fldas = fldas.to_dataset()
        fldas = (
            fldas.expand_dims({'time':[datetime.date(year, month, 1)], 'source':['fldas']})
            .rename({'x':'lon', 'y':'lat'})
            .squeeze('band')
        )
        fldas_xarrays.append(fldas)

In [0]:
# check
print(len(fldas_xarrays))
fldas_all = xr.combine_by_coords(fldas_xarrays, combine_attrs='drop')
fldas_all

In [0]:
fldas_all = fldas_all.to_dataframe()
fldas_all = fldas_all.reset_index()

In [0]:
m = pd.merge(fldas_all, lcvr, how='inner')

In [0]:
print(m.shape)
m.isnull().sum()

In [0]:
m = m.drop(['source', 'landcover'], axis=1)

In [0]:
schema = [StructField("time", DateType(), True), StructField("lon", FloatType(), True), StructField("lat", FloatType(), True)]

schema2 = [StructField(col, FloatType(), True) for col in m.columns if col not in ['time', 'lat', 'lon']]
schema.extend(schema2)
schema = StructType(schema)

In [0]:
m = spark.createDataFrame(m, schema)

In [0]:
m.write.mode('append').format('delta').saveAsTable(f'myanmar_ml.lcvr_ref_fldas_2017')