# WRF model wind speed calculations and visualization

- [High Resolution WRF Simulations of the Current and Future Climate of North America](https://rda.ucar.edu/datasets/ds612.0/index.html#sfol-wl-/data/ds612.0?g=33200406) (DOI: 10.5065/D6V40SXP)
- For example, https://rda.ucar.edu/data/ds612.0/PGW3D/2004/wrf3d_d01_PGW_U_20040602.nc (careful to click, big download!)
- Xarray / Dask / Kubernetes allows for fast parallelization 
- [Must be very careful to coordinate worker / client / scheduler conda environments](https://github.com/julienchastang/jupyter-classroom/tree/master/rut-spring-2022)
- Setup for Dask cluster:
  - 4 workers
  - 4 cores per worker
  - 8 GBs per worker
  - 16 “task streams”

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import xarray as xr
import dask_gateway, dask, distributed, time
from dask.diagnostics import ProgressBar

Baseline time

In [None]:
time0 = time.time()

Lengthy timeouts are apperently important.

In [None]:
dask.config.set({"distributed.comm.timeouts.tcp": "300s"})
dask.config.set({"distributed.comm.timeouts.connect": "300s"})

Cluster gateway

In [None]:
from dask_gateway import Gateway
gateway = Gateway(
    address="http://traefik-dask-gateway/services/dask-gateway/",
    public_address="https://js-156-66.jetstream-cloud.org/services/dask-gateway/",
    auth="jupyterhub")
gateway

Cluster default options

In [None]:
options = gateway.cluster_options()
options

Launching cluster. This might take a few minutes. Make a note of the Dask Dashboard URL. Open it in a separate tab. The URL should look something like:

```
https://js-156-66.jetstream-cloud.org/services/dask-gateway/clusters/jhub.231b3db17f834c2e92130c5dde31c346/status
```

Make sure the first part of the URL matches what you see for the URL of this page. **IMPORTANT**: Only call this cell once or else you will have multiple clusters on your hands that you will have to sort through.

In [None]:
cluster = gateway.new_cluster(options)
cluster.scale(4)
cluster

You only have one cluster, right?

In [None]:
clusters = gateway.list_clusters()
clusters

In [None]:
cluster = gateway.connect(clusters[0].name)
cluster

Shutting down cluster if necessary:

Don't forget to call or else cluster will die. 

In [None]:
client = cluster.get_client()
client

Apperently important if you want to see Dask Dashboard update.

In [None]:
client = gateway.connect(cluster.name).get_client()

This is your "control experiment" to ensure the cluster is working. Make sure the dashboard is also working.

In [None]:
import dask.array as da
a = da.random.normal(size=(40000, 40000), chunks=(500, 500))
a.mean().compute()

-------------

Now you are ready to run your notebook in earnest:

In [None]:
def unstagger(ds, var, coord, new_coord):
    var1 = ds[var].isel({coord: slice(None, -1)})
    var2 = ds[var].isel({coord: slice(1, None)})
    return ((var1 + var2) / 2).rename({coord: new_coord})

Open U dataset

In [None]:
ds = xr.open_dataset('https://rda.ucar.edu/thredds/dodsC/files/g/ds612.0/PGW3D/2004/wrf3d_d01_PGW_U_20040601.nc',
                      chunks={'bottom_top': 10})

Plot

In [None]:
with ProgressBar():
    ds.U.sel(Time='2004-06-01T00:00').isel(bottom_top=0).plot()

In [None]:
with ProgressBar():
    ds['U_unstaggered'] = unstagger(ds, 'U', 'west_east_stag', 'west_east')

Open V dataset

In [None]:
ds2 = xr.open_dataset('https://rda.ucar.edu/thredds/dodsC/files/g/ds612.0/PGW3D/2004/wrf3d_d01_PGW_V_20040601.nc',
                      chunks={'bottom_top': 10})
ds2

In [None]:
with ProgressBar():
    ds2['V_unstaggered'] = unstagger(ds2, 'V', 'south_north_stag', 'south_north')

In [None]:
ds = xr.merge((ds, ds2))
ds

Calculate wind speed.

In [None]:
ds['speed'] = np.sqrt(ds.U_unstaggered**2 + ds.V_unstaggered**2)
ds

In [None]:
ds.speed.sel(Time='2004-06-01T18:00').isel(bottom_top=10).plot()

In [None]:
prefix = 'https://rda.ucar.edu/thredds/dodsC/files/g/ds612.0/PGW3D/2004/wrf3d_d01_PGW_'
list_of_files = []
for var in ('U', 'V'):
#    for day in range(1,15):
    for day in range(1,4):
        filename = prefix + var + f'_200406{day:02g}.nc'
        list_of_files.append(filename)

In [None]:
with ProgressBar():
    ds = xr.open_mfdataset(list_of_files, parallel=True, chunks={'bottom_top': 10})

In [None]:
with ProgressBar():
    ds['U_unstaggered'] = unstagger(ds, 'U', 'west_east_stag', 'west_east')
    ds['V_unstaggered'] = unstagger(ds, 'V', 'south_north_stag', 'south_north')
    ds['speed'] = np.sqrt(ds.U_unstaggered**2 + ds.V_unstaggered**2)

In [None]:
with ProgressBar():
    ds.speed.sel(Time='2004-06-03T18:00').isel(bottom_top=10).plot()

In [None]:
mean_speed_lev5 = ds.speed.isel(bottom_top=5).mean(dim='Time')

In [None]:
with ProgressBar():
    mean_speed_lev5.plot()

In [None]:
zonal_avg_mean_wind = ds.speed.mean(dim='west_east').mean(dim='Time')
zonal_avg_mean_wind

In [None]:
fig, ax = plt.subplots(figsize=(15,12))
zonal_avg_mean_wind.plot.contourf(ax=ax, levels=10)
ax.set_title('Mean Wind Speed (Zonally Averaged)')
plt.show()

How long did it take for the notebook to run

In [None]:
time = time.time() - time0 
time

Don't forget to shut down your cluster

In [None]:
c = gateway.connect(cluster.name)
c.shutdown()

Stop cell execution

In [None]:
class StopExecution(Exception):
    def _render_traceback_(self):
        pass

raise StopExecution