Skip to content

Commit

Permalink
improving coverage of obspy.core.stream
Browse files Browse the repository at this point in the history
  • Loading branch information
barsch committed Aug 4, 2013
1 parent ee8480d commit a5cf36b
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 24 deletions.
18 changes: 16 additions & 2 deletions obspy/core/stream.py
Expand Up @@ -1162,6 +1162,22 @@ def printGaps(self, min_gap=None, max_gap=None):
Source Last Sample ...
BW.RJOB..EHZ 2009-08-24T00:20:13.000000Z ...
Total: 1 gap(s) and 0 overlap(s)
And finally let us create some overlapping traces:
>>> st = read()
>>> tr = st[0].copy()
>>> t = UTCDateTime("2009-08-24T00:20:13.0")
>>> st[0].trim(endtime=t)
>>> tr.trim(starttime=t-1)
>>> st.append(tr)
>>> st.getGaps() # doctest: +ELLIPSIS
[['BW', 'RJOB', '', 'EHZ', UTCDateTime(2009, 8, 24, 0, 20, 13), ...
>>> st.printGaps() # doctest: +ELLIPSIS
Source Last Sample ...
BW.RJOB..EHZ 2009-08-24T00:20:13.000000Z ...
Total: 0 gap(s) and 1 overlap(s)
"""
result = self.getGaps(min_gap, max_gap)
print("%-17s %-27s %-27s %-15s %-8s" % ('Source', 'Last Sample',
Expand Down Expand Up @@ -1477,8 +1493,6 @@ def cutout(self, starttime, endtime):
BW.RJOB..EHN | 2009-08-24T00:20:11.000000Z ... | 100.0 Hz, 2200 samples
BW.RJOB..EHE | 2009-08-24T00:20:11.000000Z ... | 100.0 Hz, 2200 samples
"""
if not self:
return
tmp = self.slice(endtime=starttime, keep_empty_traces=False)
tmp += self.slice(starttime=endtime, keep_empty_traces=False)
self.traces = tmp.traces
Expand Down
199 changes: 177 additions & 22 deletions obspy/core/tests/test_stream.py
Expand Up @@ -9,6 +9,7 @@
import pickle
import unittest
import warnings
import os


class StreamTestCase(unittest.TestCase):
Expand Down Expand Up @@ -47,6 +48,20 @@ def setUp(self):
header=header)
self.gse2_stream = Stream(traces=[trace])

def test_init(self):
"""
Tests the __init__ method of the Stream object.
"""
# empty
st = Stream()
self.assertEqual(len(st), 0)
# single trace
st = Stream(Trace())
self.assertEqual(len(st), 1)
# array of traces
st = Stream([Trace(), Trace()])
self.assertEqual(len(st), 2)

def test_setitem(self):
"""
Tests the __setitem__ method of the Stream object.
Expand Down Expand Up @@ -103,6 +118,9 @@ def test_add(self):
self.assertEqual(new_stream[8], other_stream[0])
self.assertEqual(new_stream[8].stats, other_stream[0].stats)
np.testing.assert_array_equal(new_stream[8].data, other_stream[0].data)
# adding something else than stream or trace results into TypeError
self.assertRaises(TypeError, stream.__add__, 1)
self.assertRaises(TypeError, stream.__add__, 'test')

def test_iadd(self):
"""
Expand All @@ -120,6 +138,21 @@ def test_iadd(self):
self.assertEqual(other_stream[0], stream[-1])
self.assertEqual(other_stream[0].stats, stream[-1].stats)
np.testing.assert_array_equal(other_stream[0].data, stream[-1].data)
# adding something else than stream or trace results into TypeError
self.assertRaises(TypeError, stream.__iadd__, 1)
self.assertRaises(TypeError, stream.__iadd__, 'test')

def test_mul(self):
"""
Tests the __mul__ method of the Stream objects.
"""
st = Stream(Trace())
self.assertEqual(len(st), 1)
st = st * 4
self.assertEqual(len(st), 4)
# multiplying by something else than an integer results into TypeError
self.assertRaises(TypeError, st.__mul__, 1.2345)
self.assertRaises(TypeError, st.__mul__, 'test')

def test_addTraceToStream(self):
"""
Expand Down Expand Up @@ -1181,32 +1214,52 @@ def test_cpickle(self):
np.testing.assert_array_equal(st[0].data, st2[0].data)
self.assertEqual(st[0].stats, st2[0].stats)

def test_isPickle(self):
"""
Testing isPickle function.
"""
# existing file
st = read()
with NamedTemporaryFile() as tf:
st.write(tf.name, format='PICKLE')
# check using file name
self.assertTrue(isPickle(tf.name))
# check using file handler
self.assertTrue(isPickle(tf))
# not existing files
self.assertFalse(isPickle('/path/to/pickle.file'))
self.assertFalse(isPickle(12345))

def test_readWritePickle(self):
"""
Testing readPickle and writePickle functions.
"""
st = read()
# write
with NamedTemporaryFile() as tf:
tmpfile = tf.name
with NamedTemporaryFile() as tf2:
tmpfile2 = tf2.name
writePickle(st, tmpfile)
st.write(tmpfile2, format='PICKLE')
# check and read directly
self.assertTrue(isPickle(tmpfile), True)
st2 = readPickle(tmpfile)
self.assertEqual(len(st2), 3)
np.testing.assert_array_equal(st2[0].data, st[0].data)
# use read() with given format
st2 = read(tmpfile2, format='PICKLE')
self.assertEqual(len(st2), 3)
np.testing.assert_array_equal(st2[0].data, st[0].data)
# use read() and autodetect format
st2 = read(tmpfile2)
self.assertEqual(len(st2), 3)
np.testing.assert_array_equal(st2[0].data, st[0].data)

def test_getGaps2(self):
# write using file name
writePickle(st, tf.name)
self.assertTrue(isPickle(tf.name))
# write using file handler
writePickle(st, tf)
tf.seek(0)
self.assertTrue(isPickle(tf))
# write using stream write method
st.write(tf.name, format='PICKLE')
# check and read directly
st2 = readPickle(tf.name)
self.assertEqual(len(st2), 3)
np.testing.assert_array_equal(st2[0].data, st[0].data)
# use read() with given format
st2 = read(tf.name, format='PICKLE')
self.assertEqual(len(st2), 3)
np.testing.assert_array_equal(st2[0].data, st[0].data)
# use read() and automatically detect format
st2 = read(tf.name)
self.assertEqual(len(st2), 3)
np.testing.assert_array_equal(st2[0].data, st[0].data)

def test_getGaps(self):
"""
Test case for issue #73.
"""
Expand Down Expand Up @@ -1465,6 +1518,11 @@ def test_str(self):
".12345.. | 1970-01-01T00:00:00.000000Z - 1970-01-01" + \
"T00:00:00.000000Z | 1.0 Hz, 0 samples"
self.assertEqual(result, expected)
# streams containing more than 20 lines will be compressed
st2 = Stream([tr1]) * 40
result = st2.__str__()
self.assertTrue('40 Trace(s) in Stream:' in result)
self.assertTrue('other traces' in result)

def test_cleanup(self):
"""
Expand Down Expand Up @@ -1590,9 +1648,9 @@ def test_cleanupNonDefaultPrecisionUTCDateTime(self):
self.assertEqual(len(st), 1)
UTCDateTime.DEFAULT_PRECISION = 6

def test_readArguments(self):
def test_read(self):
"""
Testing arguments on read function.
Testing read function.
"""
# 1 - default example
# dtype
Expand Down Expand Up @@ -1634,6 +1692,24 @@ def test_readArguments(self):
# headonly
tr = read('/path/to/slist_float.ascii', headonly=True)[0]
self.assertFalse(tr.data)
# not existing
self.assertRaises(IOError, read, '/path/to/UNKNOWN')

# 4 - file patterns
path = os.path.dirname(__file__)
filename = path + os.sep + 'data' + os.sep + 'slist.*'
st = read(filename)
self.assertEquals(len(st), 2)
# exception if no file matches file pattern
filename = path + os.sep + 'data' + os.sep + 'NOTEXISTING.*'
self.assertRaises(Exception, read, filename)

# argument headonly should not be used with starttime, endtime or dtype
with warnings.catch_warnings(record=True):
# will usually warn only but here we force to raise an exception
warnings.simplefilter('error', UserWarning)
self.assertRaises(UserWarning, read, '/path/to/slist_float.ascii',
headonly=True, starttime=0, endtime=1)

def test_copy(self):
"""
Expand Down Expand Up @@ -1712,6 +1788,28 @@ def test_rotate(self):
self.assertTrue(np.allclose(st[4].data, st2[4].data))
self.assertTrue(np.allclose(st[5].data, st2[5].data))

# unknown rotate method will raise ValueError
self.assertRaises(ValueError, st.rotate, method='UNKNOWN')
# rotating without back_azimuth raises TypeError
st = Stream()
self.assertRaises(TypeError, st.rotate, method='RT->NE')
# rotating without inclination raises TypeError for LQT-> or ZNE->
self.assertRaises(TypeError, st.rotate, method='LQT->ZNE',
back_azimuth=30)
# having traces with different timespans or sampling rates will fail
st = read()
st[1].stats.sampling_rate = 2.0
self.assertRaises(ValueError, st.rotate, method='NE->RT')
st = read()
st[1].stats.starttime += 1
self.assertRaises(ValueError, st.rotate, method='NE->RT')
st = read()
st[1].stats.sampling_rate = 2.0
self.assertRaises(ValueError, st.rotate, method='ZNE->LQT')
st = read()
st[1].stats.starttime += 1
self.assertRaises(ValueError, st.rotate, method='ZNE->LQT')

def test_plot(self):
"""
Tests plot method if matplotlib is installed
Expand Down Expand Up @@ -1754,6 +1852,63 @@ def test_deepcopy(self):
self.assertEquals(st[0].stats.mseed.dataquality, 'X')
self.assertEquals(ct[0].stats.mseed.dataquality, 'A')

def test_write(self):
# writing in unknown format raises TypeError
st = read()
self.assertRaises(TypeError, st.write, 'file.ext', format="UNKNOWN")

def test_detrend(self):
"""
Test detrend method of stream
"""
t = np.arange(10)
data = 0.1 * t + 1.

tr = Trace(data=data.copy())
st = Stream([tr, tr])
st.detrend(type='simple')
np.testing.assert_array_almost_equal(st[0].data, np.zeros(10))
np.testing.assert_array_almost_equal(st[1].data, np.zeros(10))

tr = Trace(data=data.copy())
st = Stream([tr, tr])
st.detrend(type='linear')
np.testing.assert_array_almost_equal(st[0].data, np.zeros(10))
np.testing.assert_array_almost_equal(st[1].data, np.zeros(10))

data = np.zeros(10)
data[3:7] = 1.

tr = Trace(data=data.copy())
st = Stream([tr, tr])
st.detrend(type='simple')
np.testing.assert_almost_equal(st[0].data[0], 0.)
np.testing.assert_almost_equal(st[0].data[-1], 0.)
np.testing.assert_almost_equal(st[1].data[0], 0.)
np.testing.assert_almost_equal(st[1].data[-1], 0.)

tr = Trace(data=data.copy())
st = Stream([tr, tr])
st.detrend(type='linear')
np.testing.assert_almost_equal(st[0].data[0], -0.4)
np.testing.assert_almost_equal(st[0].data[-1], -0.4)
np.testing.assert_almost_equal(st[1].data[0], -0.4)
np.testing.assert_almost_equal(st[1].data[-1], -0.4)

def test_taper(self):
"""
Test taper method of stream
"""
data = np.ones(10)
tr = Trace(data=data.copy())
st = Stream([tr, tr])
st.taper()
for i in range(len(data)):
self.assertTrue(st[0].data[i] <= 1.)
self.assertTrue(st[0].data[i] >= 0.)
self.assertTrue(st[1].data[i] <= 1.)
self.assertTrue(st[1].data[i] >= 0.)


def suite():
return unittest.makeSuite(StreamTestCase, 'test')
Expand Down

0 comments on commit a5cf36b

Please sign in to comment.