forked from chromium/web-page-replay
-
Notifications
You must be signed in to change notification settings - Fork 0
/
certutils.py
237 lines (191 loc) · 6.9 KB
/
certutils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""Routines to generate root and server certificates.
Certificate Naming Conventions:
ca_cert: crypto.X509 for the certificate authority (w/ both the pub &
priv keys)
cert: a crypto.X509 certificate (w/ just the pub key)
cert_str: a certificate string (w/ just the pub cert)
key: a private crypto.PKey (from ca or pem)
ca_cert_str: a certificae authority string (w/ both the pub & priv certs)
"""
import logging
import os
import socket
import time
openssl_import_error = None
SSL_METHOD = None
VERIFY_PEER = None
SysCallError = None
Error = None
ZeroReturnError = None
try:
from OpenSSL import crypto, SSL
SSL_METHOD = SSL.SSLv23_METHOD
VERIFY_PEER = SSL.VERIFY_PEER
SysCallError = SSL.SysCallError
Error = SSL.Error
ZeroReturnError = SSL.ZeroReturnError
except ImportError, e:
openssl_import_error = e
def get_ssl_context(method=SSL_METHOD):
# One of: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, or TLSv1_METHOD
return SSL.Context(method)
class WrappedConnection(object):
def __init__(self, obj):
self._wrapped_obj = obj
def __getattr__(self, attr):
if attr in self.__dict__:
return getattr(self, attr)
return getattr(self._wrapped_obj, attr)
def recv(self, buflen=1024, flags=0):
try:
return self._wrapped_obj.recv(buflen, flags)
except SSL.SysCallError, e:
if e.args[1] == 'Unexpected EOF':
return ''
raise
except SSL.ZeroReturnError:
return ''
def get_ssl_connection(context, connection):
return WrappedConnection(SSL.Connection(context, connection))
def load_privatekey(key, filetype=crypto.FILETYPE_PEM):
"""Loads obj private key object from string."""
return crypto.load_privatekey(filetype, key)
def load_cert(cert_str, filetype=crypto.FILETYPE_PEM):
"""Loads obj cert object from string."""
return crypto.load_certificate(filetype, cert_str)
def _dump_privatekey(key, filetype=crypto.FILETYPE_PEM):
"""Dumps obj private key object to string."""
return crypto.dump_privatekey(filetype, key)
def _dump_cert(cert, filetype=crypto.FILETYPE_PEM):
"""Dumps obj cert object to string."""
return crypto.dump_certificate(filetype, cert)
def generate_dummy_ca_cert(subject='_WebPageReplayCert'):
"""Generates dummy certificate authority.
Args:
subject: a string representing the desired root cert issuer
Returns:
A tuple of the public key and the private key strings for the root
certificate
"""
if openssl_import_error:
raise openssl_import_error
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 1024)
ca_cert = crypto.X509()
ca_cert.set_serial_number(int(time.time()*10000))
ca_cert.set_version(2)
ca_cert.get_subject().CN = subject
ca_cert.get_subject().O = subject
ca_cert.gmtime_adj_notBefore(-60 * 60 * 24 * 365 * 2)
ca_cert.gmtime_adj_notAfter(60 * 60 * 24 * 365 * 2)
ca_cert.set_issuer(ca_cert.get_subject())
ca_cert.set_pubkey(key)
ca_cert.add_extensions([
crypto.X509Extension('basicConstraints', True, 'CA:TRUE'),
crypto.X509Extension('nsCertType', True, 'sslCA'),
crypto.X509Extension('extendedKeyUsage', True,
('serverAuth,clientAuth,emailProtection,'
'timeStamping,msCodeInd,msCodeCom,msCTLSign,'
'msSGC,msEFS,nsSGC')),
crypto.X509Extension('keyUsage', False, 'keyCertSign, cRLSign'),
crypto.X509Extension('subjectKeyIdentifier', False, 'hash',
subject=ca_cert),
])
ca_cert.sign(key, 'sha1')
key_str = _dump_privatekey(key)
ca_cert_str = _dump_cert(ca_cert)
return ca_cert_str, key_str
def get_host_cert(host, port=443):
"""Contacts the host and returns its certificate."""
host_certs = []
def verify_cb(conn, cert, errnum, depth, ok):
host_certs.append(cert)
# The return code of 1 indicates that the certificate was ok.
return 1
context = SSL.Context(SSL.SSLv23_METHOD)
context.set_verify(SSL.VERIFY_PEER, verify_cb) # Demand a certificate
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connection = SSL.Connection(context, s)
try:
connection.connect((host, port))
connection.send('')
except SSL.SysCallError:
pass
except socket.gaierror:
logging.debug('Host name is not valid')
finally:
connection.shutdown()
connection.close()
if not host_certs:
logging.warning('Did not get SNI from %s:%s', host, port)
return ''
return _dump_cert(host_certs[-1])
def write_dummy_ca_cert(ca_cert_str, key_str, cert_path):
"""Writes four certificate files.
For example, if cert_path is "mycert.pem":
mycert.pem - CA plus private key
mycert-cert.pem - CA in PEM format
mycert-cert.cer - CA for Android
mycert-cert.p12 - CA in PKCS12 format for Windows devices
Args:
cert_path: path string such as "mycert.pem"
ca_cert_str: certificate string
key_str: private key string
"""
dirname = os.path.dirname(cert_path)
if dirname and not os.path.exists(dirname):
os.makedirs(dirname)
root_path = os.path.splitext(cert_path)[0]
ca_cert_path = root_path + '-cert.pem'
android_cer_path = root_path + '-cert.cer'
windows_p12_path = root_path + '-cert.p12'
# Dump the CA plus private key
with open(cert_path, 'w') as f:
f.write(key_str)
f.write(ca_cert_str)
# Dump the certificate in PEM format
with open(ca_cert_path, 'w') as f:
f.write(ca_cert_str)
# Create a .cer file with the same contents for Android
with open(android_cer_path, 'w') as f:
f.write(ca_cert_str)
ca_cert = load_cert(ca_cert_str)
key = load_privatekey(key_str)
# Dump the certificate in PKCS12 format for Windows devices
with open(windows_p12_path, 'w') as f:
p12 = crypto.PKCS12()
p12.set_certificate(ca_cert)
p12.set_privatekey(key)
f.write(p12.export())
def generate_cert(root_ca_cert_str, server_cert_str, server_host):
"""Generates a cert_str with the sni field in server_cert_str signed by the
root_ca_cert_str.
Args:
root_ca_cert_str: PEM formatted string representing the root cert
server_cert_str: PEM formatted string representing cert
server_host: host name to use if there is no server_cert_str
Returns:
a PEM formatted certificate string
"""
if openssl_import_error:
raise openssl_import_error
common_name = server_host
if server_cert_str:
cert = load_cert(server_cert_str)
common_name = cert.get_subject().commonName
ca_cert = load_cert(root_ca_cert_str)
key = load_privatekey(root_ca_cert_str)
req = crypto.X509Req()
subj = req.get_subject()
subj.CN = common_name
req.set_pubkey(ca_cert.get_pubkey())
req.sign(key, 'sha1')
cert = crypto.X509()
cert.gmtime_adj_notBefore(-60 * 60)
cert.gmtime_adj_notAfter(60 * 60 * 24 * 30)
cert.set_issuer(ca_cert.get_subject())
cert.set_subject(req.get_subject())
cert.set_serial_number(int(time.time()*10000))
cert.set_pubkey(req.get_pubkey())
cert.sign(key, 'sha1')
return _dump_cert(cert)