Full run through test

In [1]:
import os
import sys
import inspect
import itertools

import numpy as np
from numpy import ma
import pandas as pd
import xarray as xr
from datetime import datetime, timedelta
from dateutil.parser import parse as parse_date
import cv2 as cv
from scipy import ndimage as ndi

import matplotlib.pyplot as plt
from matplotlib import animation
from mpl_toolkits.axes_grid1.axes_divider import make_axes_locatable
import cartopy.crs as ccrs


# from utils import io, abi, glm, nexrad
from tobac_flow import io, abi, glm, nexrad
from tobac_flow.plotting import goes_figure
from tobac_flow.flow import Flow
from tobac_flow import legacy_flow as lf
from tobac_flow.dataset import get_datetime_from_coord, get_time_diff_from_coord, create_new_goes_ds, add_dataarray_to_ds, create_dataarray
from tobac_flow.detection import detect_growth_markers, edge_watershed
from tobac_flow.analysis import filter_labels_by_length, filter_labels_by_length_and_mask, apply_func_to_labels, apply_weighted_func_to_labels, get_label_stats, get_stats_for_labels, slice_label_da
from tobac_flow.validation import get_min_dist_for_objects, get_marker_distance
from tobac_flow.abi import get_abi_proj

# Filter some warnings because pyart doesn't work nicely with notebooks

import warnings
warnings.filterwarnings(
    action='ignore',
    category=DeprecationWarning
)

import warnings
warnings.filterwarnings(
    action='ignore',
    category=UserWarning
)


## You are using the Python ARM Radar Toolkit (Py-ART), an open source
## library for working with weather radar data. Py-ART is partly
## supported by the U.S. Department of Energy as part of the Atmospheric
## Radiation Measurement (ARM) Climate Research Facility, an Office of
## Science user facility.
##
## If you use this software to prepare a publication, please cite:
##
##     JJ Helmus and SM Collis, JORS 2016, doi: 10.5334/jors.119



Data_loading

In [2]:
# Load files
goes_data_path = '../data/GOES16'
start_date = datetime(2018,6,19,17)
hours = timedelta(hours=8)
dates = pd.date_range(start_date, start_date+hours, freq='H', closed='left').to_pydatetime()

abi_files = io.find_abi_files(dates, satellite=16, product='MCMIP', view='C', mode=[3,6], 
                              save_dir=goes_data_path, 
                              replicate_path=True, check_download=True, 
                              n_attempts=1, download_missing=True)

print(len(abi_files))

abi_pre_file = io.find_abi_files(start_date-timedelta(hours=1), satellite=16, product='MCMIP', view='C', mode=[3,6], 
                                 save_dir=goes_data_path, 
                                 replicate_path=True, check_download=True, 
                                 n_attempts=1, download_missing=True)

if len(abi_pre_file):
    abi_pre_file = abi_pre_file[-1]

abi_post_file = io.find_abi_files(start_date+hours, satellite=16, product='MCMIP', view='C', mode=[3,6], 
                                  save_dir=goes_data_path, 
                                  replicate_path=True, check_download=True, 
                                  n_attempts=1, download_missing=True)

if len(abi_post_file):
    abi_post_file = abi_post_file[0]

print (abi_pre_file, abi_post_file)

all_abi_files = [abi_pre_file] + abi_files + [abi_post_file]
# Test with some multichannel data
ds_slice = {'x':slice(1300,1600), 'y':slice(600,900)}
# Load a stack of goes datasets using xarray. Select a region over Northern Florida. (full file size in 1500x2500 pixels)
goes_ds = xr.open_mfdataset(all_abi_files, concat_dim='t', combine='nested').isel(ds_slice)

96
../data/GOES16/ABI-L2-MCMIPC/2018/170/16/OR_ABI-L2-MCMIPC-M3_G16_s20181701657246_e20181701700019_c20181701700123.nc ../data/GOES16/ABI-L2-MCMIPC/2018/171/01/OR_ABI-L2-MCMIPC-M3_G16_s20181710102248_e20181710105021_c20181710105133.nc


In [3]:
goes_ds.t

In [4]:
goes_ds.CMI_C13

Unnamed: 0,Array,Chunk
Bytes,35.28 MB,360.00 kB
Shape,"(98, 300, 300)","(1, 300, 300)"
Count,490 Tasks,98 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 35.28 MB 360.00 kB Shape (98, 300, 300) (1, 300, 300) Count 490 Tasks 98 Chunks Type float32 numpy.ndarray",300  300  98,

Unnamed: 0,Array,Chunk
Bytes,35.28 MB,360.00 kB
Shape,"(98, 300, 300)","(1, 300, 300)"
Count,490 Tasks,98 Chunks
Type,float32,numpy.ndarray


In [5]:
goes_ds.CMI_C13.load()

In [4]:
# Extract fields and load into memory
wvd = goes_ds.CMI_C08 - goes_ds.CMI_C10
try:
    wvd = wvd.compute()
except AttributeError:
    pass

bt = goes_ds.CMI_C13
try:
    bt = bt.compute()
except AttributeError:
    pass

swd = goes_ds.CMI_C13 - goes_ds.CMI_C15
try:
    swd = swd.compute()
except AttributeError:
    pass

In [6]:
# Use .load() instead of compute?
wvd = (goes_ds.CMI_C08 - goes_ds.CMI_C10).load()

bt = goes_ds.CMI_C13.load()

swd = (bt - goes_ds.CMI_C15.load())


In [7]:
wvd, bt, swd

(<xarray.DataArray (t: 98, y: 300, x: 300)>
 array([[[-18.475739, -18.39888 , -18.348984, ..., -19.14534 ,
          -19.07254 , -19.195679],
         [-18.39888 , -18.348984, -18.229889, ..., -19.310303,
          -19.379501, -19.398834],
         [-18.39888 , -18.322006, -18.195267, ..., -19.494583,
          -19.494583, -19.655945],
         ...,
         [-16.955994, -16.936676, -16.944305, ..., -18.847244,
          -18.989288, -19.04683 ],
         [-17.125   , -17.00589 , -17.078705, ..., -18.985245,
          -18.931747, -18.786133],
         [-16.948349, -17.028809, -17.036438, ..., -19.379822,
          -19.326355, -19.157806]],
 
        [[-18.617752, -18.39888 , -18.322006, ..., -19.12648 ,
          -19.04602 , -19.038864],
         [-18.39888 , -18.448761, -18.322006, ..., -19.379501,
          -19.45639 , -19.44513 ],
         [-18.314392, -18.41417 , -18.202911, ..., -19.571442,
          -19.571442, -19.59842 ],
 ...
         [-16.643097, -16.712326, -16.612534, ..., -

Introduce missing data for test

In [5]:
wvd[6,:50,:50] = np.nan
wvd[12:18,:50,:50] = np.nan
wvd[-3] = np.nan

In [6]:
# Check for missing data and DQF flags in any channels, propagate to all data
all_isnan = np.any([~np.isfinite(bt), ~np.isfinite(wvd), ~np.isfinite(swd)], 0)
all_DQF = np.any([goes_ds.DQF_C08, goes_ds.DQF_C10, goes_ds.DQF_C13, goes_ds.DQF_C15], 0)

bt.data[all_isnan] = np.nan
bt.data[all_DQF] = np.nan

wvd.data[all_isnan] = np.nan
wvd.data[all_DQF] = np.nan

swd.data[all_isnan] = np.nan
swd.data[all_DQF] = np.nan

In [7]:
np.sum(all_isnan)

107500

In [137]:
def get_stripe_deviation(da):
    y_mean = da.mean('y')
    y_std = da.std('y')
    return np.abs(((da-y_mean)/(y_std+1e-8)).mean('x'))

def load_mcmip(files, x0=None, x1=None, y0=None, y1=None, dtype=np.float32):
    ds_slice = {'x':slice(x0,x1), 'y':slice(y0,y1)}
    # Load a stack of goes datasets using xarray
    if len(files)>1:
        goes_ds = xr.open_mfdataset(files, concat_dim='t', combine='nested').isel(ds_slice)
    else:
        goes_ds = xr.open_dataset(files[0]).isel(ds_slice)
    
    # Extract fields and load into memory
    wvd = (goes_ds.CMI_C08 - goes_ds.CMI_C10).astype(dtype)
    try:
        wvd = wvd.compute()
    except AttributeError:
        pass

    bt = (goes_ds.CMI_C13).astype(dtype)
    try:
        bt = bt.compute()
    except AttributeError:
        pass

    swd = (goes_ds.CMI_C13 - goes_ds.CMI_C15).astype(dtype)
    try:
        swd = swd.compute()
    except AttributeError:
        pass
    
    # Check for missing data and DQF flags in any channels, propagate to all data
    all_isnan = np.any([~np.isfinite(bt), ~np.isfinite(wvd), ~np.isfinite(swd)], 0)
    all_DQF = np.any([goes_ds.DQF_C08, goes_ds.DQF_C10, goes_ds.DQF_C13, goes_ds.DQF_C15], 0)
    all_stripe = np.any([get_stripe_deviation(goes_ds.DQF_C08)>2,
                         get_stripe_deviation(goes_ds.DQF_C10)>2,
                         get_stripe_deviation(goes_ds.DQF_C13)>2,
                         get_stripe_deviation(goes_ds.DQF_C15)>2], 0)
    
    bt.data[all_isnan] = np.nan
    bt.data[all_DQF] = np.nan
    bt.data[all_stripe] = np.nan

    wvd.data[all_isnan] = np.nan
    wvd.data[all_DQF] = np.nan
    wvd.data[all_stripe] = np.nan

    swd.data[all_isnan] = np.nan
    swd.data[all_DQF] = np.nan
    swd.data[all_stripe] = np.nan
    
    return bt, wvd, swd

In [138]:
load_mcmip(all_abi_files[0:12], 1300, 1600, 600, 900, dtype=np.float16)

(<xarray.DataArray 'CMI_C13' (t: 12, y: 300, x: 300)>
 array([[[292. , 296.5, 296.2, ..., 298.5, 298. , 298. ],
         [292.8, 296. , 295.2, ..., 299.8, 299. , 298.2],
         [292.5, 295.5, 295.8, ..., 299.8, 299.5, 299.2],
         ...,
         [295.8, 295.8, 295.8, ..., 291.8, 291.8, 291.8],
         [295.8, 295.8, 295.8, ..., 292. , 291.5, 291. ],
         [295.8, 295.8, 295.8, ..., 293.8, 293.2, 292.5]],
 
        [[291.5, 295.8, 296.5, ..., 298.2, 297.5, 297. ],
         [291.8, 295.8, 295.5, ..., 299.5, 298.8, 298. ],
         [290.8, 295.8, 295.5, ..., 299.8, 299.2, 299. ],
         ...,
         [295.8, 295.8, 295.8, ..., 292.8, 292.8, 293. ],
         [295.8, 295.8, 295.8, ..., 291.2, 291.2, 291.2],
         [295.8, 295.8, 295.8, ..., 291.2, 290.8, 290.8]],
 
        [[292.2, 295.5, 296.2, ..., 297.8, 296. , 295.8],
         [291.5, 295.2, 296.2, ..., 299.2, 298.2, 297. ],
         [291.5, 294.5, 295.5, ..., 299.5, 299. , 298.5],
         ...,
 ...
         ...,
         

Now test inserting a layer of NaNs where we have missing time steps

In [9]:
bt.t.size

98

In [53]:
test_bt = bt.isel(t=list(range(84))+list(range(87,98)))
test_wvd = wvd.isel(t=list(range(84))+list(range(87,98)))
test_swd = swd.isel(t=list(range(84))+list(range(87,98)))

In [35]:
test_bt.shape

(95, 300, 300)

In [11]:
where_time_gap = np.diff(get_datetime_from_coord(test_bt.t))>timedelta(minutes=15)

In [12]:
np.where(where_time_gap)

(array([83]),)

In [28]:
(test_bt.t[83]+(test_bt.t[84]-test_bt.t[83])/2)

In [26]:
nan_slice_da = xr.full_like(test_bt.isel(t=slice(0,1)), np.nan)
nan_slice_da.t.data[0] = (test_bt.t[83]+(test_bt.t[84]-test_bt.t[83])/2).item()

In [27]:
nan_slice_da.t

In [43]:
def create_nan_slice(da, t_ind):
    nan_slice_da = xr.full_like(da.isel(t=slice(0,1)), np.nan)
    nan_slice_da.t.data[0] = (da.t[t_ind]+(da.t[t_ind+1]-da.t[t_ind])/2).item()
    return nan_slice_da

def fill_time_gap_nan(da, time_gap=timedelta(minutes=15)):
    where_time_gap = np.where(np.diff(get_datetime_from_coord(da.t))>time_gap)[0]
    
    concat_list = []
    last_t_ind = 0
    
    if where_time_gap.size > 0:
        for t_ind in where_time_gap:
            concat_list.append(da.isel(t=slice(last_t_ind, t_ind+1)))
            concat_list.append(create_nan_slice(da, t_ind))
            last_t_ind = t_ind+1
    
        concat_list.append(da.isel(t=slice(last_t_ind, None)))
        
        return xr.concat(concat_list, 't')
    else:
        return da

In [49]:
fill_time_gap_nan(test_bt)

In [44]:
np.unique(np.concatenate([test_bt.t[np.where(where_time_gap)[0]].data.astype('datetime64[h]'), 
                          test_bt.t[np.where(where_time_gap)[0]+1].data.astype('datetime64[h]')]))

array(['2018-06-19T23', '2018-06-20T00'], dtype='datetime64[h]')

In [139]:
def get_full_disk_for_time_gap(start_date, end_date, **io_kwargs):
    start_date = parse_date(start_date.astype('datetime64[s]').astype('str'))
    end_date = parse_date(end_date.astype('datetime64[s]').astype('str'))
    dates = pd.date_range(start_date, start_date+hours, freq='H').to_pydatetime()
#     io_kwargs["view"] = "F"
    F_files = io.find_abi_files(dates, **io_kwargs)#, satellite=16, product='MCMIP', view='F', mode=[3,4,6], 
#                                 save_dir=goes_data_path, 
#                                 replicate_path=True, check_download=True, 
#                                 n_attempts=1, download_missing=True)
    
    F_dates = [io.get_goes_date(i) for i in F_files]
    
    return [file for file,date in zip(F_files, F_dates) if date>start_date and date<end_date]

def fill_time_gap_full_disk(bt, wvd, swd, time_gap=timedelta(minutes=15), 
                            x0=None, x1=None, y0=None, y1=None,
                            **io_kwargs):
    where_time_gap = np.where(np.diff(get_datetime_from_coord(bt.t))>time_gap)[0]
    
    bt_concat_list = []
    wvd_concat_list = []
    swd_concat_list = []
    last_t_ind = 0
    
    if x0:
        x0 += 902
    else:
        x0 = 902
    if x1:
        x1 += 902
    else:
        x1 = 902+2500
    if y0:
        y0 += 422
    else:
        y0 = 422
    if y1:
        y1 += 422
    else:
        y1 = 422+1500
    
    if where_time_gap.size > 0:
        for t_ind in where_time_gap:
            full_disk_files = get_full_disk_for_time_gap(bt.t.data[t_ind], bt.t.data[t_ind+1], **io_kwargs)
            bt_concat_list.append(bt.isel(t=slice(last_t_ind, t_ind+1)))
            wvd_concat_list.append(wvd.isel(t=slice(last_t_ind, t_ind+1)))
            swd_concat_list.append(swd.isel(t=slice(last_t_ind, t_ind+1)))
            
            if len(full_disk_files) > 0:
                full_bt, full_wvd, full_swd = load_mcmip(full_disk_files, x0, x1, y0, y1, dtype=bt.dtype)
                
                bt_concat_list.append(full_bt)
                wvd_concat_list.append(full_wvd)
                swd_concat_list.append(full_swd)

            last_t_ind = t_ind+1
        
#         raise ValueError
        
        bt_concat_list.append(bt.isel(t=slice(last_t_ind, None)))
        wvd_concat_list.append(wvd.isel(t=slice(last_t_ind, None)))
        swd_concat_list.append(swd.isel(t=slice(last_t_ind, None)))
        
        return (xr.concat(bt_concat_list, 't', join="left"), 
                xr.concat(wvd_concat_list, 't', join="left"), 
                xr.concat(swd_concat_list, 't', join="left"))
    else:
        return bt, wvd, swd

In [107]:
get_full_disk_for_time_gap(test_bt.t.data[83], test_bt.t.data[84],
                           satellite=16, product='MCMIP', view='C', mode=[3,4,6], 
                           save_dir=goes_data_path, replicate_path=True, check_download=True, 
                           n_attempts=1, download_missing=True)

['../data/GOES16/ABI-L2-MCMIPF/2018/171/00/OR_ABI-L2-MCMIPF-M3_G16_s20181710000438_e20181710011217_c20181710011298.nc']

In [106]:
fill_time_gap_full_disk(test_bt, test_wvd, test_swd, x0=1300, x1=1600, y0=600, y1=900,
                        satellite=16, product='MCMIP', view='C', mode=[3,4,6], 
                        save_dir=goes_data_path, replicate_path=True, check_download=True, 
                        n_attempts=1, download_missing=True)

(<xarray.DataArray 'CMI_C13' (t: 96, y: 300, x: 300)>
 array([[[291.98578, 296.4719 , 296.22607, ..., 298.62274, 297.94675,
          298.06967],
         [292.66177, 296.04172, 295.36572, ..., 299.66745, 298.99146,
          298.25403],
         [292.47742, 295.61154, 295.7959 , ..., 299.8518 , 299.4831 ,
          299.23727],
         ...,
         [295.7959 , 295.7959 , 295.7959 , ..., 291.73996, 291.73996,
          291.86288],
         [295.7959 , 295.7959 , 295.85733, ..., 292.04724, 291.5556 ,
          291.064  ],
         [295.73444, 295.7959 , 295.85733, ..., 293.82938, 293.1534 ,
          292.41595]],
 
        [[291.4327 , 295.73444, 296.53333, ..., 298.25403, 297.39368,
          296.9635 ],
         [291.67853, 295.85733, 295.61154, ..., 299.42163, 298.8071 ,
          297.8853 ],
         [290.7567 , 295.85733, 295.48862, ..., 299.79034, 299.23727,
          298.99146],
 ...
         [295.67297, 295.36572, 295.1199 , ..., 295.98026, 295.85733,
          295.7959 ],
    

In [146]:
from tobac_flow.dataset import get_datetime_from_coord, create_dataarray, add_dataarray_to_ds
from tobac_flow.abi import get_abi_lat_lon, get_abi_pixel_area

def goes_dataloader(start_date, end_date, n_pad_files=1, 
                    x0=None, x1=None, y0=None, y1=None,
                    time_gap=timedelta(minutes=15), 
                    dtype=np.float32, return_new_ds=False, 
                    **io_kwargs):
    # Find ABI files
    dates = pd.date_range(start_date, end_date, freq='H', closed='left').to_pydatetime()

    abi_files = io.find_abi_files(dates, **io_kwargs)
    
    if n_pad_files > 0:
        pad_hours = int(np.ceil(n_pad_files/12))

        pre_dates = pd.date_range(start_date-timedelta(hours=pad_hours), start_date, 
                                  freq='H', closed='left').to_pydatetime()
        abi_pre_file = io.find_abi_files(pre_dates, **io_kwargs)
        if len(abi_pre_file):
            abi_pre_file = abi_pre_file[-n_pad_files:]

        post_dates = pd.date_range(end_date, end_date+timedelta(hours=pad_hours), 
                                  freq='H', closed='left').to_pydatetime()
        abi_post_file = io.find_abi_files(post_dates, **io_kwargs)
        if len(abi_post_file):
            abi_post_file = abi_post_file[:n_pad_files]

        all_abi_files = abi_pre_file + abi_files + abi_post_file
    
    else:
        all_abi_files = abi_files
    
    # Load ABI files
    bt, wvd, swd = load_mcmip(all_abi_files, x0, x1, y0, y1, dtype=dtype)
    
    # Fill any gaps:
    if io_kwargs["view"] == "M":
        io_kwargs["view"] = "C"
        bt, wvd, swd = fill_time_gap_full_disk(bt, wvd, swd, time_gap, x0, x1, y0, y1, **io_kwargs)
    
    if io_kwargs["view"] == "C":
        io_kwargs["view"] = "F"
        bt, wvd, swd = fill_time_gap_full_disk(bt, wvd, swd, time_gap, x0, x1, y0, y1, **io_kwargs)
    
    bt = fill_time_gap_nan(bt, time_gap)
    wvd = fill_time_gap_nan(wvd, time_gap)
    swd = fill_time_gap_nan(swd, time_gap)
    
    wvd.name = "WVD"
    wvd.attrs["standard_name"] = wvd.name
    wvd.attrs["long_name"] = "water vapour difference"
    wvd.attrs["units"] = "K"

    bt.name = "BT"
    bt.attrs["standard_name"] = bt.name
    bt.attrs["long_name"] = "brightness temperature"
    bt.attrs["units"] = "K"

    swd.name = "SWD"
    swd.attrs["standard_name"] = swd.name
    swd.attrs["long_name"] = "split window difference"
    swd.attrs["units"] = "K"

    if return_new_ds:
        goes_ds = xr.open_dataset(abi_files[0])

        goes_coords = {'t':bt.t, 'y':bt.y, 'x':bt.x,
                       'y_image':goes_ds.y_image, 'x_image':goes_ds.x_image}

        new_ds = xr.Dataset(coords=goes_coords)
        new_ds["goes_imager_projection"] = goes_ds.goes_imager_projection
        lat, lon = get_abi_lat_lon(new_ds)
        add_dataarray_to_ds(create_dataarray(lat, ('y', 'x'), 'lat', long_name="latitude", dtype=np.float32), new_ds)
        add_dataarray_to_ds(create_dataarray(lon, ('y', 'x'), 'lon', long_name="longitude", dtype=np.float32), new_ds)
        add_dataarray_to_ds(create_dataarray(get_abi_pixel_area(new_ds), ('y', 'x'), 'area',
                                             long_name="pixel area", units='km^2', dtype=np.float32), new_ds)

        return bt, wvd, swd, new_ds

    else:
        return bt, wvd, swd

In [147]:
goes_dataloader(start_date, start_date+hours, x0=1300, x1=1600, y0=600, y1=900,
                satellite=16, product='MCMIP', view='C', mode=[3,4,6], 
                save_dir=goes_data_path, replicate_path=True, check_download=True, 
                n_attempts=1, download_missing=True, dtype=np.float16, return_new_ds=True)

(<xarray.DataArray 'BT' (t: 98, y: 300, x: 300)>
 array([[[292. , 296.5, 296.2, ..., 298.5, 298. , 298. ],
         [292.8, 296. , 295.2, ..., 299.8, 299. , 298.2],
         [292.5, 295.5, 295.8, ..., 299.8, 299.5, 299.2],
         ...,
         [295.8, 295.8, 295.8, ..., 291.8, 291.8, 291.8],
         [295.8, 295.8, 295.8, ..., 292. , 291.5, 291. ],
         [295.8, 295.8, 295.8, ..., 293.8, 293.2, 292.5]],
 
        [[291.5, 295.8, 296.5, ..., 298.2, 297.5, 297. ],
         [291.8, 295.8, 295.5, ..., 299.5, 298.8, 298. ],
         [290.8, 295.8, 295.5, ..., 299.8, 299.2, 299. ],
         ...,
         [295.8, 295.8, 295.8, ..., 292.8, 292.8, 293. ],
         [295.8, 295.8, 295.8, ..., 291.2, 291.2, 291.2],
         [295.8, 295.8, 295.8, ..., 291.2, 290.8, 290.8]],
 
        [[292.2, 295.5, 296.2, ..., 297.8, 296. , 295.8],
         [291.5, 295.2, 296.2, ..., 299.2, 298.2, 297. ],
         [291.5, 294.5, 295.5, ..., 299.5, 299. , 298.5],
         ...,
 ...
         ...,
         [296.

In [11]:
pd.date_range(start_date, end_date, freq='H').to_pydatetime()

NameError: name 'end_date' is not defined