# Preprocesses NCEP-NCAR-R1
Notebook preprocessing is based on the workflow in [read_reanalysis.ipynb](https://github.com/fdavenport/GRL2021/blob/main/notebooks/0a_read_reanalysis.ipynb) from Davenport and Diffenbaugh, 2021 
<br><br>
**Preprocessing steps**: 
1) Clip to Colorado
2) Compute average over region (where should this step go?)
3) [HGT only] Detrend the data
4) Compute daily standardized anomalies

In [1]:
import xarray as xr 
import numpy as np 
import pandas as pd
from glob import glob
import sys 
from datetime import datetime
import boto3
import s3fs

# Import helper functions 
sys.path.insert(0, '../utils')
from preprocessing_utils import (
    get_state_geom, 
    convert_lon_360_to_180, 
    clip_to_geom, 
    calc_anomalies
) 
from misc_utils import format_nbytes
import parameters as param

## Get Colorado state boundary geometry 
Will be used to clip the data

In [2]:
state = "Colorado"
geom = get_state_geom(state)

## Sea Level Pressure data 

In [3]:
# Open dataset 
var = "slp" # Variable name 
filepaths_wildcard = "../data/{0}_daily_means/{1}*.nc".format(var,var)
filepaths_all = glob(filepaths_wildcard)
ds = xr.open_mfdataset(filepaths_all).sel(time=param.time_period)
global_attrs = ds.attrs
ds = ds.drop_dims("nbnds")

# Convert lon range from 0:360 to -180:180 
ds = convert_lon_360_to_180(ds)

# Clip to Colorado geometry 
ds = clip_to_geom(ds, geom)

# Average over entire region
ds = ds.mean(dim=["lat","lon"])

# Calculate daily standardized anomalies
ds = calc_anomalies(ds, var) 

ERROR 1: PROJ: proj_identify: Cannot find proj.db
ERROR 1: PROJ: proj_identify: Cannot find proj.db
  return self.array[key]
  return self.array[key]


Format the output data

In [4]:
# Format the output data 
output_da = ds[var+"_anom"]
output_da.attrs = {
    "long_name": "mean daily sea level pressure anomalies",
    "units": "Pa",
}
output_ds = output_da.to_dataset()
output_ds.attrs = global_attrs
output_ds.attrs["region"] = "Data has been spatially averaged across the state of "+state
output_ds.attrs["title"] = global_attrs["title"] + " modified to produce daily anomalies"
output_ds.attrs["history"] = global_attrs["history"] + "\nDaily anomalies produced " + datetime.today().strftime('%Y/%m/%d')

# Print size of dataset 
nbytes = format_nbytes(output_ds.nbytes)
print("Size of output dataset: {0}".format(nbytes))

# Convert to pandas DataFrame 
output_df_slp = output_ds.to_dataframe()

Size of output dataset: 85.61 KB


# Geopotential Heights at 500 hPa

In [5]:
# Open dataset 
var = "hgt"
filepaths_wildcard = "../data/{0}_daily_means/{1}*.nc".format(var,var)
filepaths_all = glob(filepaths_wildcard)
ds = xr.open_mfdataset(filepaths_all)
global_attrs = ds.attrs

# Clean it up a bit 
level = 500
ds = ds.sel(time=param.time_period)
ds = ds.drop_dims("nbnds")
ds = ds.sel(level=level).drop("level") 

# Convert lon range from 0:360 to -180:180 
ds = convert_lon_360_to_180(ds)

# Clip to Colorado geometry 
ds = clip_to_geom(ds, geom)

# Average over entire region
ds = ds.mean(dim=["lat","lon"])

# Calculate annual domain average 500-hPa GPH to remove seasonal variability 
domain_mean_df = ds[var].groupby('time.year').mean(dim = "time").to_dataframe(name = var)

# Calculate linear trend in 500-hPa GPH
trend = np.polyfit(domain_mean_df.index.get_level_values('year'), domain_mean_df[var], 1)
print("Slope of trend:", trend[0], "m per year")

# Calculate detrended hgt
ds['change'] = (ds.time.dt.year - int(param.time_start))*trend[0]
ds[var+'_detrended'] = ds[var] - ds['change']
ds = ds.drop_vars('change')

# Calculate daily standardized anomalies
ds = calc_anomalies(ds, var+'_detrended') 

ERROR 1: PROJ: proj_identify: Cannot find proj.db
ERROR 1: PROJ: proj_identify: Cannot find proj.db


Slope of trend: 0.675666884206026 m per year


  return self.array[key]
  return self.array[key]


Format the data

In [6]:
# Format the output data 
output_da = ds[var+"_detrended_anom"]
output_da.attrs = {
    "long_name": "mean detrended daily geopotential height anomalies",
    "units": "m",
    "level":level
}
output_ds = output_da.to_dataset()
output_ds.attrs = global_attrs
output_ds.attrs["region"] = "Data has been spatially averaged across the state of "+state
output_ds.attrs["title"] = global_attrs["title"] + " modified to produce detrended daily anomalies"
output_ds.attrs["history"] = global_attrs["history"] + "\nDaily detrended anomalies produced " + datetime.today().strftime('%Y/%m/%d')

# Print size of dataset 
nbytes = format_nbytes(output_ds.nbytes)
print("Size of output dataset: {0}".format(nbytes))

# Convert to pandas DataFrame 
output_df_hgt = output_ds.to_dataframe()

Size of output dataset: 108.44 KB


## Combine datasets and write to csv
Combine DataFrames for both variables <br>
Save to local drive

In [7]:
output_df_final = output_df_slp.merge(output_df_hgt, on="time").reset_index()
output_df_final.head()

Unnamed: 0,time,slp_anom,hgt_detrended_anom
0,2001-01-01,0.364426,0.631857
1,2001-01-02,0.758293,0.48543
2,2001-01-03,1.545197,1.384811
3,2001-01-04,0.962102,1.977247
4,2001-01-05,0.15844,1.538378


In [8]:
filename = "slp_hgt_anoms.csv"
output_df_final.to_csv("../data/input_data_preprocessed/{}".format(filename), index=False)

## Output xarray object to zarr store in S3

In [9]:
# # Confirm that you're connected to the right S3 bucket
# s3 = boto3.resource(service_name='s3')
# for bucket in s3.buckets.all():
#     # What is printed here should match the variable "bucket" below
#     print("Bucket in S3: " + bucket.name)

# # S3 paths and such 
# bucket = "ml-extreme-precip" # Name of bucket 
# folder = "NCEP-NCAR-R1" # Name of folder in bucket
# s3_path = "s3://{0}/{1}/".format(bucket, folder) 

# # Name to give file 
# # DO NOT include file extension (this will be .zarr)
# filename = output_da.name

# # Path to zarr store in AWS bucket
# filepath_zarr = "{}{}.zarr/".format(s3_path, filename)
# print("zarr store will be written to path: {}".format(filepath_zarr))

In [10]:
# # Write zarr to bucket 

# # Initilize the S3 file system
# s3 = s3fs.S3FileSystem()
# store = s3fs.S3Map(root=filepath_zarr, s3=s3, check=False)

# # Save to zarr
# output_ds.to_zarr(
#     store=store, 
#     consolidated=True, 
#     mode="w" # Overwrite any existing files 
# )

In [11]:
# # Now try opening the file from AWS! :D 
# xr.open_zarr(filepath_zarr)