# example_batch_processing.ipynb
:auth: Nathan T. Stevens  
:email: ntsteven (at) uw.edu  
:org: Pacific Northwest Seismic Network  
:license: MIT (2023)  


:purpose:   
This notebook documents a brief example of batch machine learning processing on 10 minutes of data from the entire PNSN network surrounding the M4.3 in October 2023 near Port Townsend, WA.  

Note: this takes awhile to run as a Jupyter Notebook and is likely faster to run as a *.py script...  
          

In [1]:
# Import "standard" modules
import os
import sys
import pandas as pd
from obspy import read, Stream
from tqdm import tqdm
# Import repo-specific modules
sys.path.append('..')
import core.preprocessing as prep
import core.prediction as pred
import core.postprocessing as post

In [2]:
# Get the wfdisc.csv for test_dataset_1
wfdisc_file = os.path.join('..','data','test_dataset_1','wfdisc.csv')
df_wf = pd.read_csv(wfdisc_file,parse_dates=['time','endtime'],index_col=[0])
display(df_wf)

Unnamed: 0_level_0,sta,bandinst,time,endtime,samprate,dir,dfile,lddate
wfid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
0,ALCT,EN,2017-05-11T00:00:00,2017-05-11T01:00:00,100.0,../data/test_dataset_1/UW/ALCT,UW.ALCT..EN?.2017.131.00.mseed,2023-11-07 12:56:23.768562
1,ALCT,EN,2017-05-11T01:00:00,2017-05-11T02:00:00,100.0,../data/test_dataset_1/UW/ALCT,UW.ALCT..EN?.2017.131.01.mseed,2023-11-07 12:56:25.271797
2,ALCT,EN,2017-05-11T02:00:00,2017-05-11T03:00:00,100.0,../data/test_dataset_1/UW/ALCT,UW.ALCT..EN?.2017.131.02.mseed,2023-11-07 12:56:26.710484
3,ALCT,EN,2017-05-11T03:00:00,2017-05-11T04:00:00,100.0,../data/test_dataset_1/UW/ALCT,UW.ALCT..EN?.2017.131.03.mseed,2023-11-07 12:56:28.745366
4,ALCT,EN,2017-05-11T04:00:00,2017-05-11T05:00:00,100.0,../data/test_dataset_1/UW/ALCT,UW.ALCT..EN?.2017.131.04.mseed,2023-11-07 12:56:30.114213
...,...,...,...,...,...,...,...,...
1366,WISC,EN,2017-05-11T19:00:00,2017-05-11T20:00:00,100.0,../data/test_dataset_1/UW/WISC,UW.WISC..EN?.2017.131.19.mseed,2023-11-07 13:41:48.598317
1367,WISC,EN,2017-05-11T20:00:00,2017-05-11T21:00:00,100.0,../data/test_dataset_1/UW/WISC,UW.WISC..EN?.2017.131.20.mseed,2023-11-07 13:41:49.745060
1368,WISC,EN,2017-05-11T21:00:00,2017-05-11T22:00:00,100.0,../data/test_dataset_1/UW/WISC,UW.WISC..EN?.2017.131.21.mseed,2023-11-07 13:41:50.835724
1369,WISC,EN,2017-05-11T22:00:00,2017-05-11T23:00:00,100.0,../data/test_dataset_1/UW/WISC,UW.WISC..EN?.2017.131.22.mseed,2023-11-07 13:41:52.787448


In [3]:
# Load all data
stream = Stream()
for _i in tqdm(range(len(df_wf))):
    _S = df_wf.iloc[_i,:]
    stream += read(os.path.join(_S.dir, _S.dfile), fmt='MSEED')


  0%|          | 0/1371 [00:00<?, ?it/s]

100%|██████████| 1371/1371 [00:07<00:00, 174.34it/s]


In [4]:
# Load ML model - currently run with Metal Performance Shaders (mps) backend for Apple M1/M2 silicon
model, device = prep.initialize_EQT_model(device=pred.torch.device('mps'))

In [5]:
# Run preprocessing
# Split by unitue Net.Sta.Loc.BandInst codes
NSLBI_dict = prep.stream_to_NSLBI_dict(stream, merge_kwargs={'method': 1}, tqdm_disable=False)
display(NSLBI_dict)


100%|██████████| 82/82 [02:37<00:00,  1.93s/it]


{'UW.ALCT..EN?': 3 Trace(s) in Stream:
UW.ALCT..ENE | 2017-05-11T00:00:00.000000Z - 2017-05-12T00:00:00.000000Z | 100.0 Hz, 8640001 samples (masked)
UW.ALCT..ENN | 2017-05-11T00:00:00.000000Z - 2017-05-12T00:00:00.000000Z | 100.0 Hz, 8640001 samples (masked)
UW.ALCT..ENZ | 2017-05-11T00:00:00.000000Z - 2017-05-12T00:00:00.000000Z | 100.0 Hz, 8640001 samples (masked),
 'UW.ALKI..HN?': 3 Trace(s) in Stream:
UW.ALKI..HNE | 2017-05-11T00:00:00.000000Z - 2017-05-12T00:00:00.000000Z | 200.0 Hz, 17280001 samples (masked)
UW.ALKI..HNN | 2017-05-11T00:00:00.000000Z - 2017-05-12T00:00:00.000000Z | 200.0 Hz, 17280001 samples (masked)
UW.ALKI..HNZ | 2017-05-11T00:00:00.000000Z - 2017-05-12T00:00:00.000000Z | 200.0 Hz, 17280001 samples (masked),
 'UW.BABE..EN?': 3 Trace(s) in Stream:
UW.BABE..ENE | 2017-05-11T00:00:00.000000Z - 2017-05-12T00:00:00.000000Z | 100.0 Hz, 8640001 samples (masked)
UW.BABE..ENZ | 2017-05-11T00:00:00.000000Z - 2017-05-12T00:00:00.000000Z | 100.0 Hz, 8640001 samples (masked

In [6]:
# Resample & pad NSLBI streams
NSLBI_dict_h = prep.homogenize_NSLBI_dict(NSLBI_dict, trim_bound='max', tqdm_disable=False)
display(NSLBI_dict_h)

100%|██████████| 82/82 [01:52<00:00,  1.37s/it]


{'UW.ALCT..EN?': 3 Trace(s) in Stream:
UW.ALCT..ENE | 2017-05-11T00:00:00.000000Z - 2017-05-12T00:00:00.000000Z | 100.0 Hz, 8640001 samples (masked)
UW.ALCT..ENN | 2017-05-11T00:00:00.000000Z - 2017-05-12T00:00:00.000000Z | 100.0 Hz, 8640001 samples (masked)
UW.ALCT..ENZ | 2017-05-11T00:00:00.000000Z - 2017-05-12T00:00:00.000000Z | 100.0 Hz, 8640001 samples (masked),
 'UW.ALKI..HN?': 3 Trace(s) in Stream:
UW.ALKI..HNE | 2017-05-11T00:00:00.000000Z - 2017-05-11T23:59:59.990000Z | 100.0 Hz, 8640000 samples
UW.ALKI..HNN | 2017-05-11T00:00:00.000000Z - 2017-05-11T23:59:59.990000Z | 100.0 Hz, 8640000 samples
UW.ALKI..HNZ | 2017-05-11T00:00:00.000000Z - 2017-05-11T23:59:59.990000Z | 100.0 Hz, 8640000 samples,
 'UW.BABE..EN?': 3 Trace(s) in Stream:
UW.BABE..ENE | 2017-05-11T00:00:00.000000Z - 2017-05-12T00:00:00.000000Z | 100.0 Hz, 8640001 samples (masked)
UW.BABE..ENZ | 2017-05-11T00:00:00.000000Z - 2017-05-12T00:00:00.000000Z | 100.0 Hz, 8640001 samples (masked)
UW.BABE..ENN | 2017-05-11T00

In [8]:
# Form data windows and station-window-index
windows, swindex = prep.NSLBI_dict_to_windows(NSLBI_dict_h, model, tqdm_disable=False)

100%|██████████| 82/82 [06:02<00:00,  4.43s/it]


In [9]:
print(f'The shape of windows is {windows.shape}')

The shape of windows is (293910, 3, 6000)


In [10]:
# Set batch size based on (#cpu - 1)*2
# batch_size = (pred.torch.get_num_threads() - 1)*2
batch_size=1000
# Run prediction
preds = pred.run_batched_prediction(windows, model, device, batch_size=batch_size)

100%|██████████| 292/292 [01:08<00:00,  4.28it/s]


In [11]:
# Reassemble predictions into streams
pred_stream = post.reassemble_multistation_preds(preds, swindex, model, NSLBI_dict, tqdm_disable=False)

  stack[:, _w:_w+mdata] = merge_method([stack[:, _w:_w+mdata], _data],
100%|██████████| 82/82 [00:39<00:00,  2.05it/s]


In [12]:
print(pred_stream.__str__(extended=True))

239 Trace(s) in Stream:
UW.ALCT.EW.END | 2017-05-11T00:00:05.000000Z - 2017-05-11T23:59:47.990000Z | 100.0 Hz, 8638300 samples
UW.ALCT.EW.ENP | 2017-05-11T00:00:05.000000Z - 2017-05-11T23:59:47.990000Z | 100.0 Hz, 8638300 samples
UW.ALCT.EW.ENS | 2017-05-11T00:00:05.000000Z - 2017-05-11T23:59:47.990000Z | 100.0 Hz, 8638300 samples
UW.ALKI.EW.HND | 2017-05-11T00:00:05.000000Z - 2017-05-11T23:59:47.990000Z | 100.0 Hz, 8638300 samples
UW.ALKI.EW.HNP | 2017-05-11T00:00:05.000000Z - 2017-05-11T23:59:47.990000Z | 100.0 Hz, 8638300 samples
UW.ALKI.EW.HNS | 2017-05-11T00:00:05.000000Z - 2017-05-11T23:59:47.990000Z | 100.0 Hz, 8638300 samples
UW.BABE.EW.END | 2017-05-11T00:00:05.000000Z - 2017-05-11T23:59:47.990000Z | 100.0 Hz, 8638300 samples
UW.BABE.EW.ENP | 2017-05-11T00:00:05.000000Z - 2017-05-11T23:59:47.990000Z | 100.0 Hz, 8638300 samples
UW.BABE.EW.ENS | 2017-05-11T00:00:05.000000Z - 2017-05-11T23:59:47.990000Z | 100.0 Hz, 8638300 samples
UW.BEVT.EW.END | 2017-05-11T00:00:05.000000Z - 20

In [13]:
# Write prediction traces to disk
write_root = os.path.join('..','data','test_dataset_1')
for _k in tqdm(NSLBI_dict.keys()):
    _n, _s, _l, _bi = _k.split('.')
    _st = pred_stream.copy().select(network=_n, station=_s, channel=_bi)
    write_fpath = os.path.join(write_root,_n, _s)
    save_name = f'{_n}.{_s}.{_st[0].stats.location}.{_bi}.{_st[0].stats.starttime.year:d}.{_st[0].stats.starttime.julday:03d}.mseed'
    _save_fp = os.path.join(write_fpath,save_name)
    _st.write(_save_fp, fmt='MSEED')


A suitable encoding will be chosen.
100%|██████████| 82/82 [01:13<00:00,  1.12it/s]
