/
test_decrypt.py
196 lines (179 loc) · 7.58 KB
/
test_decrypt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# -*- coding: utf-8 -*-
"""
The obspy.clients.arclink.client test suite.
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from future.builtins import * # NOQA
import os
import unittest
import numpy as np
from obspy.clients.arclink import Client, decrypt
from obspy.clients.arclink.client import DCID_KEY_FILE, ArcLinkException
from obspy.core.utcdatetime import UTCDateTime
from obspy.core.util import NamedTemporaryFile
@unittest.skipIf(not decrypt.HAS_CRYPTOLIB,
'M2Crypto, PyCrypto or cryptography is not installed')
class ClientTestCase(unittest.TestCase):
"""
Test cases for L{obspy.clients.arclink.client.Client}.
"""
def test_get_waveform_with_dcid_key(self):
"""
"""
# test server for encryption
client1 = Client(host="webdc.eu", port=36000, user="test@obspy.org",
dcid_keys={'BIA': 'OfH9ekhi'})
# public server
client2 = Client(host="webdc.eu", port=18001, user="test@obspy.org")
# request data
start = UTCDateTime(2010, 1, 1, 10, 0, 0)
end = start + 100
stream1 = client1.get_waveforms('GE', 'APE', '', 'BHZ', start, end)
stream2 = client2.get_waveforms('GE', 'APE', '', 'BHZ', start, end)
# compare results
np.testing.assert_array_equal(stream1[0].data, stream2[0].data)
self.assertEqual(stream1[0].stats, stream2[0].stats)
def test_get_waveform_with_dcid_key_file(self):
"""
Tests various DCID key file formats (with space or equal sign). Also
checks if empty lines or comment lines are ignored.
"""
# 1 - using = sign between username and password
with NamedTemporaryFile() as tf:
dcidfile = tf.name
with open(dcidfile, 'wt') as fh:
fh.write('#Comment\n\n\nTEST=XYZ\r\nBIA=OfH9ekhi\r\n')
# test server for encryption
client1 = Client(host="webdc.eu", port=36000,
user="test@obspy.org", dcid_key_file=dcidfile)
# public server
client2 = Client(host="webdc.eu", port=18001,
user="test@obspy.org")
# request data
start = UTCDateTime(2010, 1, 1, 10, 0, 0)
end = start + 100
stream1 = client1.get_waveforms('GE', 'APE', '', 'BHZ', start, end)
stream2 = client2.get_waveforms('GE', 'APE', '', 'BHZ', start, end)
# compare results
np.testing.assert_array_equal(stream1[0].data, stream2[0].data)
self.assertEqual(stream1[0].stats, stream2[0].stats)
# 2 - using space between username and password
with NamedTemporaryFile() as tf:
dcidfile = tf.name
with open(dcidfile, 'wt') as fh:
fh.write('TEST XYZ\r\nBIA OfH9ekhi\r\n')
# test server for encryption
client1 = Client(host="webdc.eu", port=36000,
user="test@obspy.org", dcid_key_file=dcidfile)
# public server
client2 = Client(host="webdc.eu", port=18001,
user="test@obspy.org")
# request data
start = UTCDateTime(2010, 1, 1, 10, 0, 0)
end = start + 100
stream1 = client1.get_waveforms('GE', 'APE', '', 'BHZ', start, end)
stream2 = client2.get_waveforms('GE', 'APE', '', 'BHZ', start, end)
# compare results
np.testing.assert_array_equal(stream1[0].data, stream2[0].data)
self.assertEqual(stream1[0].stats, stream2[0].stats)
@unittest.skipIf(os.path.isfile(DCID_KEY_FILE),
'$HOME/dcidpasswords.txt already exists')
def test_get_waveform_with_default_dcid_key_file(self):
"""
Use $HOME/dcidpasswords.txt.
"""
dcidfile = DCID_KEY_FILE
fh = open(dcidfile, 'wt')
fh.write('TEST=XYZ\r\nBIA=OfH9ekhi\r\n')
fh.close()
# test server for encryption
client1 = Client(host="webdc.eu", port=36000, user="test@obspy.org")
# public server
client2 = Client(host="webdc.eu", port=18001, user="test@obspy.org")
# clean up dcid file
os.remove(dcidfile)
# request data
start = UTCDateTime(2010, 1, 1, 10, 0, 0)
end = start + 100
stream1 = client1.get_waveforms('GE', 'APE', '', 'BHZ', start, end)
stream2 = client2.get_waveforms('GE', 'APE', '', 'BHZ', start, end)
# compare results
np.testing.assert_array_equal(stream1[0].data, stream2[0].data)
self.assertEqual(stream1[0].stats, stream2[0].stats)
def test_get_waveform_unknown_user(self):
"""
Unknown user raises an ArcLinkException: DENIED.
"""
client = Client(host="webdc.eu", port=36000, user="unknown@obspy.org")
# request data
start = UTCDateTime(2010, 1, 1, 10, 0, 0)
end = start + 100
self.assertRaises(ArcLinkException, client.get_waveforms, 'GE', 'APE',
'', 'BHZ', start, end)
def test_get_waveform_wrong_password(self):
"""
A wrong password password raises exception.
"""
client = Client(host="webdc.eu", port=36000, user="test@obspy.org",
dcid_keys={'BIA': 'WrongPassword'})
# request data
start = UTCDateTime(2010, 1, 1, 10, 0, 0)
end = start + 100
self.assertRaises(Exception, client.get_waveforms,
'GE', 'APE', '', 'BHZ', start, end)
def test_get_waveform_no_password(self):
"""
No password raises exception.
"""
client = Client(host="webdc.eu", port=36000, user="test@obspy.org",
dcid_keys={'BIA': ''})
# request data
start = UTCDateTime(2010, 1, 1, 10, 0, 0)
end = start + 100
self.assertRaises(Exception, client.get_waveforms,
'GE', 'APE', '', 'BHZ', start, end)
@unittest.skipIf(not decrypt.HAS_CRYPTOGRAPHY,
'cryptography is not installed')
def test_cryptography(self):
"""
Test cryptography by temporarly disabling all other crypto libs
"""
# monkey patch
backup = decrypt.HAS_M2CRYPTO, decrypt.HAS_PYCRYPTO
decrypt.HAS_M2CRYPTO = False
decrypt.HAS_PYCRYPTO = False
# run test
self.test_get_waveform_with_dcid_key()
# revert monkey patch
decrypt.HAS_M2CRYPTO, decrypt.HAS_PYCRYPTO = backup
@unittest.skipIf(not decrypt.HAS_PYCRYPTO, 'PyCrypto is not installed')
def test_pycrypto(self):
"""
Test PyCrypto by temporarly disabling all other crypto libs
"""
# monkey patch
backup = decrypt.HAS_M2CRYPTO, decrypt.HAS_CRYPTOGRAPHY
decrypt.HAS_M2CRYPTO = False
decrypt.HAS_CRYPTOGRAPHY = False
# run test
self.test_get_waveform_with_dcid_key()
# revert monkey patch
decrypt.HAS_M2CRYPTO, decrypt.HAS_CRYPTOGRAPHY = backup
@unittest.skipIf(not decrypt.HAS_M2CRYPTO, 'M2Crypto is not installed')
def test_m2crypto(self):
"""
Test M2Crypto by temporarly disabling all other crypto libs
"""
# monkey patch
backup = decrypt.HAS_CRYPTOGRAPHY, decrypt.HAS_PYCRYPTO
decrypt.HAS_CRYPTOGRAPHY = False
decrypt.HAS_PYCRYPTO = False
# run test
self.test_get_waveform_with_dcid_key()
# revert monkey patch
decrypt.HAS_CRYPTOGRAPHY, decrypt.HAS_PYCRYPTO = backup
def suite():
return unittest.makeSuite(ClientTestCase, 'test')
if __name__ == '__main__':
unittest.main(defaultTest='suite')