/
knet.py
304 lines (250 loc) · 9.38 KB
/
knet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# -*- coding: utf-8 -*-
"""
obspy.io.nied.knet - K-NET/KiK-net read support for ObsPy
=========================================================
Reading of the K-NET and KiK-net ASCII format as defined on
http://www.kyoshin.bosai.go.jp.
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from future.builtins import * # NOQA @UnusedWildImport
import re
import numpy as np
from obspy import UTCDateTime, Stream, Trace
from obspy.core.trace import Stats
class KNETException(Exception):
pass
def _buffer_proxy(filename_or_buf, function, reset_fp=True,
file_mode="rb", *args, **kwargs):
"""
Calls a function with an open file or file-like object as the first
argument. If the file originally was a filename, the file will be
opened, otherwise it will just be passed to the underlying function.
:param filename_or_buf: File to pass.
:type filename_or_buf: str, open file, or file-like object.
:param function: The function to call.
:param reset_fp: If True, the file pointer will be set to the initial
position after the function has been called.
:type reset_fp: bool
:param file_mode: Mode to open file in if necessary.
"""
try:
position = filename_or_buf.tell()
is_buffer = True
except AttributeError:
is_buffer = False
if is_buffer is True:
ret_val = function(filename_or_buf, *args, **kwargs)
if reset_fp:
filename_or_buf.seek(position, 0)
return ret_val
else:
with open(filename_or_buf, file_mode) as fh:
return function(fh, *args, **kwargs)
def _is_knet_ascii(filename_or_buf):
"""
Checks if the file is a valid K-NET/KiK-net ASCII file.
:param filename_or_buf: File to test.
:type filename_or_buf: str or file-like object.
"""
try:
return _buffer_proxy(filename_or_buf, _internal_is_knet_ascii,
reset_fp=True)
# Happens for example when passing the data as a string which would be
# interpreted as a filename.
except (OSError, UnicodeDecodeError):
return False
def _internal_is_knet_ascii(buf):
"""
Checks if the file is a valid K-NET/KiK-net ASCII file.
:param buf: File to read.
:type buf: Open file or open file like object.
"""
first_string = buf.read(11).decode()
# File has less than 11 characters
if len(first_string) != 11:
return False
if first_string == 'Origin Time':
return True
return False
def _prep_hdr_line(name, line):
"""
Helper function to check the contents of a header line and split it.
:param name: String that the line should start with.
:type name: str
:param line: Line to check and split.
:type line: str
"""
if not line.startswith(name):
raise KNETException("Expected line to start with %s but got %s "
% (name, line))
else:
return line.split()
def _read_knet_hdr(hdrlines, convert_stnm=False, **kwargs):
"""
Read the header values into a dictionary.
:param hdrlines: List of the header lines of a a K-NET/KiK-net ASCII file
:type hdrlines: list
:param convert_stnm: For station names with 6 letters write the last two
letters of the station code to the 'location' field
:type convert_stnm: bool
"""
hdrdict = {'knet': {}}
hdrnames = ['Origin Time', 'Lat.', 'Long.', 'Depth. (km)', 'Mag.',
'Station Code', 'Station Lat.', 'Station Long.',
'Station Height(m)', 'Record Time', 'Sampling Freq(Hz)',
'Duration Time(s)', 'Dir.', 'Scale Factor', 'Max. Acc. (gal)',
'Last Correction', 'Memo.']
_i = 0
# Event information
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
dt = flds[2] + ' ' + flds[3]
dt = UTCDateTime.strptime(dt, '%Y/%m/%d %H:%M:%S')
# All times are in Japanese standard time which is 9 hours ahead of UTC
dt -= 9 * 3600.
hdrdict['knet']['evot'] = dt
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
lat = float(flds[1])
hdrdict['knet']['evla'] = lat
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
lon = float(flds[1])
hdrdict['knet']['evlo'] = lon
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
dp = float(flds[2])
hdrdict['knet']['evdp'] = dp
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
mag = float(flds[1])
hdrdict['knet']['mag'] = mag
# Station information
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
# K-NET and KiK-Net station names can be more than 5 characters long
# which will cause the station name to be truncated when writing the
# the trace as miniSEED; if convert_stnm is enabled, the last two
# letters of the station code are written to the 'location' field
stnm = flds[2]
location = ''
if convert_stnm and len(stnm) > 5:
location = stnm[-2:]
stnm = stnm[:-2]
if len(stnm) > 7:
raise KNETException(
"Station name can't be more than 7 characters long!")
hdrdict['station'] = stnm
hdrdict['location'] = location
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
hdrdict['knet']['stla'] = float(flds[2])
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
hdrdict['knet']['stlo'] = float(flds[2])
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
hdrdict['knet']['stel'] = float(flds[2])
# Data information
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
dt = flds[2] + ' ' + flds[3]
# A 15 s delay is added to the record time by the
# the K-NET and KiK-Net data logger
dt = UTCDateTime.strptime(dt, '%Y/%m/%d %H:%M:%S') - 15.0
# All times are in Japanese standard time which is 9 hours ahead of UTC
dt -= 9 * 3600.
hdrdict['starttime'] = dt
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
freqstr = flds[2]
m = re.search('[0-9]*', freqstr)
freq = int(m.group())
hdrdict['sampling_rate'] = freq
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
hdrdict['knet']['duration'] = float(flds[2])
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
channel = flds[1].replace('-', '')
kiknetcomps = {'1': 'NS1', '2': 'EW1', '3': 'UD1',
'4': 'NS2', '5': 'EW2', '6': 'UD2'}
if channel.strip() in kiknetcomps.keys(): # kiknet directions are 1-6
channel = kiknetcomps[channel.strip()]
hdrdict['channel'] = channel
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
eqn = flds[2]
num, denom = eqn.split('/')
num = float(re.search('[0-9]*', num).group())
denom = float(denom)
# convert the calibration from gal to m/s^2
hdrdict['calib'] = 0.01 * num / denom
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
acc = float(flds[3])
hdrdict['knet']['accmax'] = acc
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
dt = flds[2] + ' ' + flds[3]
dt = UTCDateTime.strptime(dt, '%Y/%m/%d %H:%M:%S')
# All times are in Japanese standard time which is 9 hours ahead of UTC
dt -= 9 * 3600.
hdrdict['knet']['last correction'] = dt
# The comment ('Memo') field is optional
_i += 1
flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i])
if len(flds) > 1:
hdrdict['knet']['comment'] = ' '.join(flds[1:])
if len(hdrlines) != _i + 1:
raise KNETException("Expected %d header lines but got %d"
% (_i + 1, len(hdrlines)))
return hdrdict
def _read_knet_ascii(filename_or_buf, **kwargs):
"""
Reads a K-NET/KiK-net ASCII file and returns an ObsPy Stream object.
.. warning::
This function should NOT be called directly, it registers via the
ObsPy :func:`~obspy.core.stream.read` function, call this instead.
:param filename: K-NET/KiK-net ASCII file to be read.
:type filename: str or file-like object.
"""
return _buffer_proxy(filename_or_buf, _internal_read_knet_ascii, **kwargs)
def _internal_read_knet_ascii(buf, **kwargs):
"""
Reads a K-NET/KiK-net ASCII file and returns an ObsPy Stream object.
.. warning::
This function should NOT be called directly, it registers via the
ObsPy :func:`~obspy.core.stream.read` function, call this instead.
:param buf: File to read.
:type buf: Open file or open file like object.
"""
data = []
hdrdict = {}
cur_pos = buf.tell()
buf.seek(0, 2)
size = buf.tell()
buf.seek(cur_pos, 0)
# First read the headerlines
headerlines = []
while buf.tell() < size:
line = buf.readline().decode()
headerlines.append(line)
if line.startswith('Memo'):
hdrdict = _read_knet_hdr(headerlines, **kwargs)
break
while buf.tell() < size:
line = buf.readline()
parts = line.strip().split()
data += [float(p) for p in parts]
hdrdict['npts'] = len(data)
# The FDSN network code for the National Research Institute for Earth
# Science and Disaster Prevention (NEID JAPAN) is BO (Bosai-Ken Network)
hdrdict['network'] = 'BO'
data = np.array(data)
stats = Stats(hdrdict)
trace = Trace(data, header=stats)
return Stream([trace])
if __name__ == '__main__':
import doctest
doctest.testmod(exclude_empty=True)