# ML4L Workflow

The `ML4L` project is trying to use ML techniques to learn the land surface temperature (LST). 

This notebook attempts to track each stage of the project, from the raw data to the nice plots, to enable reproducible research.





---
# Table of contents
1. [Raw Data](#rawdata) <br>

    1.1 [ERA5 data](#rawdataERA5) <br>
...1.1.1 [Processing ERA5 data](#rawdataERA5processing) <br>
    1.2 [MODIS data](#rawdataMODIS) <br>
    1.3 [Data Overview](#rawdataOverview) <br>
    
2. [Joining data](#joiningdata) <br>
---

# 1. Raw Data <a name="rawdata"></a>
Any ML project needs data.

We can split this into two main categories: inputs and outputs.

Inputs are the features that go into our ML model. Outputs are the things we are trying to learn.

For this project:

* Inputs: ERA5 data

* Outputs: MODIS data

## 1.1 ERA5 Data <a name="rawdataERA5"></a>

We have a few different raw sources of ERA5 data provided by ECMWF

**a)** ERA_sfc. 3 years of data (2018-2020), hourly grain. Features: `sp, msl, 10u, 10v, 2t.` 

In [5]:
! grib_ls /network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/ERA_sfc/sfc_unstructured_2018_01.grib | head -10

/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/ERA_sfc/sfc_unstructured_2018_01.grib
edition      centre       typeOfLevel  level        dataDate     stepRange    dataType     shortName    packingType  gridType     
1            ecmf         surface      0            20180101     0            an           sp           grid_simple  reduced_gg  
1            ecmf         surface      0            20180101     0            an           msl          grid_simple  reduced_gg  
1            ecmf         surface      0            20180101     0            an           10u          grid_simple  reduced_gg  
1            ecmf         surface      0            20180101     0            an           10v          grid_simple  reduced_gg  
1            ecmf         surface      0            20180101     0            an           2t           grid_simple  reduced_gg  
1            ecmf         surface      0            20180101     0            an           sp           grid_

**b)** ERA_skin. 10 years of data (2010-2020), hourly grain. Features: `aluvp,aluvd,alnip ,alnid,cl,cvl,cvh,istl1,istl2,slt,sdfor,z,sd,sdor,isor,anor,slor
2d,lsm,fal`



In [4]:
! grib_ls /network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/ERA_skin/sfc_skin_unstructured_2018_01.grib | head -25

/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/ERA_skin/sfc_skin_unstructured_2018_01.grib
edition      centre       typeOfLevel  level        dataDate     stepRange    dataType     shortName    packingType  gridType     
1            ecmf         surface      0            20180101     0            an           aluvp        grid_simple  reduced_gg  
1            ecmf         surface      0            20180101     0            an           aluvd        grid_simple  reduced_gg  
1            ecmf         surface      0            20180101     0            an           alnip        grid_simple  reduced_gg  
1            ecmf         surface      0            20180101     0            an           alnid        grid_simple  reduced_gg  
1            ecmf         surface      0            20180101     0            an           cl           grid_simple  reduced_gg  
1            ecmf         surface      0            20180101     0            an           cvl         

**c)** ERA_skt. 3 years of data (2018-2020), hourly grain. Features: `skt`

In [14]:
! grib_ls /network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/ERA_skt/skt_unstructured_2018_01.grib | head -5

/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/ERA_skt/skt_unstructured_2018_01.grib
edition      centre       typeOfLevel  level        dataDate     stepRange    dataType     shortName    packingType  gridType     
1            ecmf         surface      0            20180101     0            an           skt          grid_simple  reduced_gg  
1            ecmf         surface      0            20180101     0            an           skt          grid_simple  reduced_gg  
1            ecmf         surface      0            20180101     0            an           skt          grid_simple  reduced_gg  


**d)** V15 surface. Some additional constant in time features. Overlaps with some data we already have. Note there also exist `clake` and `lsmoro`, which describe features we have above.

In [11]:
! grib_ls /network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/climate.v015/climate.v015/639l_2/sfc 

/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/climate.v015/climate.v015/639l_2/sfc
edition      centre       date         dataType     gridType     stepRange    typeOfLevel  level        shortName    packingType  
2            ecmf         20131129     an           reduced_gg   0            surface      0            anor         grid_simple 
2            ecmf         20131129     an           reduced_gg   0            surface      0            isor         grid_simple 
2            ecmf         20131129     an           reduced_gg   0            surface      0            slor         grid_simple 
2            ecmf         20131129     an           reduced_gg   0            surface      0            sdor         grid_simple 
1            ecmf         19960101     not_found    reduced_gg   0            surface      0            sr           grid_simple 
1            ecmf         19960101     not_found    reduced_gg   0            surface      0            lsrh  

**e)** V20 surface. More up to date (version 20) fields. Overlaps with some data we already have:

In [13]:
! grib_ls /network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/climate.v020/climate.v020/639l_2/clake
! grib_ls /network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/climate.v020/climate.v020/639l_2/lsmoro
! grib_ls /network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/climate.v020/climate.v020/639l_2/sfc

/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/climate.v020/climate.v020/639l_2/clake
edition      centre       date         dataType     gridType     stepRange    typeOfLevel  level        shortName    packingType  
2            ecmf         20110131     af           reduced_gg   0            surface      0            cl           grid_simple 
1 of 1 messages in /network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/climate.v020/climate.v020/639l_2/clake

1 of 1 total messages in 1 files
/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/climate.v020/climate.v020/639l_2/lsmoro
edition      centre       date         dataType     gridType     stepRange    typeOfLevel  level        shortName    packingType  
2            ecmf         20210222     an           reduced_gg   0            surface      0            lsm          grid_simple 
2            ecmf         20210222     an           reduced_gg   0            surface      

## 1.1.1 Processing ERA5 Data <a name="rawdataERA5processing"></a>


Given all this data in disparate files, it will be useful to bring it all together in a usable format.

There is some overlap between files, with some files holding the same features.

There is also some degneracy in that we are needlessly holding values of features which are constant in time

**Which data is constant in time?**

In [27]:
import xarray as xr
import numpy as np
import pandas as pd

example_ERA_sfc_file = '/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/ERA_sfc/sfc_unstructured_2018_01.grib'
example_ERA_skin_file = '/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/ERA_skin/sfc_skin_unstructured_2018_01.grib'

def check_if_variables_are_constant(ds): 
    """For all the variables in this ds, are they constant in time for each grid point?"""
    
    #Get all variables/features
    all_variables = list(ds.keys())

    names = []
    variations = []
    for v in all_variables:
    
        a = ds[v].values
        #a == a[0,:] compares each value to the corresponding value in the first row
        #A column shares a common value if all the values in that column are True
        is_constant = all(np.all(a == a[0,:], axis = 0))
        
        
        print (v,is_constant)
        names.extend([v])
        variations.extend([is_constant])
        

In [28]:
ds_sfc = xr.open_dataset(example_ERA_sfc_file,engine='cfgrib',backend_kwargs={'indexpath': ''} )
check_if_variables_are_constant(ds_sfc)

#Surface skin values. Filter out istl1,2 else errors.
ds_skin = xr.open_dataset(example_ERA_skin_file,engine='cfgrib',filter_by_keys={'typeOfLevel': 'surface'}, backend_kwargs={'indexpath': ''} ) 
check_if_variables_are_constant(ds_skin)

sp False
msl False
u10 False
v10 False
t2m False
aluvp False
aluvd False
alnip False
alnid False
cl True
cvl True
cvh True
slt True
sdfor True
z True
sd False
sdor True
isor True
anor True
slor True
d2m False
lsm True
fal False


In [34]:
#Now deal with istl1,2 individually
ds_istl1 = xr.open_dataset(example_ERA_skin_file,engine='cfgrib',filter_by_keys={'shortName': 'istl1'}, backend_kwargs={'indexpath': ''} ) 
check_if_variables_are_constant(ds_istl1)

ds_istl2 = xr.open_dataset(example_ERA_skin_file,engine='cfgrib',filter_by_keys={'shortName': 'istl2'}, backend_kwargs={'indexpath': ''} ) 
check_if_variables_are_constant(ds_istl2)


istl1 False
istl2 False


We have just demonstrated a month of data here, but it is straightforward to extend to multiple months and the conclusions are unchanged

---


With the features which are constant in time identified, we need to take the following steps:

For each month: 
* Extract the time variable features from ERA skin: `aluvp,aluvd,alnip,alnid,istl1,istl2,sd,2d,fal`
* Merge the time variable ERA skin, ERA surface and ERA skt into a single file

This is done in `scripts/process_time_variable_data.py`.
    
For one month:
* Extract the constant time features from ERA skin, that are not covered in V15, V20.: `slt,sdfor`
* Bring together with the V15 and V20 data

This is done in `scripts/process_time_constant_data.py`.

The final outputs of ERA data are then

* `ERA_time_variable` * nmonths. GRIB format. `processed_data/ERA_timevariable`
* `ERA_constant_V15` * 1. NetCDF format. `processed_data/ERA_timeconsant`
* `ERA_constant_V20` * 1. NetCDF format. `processed_data/ERA_timeconsant`

---


## 1.2 MODIS Data <a name="rawdataMODIS"></a>

The MODIS data requires less (or rather no!) processing than the ERA5 data.

Daily `.tiff` files are found at `/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/MODIS/` and are easy to load and process


## 1.3 Data Overview <a name="rawdataOverview"></a>

It is useful for the intution to have a quick look at each of our data sources: 

* ERA5 time variable:

In [46]:
f = '/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/processed_data/ERA_timevariable/ERA_0.grib'
ERA5_timevariable = xr.open_dataset(f,engine='cfgrib',backend_kwargs={'indexpath': ''} )
display(ERA5_timevariable)

* ERA5 time constant:

In [47]:
f = '/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/processed_data/ERA_timeconstant/ERA_constants_v15.nc'
ERA5_timevariable = xr.open_dataset(f)
display(ERA5_timevariable)

* MODIS

In [48]:
f = '/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/MODIS/aquaDay_errorGTE03K_04km_2018-01-01.tif'
MODIS_data = xr.open_dataarray(f,engine="rasterio")
display(MODIS_data)

---
# 2. Joining Data <a name="joiningdata"></a>


We have our inputs `ERA` and our outputs `MODIS` is a nice clean form.

We now need to join these two dat sources in both space and time.

What this means is, for a bunch of ERA features at time `t` and grid point `x` what is the corresponding real-world observation provided by MODIS?

There are some different options for how to implement this:



In [3]:
#Declarations

import re
import glob
import xarray as xr
#Deal with filename sorting. Stolen from: https://stackoverflow.com/questions/4836710/is-there-a-built-in-function-for-string-natural-sort
def natural_sort(l): 
    convert = lambda text: int(text) if text.isdigit() else text.lower()
    alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
    return sorted(l, key=alphanum_key)




ERA_folder = '/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/processed_data/ERA_timevariable/'
ERA_files = natural_sort(glob.glob(ERA_folder+'*'))


    
fvar = '/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/processed_data/ERA_timevariable/ERA_0.grib'
fconst = '/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/processed_data/ERA_timeconstant/ERA_constants_v15.nc'
   
ds_ERA_timevariable = xr.open_dataset(fvar,engine='cfgrib',backend_kwargs={'indexpath': ''}) #grib file
ds_ERA_timeconstant = xr.open_dataset(fconst) #nc file
    


In [4]:
# ds_ERA_timevariable.load()
# ds_ERA_timeconstant.load()




# ds_ERA_timevariable.close()
# ds_ERA_timeconstant.close()



In [5]:
%%time

import pandas as pd
timestamps = pd.to_datetime(ds_ERA_timevariable.time) # hourly grain

t = timestamps[536]

print ('Timefilter')
#Filter month of ERA data to an hour
time_filter = (ds_ERA_timevariable.time == t)
ERA_hour = ds_ERA_timevariable.where(time_filter,drop=True)

print ('Join constant data')
#Join on the constant data, first setting the time coordinate
ERA_constant = ds_ERA_timeconstant.assign_coords({"time": (((ERA_hour.time)))})
ERA_hour = xr.merge([ERA_hour,ERA_constant])

# print ('Land filter')
# #Now filter to get land values only 
# land_filter = (ERA_hour.lsm > 0.5)
# ERA_land = ERA_hour.where(land_filter,drop=True)

# print ('coordinate conversion')
# #And covert longitude to long1
# ERA_land = ERA_land.assign_coords({"longitude": (((ERA_land.longitude + 180) % 360) - 180)})




Timefilter
Join constant data
CPU times: user 261 ms, sys: 72.2 ms, total: 333 ms
Wall time: 362 ms


In [6]:
ERA_hour.load()

In [7]:
ERA_hour.lsm

In [10]:
%%time
sp_filter = (ERA_hour.lsm > 1.03e5)
ERA_hour.where(sp_filter,drop=True)
ERA_hour.close()

CPU times: user 79.8 ms, sys: 35.1 ms, total: 115 ms
Wall time: 111 ms


In [116]:
ERA_hour.lsm

In [113]:
sp_filter

In [101]:
ERA_constant

In [88]:
#ERA_timeconstant = ds_ERA_timeconstant.assign_coords({"time": t})
ERA_timeconstant = ds_ERA_timeconstant
ERA_timeconstant
#ds_ERA_timeconstant.time

In [68]:
ERA_hour

In [None]:
#  """
#     Select an hourly snapshot of ERA data, spatially filtered
#     """

#     verboseprint('Getting an hourly snapshot of ERA data')

    
#     #Filter month of ERA land data to an hour 
#     time_filter = (ERA_land.time == t)
#     ERA_land_snapshot = ERA_land.where(time_filter,drop=True)
    
    
    
    
#     #Filter spatially - MODIS is only a strip of data, we dont need the whole Earth surface
#     delta = 1.0 #some leeway
#     bounds = {"latitude_min"  :MODIS_data_snapshot.latitude.data.min()-delta,
#               "latitude_max"  :MODIS_data_snapshot.latitude.data.max()+delta,
#                "longitude_min":MODIS_data_snapshot.longitude.data.min()-delta,
#                "longitude_max":MODIS_data_snapshot.longitude.data.max()+delta
#               }
    

#     longitude_filter = (ERA_land_snapshot.longitude > bounds['longitude_min']) & (ERA_land_snapshot.longitude < bounds['longitude_max'])
#     latitude_filter =  (ERA_land_snapshot.latitude > bounds['latitude_min']) & (ERA_land_snapshot.latitude < bounds['latitude_max'])
#     ERA_land_snapshot = ERA_land_snapshot.where(longitude_filter & latitude_filter,drop=True)
    

#     return ERA_land_snapshot

In [None]:
import xarray as xr
import rioxarray
import pandas as pd
import numpy as np
import faiss
from scipy.interpolate import griddata
from sklearn.neighbors import NearestNeighbors
from datetime import timedelta, date



"""
Script join monthly ERA .grib data with daily MODIS .tiff data
We join in both time (hourly grain) and space.
See X
"""



###---GLOBAL VARIABLES---###


#These dictionaries describe the local hour of the satellite
local_times = {"aquaDay":"13:30",
               "terraDay":"10:30",
               "terraNight":"22:30",
               "aquaNight":"01:30"
              }
# and are used to load the correct file for dealing with the date-line.
min_hours = {"aquaDay":2,
            "terraDay":-1,
            "aquaNight":-1,
            "terraNight":11}
max_hours = {"aquaDay":24,
            "terraDay":22,
            "aquaNight":13,
            "terraNight":24}


#Path to MODIS data
satellite_folder = '/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/MODIS'

#Path to ERA data
era_folder = '/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/'


#Where do you want the output files to go?
IO_path = '/network/group/aopp/predict/TIP016_PAXTON_RPSPEEDY/ML4L/ECMWF_files/raw/monthly_joined_ERA_MODIS/'

#Some parameters - can be changed by user.
satellite='aquaDay'
latitude_bound=70
verbose = False

###------END------###



###---FUNCTIONS---###



def fetch_ERA_month(year_month):
    
    """Load a month of ERA data from multiple sources and filter to just get land values"""
    
    print ('Loading a month of ERA data. These are large files so could take some time')
    
    #All different data sources
    fskin = era_folder+'ERA_skin/sfc_skin_unstructured_'+year_month+'.grib'
    fsfc =  era_folder+'ERA_sfc/sfc_unstructured_'+year_month+'.grib'
    fskt =  era_folder+'ERA_skt/skt_unstructured_'+year_month+'.grib'

    #Load month of data from different sources
    ds_skin = xr.open_dataset(fskin,engine='cfgrib',filter_by_keys={'typeOfLevel': 'surface'},backend_kwargs={'indexpath': ''})
    ds_sfc = xr.open_dataset(fsfc,engine='cfgrib',filter_by_keys={'typeOfLevel': 'surface'},backend_kwargs={'indexpath': ''})
    ds_skt = xr.open_dataset(fskt,engine='cfgrib',filter_by_keys={'typeOfLevel': 'surface'},backend_kwargs={'indexpath': ''})

    #...and merge it into one
    ERA = xr.merge([ds_skin, ds_sfc,ds_skt])
    
    #Filter to just get land values
    land_filter = (ERA.lsm > 0.5)
    ERA_land = ERA.where(land_filter,drop=True)

    
    #Relabel longitude coordinate to be consistent with MODIS
    ERA_land = ERA_land.assign_coords({"longitude": (((ERA_land.longitude + 180) % 360) - 180)})

    return ERA_land







def select_correct_MODIS_file(t):
    
    """We have to be careful with the dateline. This function
       figures out which MODIS file to load."""

    
    #Get the hour
    utc_hour = t.hour
    
    
    #Due to crossing of the datetime, some times will be saved different date
    if utc_hour < min_hours[satellite]:
        file_date = t  - np.timedelta64(1,'D')
    elif utc_hour > max_hours[satellite]:
        file_date = t  + np.timedelta64(1,'D')
    else:
        file_date = t
        
    #Create a string which will be used to open file
    y = pd.to_datetime(file_date).year
    m = pd.to_datetime(file_date).month
    d = pd.to_datetime(file_date).day
    date_string = f'{y}-{m:02}-{d:02}'
    
    return date_string



def process_MODIS_file(sat_xr,date_string,latitude_bound): 
    
    """
    Rename some columns, apply latitude bounds and calculate absolute time from local solar time
    """
    
    # Rename spatial dimensions
    sat_xr = sat_xr.rename({'x':'longitude','y':'latitude'})
    
    
    #Filter by latitude
    space_filter = np.expand_dims(np.abs(sat_xr.latitude) < latitude_bound,axis=(0,-1))
    mask = np.logical_and(np.isfinite(sat_xr),space_filter) #make it a 2d mask

    sat_xr = sat_xr.where(mask,drop=True)
    
    #Create time delta to change local to UTC
    time_delta = pd.to_timedelta(sat_xr.longitude.data/15,unit='H') 
    
    #Convert local satellite time to UTC and round to nearest hour
    time = (pd.to_datetime([date_string + " " + local_times[satellite]]*time_delta.shape[0]) - time_delta).round('H')
    
    
    return sat_xr.where(mask,drop=True), time




def snapshot_MODIS(t,MODIS_data,MODIS_time,previous_datestring):
    
    """
    Select an hourly snapshot of MODIS data
    """
    
    verboseprint('Getting an hourly snapshot of MODIS data')
    
    date_string = select_correct_MODIS_file(t)
    
    if date_string != previous_datestring:
        #We need to open a new file. 
        #First close the old one explicitly
        try:
            MODIS_data.close()
        except:
            pass
        
        #Now open a new file, and update the datestring
        print('Opening new file')
        MODIS_data = xr.open_dataarray(f'{satellite_folder}/{satellite}_errorGTE03K_04km_{date_string}.tif',engine="rasterio")
        previous_datestring=date_string
            
        #Make some corrections and calculations
        MODIS_data,MODIS_time = process_MODIS_file(MODIS_data,date_string,latitude_bound)      
   

    #Select the correct hour of MODIS data
    time_filter = np.expand_dims(MODIS_time == t,axis=(0,1))
    
    # Make this 1d time filter a 2d mask
    mask = np.logical_and(np.isfinite(MODIS_data),time_filter)
    
    #Apply mask to data array
    MODIS_data_snapshot= MODIS_data.where(mask,drop=True) 
    
    return MODIS_data_snapshot,MODIS_data,MODIS_time,date_string #hour of data, day of data, times in UTC, date_string
    
    
    
def snapshot_ERA(t,ERA_land,MODIS_data_snapshot):

    
    """
    Select an hourly snapshot of ERA data, spatially filtered
    """

    verboseprint('Getting an hourly snapshot of ERA data')

    
    #Filter month of ERA land data to an hour 
    time_filter = (ERA_land.time == t)
    ERA_land_snapshot = ERA_land.where(time_filter,drop=True)
    
    
    
    
    #Filter spatially - MODIS is only a strip of data, we dont need the whole Earth surface
    delta = 1.0 #some leeway
    bounds = {"latitude_min"  :MODIS_data_snapshot.latitude.data.min()-delta,
              "latitude_max"  :MODIS_data_snapshot.latitude.data.max()+delta,
               "longitude_min":MODIS_data_snapshot.longitude.data.min()-delta,
               "longitude_max":MODIS_data_snapshot.longitude.data.max()+delta
              }
    

    longitude_filter = (ERA_land_snapshot.longitude > bounds['longitude_min']) & (ERA_land_snapshot.longitude < bounds['longitude_max'])
    latitude_filter =  (ERA_land_snapshot.latitude > bounds['latitude_min']) & (ERA_land_snapshot.latitude < bounds['latitude_max'])
    ERA_land_snapshot = ERA_land_snapshot.where(longitude_filter & latitude_filter,drop=True)
    

    return ERA_land_snapshot

def haver(lat1_deg,lon1_deg,lat2_deg,lon2_deg):
    
    """
    Given coordinates of two points IN DEGREES calculate the haversine distance
    """
    
    #Convert degrees to radians
    lat1 = np.deg2rad(lat1_deg)
    lon1 = np.deg2rad(lon1_deg)
    lat2 = np.deg2rad(lat2_deg)
    lon2 = np.deg2rad(lon2_deg)


    #...and the calculation
    delta_lat = lat1 -lat2
    delta_lon = lon1 -lon2
    Re = 6371 #km
    Z = np.sin(delta_lat/2)**2 + np.cos(lat1)*np.cos(lat2)*np.sin(delta_lon/2)**2
    H = 2*Re*np.arcsin(np.sqrt(Z)) #Haversine distance in km
    return H

    
    
def faiss_knn(database,query):
    
    """
    Use faiss library (https://github.com/facebookresearch/faiss) for fass k-nearest neighbours on GPU
    
    Note that the nearness is an L2 (squared) norm on the lat/long coordinates, rather than a haversine metric
    """
    
    #Database
    xb = database[["latitude", "longitude"]].to_numpy().astype('float32')
    xb = xb.copy(order='C') #C-contigious
    
    #Query
    xq = query[["latitude", "longitude"]].to_numpy().astype('float32') 
    xq = xq.copy(order='C')
    
    #Create index
    d = 2                            # dimension
    res = faiss.StandardGpuResources()
    index_flat = faiss.IndexFlatL2(d) #index
    gpu_index_flat = faiss.index_cpu_to_gpu(res, 0, index_flat) # make it into a gpu index
    gpu_index_flat.add(xb)  
    
    #Search
    k = 1                          # we want to see 1 nearest neighbors
    distances, indices = gpu_index_flat.search(xq, k)
    

    #Combine into a single df with all data
    df = query.reset_index().join(database.iloc[indices.flatten()].reset_index(), lsuffix='_MODIS',rsuffix='_ERA')
    df['L2_distance'] = distances
    df['MODIS_idx'] = indices
    df['H_distance'] = haver(df['latitude_MODIS'],df['longitude_MODIS'],df['latitude_ERA'],df['longitude_ERA']) #Haversine distance
    
    #Filter out any large distances
    tolerance = 50 #km
    df_filtered = df.query('H_distance < %.9f' % tolerance)


    #Group it. Each ERA point has a bunch of MODIS points. Group and average
    df_grouped = df_filtered.groupby(['latitude_ERA','longitude_ERA'],as_index=False).mean()

    
    return df_grouped







###------END------###







###------MAIN------###



#Parameters
dates = pd.date_range(start="2018-01-01",end="2020-01-01",freq='MS') #Iterate over this date range



#Declarations
MODIS_data_snapshot,MODIS_data,MODIS_time,previous_datestring = (None,)*4 #Declare some empty variables

for dt in dates:
    print (dt)
    year_month = dt.strftime("%Y_%m")
    ERA_land = fetch_ERA_month(year_month) #This is a month of ERA data
    
    timestamps = pd.to_datetime(ERA_land.time) # hourly grain
    
    if dt == dates[0]:
        timestamps = timestamps[3:] #We dont have data for Dec 2017. Can't do 0,1,2. Note this correction is specific to AquaDay
        
        
    dfs = []
    for t in timestamps:
        print(t)
        
        #Populate previously empty MODIS variables. 
            #MODIS_data_snapshot = hour of MODIS data
            #MODIS_data = day of MODIS data
            #MODIS_time = UTC times of MODIS_data
            #previous_datestring = used to determine whether we need to open a new file, or use the one we already have
        MODIS_data_snapshot,MODIS_data,MODIS_time,previous_datestring = snapshot_MODIS(t,MODIS_data,MODIS_time,previous_datestring)


        #And get hour of ERA data
        ERA_data_snapshot = snapshot_ERA(t,ERA_land,MODIS_data_snapshot)
        
        
        #Make everything a pandas df to pass into faiss_knn. Unnecessary step?
        ERA_df = ERA_data_snapshot.to_dataframe().reset_index().dropna()
        MODIS_df = MODIS_data_snapshot.to_dataframe().reset_index().dropna()
        
        
        verboseprint(ERA_df)
        verboseprint(MODIS_df)
        
        df_matched = faiss_knn(ERA_df,MODIS_df)
        df_matched['time'] = t
        dfs.append(df_matched)
        
        
    #At the end of every month, do some IO
    df = pd.concat(dfs)
    fname = f'matched_{dt}.pkl'
    print ("Writing to disk:", IO_path+fname)
    df.to_pickle(IO_path+fname)

###------END------###






    













 