# Pre-processing the data - batch
This notebook follows on from `transform_and_preprocess_spice.ipynb`, so look at that first for an explanation of processing. This notebook has the same processing in fewer commands and in a loop across forecast ref times. Along the way intermediate data is also output, including:
* MOGREPS-G gridded data for UK, all variables in one file.
* MOGREPS-G data for UK in tabular form
* Radar data regridded onto MOGREPS-G grid and summed to 3hr accumualtions, one file
* Radar data on MOGREPS-G grid and summed to 3hr accumualtions in tabular form
* Final output: Merged MOGREPS-G and radar data in tabular

Changing the initial parameter (such as time and location) should allow duplicated o f this notebook to work for other timeperiods.

In [1]:
import pathlib
import datetime
import functools
import os

In [2]:
import numpy

In [3]:
import pandas

In [4]:
import xarray
import iris
import iris.quickplot
import iris.coord_categorisation

In [5]:
import matplotlib.pyplot

# Set parameters for notebook
Set the paths and lists of things to process

In [6]:
project_name = 'precip_rediagnosis'
mogreps_g_name = 'mogreps-g'
ilab_project_dir = pathlib.Path('/project/informatics_lab/')
output_dir =  pathlib.Path('/scratch')/ os.environ['USER'] / project_name

In [7]:
root_data_dir = ilab_project_dir / project_name
mogreps_g_data_dir = root_data_dir / mogreps_g_name
radar_data_dir = root_data_dir / 'radar'

In [8]:
date_fname_template = '{start.year:04d}{start.month:02d}{start.day:02d}T{start.hour:02d}{start.minute:02d}Z_{end.year:04d}{end.month:02d}{end.day:02d}T{end.hour:02d}{end.minute:02d}Z'
fname_extension_grid = '.nc'
fname_extension_tabular = '.csv'
leadtime_template = '{lt:03d}H'
mogreps_g_tab_fname_template = 'prd_mogreps_g_' + leadtime_template + '_' + date_fname_template + fname_extension_tabular
mogreps_g_grid_fname_template = 'prd_mogreps_g_' + leadtime_template + '_' + date_fname_template + fname_extension_grid
radar_tab_fname_template = 'prd_radar_' + date_fname_template + fname_extension_tabular
radar_grid_fname_template = 'prd_radar_' + date_fname_template + fname_extension_grid
output_fname_template = 'prd_merged_' + leadtime_template + '_' + date_fname_template + fname_extension_tabular

In [9]:
variables_single_level = [
    "cloud_amount_of_total_cloud",
    "rainfall_accumulation-PT03H",
    "snowfall_accumulation-PT03H",
    "rainfall_rate",
    "snowfall_rate",
    "height_of_orography",
    "pressure_at_mean_sea_level",
]

variables_height_levels = [
    "cloud_amount_on_height_levels",
    "pressure_on_height_levels",
    "temperature_on_height_levels",
    "relative_humidity_on_height_levels",
    "wind_direction_on_height_levels",
    "wind_speed_on_height_levels",
    
]

In [10]:
num_periods = 10
start_ref_time = datetime.datetime(2020,2,14,12)
forecast_ref_time_range = [start_ref_time + datetime.timedelta(hours=6)*i1 for i1 in range(num_periods)]
leadtime_hours = 15
realizations_list = list(range(35))

In [11]:
dataset = 'mogreps-g'
subset = 'lev1'
forecast_ref_template = '{frt.year:04d}{frt.month:02d}{frt.day:02d}T{frt.hour:02d}00Z.nc.file'
fname_template = '{vt.year:04d}{vt.month:02d}{vt.day:02d}T{vt.hour:02d}00Z-PT{lead_time:04d}H00M-{var_name}.nc'

In [12]:
variables_to_extract = variables_height_levels + variables_single_level

In [13]:
path_lists_vars = {
    var_name: [f1 for f1 in mogreps_g_data_dir.iterdir() if var_name in str(f1)]
    for var_name in variables_to_extract
}


In [14]:
uk_bounds={'latitude':(50,58), 'longitude': (-6,2)}
xarray_select_uk = {k1: slice(*v1) for k1,v1 in uk_bounds.items()}

## Load radar data

In [None]:
radar_days = [datetime.datetime(2020,2,14) + datetime.timedelta(days=d1) for d1 in range(5)]
radar_days

In [None]:
radar_fname_template = 'composite_rainfall_{dt.year:04d}{dt.month:02d}{dt.day:02d}.nc'
radar_cube = iris.cube.CubeList([iris.load_cube(str(radar_data_dir / radar_fname_template.format(dt=dt1))) for dt1 in radar_days] ).concatenate_cube()

In [None]:
iris.coord_categorisation.add_hour(radar_cube, coord='time')
iris.coord_categorisation.add_day_of_year(radar_cube, coord='time')

Load a sample variable from MOGREPS-G to use for regridding radar data.

In [None]:
mogreps_g_example = iris.load_cube(
    str(mogreps_g_data_dir / fname_template.format(
        vt=forecast_ref_time_range[0] + datetime.timedelta(hours=leadtime_hours), 
        lead_time=leadtime_hours, 
        var_name=variables_single_level[0])),
    iris.Constraint(latitude=lambda cell1: uk_bounds['latitude'][0] < cell1 < uk_bounds['latitude'][1], 
                                                     longitude=lambda cell1: uk_bounds['longitude'][0] < cell1 < uk_bounds['longitude'][1], realization=0)
)


Aggregate the instantaneous rates to get an accumulation (this makes the assumption that the rates represent the accumulation for the 5 minute period).

In [None]:
coord_3hr = iris.coords.AuxCoord(radar_cube.coord('hour').points // 3,
                                long_name='3hr',
                                 units='hour',
                                )
radar_cube.add_aux_coord(coord_3hr, data_dims=0)
radar_agg = radar_cube.aggregated_by(['3hr', 'day_of_year'],iris.analysis.SUM)
radar_agg.add_aux_coord(iris.coords.AuxCoord([c1.bound[0] + datetime.timedelta(hours=3) for c1 in radar_agg.coord('time').cells()], long_name='model_accum_time', units='mm/h'), data_dims=0)

In [None]:
radar_mggrid = radar_agg.regrid(mogreps_g_example, iris.analysis.Linear())

In [None]:
radar_mggrid

In [None]:
radar_mggrid.remove_coord('model_accum_time')

In [None]:
radar_mggrid.remove_coord('3hr')

In [None]:
def cftime_to_datetime(input_cft):
    return datetime.datetime(input_cft.year,
                             input_cft.month,
                             input_cft.day,
                             input_cft.hour,
                             input_cft.minute,
                             input_cft.second,
                            )

In [None]:
iris.save(radar_mggrid, 
          str(output_dir / radar_grid_fname_template.format(start=min([cftime_to_datetime(cell1.point) for cell1 in radar_mggrid.coord('time').cells()]),
                                                            end=max([cftime_to_datetime(cell1.point) for cell1 in radar_mggrid.coord('time').cells()])
                                                           )))

In [None]:
radar_acc_regrid_df = xarray.DataArray.from_iris(radar_mggrid).to_dataframe().reset_index()
radar_acc_regrid_df

In [None]:
radar_acc_regrid_df = radar_acc_regrid_df.rename({'time': 'period_midpoint'}, axis='columns')
radar_acc_regrid_df['time'] = radar_acc_regrid_df['period_midpoint'].apply(lambda dt1: datetime.datetime(dt1.year, dt1.month, dt1.day, dt1.hour, dt1.minute,dt1.second) + datetime.timedelta(hours=1,minutes=32,seconds=30))
radar_acc_regrid_df

In [None]:
radar_acc_regrid_df.to_csv(output_dir / radar_tab_fname_template.format(start=radar_acc_regrid_df['time'].min(),
                                                                        end=radar_acc_regrid_df['time'].min(),
                                                                       ))

## Create a dataset from MOGREPS-G data
Information on Met Office Ensmble forecasts - https://www.metoffice.gov.uk/research/weather/ensemble-forecasting#
Paper - https://www.metoffice.gov.uk/research/weather/ensemble-forecasting 

### Get the mapping of variable names 
Load some files and get the actual variable names.

In [15]:
fcst_ref_time = forecast_ref_time_range[0]
real1 = realizations_list[10]
validity_time = fcst_ref_time + datetime.timedelta(hours=leadtime_hours)

In [16]:
%%time
# load a cube for each variable in iris to get the actual variable name, and populate dictionary mapping from the var name in the file name to the variable as loaded into iris/xarray
file_to_var_mapping = {
    var_file_name: iris.load_cube(str(mogreps_g_data_dir / fname_template.format(vt=validity_time,
                                                                                 lead_time=leadtime_hours,
                                                                                 var_name=var_file_name))).name()
    for var_file_name in variables_single_level + variables_height_levels}
file_to_var_mapping

CPU times: user 523 ms, sys: 74.4 ms, total: 597 ms
Wall time: 1.7 s


{'cloud_amount_of_total_cloud': 'cloud_area_fraction',
 'rainfall_accumulation-PT03H': 'thickness_of_rainfall_amount',
 'snowfall_accumulation-PT03H': 'lwe_thickness_of_snowfall_amount',
 'rainfall_rate': 'rainfall_rate',
 'snowfall_rate': 'lwe_snowfall_rate',
 'height_of_orography': 'surface_altitude',
 'pressure_at_mean_sea_level': 'air_pressure_at_sea_level',
 'cloud_amount_on_height_levels': 'cloud_volume_fraction_in_atmosphere_layer',
 'pressure_on_height_levels': 'air_pressure',
 'temperature_on_height_levels': 'air_temperature',
 'relative_humidity_on_height_levels': 'relative_humidity',
 'wind_direction_on_height_levels': 'wind_from_direction',
 'wind_speed_on_height_levels': 'wind_speed'}

In [17]:
heights = iris.load_cube(str(mogreps_g_data_dir / fname_template.format(vt=validity_time,
                                                                                 lead_time=leadtime_hours,
                                                                                 var_name=variables_height_levels[0]))).coord('height').points

In [18]:
merge_coords = ['latitude', 'longitude', 'time', 'realization']

In [19]:
single_level_var_mappings = {v1: file_to_var_mapping[v1] for v1 in variables_single_level}
height_level_var_mappings = {v1: file_to_var_mapping[v1] for v1 in variables_height_levels}

In [20]:
def load_ds(ds_path, selected_bounds):
    try:
        subset1 = dict(selected_bounds)
        subset1['bnds'] = 0
        single_level_ds = xarray.load_dataset(ds_path).sel(**subset1)
    except KeyError as e1:
        single_level_ds = None
    return single_level_ds

In [None]:
%%time
ts_data_list = []
# gridded_data_list = []
for fcst_ref_time in forecast_ref_time_range:
    print(fcst_ref_time)
    validity_time = fcst_ref_time + datetime.timedelta(hours=leadtime_hours)
    single_level_ds = xarray.merge([load_ds(ds_path= mogreps_g_data_dir / fname_template.format(vt=validity_time,
                                                                                                lead_time=leadtime_hours,
                                                                                                var_name=var1),
                                            selected_bounds=xarray_select_uk,
                                           )
                                    for var1 in variables_single_level]
                                  )
    single_level_df = single_level_ds.to_dataframe().reset_index()

    height_levels_ds = xarray.merge([load_ds(ds_path=mogreps_g_data_dir / fname_template.format(vt=validity_time,
                                                                                                lead_time=leadtime_hours,
                                                                                                var_name=var1),
                                             selected_bounds=xarray_select_uk,
                                            )
                                     for var1 in variables_height_levels])
    hl_df_multirow = height_levels_ds.to_dataframe().reset_index()
    
    var_df_merged = []
    # heights_vars_marged = height_levels_df[height_levels_df.height==heights[0]][ merge_coords]
    for var1 in height_level_var_mappings.values():
        print(var1)
        # for h1 in heights:
        #     heights_vars_marged[f'{var1}_{h1:.1f}'] = list(height_levels_df[height_levels_df.height==h1][var1])
        var_at_heights = [hl_df_multirow[hl_df_multirow.height==h1][merge_coords + [var1]].rename({var1: f'{var1}_{h1:.1f}'}, axis='columns') for h1 in heights]
        var_df_merged += [functools.reduce(lambda x,y: x.merge(y, on=merge_coords), var_at_heights)]
    height_levels_df = functools.reduce(lambda x,y: x.merge(y, on=merge_coords), var_df_merged)    
    
    mogreps_g_single_ts_uk_df = single_level_df.merge(height_levels_df, on=merge_coords)
    mogreps_g_single_ts_uk_df
    
    mogreps_g_single_ts_uk_df = single_level_df.merge(height_levels_df, on=merge_coords)
    ts_data_list += [mogreps_g_single_ts_uk_df]
    ts_mogg_ds1 = xarray.merge([height_levels_ds, single_level_ds])
    ts_mogg_ds1.to_netcdf(output_dir / (
        'prd_mg_ts_'+ f'{validity_time.year:04d}{validity_time.month:02d}{validity_time.day:02d}{validity_time.hour:02d}{validity_time.minute:02d}' 
        + fname_extension_grid)
    )
    # gridded_data_list += [xarray.merge([height_levels_ds, single_level_ds])]

2020-02-14 12:00:00
cloud_volume_fraction_in_atmosphere_layer
air_pressure
air_temperature
relative_humidity
wind_from_direction
wind_speed
2020-02-14 18:00:00
cloud_volume_fraction_in_atmosphere_layer
air_pressure
air_temperature
relative_humidity
wind_from_direction
wind_speed
2020-02-15 00:00:00
cloud_volume_fraction_in_atmosphere_layer
air_pressure
air_temperature
relative_humidity
wind_from_direction
wind_speed
2020-02-15 06:00:00
cloud_volume_fraction_in_atmosphere_layer
air_pressure
air_temperature
relative_humidity
wind_from_direction
wind_speed
2020-02-15 12:00:00


In [None]:
# prd_mogreps_grid_ds = xarray.concat(gridded_data_list)

In [None]:
# prd_mogreps_grid_ds.to_netcdf(output_dir / mogreps_g_grid_fname_template.format(lt=leadtime_hours,
#                                                                                 start=prd_column_dataset['time'].min(),
#                                                                                 end=prd_column_dataset['time'].max(),
#                                                                                ))

In [None]:
prd_column_dataset = pandas.concat(ts_data_list)


In [None]:
prd_column_dataset

In [None]:
prd_column_dataset.to_csv(output_dir / mogreps_g_tab_fname_template.format(lt=leadtime_hours,
                                                                           start=prd_column_dataset['time'].min(),
                                                                           end=prd_column_dataset['time'].max(),
                                                                          ))

## Merging radar and model data
We have now created a table with radar data and a table with MOGREPS-G model data. We  now wantto merge them into a single. The steps are as follows
* trim the time periods so the data covers matching times
* do a merge on latitude, longitude and time. 
  * We don't merge on realization as done for merging the different model fields, as radar is not ensemble data. Instead, the type of merge chosen will insert the radar composite rainfall field for each row with with the correct time and place, so will appear multiple times for each realization present at that timestamp
* Output resulting table to disk.

In [None]:
prd_column_dataset.time.min(), prd_column_dataset.time.max()

In [None]:
radar_acc_regrid_df = radar_acc_regrid_df[(radar_acc_regrid_df['time'] >= prd_column_dataset.time.min()) & (radar_acc_regrid_df['time'] <= prd_column_dataset.time.max())]
radar_acc_regrid_df

In [None]:
prd_merged_mogreps_radar = prd_column_dataset.merge(radar_acc_regrid_df, on=['latitude', 'longitude','time'], how='inner')
prd_merged_mogreps_radar

Looking at the results of the merge, we see that we have different values for model output for different realisations at the same time and location, but all of those datapoints will have the same value for radar rainfall accumulation.

In [None]:
prd_merged_mogreps_radar[(prd_merged_mogreps_radar['time'] == '2020-02-16T015:00') &
                         (prd_merged_mogreps_radar['latitude'] == 50.15625) &
                         (prd_merged_mogreps_radar['longitude'] == -5.765625) 
                        ][['latitude','longitude','realization', 'time','air_temperature_5.0','rainfall_rate_composite']]

### Output to Tabular data format

In [None]:
start_dt = prd_merged_mogreps_radar['time'].min()
end_dt = prd_merged_mogreps_radar['time'].max()

In [None]:
output_fname = output_fname_template.format(lt=leadtime_hours,
                                            start=start_dt,
                                            end=end_dt,
                                           )
output_path = output_dir / output_fname
print(output_path)
prd_merged_mogreps_radar.to_csv(output_path)

In [None]:
#TODO:add parquet to conda environment and save as parquet format
#prd_merged_mogreps_radar.to_parquet(output_path)