# Earthquake Detection Workflow

## Outline

Here we show an example of the current modules in QuakeFlow

1. Download data using Obpsy:

    [FDSN web service client for ObsPy](https://docs.obspy.org/packages/obspy.clients.fdsn.html#module-obspy.clients.fdsn)
    
    [Mass Downloader for FDSN Compliant Web Services](https://docs.obspy.org/packages/autogen/obspy.clients.fdsn.mass_downloader.html#module-obspy.clients.fdsn.mass_downloader)

2. PhaseNet for picking P/S phases

    Find more details in [PhaseNet github page](https://wayneweiqiang.github.io/PhaseNet/)

3. GaMMA for associating picking and estimate approximate location and magnitude

    Find more details in [GaMMA github page](https://wayneweiqiang.github.io/GMMA/)

4. Earthquake location, magnitude estimation, etc. (to be continued)


## 1. Install [miniconda](https://docs.conda.io/en/latest/miniconda.html) and download packages

<!-- # %%capture -->
```bash
git clone https://github.com/wayneweiqiang/PhaseNet.git
git clone https://github.com/wayneweiqiang/GMMA.git
conda env update -f=env.yml -n base
```

**Second option: install to quakeflow environment, but need to select jupyter notebook kernel to quakflow**
```bash
conda env create -f=env.yml -n quakeflow
python -m ipykernel install --user --name=quakeflow
```

In [1]:
import kfp
import kfp.dsl as dsl
import kfp.components as comp
from kfp.components import InputPath, OutputPath
import warnings
warnings.filterwarnings("ignore")

## 2. Set configurations

In [2]:
import os
import matplotlib
# matplotlib.use("agg")
import matplotlib.pyplot as plt

# region_name = "Ridgecrest_demo"
# region_name = "Ridgecrest_oneweek"
# region_name = "SaltonSea"
# region_name = "Ridgecrest"
# region_name = "SanSimeon"
# region_name = "Italy"
# region_name = "PNSN"
region_name = "Hawaii"
# region_name = "PuertoRico"
# region_name = "SmithValley"
# region_name = "Antilles"
dir_name = region_name
if not os.path.exists(dir_name):
    os.mkdir(dir_name)
root_dir = lambda x: os.path.join(dir_name, x)

run_local = False

In [3]:
def set_config(
    index_json: OutputPath("json"),
    config_json: OutputPath("json"),
    datetime_json: OutputPath("json"),
    num_parallel: int = 1,
) -> list:

    import obspy
    import datetime
    import json

    pi = 3.1415926
    degree2km = pi * 6371 / 180

    # region_name = "Ridgecrest_demo"
    # center = (-117.504, 35.705)
    # horizontal_degree = 1.0
    # vertical_degree = 1.0
    # starttime = obspy.UTCDateTime("2019-07-04T17")
    # endtime = obspy.UTCDateTime("2019-07-04T19")
    # client = "SCEDC"
    # network_list = ["CI"]
    # channel_list = "HH*,BH*,EH*,HN*"

    # region_name = "Ridgecrest_oneweek"
    # center = (-117.504, 35.705)
    # horizontal_degree = 1.0
    # vertical_degree = 1.0
    # starttime = obspy.UTCDateTime("2019-07-04T00")
    # endtime = obspy.UTCDateTime("2019-07-10T00")
    # client = "SCEDC"
    # network_list = ["CI"]
    # channel_list = "HH*,BH*,EH*,HN*"

    region_name = "Hawaii"
    center = (-155.32, 19.39)
    horizontal_degree = 2.0
    vertical_degree = 2.0
    starttime = obspy.UTCDateTime("2021-07-01T00")
    endtime = obspy.UTCDateTime("2021-11-01T00")
    client = "IRIS"
    network_list = ["HV", "PT"]
    channel_list = "HH*,BH*,EH*,HN*"

    # region_name = "PuertoRico"
    # center = (-66.5, 18)
    # horizontal_degree = 3.0
    # vertical_degree = 2.0
    # # starttime = obspy.UTCDateTime("2020-01-01T00")
    # # endtime = obspy.UTCDateTime("2020-01-05T05")
    # starttime = obspy.UTCDateTime("2018-01-01T00")
    # endtime = obspy.UTCDateTime("2018-01-01T06")
    # # endtime = obspy.UTCDateTime("2018-01-06T00")
    # # endtime = obspy.UTCDateTime("2021-08-01T00")
    # client = "IRIS"
    # network_list = ["*"]
    # channel_list = "HH*,BH*,HN*"

    # region_name = "SaltonSea"
    # center = (-115.53, 32.98)
    # horizontal_degree = 1.0
    # vertical_degree = 1.0
    # starttime = obspy.UTCDateTime("2020-10-01T00")
    # endtime = obspy.UTCDateTime("2020-10-01T02")
    # client = "SCEDC"
    # network_list = ["CI"]
    # channel_list = "HH*,BH*,EH*,HN*"

    # region_name = "2003SanSimeon"
    # center = (-121.101, 35.701)
    # horizontal_degree = 1.0
    # vertical_degree = 1.0
    # starttime = obspy.UTCDateTime("2003-12-22T00")
    # endtime = obspy.UTCDateTime("2003-12-24T00")
    # client = "NCEDC"
    # network_list = ["*"]
    # channel_list = "HH*,BH*,EH*,HN*"

    # region_name = "Italy"
    # center = (13.188, 42.723)
    # horizontal_degree = 1.0
    # vertical_degree = 1.0
    # starttime = obspy.UTCDateTime("2016-08-24T00")
    # endtime = obspy.UTCDateTime("2016-08-26T00")
    # client = "INGV"
    # network_list = ["*"]
    # channel_list = "HH*,BH*,EH*,HN*"

    # region_name = "SmithValley"
    # center = (-119.5, 38.51)
    # horizontal_degree = 1.0
    # vertical_degree = 1.0
    # starttime = obspy.UTCDateTime("2021-07-08T00:00")
    # endtime = obspy.UTCDateTime("2021-07-16T00:00")
    # client = "NCEDC"
    # network_list = ["*"]
    # channel_list = "HH*,BH*,EH*,HN*"

    # region_name = "Antilles"
    # center = (-61.14867, 14.79683)
    # horizontal_degree = 0.2
    # vertical_degree = 0.2
    # starttime = obspy.UTCDateTime("2021-04-10T00")
    # endtime = obspy.UTCDateTime("2021-04-15T00")
    # client = "RESIF"
    # network_list = ["*"]
    # channel_list = "HH*,BH*,EH*,HN*"

    ####### save config ########
    config = {}
    config["region"] = region_name
    config["center"] = center
    config["xlim_degree"] = [center[0] - horizontal_degree / 2, center[0] + horizontal_degree / 2]
    config["ylim_degree"] = [center[1] - vertical_degree / 2, center[1] + vertical_degree / 2]
    config["degree2km"] = degree2km
    config["starttime"] = starttime.datetime.isoformat()
    config["endtime"] = endtime.datetime.isoformat()
    config["networks"] = network_list
    config["channels"] = channel_list
    config["client"] = client

    one_day = datetime.timedelta(days=1)
    one_hour = datetime.timedelta(hours=1)
    starttimes = []
    tmp_start = starttime
    while tmp_start < endtime:
        starttimes.append(tmp_start.datetime.isoformat())
        tmp_start += one_hour

    with open(datetime_json, "w") as fp:
        json.dump({"starttimes": starttimes, "interval": one_hour.total_seconds()}, fp)

    if num_parallel == 0:
        num_parallel = min(24, len(starttimes))
        # num_parallel = min(60, len(starttimes)//12)
        # num_parallel = (len(starttimes)-1)//(24) + 1
        # num_parallel = (len(starttimes)-1)//(24*7) + 1
        # num_parallel = (len(starttimes)-1)//(24*10) + 1
        # num_parallel = (len(starttimes)-1)//(24*14) + 1
        # num_parallel = min(60, (len(starttimes)-1)//(24) + 1)
        print(f"num_parallel = {num_parallel}")
    config["num_parallel"] = num_parallel

    idx = [[] for i in range(num_parallel)]
    for i in range(len(starttimes)):
        idx[i - i // num_parallel * num_parallel].append(i)

    with open(config_json, 'w') as fp:
        json.dump(config, fp)
    with open(index_json, 'w') as fp:
        json.dump(idx, fp)

    return list(range(num_parallel))


In [4]:
if run_local:
    idx = set_config(root_dir("index.json"), root_dir("config.json"), root_dir("datetimes.json"), num_parallel=1)


In [5]:
config_op = comp.func_to_container_op(
    set_config,
    # base_image='zhuwq0/quakeflow-env:latest',
    base_image='python:3.8',
    packages_to_install=[
        "obspy",
    ],
)


## 3. Download events in the routine catalog

This catalog is not used by QuakeFolow. It is only used for comparing detection results.

In [6]:
def download_events(config_json: InputPath("json"), event_csv: OutputPath(str)):

    from obspy.clients.fdsn import Client
    from collections import defaultdict
    import pandas as pd
    import json
    import matplotlib
    #     matplotlib.use("agg")
    import matplotlib.pyplot as plt

    with open(config_json, "r") as fp:
        config = json.load(fp)

    ####### IRIS catalog ########
    try:
        events = Client(config["client"]).get_events(
            starttime=config["starttime"],
            endtime=config["endtime"],
            minlongitude=config["xlim_degree"][0],
            maxlongitude=config["xlim_degree"][1],
            minlatitude=config["ylim_degree"][0],
            maxlatitude=config["ylim_degree"][1],
        )
    except:
        events = Client("iris").get_events(
            starttime=config["starttime"],
            endtime=config["endtime"],
            minlongitude=config["xlim_degree"][0],
            maxlongitude=config["xlim_degree"][1],
            minlatitude=config["ylim_degree"][0],
            maxlatitude=config["ylim_degree"][1],
        )
    print(f"Number of events: {len(events)}")

    ####### Save catalog ########
    catalog = defaultdict(list)
    for event in events:
        if len(event.magnitudes) > 0:
            catalog["time"].append(event.origins[0].time.datetime)
            catalog["magnitude"].append(event.magnitudes[0].mag)
            catalog["longitude"].append(event.origins[0].longitude)
            catalog["latitude"].append(event.origins[0].latitude)
            catalog["depth(m)"].append(event.origins[0].depth)
    catalog = pd.DataFrame.from_dict(catalog).sort_values(["time"])
    catalog.to_csv(
        event_csv,
        sep="\t",
        index=False,
        float_format="%.3f",
        date_format='%Y-%m-%dT%H:%M:%S.%f',
        columns=["time", "magnitude", "longitude", "latitude", "depth(m)"],
    )

    ####### Plot catalog ########
    plt.figure()
    plt.plot(catalog["longitude"], catalog["latitude"], '.', markersize=1)
    plt.xlabel("Longitude")
    plt.ylabel("Latitude")
    plt.axis("scaled")
    plt.xlim(config["xlim_degree"])
    plt.ylim(config["ylim_degree"])
    #  plt.savefig(os.path.join(data_path, "events_loc.png"))
    plt.show()

    plt.figure()
    plt.plot_date(catalog["time"], catalog["magnitude"], '.', markersize=1)
    plt.gcf().autofmt_xdate()
    plt.ylabel("Magnitude")
    plt.title(f"Number of events: {len(events)}")
    #  plt.savefig(os.path.join(data_path, "events_mag_time.png"))
    plt.show()


In [7]:
if run_local:
    download_events(root_dir("config.json"), root_dir("events.csv"))


In [8]:
download_events_op = comp.func_to_container_op(
    download_events,
    #base_image='zhuwq0/quakeflow-env:latest',
    base_image='python:3.8',
    packages_to_install=[
        "obspy",
        "pandas",
        "matplotlib",
    ],
)


## 4. Download stations

In [9]:
def download_stations(config_json: InputPath("json"), station_csv: OutputPath(str), station_pkl: OutputPath("pickle")):

    import pickle
    from obspy.clients.fdsn import Client
    from collections import defaultdict
    import pandas as pd
    import json
    import matplotlib
    #     matplotlib.use("agg")
    import matplotlib.pyplot as plt

    with open(config_json, "r") as fp:
        config = json.load(fp)

    print("Network:", ",".join(config["networks"]))
    ####### Download stations ########
    stations = Client(config["client"]).get_stations(
        network=",".join(config["networks"]),
        station="*",
        starttime=config["starttime"],
        endtime=config["endtime"],
        minlongitude=config["xlim_degree"][0],
        maxlongitude=config["xlim_degree"][1],
        minlatitude=config["ylim_degree"][0],
        maxlatitude=config["ylim_degree"][1],
        channel=config["channels"],
        level="response",
    ) 
    print("Number of stations: {}".format(sum([len(x) for x in stations])))

    ####### Save stations ########
    station_locs = defaultdict(dict)
    for network in stations:
        for station in network:
            for chn in station:
                sid = f"{network.code}.{station.code}.{chn.location_code}.{chn.code[:-1]}"
                if sid in station_locs:
                    station_locs[sid]["component"] += f",{chn.code[-1]}"
                    station_locs[sid]["response"] += f",{chn.response.instrument_sensitivity.value:.2f}"
                else:
                    component = f"{chn.code[-1]}"
                    response = f"{chn.response.instrument_sensitivity.value:.2f}"
                    dtype = chn.response.instrument_sensitivity.input_units.lower()
                    tmp_dict = {}
                    tmp_dict["longitude"], tmp_dict["latitude"], tmp_dict["elevation(m)"] = (
                        chn.longitude,
                        chn.latitude,
                        chn.elevation,
                    )
                    tmp_dict["component"], tmp_dict["response"], tmp_dict["unit"] = component, response, dtype
                    station_locs[sid] = tmp_dict

    station_locs = pd.DataFrame.from_dict(station_locs, orient='index')
    station_locs.to_csv(
        station_csv,
        sep="\t",
        float_format="%.3f",
        index_label="station",
        columns=["longitude", "latitude", "elevation(m)", "unit", "component", "response"],
    )

    with open(station_pkl, "wb") as fp:
        pickle.dump(stations, fp)

    ######## Plot stations ########
    plt.figure()
    plt.plot(station_locs["longitude"], station_locs["latitude"], "^", label="Stations")
    plt.xlabel("X (km)")
    plt.ylabel("Y (km)")
    plt.axis("scaled")
    plt.xlim(config["xlim_degree"])
    plt.ylim(config["ylim_degree"])
    plt.legend()
    plt.title(f"Number of stations: {len(station_locs)}")
    #     plt.savefig(os.path.join(data_path, "stations_loc.png"))
    plt.show()


In [10]:
if run_local:
    download_stations(root_dir("config.json"), root_dir("stations.csv"), root_dir("stations.pkl"))


In [11]:
download_stations_op = comp.func_to_container_op(
    download_stations,
    # base_image='zhuwq0/quakeflow-env:latest',
    base_image='python:3.8',
    packages_to_install=[
        "obspy",
        "pandas",
        "matplotlib",
    ],
)


## 5. Download waveform data

In [12]:
def download_waveform(
    i: int,
    index_json: InputPath("json"),
    config_json: InputPath("json"),
    datetime_json: InputPath("json"),
    station_pkl: InputPath("pickle"),
    fname_csv: OutputPath(str),
    data_path: str,
    bucket_name: str = "waveforms",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
) -> str:

    import pickle, os
    import obspy
    from obspy.clients.fdsn import Client
    import time
    import json
    import random
    import threading

    lock = threading.Lock()

    upload_minio = False
    try:
        from minio import Minio

        minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        upload_minio = True
    except Exception as err:
        # print(f"ERROR: can not access minio service! \n{err}")
        pass

    with open(index_json, "r") as fp:
        index = json.load(fp)
    idx = index[i]
    with open(config_json, "r") as fp:
        config = json.load(fp)
    with open(datetime_json, "r") as fp:
        tmp = json.load(fp)
        starttimes = tmp["starttimes"]
        interval = tmp["interval"]
    with open(station_pkl, "rb") as fp:
        stations = pickle.load(fp)

    waveform_dir = os.path.join(data_path, config["region"], "waveforms")
    if not os.path.exists(waveform_dir):
        os.makedirs(waveform_dir)

    ####### Download data ########
    client = Client(config["client"])
    fname_list = ["fname"]

    def download(i):
        #     for i in idx:
        starttime = obspy.UTCDateTime(starttimes[i])
        endtime = starttime + interval
        fname = "{}.mseed".format(starttime.datetime.strftime("%Y-%m-%dT%H:%M:%S"))
        if not upload_minio:
            if os.path.exists(os.path.join(waveform_dir, fname)):
                print(f"{fname} exists")
                fname_list.append(fname)
                return
        else:
            try:
                minioClient.fget_object(bucket_name,  os.path.join(config['region'], fname), os.path.join(waveform_dir, fname))
                print(f"{bucket_name}/{os.path.join(config['region'], fname)} download to {os.path.join(waveform_dir, fname)}")
                fname_list.append(fname)
                return
            except Exception as err:
                print(err)

        max_retry = 10
        stream = obspy.Stream()
        print(f"{fname} download starts")
        num_sta = 0
        for network in stations:
            for station in network:
                print(f"********{network.code}.{station.code}********")
                retry = 0
                while retry < max_retry:
                    try:
                        tmp = client.get_waveforms(
                            network.code, station.code, "*", config["channels"], starttime, endtime
                        )
                        #  for trace in tmp:
                        #      if trace.stats.sampling_rate != 100:
                        #          print(trace)
                        #          trace = trace.interpolate(100, method="linear")
                        #      trace = trace.detrend("spline", order=2, dspline=5*trace.stats.sampling_rate)
                        #      stream.append(trace)
                        stream += tmp
                        num_sta += len(tmp)
                        break
                    except Exception as err:
                        print("Error {}.{}: {}".format(network.code, station.code, err))
                        message = "No data available for request."
                        if str(err)[: len(message)] == message:
                            break
                        retry += 1
                        time.sleep(5)
                        continue
                if retry == max_retry:
                    print(f"{fname}: MAX {max_retry} retries reached : {network.code}.{station.code}")

        # stream = stream.merge(fill_value=0)
        # stream = stream.trim(starttime, endtime, pad=True, fill_value=0)
        stream.write(os.path.join(waveform_dir, fname))
        print(f"{fname} download succeeds")
        if upload_minio:
            minioClient.fput_object(bucket_name, os.path.join(config['region'], fname), os.path.join(waveform_dir, fname))
            print(f"{fname} upload to minio {os.path.join(config['region'], fname)}")
        lock.acquire()
        fname_list.append(fname)
        lock.release()

    threads = []
    MAX_THREADS = 4
    for ii, i in enumerate(idx):
        t = threading.Thread(target=download, args=(i,))
        t.start()
        time.sleep(1)
        threads.append(t)
        if ii % MAX_THREADS == MAX_THREADS - 1:
            for t in threads:
                t.join()
            threads = []
    for t in threads:
        t.join()

    with open(fname_csv, "w") as fp:
        fp.write("\n".join(fname_list))

    return waveform_dir


In [13]:
if run_local:
    waveform_path = download_waveform(
        0,
        root_dir("index.json"),
        root_dir("config.json"),
        root_dir("datetimes.json"),
        root_dir("stations.pkl"),
        root_dir("fname.csv"),
        data_path=root_dir(""),
    )


In [14]:
download_waveform_op = comp.func_to_container_op(
    download_waveform,
    # base_image='zhuwq0/quakeflow-env:latest',
    base_image='python:3.8',
    packages_to_install=["obspy", "minio"],
)


In [15]:
def phasenet2gamma(
    i: int,
    index_json: InputPath("json"),
    config_json: InputPath("json"),
    station_csv: InputPath(str),
    data_path: str,
    data_list: InputPath(str),
    catalog_csv: OutputPath(str),
    picks_csv: OutputPath(str),
    bucket_name: str = "catalogs",
    s3_url: str = "localhost:9000",
    secure: bool = True,
) -> str:

    import json
    import os
    import obspy
    import pandas as pd
    import requests
    import numpy as np
    import multiprocessing
    from multiprocessing import dummy
    import time

    import threading

    lock = threading.Lock()

    # from requests.adapters import HTTPAdapter
    # from requests.packages.urllib3.util.retry import Retry
    # retry_strategy = Retry(
    #     total=10,
    #     status_forcelist=[104, 502],
    # )
    # adapter = HTTPAdapter(max_retries=retry_strategy)
    # http = requests.Session()
    # http.mount("http://", adapter)

    def convert_mseed(
        mseed, stations, sampling_rate=100, n_channel=3, dtype="float32", amplitude=True, remove_resp=True
    ):
        try:
            mseed = mseed.detrend("spline", order=2, dspline=5 * mseed[0].stats.sampling_rate)
        except:
            print(f"Error: spline detrend failed at file {mseed}")
            mseed = mseed.detrend("demean")
        mseed = mseed.merge(fill_value=0)
        starttime = min([st.stats.starttime for st in mseed])
        endtime = max([st.stats.endtime for st in mseed])
        # endtime = starttime + 60
        mseed = mseed.trim(starttime, endtime, pad=True, fill_value=0)

        for i in range(len(mseed)):
            if mseed[i].stats.sampling_rate != sampling_rate:
                # print(f"Resampling {mseed[i].id} from {mseed[i].stats.sampling_rate} to {sampling_rate} Hz")
                mseed[i] = mseed[i].interpolate(sampling_rate, method="linear")

        order = ['3', '2', '1', 'E', 'N', 'Z']
        order = {key: i for i, key in enumerate(order)}
        comp2idx = {"3": 0, "2": 1, "1": 2, "E": 0, "N": 1, "Z": 2}

        nsta = len(stations)
        nt = max(len(mseed[i].data) for i in range(len(mseed)))
        data = []
        station_id = []
        t0 = []
        for i in range(nsta):
            trace_data = np.zeros([nt, n_channel], dtype=dtype)
            empty_station = True
            # sta = stations.iloc[i]["station"]
            # sta = stations.index[i]
            sta = stations.iloc[i]["id"]
            comp = stations.iloc[i]["component"].split(",")
            if remove_resp:
                resp = stations.iloc[i]["response"].split(",")
                # resp = station_locs.iloc[i]["response"]

            for j, c in enumerate(sorted(comp, key=lambda x: order[x[-1]])):
                resp_j = float(resp[j])
                if len(comp) != 3:  ## less than 3 component
                    j = comp2idx[c]
                if len(mseed.select(id=sta + c)) == 0:
                    # print(f"Empty trace: {sta+c} {starttime}")
                    continue
                else:
                    empty_station = False

                tmp = mseed.select(id=sta + c)[0].data.astype(dtype)
                trace_data[: len(tmp), j] = tmp[:nt]

                if stations.iloc[i]["unit"] == "m/s**2":
                    tmp = mseed.select(id=sta + c)[0]
                    tmp = tmp.integrate()
                    tmp = tmp.filter("highpass", freq=1.0)
                    tmp = tmp.data.astype(dtype)
                    trace_data[: len(tmp), j] = tmp[:nt]
                elif stations.iloc[i]["unit"] == "m/s":
                    tmp = mseed.select(id=sta + c)[0].data.astype(dtype)
                    trace_data[: len(tmp), j] = tmp[:nt]
                else:
                    print(
                        f"Error in {stations.iloc[i]['station']}\n{stations.iloc[i]['unit']} should be m/s**2 or m/s!"
                    )

                if remove_resp:
                    trace_data[:, j] /= resp_j

            if not empty_station:
                data.append(trace_data)
                station_id.append(sta)
                t0.append(starttime.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3])

        data = np.stack(data)

        meta = {"data": data, "t0": t0, "station_id": station_id}

        return meta

    # PHASENET_API_URL = "http://phasenet.quakeflow.com"
    # GAMMA_API_URL = "http://gamma.quakeflow.com"
    PHASENET_API_URL = "http://phasenet-api.default.svc.cluster.local:8000"
    GAMMA_API_URL = "http://gamma-api.default.svc.cluster.local:8001"

    ## read config
    with open(index_json, "r") as fp:
        index = json.load(fp)
    idx = index[i]

    with open(config_json, "r") as fp:
        config = json.load(fp)

    ## read stations
    stations = pd.read_csv(station_csv, delimiter="\t")
    stations = stations.rename(columns={"station": "id"})

    ## read meed list
    data_list = pd.read_csv(data_list)

    manager = multiprocessing.Manager()
    # catalog_list = manager.list()
    # picks_list = manager.list()
    catalog_list = []
    picks_list = []

    for fname in data_list['fname']:
        # def process(fname):
        # print(f"Process {fname}\n")
        mseed = obspy.read(os.path.join(data_path, fname))
        meta = convert_mseed(mseed, stations)

        batch = 2
        # phasenet_picks = manager.list()
        phasenet_picks = []

        # for j in range(0, len(meta["station_id"]), batch):
        def run_phasenet(j):
            req = {
                "id": meta['station_id'][j : j + batch],
                "timestamp": meta["t0"][j : j + batch],
                "vec": meta["data"][j : j + batch].tolist(),
            }
            # resp = requests.post(f'{PHASENET_API_URL}/predict', json=req)
            # phasenet_picks.extend(resp.json())
            counts = 0
            while True:
                try:
                    resp = requests.post(f'{PHASENET_API_URL}/predict', json=req, timeout=30)
                    lock.acquire()
                    phasenet_picks.extend(resp.json())
                    lock.release()
                    break
                except Exception as e:
                    if counts > 30:
                        print(f"Error PhaseNet on {fname} at {j} trace: {e}")
                        break
                    print(f"Retry PhaseNet on {fname} at {j} trace")
                    # time.sleep(3)
                    counts += 1

        time_prev = time.time()
        print(f"PhaseNet start on {fname}...")
        threads = []
        MAX_THREADS = 4
        # for ii, i in enumerate(idx):
        for ii, j in enumerate(range(0, len(meta["station_id"]), batch)):
            t = threading.Thread(target=run_phasenet, args=(j,))
            t.start()
            time.sleep(3)
            threads.append(t)
            if ii % MAX_THREADS == MAX_THREADS - 1:
                for t in threads:
                    t.join()
                threads = []
        for t in threads:
            t.join()

        print(f"PhaseNet finish on {fname} using {time.time()-time_prev}s")

        # print(f"PhaseNet on {fname}...")
        # # with multiprocessing.dummy.Pool(processes=min(8, (len(meta["station_id"])-1)//batch+1)) as pool:
        # with multiprocessing.dummy.Pool(processes=min(8, (len(meta["station_id"])-1)//batch+1)) as pool:
        #     pool.map(run_phasenet, range(0, len(meta["station_id"]), batch))
        # phasenet_picks = list(phasenet_picks)

        time_prev = time.time()
        print(f"GaMMA start on {fname}...")
        counts = 0
        while True:
            try:
                resp = requests.post(
                    f'{GAMMA_API_URL}/predict',
                    json={"picks": phasenet_picks, "stations": stations.to_dict(orient="records"), "config": config},
                )
                result = resp.json()
                catalog_gamma = result["catalog"]
                for c in catalog_gamma:
                    c["file_idx"] = fname.split("/")[-1]
                picks_gamma = result["picks"]
                for c in picks_gamma:
                    c["file_idx"] = fname.split("/")[-1]
                break
            except Exception as e:
                if counts > 30:
                    print(f"Error in GaMMA on {fname}: {e}")
                    break
                print(f"Retry GaMMA on {fname}")
                time.sleep(3)
                counts += 1

        catalog_list.extend(catalog_gamma)
        picks_list.extend(picks_gamma)

        print(f"GaMMA finish on {fname} using {time.time()-time_prev}s")

    # with multiprocessing.dummy.Pool(processes=min(12, len(data_list["fname"]))) as pool:
    #     pool.map(process, data_list["fname"])

    catalog_df = pd.DataFrame(list(catalog_list))
    picks_df = pd.DataFrame(list(picks_list))
    manager.shutdown()
    if len(catalog_df) > 0:
        print("GaMMA catalog:")
        print(
            catalog_df[
                ["time", "latitude", "longitude", "depth(m)", "magnitude", "covariance", "event_idx", "file_idx"]
            ]
        )
    if len(picks_df) > 0:
        print("GaMMA association:")
        print(picks_df)

    with open(catalog_csv, 'w') as fp:
        catalog_df.to_csv(
            fp,
            sep="\t",
            index=False,
            float_format="%.3f",
            date_format='%Y-%m-%dT%H:%M:%S.%f',
            columns=["time", "magnitude", "longitude", "latitude", "depth(m)", "covariance", "event_idx", "file_idx"],
        )

    with open(picks_csv, 'w') as fp:
        picks_df.to_csv(
            fp,
            sep="\t",
            index=False,
            date_format='%Y-%m-%dT%H:%M:%S.%f',
            columns=["id", "timestamp", "type", "prob", "amp", "prob_gmma", "event_idx", "file_idx"],
        )

    ## upload to s3 bucket
    try:
        from minio import Minio

        catalog_dir = os.path.join("/tmp/", bucket_name)
        if not os.path.exists(catalog_dir):
            os.makedirs(catalog_dir)
        minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)

        with open(os.path.join(catalog_dir, f"catalog_{idx[0]:04d}.csv"), 'w') as fp:
            catalog_df.to_csv(
                fp,
                sep="\t",
                index=False,
                float_format="%.3f",
                date_format='%Y-%m-%dT%H:%M:%S.%f',
                columns=[
                    "time",
                    "magnitude",
                    "longitude",
                    "latitude",
                    "depth(m)",
                    "covariance",
                    "event_idx",
                    "file_idx",
                ],
            )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/catalog_{idx[0]:04d}.csv",
            os.path.join(catalog_dir, f"catalog_{idx[0]:04d}.csv"),
        )

        with open(os.path.join(catalog_dir, f"picks_{idx[0]:04d}.csv"), 'w') as fp:
            picks_df.to_csv(
                fp,
                sep="\t",
                index=False,
                date_format='%Y-%m-%dT%H:%M:%S.%f',
                columns=["id", "timestamp", "type", "prob", "amp", "prob_gmma", "event_idx", "file_idx"],
            )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/picks_{idx[0]:04d}.csv",
            os.path.join(catalog_dir, f"picks_{idx[0]:04d}.csv"),
        )

    except Exception as err:
        # print(f"ERROR: can not access minio service! \n{err}")
        pass

    return f"catalog_{idx[0]:04d}.csv"


In [16]:
if run_local:
    phasenet2gamma(
        0, 
        root_dir("index.json"),
        root_dir("config.json"),
        root_dir("stations.csv"),
        root_dir(root_dir('waveforms')),
        root_dir('fname.csv'),
        root_dir("catalog.csv"),
        root_dir("picks.csv"),
    )

In [17]:
phasenet2gamma_op = comp.func_to_container_op(
    phasenet2gamma,
    # base_image='zhuwq0/quakeflow-env:latest',
    base_image='python:3.8',
    packages_to_install=["pandas", "numpy", "tqdm", "minio", "obspy"],
)

## 8. Plot catalogs

In [18]:
if run_local:
    %run plot_catalog.ipynb

## 9. Parallel processing on cloud

Only run this section for parallel jobs on cloud. Setting cloud environment is needed.

In [19]:
def merge_catalog(
    config_json: InputPath("json"),
    catalog_csv: OutputPath(str),
    picks_csv: OutputPath(str),
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):

    import pandas as pd
    from glob import glob
    import os
    import json

    from minio import Minio

    minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)

    with open(config_json, "r") as fp:
        config = json.load(fp)

    objects = minioClient.list_objects(bucket_name, prefix=config["region"], recursive=True)

    tmp_path = lambda x: os.path.join("/tmp/", x)
    for obj in objects:
        print(obj._object_name)
        minioClient.fget_object(bucket_name, obj._object_name, tmp_path(obj._object_name.split("/")[-1]))

    files_catalog = sorted(glob(tmp_path("catalog_*.csv")))
    files_picks = sorted(glob(tmp_path("picks_*.csv")))

    if len(files_catalog) > 0:
        catalog_list = []
        for f in files_catalog:
            tmp = pd.read_csv(f, sep="\t", dtype=str)
            catalog_list.append(tmp)
        merged_catalog = pd.concat(catalog_list).sort_values(by="time")
        merged_catalog.to_csv(tmp_path("merged_catalog.csv"), sep="\t", index=False)
        minioClient.fput_object(bucket_name, f"{config['region']}/merged_catalog.csv", tmp_path("merged_catalog.csv"))

        pick_list = []
        for f in files_picks:
            tmp = pd.read_csv(f, sep="\t", dtype=str)
            pick_list.append(tmp)
        merged_picks = pd.concat(pick_list).sort_values(by="timestamp")
        merged_picks.to_csv(tmp_path("merged_picks.csv"), sep="\t", index=False)
        minioClient.fput_object(bucket_name, f"{config['region']}/merged_picks.csv", tmp_path("merged_picks.csv"))

        with open(catalog_csv, "w") as fout:
            with open(tmp_path("merged_catalog.csv"), "r") as fin:
                for line in fin:
                    fout.write(line)
        with open(picks_csv, "w") as fout:
            with open(tmp_path("merged_picks.csv"), "r") as fin:
                for line in fin:
                    fout.write(line)
    else:
        with open(catalog_csv, "w") as fout:
            pass
        print("No catalog.csv found!")
        with open(picks_csv, "w") as fout:
            pass
        print("No picks.csv found!")


In [20]:
# if run_local
# combine_catalog(root_dir("config.json"), root_dir("combined_catalog.csv"), root_dir("combined_picks.csv"), bucket_name="catalogs", s3_url="localhost:9000", secure=False)


In [21]:
merge_op = comp.func_to_container_op(
    merge_catalog,
    # base_image='zhuwq0/quakeflow-env:latest',
    base_image='python:3.8',
    packages_to_install=["pandas", "minio"],
)


In [22]:
## Define QuakeFlow pipeline
@dsl.pipeline(name='QuakeFlow', description='')
def quakeflow_pipeline(
    data_path: str = "/tmp/",
    num_parallel=0,
    bucket_catalog: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = False,
):

    config = config_op(num_parallel)

    events = download_events_op(config.outputs["config_json"])

    stations = download_stations_op(config.outputs["config_json"])

    with kfp.dsl.ParallelFor(config.outputs["output"]) as i:

        vop_ = dsl.VolumeOp(
            name=f"Create volume", resource_name=f"data-volume-{str(i)}", size="20Gi", modes=dsl.VOLUME_MODE_RWO
        ).set_retry(3)

        download_op_ = (
            download_waveform_op(
                i,
                config.outputs["index_json"],
                config.outputs["config_json"],
                config.outputs["datetime_json"],
                stations.outputs["station_pkl"],
                data_path=data_path,
                bucket_name=f"waveforms",
                # s3_url=s3_url,
                s3_url="quakeflow-minio.default.svc.cluster.local:9000",
                secure=secure,
            )
            .add_pvolumes({data_path: vop_.volume})
            .set_cpu_request("860m") #2CPU per node
            .set_retry(3)
            .set_display_name('Download Waveforms')
        )
        download_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"

        phasenet2gamma_op_ = (
            phasenet2gamma_op(
                i,
                config.outputs["index_json"],
                config.outputs["config_json"],
                station_csv=stations.outputs["station_csv"],
                data_path=download_op_.outputs["Output"],
                data_list=download_op_.outputs["fname_csv"],
                bucket_name=f"catalogs",
                s3_url=s3_url,
                secure=secure,
            ).add_pvolumes({data_path: download_op_.pvolume})
            .set_cpu_request("860m")
            # .set_memory_request("2500M") #6GB per node
            .set_display_name('PhaseNet + GaMMA')
        )
        phasenet2gamma_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"

    merge_op_ = merge_op(config.outputs["config_json"], bucket_name=f"catalogs", s3_url=s3_url, secure=secure).after(
        phasenet2gamma_op_
    )
    merge_op_.execution_options.caching_strategy.max_cache_staleness = "P0D"

    vop_.delete().after(merge_op_)


```
helm install quakeflow-minio --set accessKey.password=minio --set secretKey.password=minio123 --set persistence.size=1T  bitnami/minio
```

In [23]:
import os

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/weiqiang/.dotbot/cloud/quakeflow_zhuwq.json"
experiment_name = 'QuakeFlow'
pipeline_func = quakeflow_pipeline
run_name = pipeline_func.__name__ + '_run'

arguments = {
    "data_path": "/tmp",
    "num_parallel": 0,
    "bucket_catalog": "catalogs",
    "s3_url": "minio-service:9000",
    #  "s3_url": "quakeflow-minio.default.svc.cluster.local:9000",
    "secure": False,
}

if not run_local:
    client = kfp.Client(host="66bece58bfa6ae5b-dot-us-west1.pipelines.googleusercontent.com")
    kfp.compiler.Compiler().compile(pipeline_func, '{}.zip'.format(experiment_name))
    results = client.create_run_from_pipeline_func(
        pipeline_func, experiment_name=experiment_name, run_name=run_name, arguments=arguments
    )




MaxRetryError: HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries exceeded with url: https://accounts.google.com/InteractiveLogin?continue=https://us-west1.notebooks.cloud.google.com/_signin?continue%3Dhttps%253A%252F%252F66bece58bfa6ae5b-dot-us-west1.pipelines.googleusercontent.com%252Fapis%252Fv1beta1%252Fhealthz%26endpoint%3D66bece58bfa6ae5b&ifkv=ASSHykqaLcG5-hRdvW4jr1MawpLoWrtuKO68ICv0lj-ULtFBDMWYvC3uMhA-ChYQGK2JhK4Uxlmbxg (Caused by ResponseError('too many redirects'))