/
test_newswrapper.py
136 lines (121 loc) · 5.04 KB
/
test_newswrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/python3 -OO
# Copyright 2007-2022 The SABnzbd-Team <team@sabnzbd.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
tests.test_newswrapper - Tests of various functions in newswrapper
"""
import logging
import os.path
import socket
import tempfile
import threading
import ssl
import time
from typing import Optional
import portend
from flaky import flaky
from tests.testhelper import *
from sabnzbd import misc
from sabnzbd import newswrapper
TEST_HOST = "127.0.0.1"
TEST_PORT = portend.find_available_local_port()
TEST_DATA = b"connection_test"
def socket_test_server(ssl_context: ssl.SSLContext):
"""Support function that starts a mini-server, as
socket.create_server is not supported on Python 3.7"""
# Allow reuse of the address, because our CI is too fast for the socket closing
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server_socket.bind((TEST_HOST, TEST_PORT))
server_socket.listen(1)
server_socket.settimeout(1.0)
conn, _ = server_socket.accept()
with ssl_context.wrap_socket(sock=conn, server_side=True) as wrapped_socket:
wrapped_socket.write(TEST_DATA)
except Exception as e:
# Skip SSL errors
logging.info("Error in server: %s", e)
pass
finally:
# Make sure to close the socket
server_socket.close()
@flaky
class TestNewsWrapper:
cert_file = os.path.join(tempfile.mkdtemp(), "test.cert")
key_file = os.path.join(tempfile.mkdtemp(), "test.key")
@pytest.mark.parametrize(
"server_tls, expected_client_tls, client_cipher, can_connect",
[
(None, "TLSv1.3", None, True), # Default, highest
(ssl.TLSVersion.TLSv1_2, "TLSv1.2", None, True), # Server with just TLSv1.2
(ssl.TLSVersion.SSLv3, None, None, False), # No connection for old TLS/SSL
(ssl.TLSVersion.TLSv1, None, None, False),
(ssl.TLSVersion.TLSv1_1, None, None, False),
(None, None, "RC4-MD5", False), # No connection for old cipher
(None, "TLSv1.2", "AES256-SHA", True), # Forced to TLSv1.2 if ciphers set
(None, None, "TLS_AES_128_CCM_SHA256", False), # Cannot force use of TLSv1.3 cipher
],
)
def test_newswrapper(
self,
server_tls: Optional[ssl.TLSVersion],
expected_client_tls: Optional[str],
client_cipher: Optional[str],
can_connect: bool,
):
# We need at least some certificates for the server to work
if not os.path.exists(self.cert_file) or not os.path.exists(self.key_file):
misc.create_https_certificates(self.cert_file, self.key_file)
# Create the server context
server_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
server_context.load_cert_chain(self.cert_file, self.key_file)
server_context.set_ciphers("HIGH")
# Set the options
if server_tls:
server_context.maximum_version = server_tls
server_thread = threading.Thread(target=socket_test_server, args=(server_context,), daemon=True)
server_thread.start()
# Create the NNTP, mocking the required values
# We disable certificate validation, as we use self-signed certificates
nw = mock.Mock()
nw.blocking = True
nw.thrdnum = 1
nw.server = mock.Mock()
nw.server.host = TEST_HOST
nw.server.port = TEST_PORT
nw.server.info = socket.getaddrinfo(TEST_HOST, TEST_PORT, 0, socket.SOCK_STREAM)
nw.server.timeout = 10
nw.server.ssl = True
nw.server.ssl_context = None
nw.server.ssl_verify = 0
nw.server.ssl_ciphers = client_cipher
# Do we expect failure to connect?
if not can_connect:
with pytest.raises(OSError):
newswrapper.NNTP(nw, TEST_HOST)
else:
nntp = newswrapper.NNTP(nw, TEST_HOST)
assert nntp.sock.recv(len(TEST_DATA)) == TEST_DATA
# Assert SSL data
assert nntp.sock.version() == expected_client_tls
if client_cipher:
assert nntp.sock.cipher()[0] == client_cipher
# Wait for server to close
server_thread.join(timeout=1.5)
if server_thread.is_alive():
raise RuntimeError("Test server was not stopped")
time.sleep(1.0)