diff --git a/obspy/io/y/core.py b/obspy/io/y/core.py index 1c597c473c4..af3607c856e 100644 --- a/obspy/io/y/core.py +++ b/obspy/io/y/core.py @@ -12,6 +12,7 @@ unicode_literals) from future.builtins import * # NOQA +import warnings from struct import unpack import numpy as np @@ -23,6 +24,11 @@ from obspy.core.util import AttribDict +INVALID_CHAR_MSG = ( + "Invalid non-ASCII characters in Y file header detected (%s). " + "These were ignored.") + + def __parse_tag(fh): """ Reads and parses a single tag. @@ -56,6 +62,33 @@ def __parse_tag(fh): return endian, tag_type, next_tag, next_same +def _parse_tag_blankpadded_and_asciiz_parts(parts, blankpadded, asciiz): + blankpadded_dict = AttribDict() + for i, key in blankpadded.items(): + part = parts[i] + try: + part = part.decode('ascii', errors="strict") + except UnicodeError as e: + warnings.warn(INVALID_CHAR_MSG % str(e), UserWarning) + part = part.decode('ascii', errors="ignore") + blankpadded_dict[key] = part.strip() + asciiz_dict = AttribDict() + for i, key in asciiz.items(): + part = parts[i] + # strip everything from first ASCII NULL (if present) + # before decoding + term_index = part.find(b"\x00") + if term_index != -1: + part = part[:term_index] + try: + part = part.decode("ascii", errors="strict") + except UnicodeError as e: + warnings.warn(INVALID_CHAR_MSG % str(e), UserWarning) + part = part.decode("ascii", errors="ignore") + asciiz_dict[key] = part + return blankpadded_dict, asciiz_dict + + def _is_y(filename): """ Checks whether a file is a Nanometrics Y file or not. @@ -153,20 +186,23 @@ def _read_y(filename, headonly=False, **kwargs): # @UnusedVariable # UCHAR DataFormat[7] (ASCIIZ) # DataFormat is some text describing the data format recorded # at the station. - data = fh.read(next_tag) - parts = [p.decode('ascii', errors='ignore') for p in - unpack(b'5s2s3s51s61s31s51s7s', data[8:])] - trace.stats.station = parts[0].strip() - trace.stats.location = parts[1].strip() - trace.stats.channel = parts[2].strip() + parts = unpack(b'5s2s3s51s61s31s51s7s', fh.read(next_tag)[8:]) + blankpadded = { + 0: "station", + 1: "location", + 2: "channel"} # extra - params = AttribDict() - params.network_id = parts[3].rstrip('\x00') - params.side_name = parts[4].rstrip('\x00') - params.comment = parts[5].rstrip('\x00') - params.sensor_type = parts[6].rstrip('\x00') - params.data_format = parts[7].rstrip('\x00') - trace.stats.y.tag_station_info = params + asciiz = { + 3: "network_id", + 4: "side_name", + 5: "comment", + 6: "sensor_type", + 7: "data_format"} + blankpadded_dict, asciiz_dict = \ + _parse_tag_blankpadded_and_asciiz_parts( + parts, blankpadded, asciiz) + trace.stats.update(blankpadded_dict) + trace.stats.y.tag_station_info = asciiz_dict elif tag_type == 2: # TAG_STATION_LOCATION # UCHAR Update[8] @@ -235,15 +271,20 @@ def _read_y(filename, headonly=False, **kwargs): # @UnusedVariable trace.stats.sampling_rate = parts[4] # extra params = AttribDict() + blankpadded = {8: "chan_flags"} + asciiz = {6: "sens_units", + 7: "calib_units"} + blankpadded_dict, asciiz_dict = \ + _parse_tag_blankpadded_and_asciiz_parts( + parts, blankpadded, asciiz) + params.update(blankpadded_dict) + params.update(asciiz_dict) params.start_valid_time = parts[0] params.end_valid_time = parts[1] params.sensitivity = parts[2] params.sens_freq = parts[3] params.sample_rate = parts[4] params.max_clk_drift = parts[5] - params.sens_units = parts[6].rstrip(b'\x00').decode() - params.calib_units = parts[7].rstrip(b'\x00').decode() - params.chan_flags = parts[8].strip() params.update_flag = parts[9] trace.stats.y.tag_station_parameters = params elif tag_type == 4: @@ -289,14 +330,19 @@ def _read_y(filename, headonly=False, **kwargs): # @UnusedVariable trace.stats.starttime = UTCDateTime(parts[0]) count = parts[2] # extra + asciiz_dict = { + 6: "format", + 7: "format_version"} + _, asciiz_dict = \ + _parse_tag_blankpadded_and_asciiz_parts( + parts, {}, asciiz_dict) params = AttribDict() + params.update(asciiz_dict) params.endtime = UTCDateTime(parts[1]) params.num_samples = parts[2] params.dc_offset = parts[3] params.max_amplitude = parts[4] params.min_amplitude = parts[5] - params.format = parts[6].rstrip(b'\x00').decode() - params.format_version = parts[7].rstrip(b'\x00').decode() trace.stats.y.tag_series_info = params elif tag_type == 6: # TAG_SERIES_DATABASE