Skip to content

Using Stream2segment in your Python code

rizac edited this page Jun 19, 2023 · 22 revisions

This notebook assumes you have already run the download routine (s2s download -c <config_file.yaml> on the terminal) and you desire to run custom processing on the downloaded segments

Table of contents

Introduction

Stream2segment has two main functions for working with downloaded data, process and imap:

from stream2segment.process import process, imap
# you can type help(process) or help(imap) for documentation

Both functions run custom code implemented in a so-called processing function iteratively on a selection of downloaded waveform segments:

  • imap yields the output of the processing function for each selected segment
  • process writes the output of the processing function for each selected segment in a row of a chosen tabular file (HDF or CSV)

In this notebook, we will show how to work on downloaded segments via imap. An example of process is found in the Python module paramtable.py available in the same directory of this Notebook, if the latter has been created with the command s2s init

Note: The main goal of paramtable.py is to exemplify the case where a processing routine is run with a more "low level" approach from the terminal, without Jupyter: the module can be opened and modified easily following the documentation therein, and eventually run as script from the terminal:

python paramtable.py

In general, and especially for big data processing, this approach is the recommended way to process segments, as it avoids the unnecessary overhead of a Jupyter Notebook, which is designed for other purposes. In addition, imap and process have several options that are not designed to be used within a Notebook (e.g., run with parallel sub-processes for faster execution, show progress bar with the % of task done and the estimated reamining time)

Writing a custom processing routine with imap

Regardless of your environment or design choice, a processing routine always involves the same operations:

  • Connect to a database
  • Select which segments to fetch
  • Run a processing function on the selected segments

Database URL

DISCLAIMER: Pay attention when typing database URLs with passwords: do not commit or distribute them, try to avoid to type them on the terminal (or otherwise delete the command history)

In most cases, the database where the data has been downloaded and ready needs a simple string URL. For simplicity, a database URL is usually extracted from the download configuration file:

import yaml
# uncomment the line below using an existing file on your OS:
# with open('replace_with_your_download_config_path.yaml', 'r') as fpt:
#    dburl = yaml.safe_load(fpt)['dburl']

In this Notebook we will use the default example database (2 segments) available in the same directory of this file after executing the command s2s init. If you will have problem later accessing the database, check the current working directory and be sure the database is in it. A database URL can be written according to the RFC-1738 syntax:

import os
dbfile = os.path.join(os.getcwd(), 'example.db.sqlite')
assert os.path.isfile(dbfile), "Db file not found, please execute this notebook from within the db the directory"
dburl = 'sqlite:///' + dbfile
# print(dburl)

Selection of segments

The selection of suitable segments is performed by creating a dict mapping one or more Segment attributes to a selection expression for that attribute (for details on the segment object and its attributes, see the segment object)

# create the selection dict. This dict select a single segment (id=2) for illustrative purposes:
segments_selection = {
  'has_valid_data': 'true',
  'maxgap_numsamples': '[-0.5, 0.5]',
  'event_distance_deg': '[70, 80]'
  # other optional attributes (see cheatsheet below for details):
  # missing_data_sec: '<120'
  # missing_data_ratio: '<0.5'
  # id: '<300'
  # event.time: "(2014-01-01T00:00:00, 2014-12-31T23:59:59)"
  # event.latitude: "[24, 70]"
  # event.longitude: "[-11, 24]"
}

Processing function

A processing function has signature (arguments) (segment, config) where the first argument is a segment object and the second is a custom dictionary of argument that can be passed to imap and process (in this example, we will not provide any config, thus the second argument will not be used).

The function has no limitation on what can be implemented, you should only avoid to return the whole segment object as it might be detached (see next section for details), i.e. its attributes not accessible outside the processing function. However, you can safely return any of the segment attributes separately

def my_processing_function(segment, config):
    """simple processing function. Take the segment stream and remove its instrumental response"""
    # Get ObsPy Trace object. If the waveform has no gapos/overlaps, the trace is the only element
    # of the segment stream object (otherwise the stream will have several traces):
    trace = segment.stream()[0]
    # remove the instrumental response of the Trace:
    # get ObsPy Inventory object:
    inventory = segment.inventory()
    # remove the response:
    trace_remresp = trace.remove_response(inventory)  # see caveat below
    # return the segment.id, the event magnitude, the original trace and the trace with response removed
    return segment.id, segment.event.magnitude, segment.stream()[0], trace_remresp

Handling exceptions

Any exception raised in the processing function (or any function called in it) will interrupt the whole processing routine with one special case: raising stream2segment.process.SkipSegment will cause the execution to resume from the next segment.

SkipSegment is raised by default when the segment waveform data or inventory are malformed (i.e., when segment.stream() and segment.inventory() fail), but can be used also to skip a segment programmatically, e.g.:

from stream2segment.process import SkipSegment

def my_processing_function_raising(segment, config):
    print('what')
    if segment.sample_rate < 30: 
        raise SkipSegment("segment sample rate too low")
    # ... implement your code here

If a log file is given to imap or process (not shown here), then all segment skipped will be logged to file with their id and error message for later inspection

Complete processing routine with imap

We can now illustrate a usage of, e.g., imap. In this case, the output of our processing function is simply printed

from stream2segment.process import imap

for (segment_id, mag, trace, trace_remresp) in imap(my_processing_function, dburl, segments_selection):  
    print()
    print('Segment Id: %d (event magnitude: %.1f)' % (segment_id, mag))
    print('Segment trace (first three points):')
    print('  - Counts units (no response removed):    %s' % trace.data[:3])
    print('  - Physical units (response removed):     %s' % trace_remresp.data[:3])
Segment Id: 2 (event magnitude: 8.1)
Segment trace (first three points):
  - Counts units (no response removed):    [-1.42699395e-06 -1.43603990e-06 -1.42210201e-06]
  - Physical units (response removed):     [-1.42699395e-06 -1.43603990e-06 -1.42210201e-06]

Caveat: As you can see, the returned trace expected in count units is actually the same as the trace with response removed. This happened because many Stream and Trace methods (check ObsPy documentation in case of doubts) modify the objects in-place, i.e. they permanently modify the returned value of segment.stream() (which is cached in the segment object for performance reasons).

A solution is to call segment.stream(reload=True) that forces the complete reload of the stream from the database, or simply work on copies of those objects (which is generally faster), e.g.:

def my_processing_function(segment, config):
    """simple processing function. Take the segment stream and remove its instrumental response"""
    # Get ObsPy Trace object and make a COPY of IT:
    trace = segment.stream()[0].copy()

    # now, segment.stream()[0] and trace are two different objects.
    # By just running the same code as in the previous processing function
    # we will remove the response to `trace` and preserve `segment.stream()`
    # (but you can do also the other way around, depending on your needs)
    inventory = segment.inventory()
    trace_remresp = trace.remove_response(inventory)  # see caveat below
    return segment.id, segment.event.magnitude, segment.stream()[0], trace_remresp

# And now we get the expected result:
for (segment_id, mag, trace, trace_remresp) in imap(my_processing_function, dburl, segments_selection):
    print()
    print('Segment Id: %d (event magnitude: %.1f)' % (segment_id, mag))
    print('Segment trace (first three points):')
    print('  - Counts units (no response removed):    %s' % trace.data[:3])
    print('  - Physical units (response removed):     %s' % trace_remresp.data[:3])
Segment Id: 2 (event magnitude: 8.1)
Segment trace (first three points):
  - Counts units (no response removed):    [-314 -280 -251]
  - Physical units (response removed):     [-1.42699395e-06 -1.43603990e-06 -1.42210201e-06]

Iterating over selected segments

Although builtin functions imap and process should fit most of the user needs, if you need even more customization, a more low-level approach consists of simply work iteratively on the selected segments via get_segments. This is basically what the two builtin functions do under the hood with a given segments selection:

from stream2segment.process import get_segments

for seg in get_segments(dburl, segments_selection):
    # do your work here... In this case, with the first segment `seg` just loaded, simply exit the loop:
    break

get_segments (and consequently, imap and process using it) opens a database session, yields selected segments and closes the session afterwards. A database session is an object that establishes all conversations with the database and represents a "holding zone" for all the objects which you’ve loaded or associated with it during its lifespan.

Closing a session is recommended after you finished your work as it releases memory on the computer and (if the db is remote) on the server, avoiding potential problems. get_segments, imap and process do this automatically. Note that after a session is closed, all segment objects are detached from the database, which means we can not access anymore all of its properties, but only those previously loaded. E.g., accessing the segment related objects (e.g. the event object) outside the for loop, raises an error:

try:
    seg.event
except Exception as exc:
    print('ERROR: ' + str(exc))
ERROR: Parent instance <Segment at 0x1303d7460> is not bound to a Session; lazy load operation of attribute 'event' cannot proceed (Background on this error at: http://sqlalche.me/e/13/bhk3)

Accessing to the db (low level approach)

In very specific cases where you want to keep the segments and all related objects accessible (i.e. attached to a session) also outside a get_segments for-loop, you can then call get_segments with a session object instead of a db url.

Using the session object is in general for users experienced with the DB library employed by stream2segment.

For these cases, we will show hereafter some properties of the Segment object in more details

from stream2segment.process import get_session, Segment

session = get_session(dburl)

# query a random segment with a dummy filter just for illustrative purposes:
seg = session.query(Segment).filter(Segment.id < 10).first()

Backreferences (on related object's)

With the session still open, we can access some of the segments related objects (this will issue a query to the database):

evt = seg.event
print(str(evt))

sta = seg.station
print(str(sta))
Event
 attributes (16 of 16 loaded):
  event_id: 20170908_0 (str, 16 characters, showing first 10 only)
  time: 2017-09-08 04:49:21.200000 (datetime)
  latitude: 15.02 (float)
  longitude: -93.81 (float)
  depth_km: 72.0 (float)
  author: EMSC (str)
  catalog: EMSC-RTS (str)
  contributor: EMSC (str)
  contributor_id: 616600 (str)
  mag_type: mw (str)
  magnitude: 8.1 (float)
  mag_author: EMSC (str)
  event_location_name: OFFSHORE C (str, 24 characters, showing first 10 only)
  event_type: None (NoneType)
  webservice_id: 1 (int)
  id: 1 (int)
 related_objects (0 of 2 loaded):
  webservice
  segments
Station
 attributes (11 of 11 loaded):
  network: GE (str)
  station: RUE (str)
  latitude: 52.4759 (float)
  longitude: 13.78 (float)
  elevation: 40.0 (float)
  site_name: None (NoneType)
  start_time: 2012-03-21 10:00:00 (datetime)
  end_time: None (NoneType)
  inventory_xml: b'\x1f\x8b\x08\x00\xa4\x99\x1b\\\x02\xff' (bytes, 44710 elements, showing first 10 only)
  datacenter_id: 1 (int)
  id: 2 (int)
 related_objects (0 of 3 loaded):
  datacenter
  channels
  segments

Note that both objects have, among their related objects, a so-called back-reference segments. This is a list-like object of Segments (among which the original seg object) and not a single entity because of a so-called "many-to-one" relationship: one segment is always related to one event/station, whereas one event generates many segments at different station, and one station generates many segments for different events.

This extremely powerful feature connecting several entities effortless is a natural consequence of being backed by a relational database and it would be nearly impossible to get with a classical file system storage. Neverthless, it should be used with care with massive downloads, as one might load in the session huge amount of data, causing memory overflows. A solution might be to call from times to times session.expunge_all() which remove all object instances from the session (possibly freeing memory) or load only each object id, deferring the load of all other attributes from the database upon access, e.g.:

from sqlalchemy.orm import load_only

evt = seg.event
# load event related segments (*risk of memory overflow: low):
segments = evt.segments.options(load_only(Segment.id)).all()

cha = seg.channel
# load channel related segments (*risk of memory overflow: medium):
segments = cha.segments.options(load_only(Segment.id)).all()

sta = seg.station
# load station related segments (*risk of memory overflow: high):
segments = sta.segments.options(load_only(Segment.id)).all()

dct = seg.datacenter
# load data center (e.g. IRIS) related segments (*risk of memory overflow: very high):
segments = dct.segments.options(load_only(Segment.id)).all()

* The levels of risk reported are just heuristically estimated and have to be considered reliable only relative to each other (an event has almost certainly less related segments than a channel, which has almost certainly less related segments than a station, and so on)

In any case, for really memory consuming or long tasks, as already recommended, consider moving the Notebook code into a custom Python module and execute the module as a script or via use the command s2s process

# finally close session
from stream2segment.process import close_session
close_session(session)
# If close_session returns True, the session and the database connection are now closed.
# If you want to close the session only (freeing memory but keeping the session reusable later):
close_session(session, dispose_engine=False)
True