Skip to content

Commit

Permalink
Merge pull request #240 from akhait/ocsp_sign
Browse files Browse the repository at this point in the history
Add OCSP signature verification
  • Loading branch information
tomato42 committed Jun 8, 2018
2 parents 2b9c15f + 7b9d1f9 commit 26bd53e
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 13 deletions.
12 changes: 8 additions & 4 deletions tlslite/ocsp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Class for handling primary OCSP responses"""

from .utils.asn1parser import ASN1Parser
from .utils.cryptomath import bytesToNumber, numBytes
from .x509 import X509
from .signed import SignedObject


class OCSPRespStatus(object):
Expand Down Expand Up @@ -48,18 +51,17 @@ def parse(self, value):
self.next_update = None


class OCSPResponse(object):
class OCSPResponse(SignedObject):
""" This class represents an OCSP response. """
def __init__(self, value):
super(OCSPResponse, self).__init__()
self.bytes = None
self.resp_status = None
self.resp_type = None
self.version = None
self.resp_id = None
self.produced_at = None
self.responses = []
self.signature_alg = None
self.signature = None
self.certs = []
self.parse(value)

Expand All @@ -86,14 +88,16 @@ def parse(self, value):
basic_resp = response.getChild(0)
# parsing tbsResponseData fields
self._tbsdataparse(basic_resp.getChild(0))
self.tbs_data = basic_resp.getChildBytes(0)
self.signature_alg = basic_resp.getChild(1).getChild(0).value
self.signature = basic_resp.getChild(2).value
# test if certs field is present
if basic_resp.getChildCount() > 3:
certs = basic_resp.getChild(3)
cnt = certs.getChildCount()
for i in range(cnt):
certificate = certs.getChild(i).value
certificate = X509()
certificate.parseBinary(certs.getChild(i).value)
self.certs.append(certificate)
return self

Expand Down
87 changes: 87 additions & 0 deletions tlslite/signed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Base class that represents any signed object"""

from .utils.cryptomath import numBytes

RSA_SIGNATURE_HASHES = ["sha512", "sha384", "sha256", "sha224", "sha1"]
ALL_RSA_SIGNATURE_HASHES = RSA_SIGNATURE_HASHES + ["md5"]
RSA_SCHEMES = ["pss", "pkcs1"]


class SignatureSettings(object):
def __init__(self, min_key_size=None, max_key_size=None,
rsa_sig_hashes=None, rsa_schemes=None):
"""Create default variables for key-related settings."""
self.min_key_size = min_key_size or 1023
self.max_key_size = max_key_size or 8193
self.rsa_sig_hashes = rsa_sig_hashes or list(RSA_SIGNATURE_HASHES)
self.rsa_schemes = rsa_schemes or list(RSA_SCHEMES)

def _copy_settings(self, other):
other.min_key_size = self.min_key_size
other.max_key_size = self.max_key_size
other.rsa_sig_hashes = self.rsa_sig_hashes
other.rsa_schemes = self.rsa_schemes

@staticmethod
def _sanityCheckKeySizes(other):
if other.min_key_size < 512:
raise ValueError("min_key_size too small")
if other.min_key_size > 16384:
raise ValueError("min_key_size too large")
if other.max_key_size < 512:
raise ValueError("max_key_size too small")
if other.max_key_size > 16384:
raise ValueError("max_key_size too large")
if other.max_key_size < other.min_key_size:
raise ValueError("max_key_size smaller than min_key_size")

@staticmethod
def _sanityCheckSignatureAlgs(other):
not_allowed = [alg for alg in other.rsa_sig_hashes
if alg not in ALL_RSA_SIGNATURE_HASHES]
if len(not_allowed) > 0:
raise ValueError("Following signature algorithms are not allowed: "
"{0}".format(", ".join(not_allowed)))

def validate(self):
other = SignatureSettings()
self._copy_settings(other)
self._sanityCheckKeySizes(other)
self._sanityCheckSignatureAlgs(other)
return other


class SignedObject(object):
def __init__(self):
self.tbs_data = None
self.signature = None
self.signature_alg = None

_hash_algs_OIDs = {
tuple([0x2a, 0x86, 0x48, 0x86, 0xf7, 0xd, 0x1, 0x1, 0x4]): 'md5',
tuple([0x2a, 0x86, 0x48, 0x86, 0xf7, 0xd, 0x1, 0x1, 0x5]): 'sha1',
tuple([0x2a, 0x86, 0x48, 0x86, 0xf7, 0xd, 0x1, 0x1, 0xe]): 'sha224',
tuple([0x2a, 0x86, 0x48, 0x86, 0xf7, 0xd, 0x1, 0x1, 0xc]): 'sha384',
tuple([0x2a, 0x86, 0x48, 0x86, 0xf7, 0xd, 0x1, 0x1, 0xb]): 'sha256',
tuple([0x2a, 0x86, 0x48, 0x86, 0xf7, 0xd, 0x1, 0x1, 0xd]): 'sha512'
}

def verify_signature(self, publicKey, settings=None):
""" Verify signature in a reponse"""
offset = 0
settings = settings or SignatureSettings()

# workaround as some signature encodings could be zero left-padded
if (self.signature[0] == 0 and
numBytes(publicKey.n) + 1 == len(self.signature)):
offset = 1

alg = self._hash_algs_OIDs[tuple(self.signature_alg)]
if alg not in settings.rsa_sig_hashes:
raise ValueError("Invalid signature algorithm: {0}".format(alg))
verified = publicKey.hashAndVerify(self.signature[offset:],
self.tbs_data, hAlg=alg)
if not verified:
raise ValueError("Signature could not be verified for {0}"
.format(alg))
return True
100 changes: 91 additions & 9 deletions unit_tests/test_tlslite_ocsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from tlslite.utils.asn1parser import ASN1Parser
from tlslite.x509 import X509
from tlslite.ocsp import OCSPResponse, OCSPRespStatus, SingleResponse
from tlslite.utils.cryptomath import numberToByteArray, bytesToNumber


resp_OK = a2b_base64(str(
"MIIGQwoBAKCCBjwwggY4BgkrBgEFBQcwAQEEggYpMIIGJTCBv6IWBBScTQCZAA6LsAGBdaG68NAl"
Expand Down Expand Up @@ -42,7 +44,8 @@
"kGw8MYyX5Eewu1JpKy/RqkbBBUDJS6pSj1LYwQxTb0JrfQbEddRBIexRYEZ+JXtqdOqpsV1R/4E/"
"k82y7fOo9N96hDrWZLF7h+p2Bg8og+eUD9kNNGPx2/iLfNYYUDynU4Ay3wOxRzb8MHTw7OpevlEN"
"GNUvcGYyOSWwMQHk8P0BCJZA8RpXuEkW/ep0ZSnNVMpbjtSpSE1i0OxMdKbfJTzm+l87FMXz/8oX"
"p+YNJ+UJfB6ALag="))
"p+YNJ+UJfB6ALag="
))
resp_malformed = a2b_base64("MAMKAQE=")
resp_internal = a2b_base64("MAMKAQI=")
resp_trylater = a2b_base64("MAMKAQM=")
Expand All @@ -56,13 +59,72 @@
"9BKfYqnO9AL1LYHOJ6ZPlzJx/nEiFue8HXb+ChiwG+nEhXprug+wP/APuKSOaKH2kcQf4Jtuv9cz"
"n/2PCaVmC+ErEThuGTZouT4eEIFMUGGDH+nZFHbl+DNm6R3+D7atOE1gDBO2LDJqIoZvaZTSpY+4"
"djZaiTdWAdOcUnnBhlhBvjg8nv6zxJ9ERBqm5P9cffnYVAJeGqylq/WFT/Ni6MprXgYoZq9bc7tk"
"PGGh5CnrOhcIQCUIX+ceM6ruxdraiPJALpY1gZz7SY53GQsXfg=="))
"PGGh5CnrOhcIQCUIX+ceM6ruxdraiPJALpY1gZz7SY53GQsXfg=="
))
resp_sig_sha1 = a2b_base64(str(
"MIIBxAoBAKCCAb0wggG5BgkrBgEFBQcwAQEEggGqMIIBpjCBj6IWBBTdieQsa8v7QTD1lfiA4LPa"
"pj5zpRgPMjAxODAzMjkxMDAxMDdaMGQwYjA6MAkGBSsOAwIaBQAEFJAVJBFLEEwrmfK6f5q3ZVC2"
"38iuBBTdieQsa8v7QTD1lfiA4LPapj5zpQIBAoAAGA8yMDE4MDMyOTEwMDEwN1qgERgPMjAxODAz"
"MzAxMDAxMDdaMA0GCSqGSIb3DQEBBQUAA4IBAQDB1RYVVpLGIWBZLftbEBwFRunGoGq5HEfFtmfd"
"0F3qwjIfqagtnnI8OrJuqkAt4G9/MvCWw3Hc6RHaqYGjzzJL/b2Qpwe6TuHk+pqeaJIiZctvjOhy"
"31cEj5CEz+Zh093diRw6YDjwD+UgirkkGl4VIqRUEwLdEHWQ+l7Se9cw9DEj2uM+MGaR3oUvrVt1"
"1a/vxtV1/Nr56kvN9lhMrNKB8rVfIwvpXJ2lQTPMi21fyNxiaY97rIeYd20TrbLQC6IblV51giTg"
"fL24fVZt8NAnGrho/lBhkbQ2JGcW9NmXWLZaHTUgDHS+3+Q0js8/CW9Ajouu0rClFnbeu4yn+Ffi"
))
cert_sig_sha1 = a2b_base64(str(
"MIIDKzCCAhOgAwIBAgIBATANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQKDApFeGFt"
"cGxlIENBMCIYDzIwMTMwMzI5MTAwMTA1WhgPMjAyODAzMjkxMDAxMDVaMBUxEzAR"
"BgNVBAoMCkV4YW1wbGUgQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB"
"AQDHTixwDQuDillTw6P5Q7ZtHcFXzmPL4Rnvybar2+iA73R20s37ufWWz9NMpjdv"
"zl7OnudvFwzgzcKyUmuarUV4nN1rcQ5TsEUrZy99f+GE2tpmObCKY707ZnQwxyB6"
"0CoKV8erXGiXe3ox5/HYVT6F1FeabrNNZK7YzhOOaJD64fDsL8LgeOZzEan30yvA"
"flKVu4yUSNzqlG1qh37F+yL8C2G3QdTS/7AnAmTiQaLIiW7564WWhZtbVIytqTVb"
"IpP3QfPMQr4enlpxKVq9TBMDEgDIBTrBN9crQrV7acrrp3CBcI3iYa0YAP1U0RK3"
"1W/uK0oL4jGTuaZv+F/uNfvnAgMBAAGjgYEwfzAPBgNVHRMBAf8EBTADAQH/MA4G"
"A1UdDwEB/wQEAwIBBjAdBgNVHQ4EFgQU3YnkLGvL+0Ew9ZX4gOCz2qY+c6UwPQYD"
"VR0jBDYwNIAU3YnkLGvL+0Ew9ZX4gOCz2qY+c6WhGaQXMBUxEzARBgNVBAoMCkV4"
"YW1wbGUgQ0GCAQEwDQYJKoZIhvcNAQELBQADggEBAAvGcqavDPo8s4Nm0ZRA0jvc"
"0GbPLIpYV+XUI11D6ma6lPJOPtyJdCWY3d+sdTX8A6zbSOKbpCFRzKyLw1GyPzrg"
"pczkoWVummukBtWCCB8ULpd28/Fn9g2I5WZoO3bXvza8Xl+LASWDuxi0EJTF64Z4"
"xZLKGRThfzWIWf11PVI9TIuUM9dJkteJAb8tUPohApVPNUU7LL74/pquZKaqubpS"
"8AgUY00SfS6lAQZK36yDxYDxgNZL/Wgmpxqjh6V+loh/75OvrYsuHPvdU2xHgVu4"
"pQ/6c/pH6rAIyunpQCwE35ekwwwDdRHLCw3dSfevQd/ucpa5zdWLSwuxRINL3Sc="
))
resp_sig_sha256 = a2b_base64(str(
"MIIBxAoBAKCCAb0wggG5BgkrBgEFBQcwAQEEggGqMIIBpjCBj6IWBBTLcv+aDVgOwqkCq4JTiC6f"
"klAqrRgPMjAxODAzMjcxMjAyNTBaMGQwYjA6MAkGBSsOAwIaBQAEFJAVJBFLEEwrmfK6f5q3ZVC2"
"38iuBBTLcv+aDVgOwqkCq4JTiC6fklAqrQIBAoAAGA8yMDE4MDMyNzEyMDI1MFqgERgPMjAxODAz"
"MjgxMjAyNTBaMA0GCSqGSIb3DQEBCwUAA4IBAQAtGmPvwG1hFJz1sL7EHGcm8qsnrYv4no3ylQVU"
"IMJuMgAPpRm1pKJ2hsdENc2KO/4QThLY0cxYIyr0l0aHOEtYgVKCkD9oPsn1aAYO1jhEFcAhmN5S"
"+dj5TdFWA1OI7Z0SH3UZTlb7hEHxhJ6PhTWb7RW2KtKTOhLy4kdgdt95oGM5IPncXXCP/4QEsQv1"
"NxTd05OT70GD15gjbx0LHYQKdqtHkrd8vLnxv3Ku4AgMWjlmu+VsbwgpDupCPZT7mVFCg7o01grV"
"/pRaqwSastU/RoWOO/aBbbZ/SLu2benEV8A/+WvXaHx/wWCJDMaort+EROi1pI5P/Mg4UHGdwcgj"
))
cert_sig_sha256 = a2b_base64(str(
"MIIDKzCCAhOgAwIBAgIBATANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQKDApFeGFt"
"cGxlIENBMCIYDzIwMTMwMzI3MTIwMjQ3WhgPMjAyODAzMjcxMjAyNDdaMBUxEzAR"
"BgNVBAoMCkV4YW1wbGUgQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB"
"AQDPnpQmVjVNk+AQ2DZvikBhOJFdqv7uDYTfvnKfWO+lDqAoA9wXC4/6jh4SDCgd"
"qPSgkjFTT3AH2JadPsHWPqG+PltE0pSk6iqCW57kzh/PBQFrVSHEjggvyjGI8Qgc"
"+0LW3zNP0rUL/OvAZhlptKIwW2wKpQPG3Pms+2qQ3Kg+uoRF5piw9KW65t1VKuOe"
"EYSN8mEgiTOE1PGbpAAyC144PN6cScb+21a066Ftk2b40prg9xugema4VM47+9IR"
"l+gvRmd27iq6dO+k6ZIO0YcMAF+r45UY8lMO92mL6rl3uLbDYMdMsMKKxZ9M+MA0"
"sytqFlLaURrhHqKQ76MxJ3xXAgMBAAGjgYEwfzAPBgNVHRMBAf8EBTADAQH/MA4G"
"A1UdDwEB/wQEAwIBBjAdBgNVHQ4EFgQUy3L/mg1YDsKpAquCU4gun5JQKq0wPQYD"
"VR0jBDYwNIAUy3L/mg1YDsKpAquCU4gun5JQKq2hGaQXMBUxEzARBgNVBAoMCkV4"
"YW1wbGUgQ0GCAQEwDQYJKoZIhvcNAQELBQADggEBABQp8yDRKe5qoSySCwFRdelO"
"EytE2wdgmeOXa+Krx9CZqNcsnSQSYa1KPl72p2YPWkVhp5mnHJRg8NGIRQby/yb4"
"y2DTHkXLFl1r0eXfEvoMyGLuWkjinuqLQJrEyOjEapxGIbm9e1ZbzYeMd2SQg9Ll"
"BAGr/JMHzjTB3gvUziGXyUG0ZYVFBqKGYyUqcTl+0LoFNPQzQOWEaLzjbHTGA27N"
"0wjZf+jxGJN7HJNWpKE02AHGWNh+XXyJXMTCyE/osV1k1rHH9v5vEypBK6Rwj7rF"
"97NBvMJ9xxnTXCZIFr466Ec+FBSWTOnVSb3JOTHycrzTwY7o3QBqQh/KbxwnHHw="
))

class TestOCSP(unittest.TestCase):
def test___init__(self):
def test_respOK(self):
resp = OCSPResponse(resp_OK)
self.assertEqual(OCSPRespStatus.successful, resp.resp_status)

def test_malformedrequest(self):
resp = OCSPResponse(resp_malformed)
self.assertEqual(OCSPRespStatus.malformedRequest, resp.resp_status)
Expand Down Expand Up @@ -121,18 +183,38 @@ def test_signature(self):
251, 188, 105, 188, 63, 223, 88, 127, 185, 246, 71, 221, 35, 100, 229, 116,
97, 237, 208, 212, 126, 199, 12, 217, 196, 167]),
resp.signature)

def test_verify_signature_sha1(self):
resp = OCSPResponse(resp_sig_sha1)
cert = X509()
cert.parseBinary(cert_sig_sha1)
self.assertTrue(resp.verify_signature(cert.publicKey))

def test_verify_signature_sha256(self):
resp = OCSPResponse(resp_sig_sha256)
cert = X509()
cert.parseBinary(cert_sig_sha256)
self.assertTrue(resp.verify_signature(cert.publicKey))

def test_invalid_signature(self):
resp = OCSPResponse(resp_sig_sha1)
cert = X509()
cert.parseBinary(cert_sig_sha1)
old_sig = resp.signature
resp.signature = bytearray([0])
self.assertNotEqual(resp.signature, old_sig)
with self.assertRaises(ValueError) as ctx:
resp.verify_signature(cert.publicKey)
self.assertTrue("Signature could not be verified for sha1" in str(ctx.exception))

def test_certs(self):
resp = OCSPResponse(resp_OK)
self.assertGreater(len(resp.certs), 0)
cert = resp.certs[0] # checking only first certificate
self.assertIsInstance(cert, bytearray)
x509 = X509()
x509.parseBinary(cert)
self.assertIsInstance(x509, X509)
self.assertIsInstance(cert, X509)

class TestSingleResponse(unittest.TestCase):
def test___init__(self):
def test_single_responses(self):
resp = OCSPResponse(resp_OK)
singleRespList = resp.responses
singleRespCnt = len(singleRespList)
Expand Down
59 changes: 59 additions & 0 deletions unit_tests/test_tlslite_signed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Author: Anna Khaitovich (c) 2018
# see LICENCE file for legal information regarding use of this file

# compatibility with Python 2.6, for that we need unittest2 package,
# which is not available on 3.3 or 3.4
try:
import unittest2 as unittest
except ImportError:
import unittest

from tlslite.signed import SignatureSettings, SignedObject, RSA_SIGNATURE_HASHES, RSA_SCHEMES


class TestSignatureSettings(unittest.TestCase):
def test_signature_settings_validate(self):
settings = SignatureSettings()
validated = settings.validate()
self.assertEqual(validated.min_key_size, 1023)
self.assertEqual(validated.max_key_size, 8193)
self.assertEqual(validated.rsa_sig_hashes, RSA_SIGNATURE_HASHES)
self.assertEqual(validated.rsa_schemes, RSA_SCHEMES)

def test_signature_settings_min_key_size_small(self):
settings = SignatureSettings(min_key_size=256)
with self.assertRaises(ValueError) as ctx:
settings.validate()
self.assertIn("min_key_size too small", str(ctx.exception))

def test_signature_settings_min_key_size_large(self):
settings = SignatureSettings(min_key_size=17000)
with self.assertRaises(ValueError) as ctx:
settings.validate()
self.assertIn("min_key_size too large", str(ctx.exception))

def test_signature_settings_max_key_size_small(self):
settings = SignatureSettings(max_key_size=256)
with self.assertRaises(ValueError) as ctx:
settings.validate()
self.assertIn("max_key_size too small", str(ctx.exception))

def test_signature_settings_max_key_size_large(self):
settings = SignatureSettings(max_key_size=17000)
with self.assertRaises(ValueError) as ctx:
settings.validate()
self.assertIn("max_key_size too large", str(ctx.exception))

def test_signature_settings_min_key_bigger_max_key(self):
settings = SignatureSettings(min_key_size=2048, max_key_size=1024)
with self.assertRaises(ValueError) as ctx:
settings.validate()
self.assertIn("max_key_size smaller than min_key_size", str(ctx.exception))

def test_signature_settings_invalid_sig_alg(self):
settings = SignatureSettings(rsa_sig_hashes=list(['sha1', 'sha128', 'sha129']))
with self.assertRaises(ValueError) as ctx:
settings.validate()
self.assertIn("Following signature algorithms are not allowed: sha128, sha129",
str(ctx.exception))
#TODO: verify_signature method testing

0 comments on commit 26bd53e

Please sign in to comment.