Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDS client: routine to add new data to archive #1413

Open
wants to merge 21 commits into
base: master
from

Conversation

Projects
None yet
3 participants
@megies
Copy link
Member

megies commented May 25, 2016

Hi
What do you think of an "import files to SDS" like
https://github.com/eost/clb-noise-analysis/blob/master/extract2sds.py

@megies

This comment has been minimized.

Copy link
Member

megies commented May 18, 2016

Actually, I worked on something like this today and a bit last week (extending the SDS client).. I'll push what I got tomorrow..

@megies

This comment has been minimized.

Copy link
Member

megies commented May 25, 2016

@bonaime, today is "tomorrow".. ;-)
See https://github.com/obspy/obspy/tree/sds_new_data, needs testing and as of now only use for sandboxes of course..

Also we'll probably at some point add a SDS Client subclass for MSEED-only, basing additions on record basis. Right now everything goes through a Stream object at some point, so additional MSEED record information is lost of course.

@sbonaime

This comment has been minimized.

Copy link
Contributor Author

sbonaime commented May 31, 2016

Could you add "details" option in add_data_to_archive to keep details information in the SDS archive ?

@megies

This comment has been minimized.

Copy link
Member

megies commented May 31, 2016

Not sure what you mean with "details".. if it's MSEED record details, that would need a different approach..

@megies megies added this to the 1.1.0 milestone Jun 2, 2016

@megies megies force-pushed the sds_new_data branch 2 times, most recently from fc2dd39 to b6c3b2f Jun 7, 2016

@megies megies changed the title Import in in obspy.clients.filesystem.sds SDS client: routine to add new data to archive Jun 7, 2016

@megies megies force-pushed the sds_new_data branch 3 times, most recently from 1b0b219 to 7dc213b Jun 7, 2016

@megies

This comment has been minimized.

Copy link
Member

megies commented Jun 14, 2016

Could you add "details" option in add_data_to_archive to keep details information in the SDS archive ?

Not sure what you mean with "details".. if it's MSEED record details, that would need a different approach..

@bonaime Ah, now I see what you meant.. details=True in read() https://github.com/eost/clb-noise-analysis/blob/master/extract2sds.py#L89
Hmm.. I don't think it makes a big difference when reading from and then writing to mseed whether it's w/ vs. w/o details=True option.. or does it? If you really wanted to keep mseed details you would have to do it record by record, I think?

megies added a commit that referenced this pull request Jun 14, 2016

small bugfix for SDS client, see test fail in feature branch
https://travis-ci.org/obspy/obspy/jobs/137294917#L717

that it didn't fail before is likely due to the (deterministic) order of test
execution by unittest, which might have changed due to the new tests in #1413

@megies megies force-pushed the sds_new_data branch from 7dc213b to 7030b0e Jun 14, 2016

@sbonaime

This comment has been minimized.

Copy link
Contributor Author

sbonaime commented Jun 14, 2016

@megies the idea is to keep, at least, the clock quality parameters in the SDS archive. I foreseen to use SDS client to create a SDS archive from files from different field tests. We can have the same details=False in add_data_to_archive than in read()

@megies

This comment has been minimized.

Copy link
Member

megies commented Jun 14, 2016

clock quality parameters

Those are record-based, right? I can't imaginge they really will be preserved correctly just by doing a read(.., details=True).write(.., format="MSEED"), or are they?

@sbonaime

This comment has been minimized.

Copy link
Contributor Author

sbonaime commented Jun 14, 2016

@megies I think they are

from obspy import read

a= read('G.IVI.00.LHZ.D.2016.022', details=True)

print a
3 Trace(s) in Stream:
G.IVI.00.LHZ | 2016-01-22T15:45:24.000000Z - 2016-01-22T15:56:46.000000Z | 1.0 Hz, 683 samples
G.IVI.00.LHZ | 2016-01-22T15:56:47.000001Z - 2016-01-22T16:04:53.000001Z | 1.0 Hz, 487 samples
G.IVI.00.LHZ | 2016-01-22T16:04:54.000004Z - 2016-01-23T00:01:18.000004Z | 1.0 Hz, 28585 samples

 print a[0].stats
         network: G
         station: IVI
        location: 00
         channel: LHZ
       starttime: 2016-01-22T15:45:24.000000Z
         endtime: 2016-01-22T15:56:46.000000Z
   sampling_rate: 1.0
           delta: 1.0
            npts: 683
           calib: 1.0
         _format: MSEED
           mseed: AttribDict({u'calibration_type': False, 'encoding': u'STEIM2', u'blkt1001': AttribDict({u'timing_quality': 0}), 'record_length': 512, 'filesize': 111104, u'dataquality': u'D', 'number_of_records': 217, 'byteorder': u'>'})


print a[1].stats
         network: G
         station: IVI
        location: 00
         channel: LHZ
       starttime: 2016-01-22T15:56:47.000001Z
         endtime: 2016-01-22T16:04:53.000001Z
   sampling_rate: 1.0
           delta: 1.0
            npts: 487
           calib: 1.0
         _format: MSEED
           mseed: AttribDict({u'calibration_type': False, 'encoding': u'STEIM2', u'blkt1001': AttribDict({u'timing_quality': 90}), 'record_length': 512, 'filesize': 111104, u'dataquality': u'D', 'number_of_records': 217, 'byteorder': u'>'})

In [54]: print a[2].stats
         network: G
         station: IVI
        location: 00
         channel: LHZ
       starttime: 2016-01-22T16:04:54.000004Z
         endtime: 2016-01-23T00:01:18.000004Z
   sampling_rate: 1.0
           delta: 1.0
            npts: 28585
           calib: 1.0
         _format: MSEED
           mseed: AttribDict({u'calibration_type': False, 'encoding': u'STEIM2', u'blkt1001': AttribDict({u'timing_quality': 100}), 'record_length': 512, 'filesize': 111104, u'dataquality': u'D', 'number_of_records': 217, 'byteorder': u'>'})

a.write('/tmp/toto.mseed',format="MSEED")

In [56]: c= read('/tmp/toto.mseed', details=True)

print c
3 Trace(s) in Stream:
G.IVI.00.LHZ | 2016-01-22T15:45:24.000000Z - 2016-01-22T15:56:46.000000Z | 1.0 Hz, 683 samples
G.IVI.00.LHZ | 2016-01-22T15:56:47.000001Z - 2016-01-22T16:04:53.000001Z | 1.0 Hz, 487 samples
G.IVI.00.LHZ | 2016-01-22T16:04:54.000004Z - 2016-01-23T00:01:18.000004Z | 1.0 Hz, 28585 samples

print c[0].stats
         network: G
         station: IVI
        location: 00
         channel: LHZ
       starttime: 2016-01-22T15:45:24.000000Z
         endtime: 2016-01-22T15:56:46.000000Z
   sampling_rate: 1.0
           delta: 1.0
            npts: 683
           calib: 1.0
         _format: MSEED
           mseed: AttribDict({u'calibration_type': False, 'encoding': u'STEIM2', u'blkt1001': AttribDict({u'timing_quality': 0}), 'record_length': 512, 'filesize': 111104, u'dataquality': u'D', 'number_of_records': 217, 'byteorder': u'>'})

print c[1].stats
         network: G
         station: IVI
        location: 00
         channel: LHZ
       starttime: 2016-01-22T15:56:47.000001Z
         endtime: 2016-01-22T16:04:53.000001Z
   sampling_rate: 1.0
           delta: 1.0
            npts: 487
           calib: 1.0
         _format: MSEED
           mseed: AttribDict({u'calibration_type': False, 'encoding': u'STEIM2', u'blkt1001': AttribDict({u'timing_quality': 90}), 'record_length': 512, 'filesize': 111104, u'dataquality': u'D', 'number_of_records': 217, 'byteorder': u'>'})

print c[2].stats
         network: G
         station: IVI
        location: 00
         channel: LHZ
       starttime: 2016-01-22T16:04:54.000004Z
         endtime: 2016-01-23T00:01:18.000004Z
   sampling_rate: 1.0
           delta: 1.0
            npts: 28585
           calib: 1.0
         _format: MSEED
           mseed: AttribDict({u'calibration_type': False, 'encoding': u'STEIM2', u'blkt1001': AttribDict({u'timing_quality': 100}), 'record_length': 512, 'filesize': 111104, u'dataquality': u'D', 'number_of_records': 217, 'byteorder': u'>'})

with details=False timing_quality equal 0

@megies

This comment has been minimized.

Copy link
Member

megies commented Jun 14, 2016

Well.. reading from miniseed, a trace has a timing_quality in trace.stats.mseed.blkt1001, which is a single value. Just guessing, but that's probably the value of timing quality of first record of the whole trace..

@megies megies force-pushed the sds_new_data branch from 7030b0e to 62617bf Jun 14, 2016

@megies

This comment has been minimized.

Copy link
Member

megies commented Jun 14, 2016

Ok, I've downloaded those data you used in the example and had a look myself. So with details=True a new trace is generated whenever the timing quality changes, otherwise the (gapless) file gets read into a single trace. So it really makes a difference (although it might lead to huge amount of trace objects, if the timing quality changes on a shorter time scale..).

@megies megies force-pushed the sds_new_data branch from 62617bf to 0108b22 Jun 14, 2016

@krischer

This comment has been minimized.

Copy link
Member

krischer commented Jun 18, 2016

Hmm....I think the details=True variant is okay but far from optimal for two reasons:

  • There is no reason to split it into several files just because the timing quality changes - they can well be in the same file.
  • There are lots of other flags and blockettes that are not preserved.

I think this should really happen at the record level but this would be a bit more work to implement :-( There is some record level splitting stuff here if it helps:

def download_and_split_mseed_bulk(client, client_name, chunks, logger):
"""
Downloads the channels of a list of stations in bulk, saves it to a
temporary folder and splits it at the record level to obtain the final
MiniSEED files.
The big advantage of this approach is that it does not mess with the
MiniSEED files at all. Each record, including all blockettes, will end
up in the final files as they are served from the data centers.
:param client: An active client instance.
:param client_name: The name of the client instance used for logging
purposes.
:param chunks: A list of tuples, each denoting a single MiniSEED chunk.
Each chunk is a tuple of network, station, location, channel,
starttime, endtime, and desired filename.
:param logger: An active logger instance.
"""
# Create a dictionary of channel ids, each containing a list of
# intervals, each of which will end up in a separate file.
filenames = collections.defaultdict(list)
for chunk in chunks:
filenames[tuple(chunk[:4])].append({
"starttime": chunk[4],
"endtime": chunk[5],
"filename": chunk[6],
"current_latest_endtime": None,
"sequence_number": None})
sequence_number = [0]
def get_filename(starttime, endtime, c):
"""
Helper function finding the corresponding filename in all filenames.
:param starttime: The start time of the record.
:param endtime: The end time of the record.
:param c: A list of candidates.
"""
# Make two passes. First find all candidates. This assumes that a
# record cannot be larger than a single desired time interval. This
# is probably always given except if somebody wants to download
# files split into 1 second intervals...
candidates = [
_i for _i in c if
(_i["starttime"] <= starttime <= _i["endtime"]) or
(_i["starttime"] <= endtime <= _i["endtime"])]
if not candidates:
return None
# If more then one candidate, apply some heuristics to find the
# correct time interval. The main complication arises when the same
# record is downloaded twice as it overlaps into two different
# requested time intervals.
if len(candidates) == 2:
candidates.sort(key=lambda x: x["starttime"])
first, second = candidates
# Make sure the assumptions about the type of overlap are correct.
if starttime > first["endtime"] or endtime < second["starttime"]:
raise NotImplementedError
# It must either be the last record of the first, or the first
# record of the second candidate.
if first["sequence_number"] is None and \
second["sequence_number"] is None:
candidates = [second]
# Unlikely to happen. Only if nothing but the very last record
# of the first interval was available and the second interval
# was first in the file.
elif first["sequence_number"] is None:
candidates = [first]
# This is fairly likely and requires an additional check with
# the latest time in the first interval.
elif second["sequence_number"] is None:
if starttime <= first["current_latest_endtime"]:
candidates = [second]
else:
candidates = [first]
# Neither are None. Just use the one with the higher sequence
# number. This probably does not happen. If it happens something
# else is a bit strange.
else:
if first["sequence_number"] > second["sequence_number"]:
candidates = [first]
else:
candidates = [second]
elif len(candidates) >= 2:
raise NotImplementedError
# Finally found the correct chunk
ret_val = candidates[0]
# Increment sequence number and make sure the current chunk is aware
# of it.
sequence_number[0] += 1
ret_val["sequence_number"] = sequence_number[0]
# Also write the time of the last chunk to it if necessary.
ce = ret_val["current_latest_endtime"]
if not ce or endtime > ce:
ret_val["current_latest_endtime"] = endtime
return ret_val["filename"]
# Only the filename is not needed for the actual data request.
bulk = [list(_i[:-1]) for _i in chunks]
original_bulk_length = len(bulk)
# Merge adjacent bulk-request for continuous downloads. This is a bit
# redundant after splitting it up before, but eases the logic in the
# other parts and puts less strain on the data centers' FDSN
# implementation. It furthermore avoid the repeated download of records
# that are part of two neighbouring time intervals.
bulk_channels = collections.defaultdict(list)
for b in bulk:
bulk_channels[(b[0], b[1], b[2], b[3])].append(b)
# Merge them.
for key, value in bulk_channels.items():
# Sort based on starttime.
value = sorted(value, key=lambda x: x[4])
# Merge adjacent.
cur_bulk = value[0:1]
for b in value[1:]:
# Random threshold of 2 seconds. Reasonable for most real world
# cases.
if b[4] <= cur_bulk[-1][5] + 2:
cur_bulk[-1][5] = b[5]
continue
cur_bulk.append(b)
bulk_channels[key] = cur_bulk
bulk = list(itertools.chain.from_iterable(bulk_channels.values()))
# Save first to a temporary file, then cut the file into separate files.
with NamedTemporaryFile() as tf:
temp_filename = tf.name
open_files = {}
client.get_waveforms_bulk(bulk, filename=temp_filename)
# If that succeeds, split the old file into multiple new ones.
file_size = os.path.getsize(temp_filename)
with open(temp_filename, "rb") as fh:
try:
while True:
if fh.tell() >= (file_size - 256):
break
info = get_record_information(fh)
channel_id = (info["network"], info["station"],
info["location"], info["channel"])
# Sometimes the services return something nobody wants...
if channel_id not in filenames:
fh.read(info["record_length"])
continue
# Get the best matching filename.
filename = get_filename(
starttime=info["starttime"], endtime=info["endtime"],
c=filenames[channel_id])
# Again sometimes there are time ranges nobody asked for...
if filename is None:
fh.read(info["record_length"])
continue
if filename not in open_files:
open_files[filename] = open(filename, "wb")
open_files[filename].write(fh.read(info["record_length"]))
finally:
for f in open_files.values():
try:
f.close()
except:
pass
logger.info("Client '%s' - Successfully downloaded %i channels (of %i)" % (
client_name, len(open_files), original_bulk_length))
return sorted(open_files.keys())

@megies

This comment has been minimized.

Copy link
Member

megies commented Jun 20, 2016

There is no reason to split it into several files just because the timing quality changes - they can well be in the same file.

The separate traces by details=True end up in the same file alright.. otherwise it wouldn't be SDS structure anymore.. ;-)

I think this should really happen at the record level but this would be a bit more work to implement :-(

Sure thing.. but for now better this than nothing..

@krischer

This comment has been minimized.

Copy link
Member

krischer commented Jun 20, 2016

The separate traces by details=True end up in the same file alright.. otherwise it wouldn't be SDS structure anymore.. ;-)

Ah ok...then I guess I don't really understand what details=True is doing for the SDS importer ;-)

I think this should really happen at the record level but this would be a bit more work to implement :-(

Sure thing.. but for now better this than nothing..

Definitely.

@krischer

This comment has been minimized.

Copy link
Member

krischer commented Jun 20, 2016

Ah ok...then I guess I don't really understand what details=True is doing for the SDS importer ;-)

Nvm. I got it.

@megies megies removed this from the 1.1.0 milestone Nov 7, 2016

@megies megies modified the milestones: 1.2.0, 1.1.0 Nov 7, 2016

@megies megies modified the milestones: 1.2.0, 1.3.0 Apr 19, 2018

@megies megies force-pushed the sds_new_data branch from 515fd8c to ba9798c Dec 21, 2018

megies added some commits May 13, 2016

sds: enhance Client._get_filenames()
Now also can be used to return filenames of non-existing files (as
opposed to globbing for existing files)
add method to extract data missing in archive from new data files
 - works in principle but needs more cleanup and docs
 - also needs tests
sds client, minor change in helper routine
when parsing a filename in SDS archive back to its components as a
dictionary: convert year and doy to int
sds client: helper method to get expected start/end time
..from filename (full path) in SDS archive
SDS: plot option for Client.add_data_to_archive()
to save before/after comparison for quick check of data import results
SDS client: return/print info on data added to archive
and some docstring improvements

@megies megies force-pushed the sds_new_data branch from ba9798c to 11fc75f Dec 21, 2018

@megies

This comment has been minimized.

Copy link
Member

megies commented Dec 21, 2018

Rebased and force-pushed since some stuff was already extracted and merged into master (the refactoring of obspy-scan internals)

@megies megies modified the milestones: 1.3.0, 2.0.0 Feb 15, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.