Skip to content

Commit

Permalink
Fix MultipleTypeField on ASN.1 packets and OSCP (#4222) (#4223)
Browse files Browse the repository at this point in the history
  • Loading branch information
gpotter2 committed Jan 30, 2024
1 parent 11787b7 commit e2e4d2d
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 191 deletions.
18 changes: 12 additions & 6 deletions scapy/asn1fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ def __init__(self,
self.explicit_tag = explicit_tag
# network_tag gets useful for ASN1F_CHOICE
self.network_tag = int(implicit_tag or explicit_tag or self.ASN1_tag)
self.owners = [] # type: List[Type[ASN1_Packet]]

def register_owner(self, cls):
# type: (Type[ASN1_Packet]) -> None
self.owners.append(cls)

def i2repr(self, pkt, x):
# type: (ASN1_Packet, _I) -> str
Expand Down Expand Up @@ -884,12 +889,13 @@ def m2i(self, pkt, s): # type: ignore
return p, remain

def i2m(self, pkt, x): # type: ignore
# type: (ASN1_Packet, Optional[ASN1_Packet]) -> bytes
s = b"" if x is None else raw(x)
return super(ASN1F_BIT_STRING_ENCAPS, self).i2m(
pkt,
ASN1_BIT_STRING(s, readable=True)
)
# type: (ASN1_Packet, Optional[ASN1_BIT_STRING]) -> bytes
if not isinstance(x, ASN1_BIT_STRING):
x = ASN1_BIT_STRING(
b"" if x is None else bytes(x), # type: ignore
readable=True,
)
return super(ASN1F_BIT_STRING_ENCAPS, self).i2m(pkt, x)


class ASN1F_FLAGS(ASN1F_BIT_STRING):
Expand Down
4 changes: 4 additions & 0 deletions scapy/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,10 @@ def register_owner(self, cls):
fld.owners.append(cls)
self.dflt.owners.append(cls)

def get_fields_list(self):
# type: () -> List[Any]
return [self]

@property
def fld(self):
# type: () -> Field[Any, Any]
Expand Down
227 changes: 42 additions & 185 deletions scapy/layers/x509.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
ASN1F_UTF8_STRING, ASN1F_badsequence, ASN1F_enum_INTEGER, ASN1F_field, \
ASN1F_optional
from scapy.packet import Packet
from scapy.fields import PacketField
from scapy.fields import PacketField, MultipleTypeField
from scapy.volatile import ZuluTime, GeneralizedTime
from scapy.compat import plain_str

Expand Down Expand Up @@ -764,63 +764,25 @@ class X509_AlgorithmIdentifier(ASN1_Packet):
ASN1F_NULL, ECParameters)))


class ASN1F_X509_SubjectPublicKeyInfoRSA(ASN1F_SEQUENCE):
def __init__(self, **kargs):
seq = [ASN1F_PACKET("signatureAlgorithm",
X509_AlgorithmIdentifier(),
X509_AlgorithmIdentifier),
ASN1F_BIT_STRING_ENCAPS("subjectPublicKey",
RSAPublicKey(),
RSAPublicKey)]
ASN1F_SEQUENCE.__init__(self, *seq, **kargs)


class ASN1F_X509_SubjectPublicKeyInfoECDSA(ASN1F_SEQUENCE):
def __init__(self, **kargs):
seq = [ASN1F_PACKET("signatureAlgorithm",
X509_AlgorithmIdentifier(),
X509_AlgorithmIdentifier),
ASN1F_PACKET("subjectPublicKey", ECDSAPublicKey(),
ECDSAPublicKey)]
ASN1F_SEQUENCE.__init__(self, *seq, **kargs)


class ASN1F_X509_SubjectPublicKeyInfo(ASN1F_SEQUENCE):
def __init__(self, **kargs):
seq = [ASN1F_PACKET("signatureAlgorithm",
X509_AlgorithmIdentifier(),
X509_AlgorithmIdentifier),
ASN1F_BIT_STRING("subjectPublicKey", None)]
MultipleTypeField(
[
(ASN1F_BIT_STRING_ENCAPS("subjectPublicKey",
RSAPublicKey(),
RSAPublicKey),
lambda pkt: "rsa" in pkt.signatureAlgorithm.algorithm.oidname.lower()), # noqa: E501
(ASN1F_PACKET("subjectPublicKey",
ECDSAPublicKey(),
ECDSAPublicKey),
lambda pkt: "ecPublicKey" == pkt.signatureAlgorithm.algorithm.oidname), # noqa: E501
],
ASN1F_BIT_STRING("subjectPublicKey", ""))]
ASN1F_SEQUENCE.__init__(self, *seq, **kargs)

def m2i(self, pkt, x):
c, s = ASN1F_SEQUENCE.m2i(self, pkt, x)
keytype = pkt.fields["signatureAlgorithm"].algorithm.oidname
if "rsa" in keytype.lower():
return ASN1F_X509_SubjectPublicKeyInfoRSA().m2i(pkt, x)
elif keytype == "ecPublicKey":
return ASN1F_X509_SubjectPublicKeyInfoECDSA().m2i(pkt, x)
else:
raise Exception("could not parse subjectPublicKeyInfo")

def dissect(self, pkt, s):
c, x = self.m2i(pkt, s)
return x

def build(self, pkt):
if "signatureAlgorithm" in pkt.fields:
ktype = pkt.fields['signatureAlgorithm'].algorithm.oidname
else:
ktype = pkt.default_fields["signatureAlgorithm"].algorithm.oidname
if "rsa" in ktype.lower():
pkt.default_fields["subjectPublicKey"] = RSAPublicKey()
return ASN1F_X509_SubjectPublicKeyInfoRSA().build(pkt)
elif ktype == "ecPublicKey":
pkt.default_fields["subjectPublicKey"] = ECDSAPublicKey()
return ASN1F_X509_SubjectPublicKeyInfoECDSA().build(pkt)
else:
raise Exception("could not build subjectPublicKeyInfo")


class X509_SubjectPublicKeyInfo(ASN1_Packet):
ASN1_codec = ASN1_Codecs.BER
Expand Down Expand Up @@ -1004,20 +966,6 @@ def get_subject_str(self):
return name_str


class ASN1F_X509_CertECDSA(ASN1F_SEQUENCE):
def __init__(self, **kargs):
seq = [ASN1F_PACKET("tbsCertificate",
X509_TBSCertificate(),
X509_TBSCertificate),
ASN1F_PACKET("signatureAlgorithm",
X509_AlgorithmIdentifier(),
X509_AlgorithmIdentifier),
ASN1F_BIT_STRING_ENCAPS("signatureValue",
ECDSASignature(),
ECDSASignature)]
ASN1F_SEQUENCE.__init__(self, *seq, **kargs)


class ASN1F_X509_Cert(ASN1F_SEQUENCE):
def __init__(self, **kargs):
seq = [ASN1F_PACKET("tbsCertificate",
Expand All @@ -1026,37 +974,17 @@ def __init__(self, **kargs):
ASN1F_PACKET("signatureAlgorithm",
X509_AlgorithmIdentifier(),
X509_AlgorithmIdentifier),
ASN1F_BIT_STRING("signatureValue",
"defaultsignature" * 2)]
MultipleTypeField(
[
(ASN1F_BIT_STRING_ENCAPS("signatureValue",
ECDSASignature(),
ECDSASignature),
lambda pkt: "ecdsa" in pkt.signatureAlgorithm.algorithm.oidname.lower()), # noqa: E501
],
ASN1F_BIT_STRING("signatureValue",
"defaultsignature" * 2))]
ASN1F_SEQUENCE.__init__(self, *seq, **kargs)

def m2i(self, pkt, x):
c, s = ASN1F_SEQUENCE.m2i(self, pkt, x)
sigtype = pkt.fields["signatureAlgorithm"].algorithm.oidname
if "rsa" in sigtype.lower():
return c, s
elif "ecdsa" in sigtype.lower():
return ASN1F_X509_CertECDSA().m2i(pkt, x)
else:
raise Exception("could not parse certificate")

def dissect(self, pkt, s):
c, x = self.m2i(pkt, s)
return x

def build(self, pkt):
if "signatureAlgorithm" in pkt.fields:
sigtype = pkt.fields['signatureAlgorithm'].algorithm.oidname
else:
sigtype = pkt.default_fields["signatureAlgorithm"].algorithm.oidname # noqa: E501
if "rsa" in sigtype.lower():
return ASN1F_SEQUENCE.build(self, pkt)
elif "ecdsa" in sigtype.lower():
pkt.default_fields["signatureValue"] = ECDSASignature()
return ASN1F_X509_CertECDSA().build(pkt)
else:
raise Exception("could not build certificate")


class X509_Cert(ASN1_Packet):
ASN1_codec = ASN1_Codecs.BER
Expand Down Expand Up @@ -1121,20 +1049,6 @@ def get_issuer_str(self):
return name_str


class ASN1F_X509_CRLECDSA(ASN1F_SEQUENCE):
def __init__(self, **kargs):
seq = [ASN1F_PACKET("tbsCertList",
X509_TBSCertList(),
X509_TBSCertList),
ASN1F_PACKET("signatureAlgorithm",
X509_AlgorithmIdentifier(),
X509_AlgorithmIdentifier),
ASN1F_BIT_STRING_ENCAPS("signatureValue",
ECDSASignature(),
ECDSASignature)]
ASN1F_SEQUENCE.__init__(self, *seq, **kargs)


class ASN1F_X509_CRL(ASN1F_SEQUENCE):
def __init__(self, **kargs):
seq = [ASN1F_PACKET("tbsCertList",
Expand All @@ -1143,37 +1057,17 @@ def __init__(self, **kargs):
ASN1F_PACKET("signatureAlgorithm",
X509_AlgorithmIdentifier(),
X509_AlgorithmIdentifier),
ASN1F_BIT_STRING("signatureValue",
"defaultsignature" * 2)]
MultipleTypeField(
[
(ASN1F_BIT_STRING_ENCAPS("signatureValue",
ECDSASignature(),
ECDSASignature),
lambda pkt: "ecdsa" in pkt.signatureAlgorithm.algorithm.oidname.lower()), # noqa: E501
],
ASN1F_BIT_STRING("signatureValue",
"defaultsignature" * 2))]
ASN1F_SEQUENCE.__init__(self, *seq, **kargs)

def m2i(self, pkt, x):
c, s = ASN1F_SEQUENCE.m2i(self, pkt, x)
sigtype = pkt.fields["signatureAlgorithm"].algorithm.oidname
if "rsa" in sigtype.lower():
return c, s
elif "ecdsa" in sigtype.lower():
return ASN1F_X509_CRLECDSA().m2i(pkt, x)
else:
raise Exception("could not parse certificate")

def dissect(self, pkt, s):
c, x = self.m2i(pkt, s)
return x

def build(self, pkt):
if "signatureAlgorithm" in pkt.fields:
sigtype = pkt.fields['signatureAlgorithm'].algorithm.oidname
else:
sigtype = pkt.default_fields["signatureAlgorithm"].algorithm.oidname # noqa: E501
if "rsa" in sigtype.lower():
return ASN1F_SEQUENCE.build(self, pkt)
elif "ecdsa" in sigtype.lower():
pkt.default_fields["signatureValue"] = ECDSASignature()
return ASN1F_X509_CRLECDSA().build(pkt)
else:
raise Exception("could not build certificate")


class X509_CRL(ASN1_Packet):
ASN1_codec = ASN1_Codecs.BER
Expand Down Expand Up @@ -1231,7 +1125,7 @@ class OCSP_SingleResponse(ASN1_Packet):
ASN1_codec = ASN1_Codecs.BER
ASN1_root = ASN1F_SEQUENCE(
ASN1F_PACKET("certID", OCSP_CertID(), OCSP_CertID),
ASN1F_PACKET("certStatus", OCSP_CertStatus(),
ASN1F_PACKET("certStatus", OCSP_CertStatus(certStatus=OCSP_GoodInfo()),
OCSP_CertStatus),
ASN1F_GENERALIZED_TIME("thisUpdate", ""),
ASN1F_optional(
Expand Down Expand Up @@ -1268,7 +1162,7 @@ class OCSP_ResponseData(ASN1_Packet):
ASN1F_optional(
ASN1F_enum_INTEGER("version", 0, {0: "v1"},
explicit_tag=0x80)),
ASN1F_PACKET("responderID", OCSP_ResponderID(),
ASN1F_PACKET("responderID", OCSP_ResponderID(responderID=OCSP_ByName()),
OCSP_ResponderID),
ASN1F_GENERALIZED_TIME("producedAt",
str(GeneralizedTime())),
Expand All @@ -1279,23 +1173,6 @@ class OCSP_ResponseData(ASN1_Packet):
explicit_tag=0xa1)))


class ASN1F_OCSP_BasicResponseECDSA(ASN1F_SEQUENCE):
def __init__(self, **kargs):
seq = [ASN1F_PACKET("tbsResponseData",
OCSP_ResponseData(),
OCSP_ResponseData),
ASN1F_PACKET("signatureAlgorithm",
X509_AlgorithmIdentifier(),
X509_AlgorithmIdentifier),
ASN1F_BIT_STRING_ENCAPS("signature",
ECDSASignature(),
ECDSASignature),
ASN1F_optional(
ASN1F_SEQUENCE_OF("certs", None, X509_Cert,
explicit_tag=0xa0))]
ASN1F_SEQUENCE.__init__(self, *seq, **kargs)


class ASN1F_OCSP_BasicResponse(ASN1F_SEQUENCE):
def __init__(self, **kargs):
seq = [ASN1F_PACKET("tbsResponseData",
Expand All @@ -1304,40 +1181,20 @@ def __init__(self, **kargs):
ASN1F_PACKET("signatureAlgorithm",
X509_AlgorithmIdentifier(),
X509_AlgorithmIdentifier),
ASN1F_BIT_STRING("signature",
"defaultsignature" * 2),
MultipleTypeField(
[
(ASN1F_BIT_STRING_ENCAPS("signature",
ECDSASignature(),
ECDSASignature),
lambda pkt: "ecdsa" in pkt.signatureAlgorithm.algorithm.oidname.lower()), # noqa: E501
],
ASN1F_BIT_STRING("signature",
"defaultsignature" * 2)),
ASN1F_optional(
ASN1F_SEQUENCE_OF("certs", None, X509_Cert,
explicit_tag=0xa0))]
ASN1F_SEQUENCE.__init__(self, *seq, **kargs)

def m2i(self, pkt, x):
c, s = ASN1F_SEQUENCE.m2i(self, pkt, x)
sigtype = pkt.fields["signatureAlgorithm"].algorithm.oidname
if "rsa" in sigtype.lower():
return c, s
elif "ecdsa" in sigtype.lower():
return ASN1F_OCSP_BasicResponseECDSA().m2i(pkt, x)
else:
raise Exception("could not parse OCSP basic response")

def dissect(self, pkt, s):
c, x = self.m2i(pkt, s)
return x

def build(self, pkt):
if "signatureAlgorithm" in pkt.fields:
sigtype = pkt.fields['signatureAlgorithm'].algorithm.oidname
else:
sigtype = pkt.default_fields["signatureAlgorithm"].algorithm.oidname # noqa: E501
if "rsa" in sigtype.lower():
return ASN1F_SEQUENCE.build(self, pkt)
elif "ecdsa" in sigtype.lower():
pkt.default_fields["signatureValue"] = ECDSASignature()
return ASN1F_OCSP_BasicResponseECDSA().build(pkt)
else:
raise Exception("could not build OCSP basic response")


class OCSP_ResponseBytes(ASN1_Packet):
ASN1_codec = ASN1_Codecs.BER
Expand Down
7 changes: 7 additions & 0 deletions test/scapy/layers/x509.uts
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,10 @@ singleResponse.singleExtensions is None
= OCSP class : OCSP Response reconstruction
raw(response) == s

= OSCP class : OSCP Response with ECDSA
response = OCSP_ResponseBytes()
assert bytes(response.signature) == b'\x03!\x00defaultsignaturedefaultsignature'
response.signatureAlgorithm.algorithm = ASN1_OID('1.2.840.10045.4.3.2')
assert bytes(response.signature) == b'\x03\t\x000\x06\x02\x01\x00\x02\x01\x00'
response = OCSP_ResponseBytes(bytes(response))
assert isinstance(response.signature, ECDSASignature)

0 comments on commit e2e4d2d

Please sign in to comment.