Skip to content

Commit

Permalink
Y: parse ASCIIZ fields better
Browse files Browse the repository at this point in the history
 - strip everything after ASCII NULL character before decoding
 - ignore all non-ascii decodable bytes, but show warnings
  • Loading branch information
megies committed Sep 16, 2015
1 parent b1300e2 commit 01eb11a
Showing 1 changed file with 64 additions and 18 deletions.
82 changes: 64 additions & 18 deletions obspy/io/y/core.py
Expand Up @@ -12,6 +12,7 @@
unicode_literals)
from future.builtins import * # NOQA

import warnings
from struct import unpack

import numpy as np
Expand All @@ -23,6 +24,11 @@
from obspy.core.util import AttribDict


INVALID_CHAR_MSG = (
"Invalid non-ASCII characters in Y file header detected (%s). "
"These were ignored.")


def __parse_tag(fh):
"""
Reads and parses a single tag.
Expand Down Expand Up @@ -56,6 +62,33 @@ def __parse_tag(fh):
return endian, tag_type, next_tag, next_same


def _parse_tag_blankpadded_and_asciiz_parts(parts, blankpadded, asciiz):
blankpadded_dict = AttribDict()
for i, key in blankpadded.items():
part = parts[i]
try:
part = part.decode('ascii', errors="strict")
except UnicodeError as e:
warnings.warn(INVALID_CHAR_MSG % str(e), UserWarning)
part = part.decode('ascii', errors="ignore")
blankpadded_dict[key] = part.strip()
asciiz_dict = AttribDict()
for i, key in asciiz.items():
part = parts[i]
# strip everything from first ASCII NULL (if present)
# before decoding
term_index = part.find(b"\x00")
if term_index != -1:
part = part[:term_index]
try:
part = part.decode("ascii", errors="strict")
except UnicodeError as e:
warnings.warn(INVALID_CHAR_MSG % str(e), UserWarning)
part = part.decode("ascii", errors="ignore")
asciiz_dict[key] = part
return blankpadded_dict, asciiz_dict


def _is_y(filename):
"""
Checks whether a file is a Nanometrics Y file or not.
Expand Down Expand Up @@ -153,20 +186,23 @@ def _read_y(filename, headonly=False, **kwargs): # @UnusedVariable
# UCHAR DataFormat[7] (ASCIIZ)
# DataFormat is some text describing the data format recorded
# at the station.
data = fh.read(next_tag)
parts = [p.decode('ascii', errors='ignore') for p in
unpack(b'5s2s3s51s61s31s51s7s', data[8:])]
trace.stats.station = parts[0].strip()
trace.stats.location = parts[1].strip()
trace.stats.channel = parts[2].strip()
parts = unpack(b'5s2s3s51s61s31s51s7s', fh.read(next_tag)[8:])
blankpadded = {
0: "station",
1: "location",
2: "channel"}
# extra
params = AttribDict()
params.network_id = parts[3].rstrip('\x00')
params.side_name = parts[4].rstrip('\x00')
params.comment = parts[5].rstrip('\x00')
params.sensor_type = parts[6].rstrip('\x00')
params.data_format = parts[7].rstrip('\x00')
trace.stats.y.tag_station_info = params
asciiz = {
3: "network_id",
4: "side_name",
5: "comment",
6: "sensor_type",
7: "data_format"}
blankpadded_dict, asciiz_dict = \
_parse_tag_blankpadded_and_asciiz_parts(
parts, blankpadded, asciiz)
trace.stats.update(blankpadded_dict)
trace.stats.y.tag_station_info = asciiz_dict
elif tag_type == 2:
# TAG_STATION_LOCATION
# UCHAR Update[8]
Expand Down Expand Up @@ -235,15 +271,20 @@ def _read_y(filename, headonly=False, **kwargs): # @UnusedVariable
trace.stats.sampling_rate = parts[4]
# extra
params = AttribDict()
blankpadded = {8: "chan_flags"}
asciiz = {6: "sens_units",
7: "calib_units"}
blankpadded_dict, asciiz_dict = \
_parse_tag_blankpadded_and_asciiz_parts(
parts, blankpadded, asciiz)
params.update(blankpadded_dict)
params.update(asciiz_dict)
params.start_valid_time = parts[0]
params.end_valid_time = parts[1]
params.sensitivity = parts[2]
params.sens_freq = parts[3]
params.sample_rate = parts[4]
params.max_clk_drift = parts[5]
params.sens_units = parts[6].rstrip(b'\x00').decode()
params.calib_units = parts[7].rstrip(b'\x00').decode()
params.chan_flags = parts[8].strip()
params.update_flag = parts[9]
trace.stats.y.tag_station_parameters = params
elif tag_type == 4:
Expand Down Expand Up @@ -289,14 +330,19 @@ def _read_y(filename, headonly=False, **kwargs): # @UnusedVariable
trace.stats.starttime = UTCDateTime(parts[0])
count = parts[2]
# extra
asciiz_dict = {
6: "format",
7: "format_version"}
_, asciiz_dict = \
_parse_tag_blankpadded_and_asciiz_parts(
parts, {}, asciiz_dict)
params = AttribDict()
params.update(asciiz_dict)
params.endtime = UTCDateTime(parts[1])
params.num_samples = parts[2]
params.dc_offset = parts[3]
params.max_amplitude = parts[4]
params.min_amplitude = parts[5]
params.format = parts[6].rstrip(b'\x00').decode()
params.format_version = parts[7].rstrip(b'\x00').decode()
trace.stats.y.tag_series_info = params
elif tag_type == 6:
# TAG_SERIES_DATABASE
Expand Down

0 comments on commit 01eb11a

Please sign in to comment.