diff --git a/scapy/asn1/mib.py b/scapy/asn1/mib.py index 7bb30811e00..b164fcc9c44 100644 --- a/scapy/asn1/mib.py +++ b/scapy/asn1/mib.py @@ -807,6 +807,7 @@ def load_mib(filenames): # of some algorithms from pkcs1_oids and x962Signature_oids. hash_by_oid = { + "1.2.840.113549.1.1.1": "sha1", "1.2.840.113549.1.1.2": "md2", "1.2.840.113549.1.1.3": "md4", "1.2.840.113549.1.1.4": "md5", diff --git a/scapy/layers/gssapi.py b/scapy/layers/gssapi.py index 6f4d5e91343..5e1732ab078 100644 --- a/scapy/layers/gssapi.py +++ b/scapy/layers/gssapi.py @@ -342,7 +342,7 @@ def fromssl( log_runtime.warning("Failed to parse the SSL Certificate. CBT not used") return GSS_C_NO_CHANNEL_BINDINGS try: - h = cert.getSignatureHash() + h = cert.getCertSignatureHash() except Exception: # We failed to get the signature algorithm. log_runtime.warning( diff --git a/scapy/layers/kerberos.py b/scapy/layers/kerberos.py index 0540d17e3d4..f8e18ce64c2 100644 --- a/scapy/layers/kerberos.py +++ b/scapy/layers/kerberos.py @@ -269,15 +269,21 @@ def toString(self): return "/".join(x.val.decode() for x in self.nameString) @staticmethod - def fromUPN(upn: str): + def fromUPN(upn: str, canonicalize: bool = False): """ Create a PrincipalName from a UPN string. """ - user, _ = _parse_upn(upn) - return PrincipalName( - nameString=[ASN1_GENERAL_STRING(user)], - nameType=ASN1_INTEGER(1), # NT-PRINCIPAL - ) + if canonicalize: + return PrincipalName( + nameString=[ASN1_GENERAL_STRING(upn)], + nameType=ASN1_INTEGER(10), # NT-ENTERPRISE + ) + else: + user, _ = _parse_upn(upn) + return PrincipalName( + nameString=[ASN1_GENERAL_STRING(user)], + nameType=ASN1_INTEGER(1), # NT-PRINCIPAL + ) @staticmethod def fromSPN(spn: str): @@ -728,6 +734,7 @@ class AD_AND_OR(ASN1_Packet): 15: "PA-PK-AS-REP-OLD", 16: "PA-PK-AS-REQ", 17: "PA-PK-AS-REP", + 18: "PA-PK-OCSP-RESPONSE", 19: "PA-ETYPE-INFO2", 20: "PA-SVR-REFERRAL-INFO", 111: "TD-CMS-DIGEST-ALGORITHMS", @@ -1381,20 +1388,21 @@ class KRB_PKAuthenticator(ASN1_Packet): ), # [MS-PKCA] sect 2.2.3 ASN1F_optional( - ASN1F_PACKET("paChecksum2", PAChecksum2(), PAChecksum2, explicit_tag=0xA5), + ASN1F_PACKET("paChecksum2", None, PAChecksum2, explicit_tag=0xA5), ), ) - def make_checksum(self, text, h="sha256"): + def make_checksum(self, text, h: str = "sha256"): """ - Populate paChecksum and paChecksum2 + Populate paChecksum """ # paChecksum (always sha-1) self.paChecksum = ASN1_STRING(Hash_SHA().digest(text)) # paChecksum2 - self.paChecksum2 = PAChecksum2() - self.paChecksum2.make(text, h=h) + if h != "sha1": + self.paChecksum2 = PAChecksum2() + self.paChecksum2.make(text, h=h) def verify_checksum(self, text): """ @@ -1403,7 +1411,8 @@ def verify_checksum(self, text): if self.paChecksum.val != Hash_SHA().digest(text): raise ValueError("Bad paChecksum checksum !") - self.paChecksum2.verify(text) + if self.paChecksum2 is not None: + self.paChecksum2.verify(text) # RFC8636 sect 6 @@ -2118,11 +2127,12 @@ def m2i(self, pkt, s): val = super(_KRBERROR_data_Field, self).m2i(pkt, s) if not val[0].val: return val - if pkt.errorCode.val in [14, 24, 25, 36]: + if pkt.errorCode.val in [14, 24, 25, 36, 80]: # 14: KDC_ERR_ETYPE_NOSUPP # 24: KDC_ERR_PREAUTH_FAILED # 25: KDC_ERR_PREAUTH_REQUIRED # 36: KRB_AP_ERR_BADMATCH + # 80: KDC_ERR_DIGEST_IN_SIGNED_DATA_NOT_ACCEPTED return MethodData(val[0].val, _underlayer=pkt), val[1] elif pkt.errorCode.val in [6, 7, 12, 13, 18, 29, 32, 41, 60, 62]: # 6: KDC_ERR_C_PRINCIPAL_UNKNOWN @@ -2993,6 +3003,7 @@ class KerberosClient(Automaton): :param mode: the mode to use for the client (default: AS_REQ). :param ip: the IP of the DC (default: discovered by dclocator) :param upn: the UPN of the client. + :param canonicalize: request the UPN to be canonicalized. :param password: the password of the client. :param key: the Key of the client (instead of the password) :param realm: the realm of the domain. (default: from the UPN) @@ -3009,6 +3020,8 @@ class KerberosClient(Automaton): :param armor_ticket_upn: the UPN of the client of the armoring ticket :param armor_ticket_skey: the session Key object of the armoring ticket :param etypes: specify the list of encryption types to support + :param dhashes: specify the list of supported digest algorithms for PKINIT + (defaults to ["sha1", "sha256", "sha384", "sha512"]) AS-REQ only: @@ -3048,12 +3061,14 @@ def __init__( mode=MODE.AS_REQ, ip: Optional[str] = None, upn: Optional[str] = None, + canonicalize: bool = False, password: Optional[str] = None, key: Optional["Key"] = None, realm: Optional[str] = None, x509: Optional[Union[Cert, str]] = None, x509key: Optional[Union[PrivKey, str]] = None, ca: Optional[Union[CertTree, str]] = None, + no_verify_cert: bool = False, p12: Optional[str] = None, spn: Optional[str] = None, ticket: Optional[KRB_Ticket] = None, @@ -3072,6 +3087,7 @@ def __init__( armor_ticket_skey: Optional["Key"] = None, key_list_req: List["EncryptionType"] = [], etypes: Optional[List["EncryptionType"]] = None, + dhashes: Optional[List[str]] = None, pkinit_kex_method: PKINIT_KEX_METHOD = PKINIT_KEX_METHOD.DIFFIE_HELLMAN, port: int = 88, timeout: int = 5, @@ -3081,22 +3097,6 @@ def __init__( import scapy.libs.rfc3961 # Trigger error if any # noqa: F401 from scapy.layers.ldap import dclocator - if not upn: - raise ValueError("Invalid upn") - if not spn: - raise ValueError("Invalid spn") - if realm is None: - if mode in [self.MODE.AS_REQ, self.MODE.GET_SALT]: - _, realm = _parse_upn(upn) - elif mode == self.MODE.TGS_REQ: - _, realm = _parse_spn(spn) - if not realm and ticket: - # if no realm is specified, but there's a ticket, take the realm - # of the ticket. - realm = ticket.realm.val.decode() - else: - raise ValueError("Invalid realm") - # PKINIT checks if p12 is not None: # password should be None or bytes @@ -3137,12 +3137,58 @@ def __init__( x509key = PrivKey(x509key) if ca and not isinstance(ca, CertList): ca = CertList(ca) + if upn is None and x509: + # For PKINIT, get the UPN from the SAN, if possible and present + if realm is None: + raise ValueError( + "When using PKINIT, you must at least specify the realm= !" + ) + for ext in x509.extensions: + if ext.extnID.val == "2.5.29.17": # subjectAltName + generalName = ext.extnValue.subjectAltName[0].generalName + upn = generalName.value.val.decode("utf-8") + break + if upn is None: + raise ValueError( + "Could not find subjectAltName in certificate !" + " Please provide a UPN." + ) + canonicalize = True + + # UPN, SPN and realm calculation + if not upn: + raise ValueError("Invalid upn") + if realm is None: + if mode in [self.MODE.AS_REQ, self.MODE.GET_SALT]: + _, realm = _parse_upn(upn) + elif mode == self.MODE.TGS_REQ: + _, realm = _parse_spn(spn) + if not realm and ticket: + # if no realm is specified, but there's a ticket, take the realm + # of the ticket. + realm = ticket.realm.val.decode() + else: + raise ValueError("Invalid realm") + if not spn and mode == self.MODE.AS_REQ and realm: + spn = "krbtgt/" + realm + elif not spn: + raise ValueError("Invalid spn") + # Extra checks for specific requests if mode in [self.MODE.AS_REQ, self.MODE.GET_SALT]: if not host: raise ValueError("Invalid host") - if x509 is not None and (not x509key or not ca): - raise ValueError("Must provide both 'x509', 'x509key' and 'ca' !") + if x509 is not None: + if x509key and not ca: + if not no_verify_cert: + raise ValueError( + "Using PKINIT without specifying the remote CA is unsafe !" + " Set no_verify_cert=True to bypass this check." + ) + else: + ca = [] + elif not x509key or not ca: + raise ValueError("Must provide both 'x509', 'x509key' and 'ca' !") elif mode == self.MODE.TGS_REQ: if not ticket: raise ValueError("Invalid ticket") @@ -3174,6 +3220,8 @@ def __init__( "Cannot specify armor_ticket without armor_ticket_{upn,skey}" ) + # Provide default supported encryption types. For SALT mode, we discard + # the encryption types that don't have a salt. if mode == self.MODE.GET_SALT: if etypes is not None: raise ValueError("Cannot specify etypes in GET_SALT mode !") @@ -3187,7 +3235,6 @@ def __init__( EncryptionType.AES256_CTS_HMAC_SHA1_96, EncryptionType.AES128_CTS_HMAC_SHA1_96, EncryptionType.RC4_HMAC, - EncryptionType.RC4_HMAC_EXP, EncryptionType.DES_CBC_MD5, ] self.etypes = etypes @@ -3208,6 +3255,7 @@ def __init__( self.password = password and bytes_encode(password) self.spn = spn self.upn = upn + self.canonicalize = canonicalize # Whether we request canonicalization self.realm = realm.upper() self.x509 = x509 self.x509key = x509key @@ -3233,7 +3281,12 @@ def __init__( # This marks that we sent a FAST-req and are awaiting for an answer self.fast_req_sent = False # Session parameters - self.pre_auth = False + if self.x509: + # Windows only assumes it needs a pre-auth when PKINIT is used, + # otherwise it waits to have a PREAUTH_REQUIRED error first. + self.pre_auth = True + else: + self.pre_auth = False self.pa_type = None # preauth-type that's used self.fast_rep = None self.fast_error = None @@ -3241,11 +3294,17 @@ def __init__( self.fast_armorkey = None # The armor key self.fxcookie = None self.pkinit_dh_key = None + self.no_verify_cert = no_verify_cert if ca is not None: self.pkinit_cms = CMS_Engine(ca) else: self.pkinit_cms = None + if dhashes is None: + self.dhashes = ["sha1", "sha256", "sha384", "sha512"] + else: + self.dhashes = dhashes + # Launch the client sock = self._connect() super(KerberosClient, self).__init__( sock=sock, @@ -3455,7 +3514,8 @@ def as_req(self): address=ASN1_STRING(self.host.ljust(16, " ")), ) ] - kdc_req.cname = PrincipalName.fromUPN(self.upn) + kdc_req.addresses = None + kdc_req.cname = PrincipalName.fromUPN(self.upn, canonicalize=self.canonicalize) kdc_req.sname = PrincipalName.fromSPN(self.spn) # 2. Build the list of PADATA @@ -3507,7 +3567,7 @@ def as_req(self): nonce=ASN1_INTEGER(RandNum(0, 0x7FFFFFFF)._fix()), ), clientPublicValue=None, # Used only in DH mode - supportedCMSTypes=None, + supportedCMSTypes=[], clientDHNonce=None, supportedKDFs=None, ) @@ -3515,7 +3575,7 @@ def as_req(self): if self.pkinit_kex_method == PKINIT_KEX_METHOD.DIFFIE_HELLMAN: # RFC4556 - 3.2.3.1. Diffie-Hellman Key Exchange - # We use modp2048 + # We (and Windows) use modp2048 dh_parameters = _ffdh_groups["modp2048"][0] self.pkinit_dh_key = dh_parameters.generate_private_key() numbers = dh_parameters.parameter_numbers() @@ -3530,6 +3590,7 @@ def as_req(self): g=ASN1_INTEGER(numbers.g), # q: see ERRATA 1 of RFC4556 q=ASN1_INTEGER(numbers.q or (numbers.p - 1) // 2), + j=None, ), ), subjectPublicKey=DHPublicKey( @@ -3565,8 +3626,15 @@ def as_req(self): else: raise ValueError - # Populate paChecksum and PAChecksum2 - authpack.pkAuthenticator.make_checksum(bytes(kdc_req)) + # Find a supported digest hash. Windows 25H2 still defaults + # to SHA1 unless a client policy has been applied. + dhash = next(iter(self.dhashes)) + + # Populate paChecksum + authpack.pkAuthenticator.make_checksum( + bytes(kdc_req), + h=dhash, + ) # Sign the AuthPack signedAuthpack = self.pkinit_cms.sign( @@ -3574,6 +3642,7 @@ def as_req(self): ASN1_OID("id-pkinit-authData"), self.x509, self.x509key, + dhash=dhash, ) # Build PA-DATA @@ -3586,6 +3655,14 @@ def as_req(self): kdcPkId=None, ), ) + + # RFC 4557 extension - OCSP + padata.insert( + 0, + PADATA( + padataType=18, # PA-PK-OCSP-RESPONSE + ), + ) else: # Key-based factor @@ -3784,7 +3861,7 @@ def tgs_req(self): _, crealm = _parse_upn(self.upn) authenticator = KRB_Authenticator( crealm=ASN1_GENERAL_STRING(crealm), - cname=PrincipalName.fromUPN(self.upn), + cname=PrincipalName.fromUPN(self.upn, canonicalize=self.canonicalize), cksum=None, ctime=ASN1_GENERALIZED_TIME(now_time), cusec=ASN1_INTEGER(0), @@ -3899,6 +3976,7 @@ def _process_padatas_and_key(self, padatas, etype: "EncryptionType" = None): keyinfo = self.pkinit_cms.verify( padata.padataValue.rep.dhSignedData, eContentType=ASN1_OID("id-pkinit-DHKeyData"), + no_verify_cert=self.no_verify_cert, ) # If 'etype' is None, we're in an error. Since we verified @@ -3925,6 +4003,9 @@ def _process_padatas_and_key(self, padatas, etype: "EncryptionType" = None): else: raise ValueError + elif padata.padataType == 111: # TD-CMS-DIGEST-ALGORITHMS + self.dhashes = [x.algorithm.oidname for x in padata.padataValue.seq] + elif padata.padataType == 133: # PA-FX-COOKIE # Get cookie and store it self.fxcookie = padata.padataValue @@ -4021,7 +4102,7 @@ def receive_krb_error_as_req(self, pkt): return if pkt.root.errorCode == 25: # KDC_ERR_PREAUTH_REQUIRED - if not self.key and not self.x509: + if not self.key: log_runtime.error( "Got 'KDC_ERR_PREAUTH_REQUIRED', " "but no possible key could be computed." @@ -4030,6 +4111,9 @@ def receive_krb_error_as_req(self, pkt): self.should_followup = True self.pre_auth = True raise self.BEGIN() + elif pkt.root.errorCode == 80: # KDC_ERR_DIGEST_IN_SIGNED_DATA_NOT_ACCEPTED + self.should_followup = True + raise self.BEGIN() else: self._show_krb_error(pkt) raise self.FINAL() @@ -4068,6 +4152,11 @@ def decrypt_as_rep(self, pkt): self.fast_armorkey, bytes(pkt.root.ticket), ) + # Process pa of FAST response + self._process_padatas_and_key( + self.fast_rep.padata, + etype=pkt.root.encPart.etype.val, + ) self.fast_rep = None elif self.fast: raise ValueError("Answer was not FAST ! Is it supported?") @@ -4204,7 +4293,7 @@ def _spn_are_equal(spn1, spn2): def krb_as_req( - upn: str, + upn: Optional[str] = None, spn: Optional[str] = None, ip: Optional[str] = None, key: Optional["Key"] = None, @@ -4248,12 +4337,10 @@ def krb_as_req( ...: f4e99205e78f8da7681d4ec5520ae4815543720c2a647c1ae814c9")) >>> krb_as_req("user1@DOMAIN.LOCAL", ip="192.168.122.17", key=key) - Example using PKINIT with a p12:: + Example using PKINIT with a p12 ("password" is the password of the p12):: - >>> krb_as_req("user1@DOMAIN.LOCAL", p12="./store.p12", password="password") + >>> krb_as_req(p12="./store.p12", realm="DOMAIN.LOCAL", password="password") """ - if realm is None: - _, realm = _parse_upn(upn) if key is None and p12 is None and x509 is None: if password is None: try: @@ -4266,7 +4353,7 @@ def krb_as_req( mode=KerberosClient.MODE.AS_REQ, realm=realm, ip=ip, - spn=spn or "krbtgt/" + realm, + spn=spn, host=host, upn=upn, password=password, diff --git a/scapy/layers/tls/cert.py b/scapy/layers/tls/cert.py index 7f3e710d806..e41ca71a784 100644 --- a/scapy/layers/tls/cert.py +++ b/scapy/layers/tls/cert.py @@ -135,10 +135,10 @@ X509_CRL, X509_SubjectPublicKeyInfo, ) +from scapy.layers.tls.crypto.hash import _get_hash from scapy.layers.tls.crypto.pkcs1 import ( _DecryptAndSignRSA, _EncryptAndVerifyRSA, - _get_hash, pkcs_os2ip, ) from scapy.compat import bytes_encode @@ -155,7 +155,7 @@ from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization - from cryptography.hazmat.primitives.asymmetric import rsa, ec, x25519 + from cryptography.hazmat.primitives.asymmetric import rsa, ec, x25519, x448 # cryptography raised the minimum RSA key length to 1024 in 43.0+ # https://github.com/pyca/cryptography/pull/10278 @@ -567,15 +567,6 @@ def __call__(cls, key_path=None, cryptography_obj=None): _an RSAPrivateKey; _an ECDSAPrivateKey. """ - if key_path is None: - obj = type.__call__(cls) - if cls is PrivKey: - cls = PrivKeyECDSA - obj.__class__ = cls - obj.frmt = "original" - obj.fill_and_store() - return obj - # This allows to import cryptography objects directly if cryptography_obj is not None: # We (stupidly) need to go through the whole import process because RSA @@ -588,6 +579,14 @@ def __call__(cls, key_path=None, cryptography_obj=None): encryption_algorithm=serialization.NoEncryption(), ), ) + elif key_path is None: + obj = type.__call__(cls) + if cls is PrivKey: + cls = PrivKeyECDSA + obj.__class__ = cls + obj.frmt = "original" + obj.fill_and_store() + return obj else: # Load from file obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE) @@ -1029,7 +1028,7 @@ def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None): def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): return self.pubkey.verify(msg, sig, t=t, h=h, mgf=mgf, L=L) - def getSignatureHash(self): + def getCertSignatureHash(self): """ Return the hash cryptography object used by the 'signatureAlgorithm' """ @@ -1142,6 +1141,10 @@ def pubKey(self): ) return self.pubkey + @property + def extensions(self): + return self.tbsCertificate.extensions + def __eq__(self, other): return self.der == other.der @@ -1635,7 +1638,7 @@ def _rec_getchain(chain, curtree): for c, subtree in curtree: curchain = chain + [c] # If 'cert' is issued by c - if cert.isIssuer(c): + if cert.isIssuer(c) or c == cert: # Final node of the chain ! # (add the final cert if not self signed) if c != cert: @@ -1650,7 +1653,8 @@ def _rec_getchain(chain, curtree): chain = _rec_getchain([], self.tree) if chain is not None: - return CertTree(chain) + # We add the first certificate to the ROOT in all cases + return CertTree(chain, [chain[0]]) else: return None @@ -1718,13 +1722,75 @@ def __init__( self.store = store self.crls = crls + def _get_algorithms(self, key: PrivKey, h="sha256") -> ASN1_OID: + """ + Get the algorithms matching a private key + """ + if isinstance(key, PrivKeyRSA): + # RFC3370 sect 3.2 + return ( + ASN1_OID("rsaEncryption"), + _get_hash(h), + h, + ) + elif isinstance(key, PrivKeyECDSA): + # RFC5753 sect 2.1.1 + if h == "sha1": + return ( + ASN1_OID("ecdsa-with-SHA1"), + hashes.SHA1(), + "sha1", + ) + elif h == "sha224": + return ( + ASN1_OID("ecdsa-with-SHA224"), + hashes.SHA224(), + "sha224", + ) + elif h == "sha256": + return ( + ASN1_OID("ecdsa-with-SHA256"), + hashes.SHA256(), + "sha256", + ) + elif h == "sha384": + return ( + ASN1_OID("ecdsa-with-SHA384"), + hashes.SHA384(), + "sha384", + ) + elif h == "sha512": + return ( + ASN1_OID("ecdsa-with-SHA512"), + hashes.SHA512(), + "sha512", + ) + else: + raise ValueError("Unknown hash for private key !") + elif isinstance(key, PrivKeyEdDSA): + # RFC8419 sect 2.3 + if isinstance(key.key, x25519.X25519PrivateKey): + return ( + ASN1_OID("Ed25519"), + hashes.SHA512(), + "sha512", + ) + elif isinstance(key.key, x448.X448PrivateKey): + return ( + ASN1_OID("Ed448"), + hashes.SHAKE256(64), + "shake256", + ) + else: + raise ValueError("Unknown private key type !") + def sign( self, message: Union[bytes, Packet], eContentType: ASN1_OID, cert: Cert, key: PrivKey, - h: Optional[str] = None, + dhash: Optional[str] = "sha256", ): """ Sign a message using CMS. @@ -1733,17 +1799,18 @@ def sign( :param eContentType: the OID of the inner content. :param cert: the certificate whose key to use use for signing. :param key: the private key to use for signing. - :param h: the hash to use (default: same as the certificate's signature) + :param dhash: the hash to use for message digest (ECDSA only). We currently only support X.509 certificates ! """ - # RFC3852 - 5.4. Message Digest Calculation Process - h = h or _get_cert_sig_hashname(cert) - hash = hashes.Hash(_get_hash(h)) + sigalg, cdhash, dhash = self._get_algorithms(key, h=dhash) + + # RFC3852 5.4. Message Digest Calculation Process + hash = hashes.Hash(cdhash) hash.update(bytes(message)) hashed_message = hash.finalize() - # 5.5. Signature Generation Process + # RFC3852 5.5. Signature Generation Process signerInfo = CMS_SignerInfo( version=1, sid=CMS_IssuerAndSerialNumber( @@ -1751,7 +1818,7 @@ def sign( serialNumber=cert.tbsCertificate.serialNumber, ), digestAlgorithm=X509_AlgorithmIdentifier( - algorithm=ASN1_OID(h), + algorithm=ASN1_OID(dhash), parameters=ASN1_NULL(0), ), signedAttrs=[ @@ -1769,7 +1836,10 @@ def sign( ], ), ], - signatureAlgorithm=cert.tbsCertificate.signature, + signatureAlgorithm=X509_AlgorithmIdentifier( + algorithm=sigalg, + parameters=ASN1_NULL(0), + ), ) signerInfo.signature = ASN1_STRING( key.sign( @@ -1778,7 +1848,7 @@ def sign( signedAttrs=signerInfo.signedAttrs, ) ), - h=h, + h=dhash, ) ) @@ -1792,7 +1862,7 @@ def sign( content=CMS_SignedData( version=3 if certificates else 1, digestAlgorithms=X509_AlgorithmIdentifier( - algorithm=ASN1_OID(h), + algorithm=ASN1_OID(dhash), parameters=ASN1_NULL(0), ), encapContentInfo=CMS_EncapsulatedContentInfo( @@ -1820,6 +1890,7 @@ def verify( contentInfo: CMS_ContentInfo, eContentType: Optional[ASN1_OID] = None, eContent: Optional[bytes] = None, + no_verify_cert: bool = False, ): """ Verify a CMS message against the list of trusted certificates, @@ -1828,6 +1899,7 @@ def verify( :param contentInfo: the ContentInfo whose signature to verify :param eContentType: if provided, verifies that the content type is valid :param eContent: in PKCS 7.1, provide the content to verify + :param no_verify_cert: do not check the remote certificate (unsafe) """ if contentInfo.contentType.oidname != "id-signedData": raise ValueError("ContentInfo isn't signed !") @@ -1846,11 +1918,14 @@ def verify( # Check all signatures for signerInfo in signeddata.signerInfos: + sigh = hash_by_oid[signerInfo.signatureAlgorithm.algorithm.val] + # Find certificate in the chain that did this cert: Cert = certTree.findCertBySid(signerInfo.sid) # Verify certificate signature - certTree.verify(cert) + if not no_verify_cert: + certTree.verify(cert) # Verify the message hash if signerInfo.signedAttrs: @@ -1911,11 +1986,13 @@ def verify( ) ), sig=signerInfo.signature.val, + h=sigh, ) else: cert.verify( msg=bytes(signeddata.encapContentInfo), sig=signerInfo.signature.val, + h=sigh, ) # Return the content diff --git a/scapy/layers/tls/crypto/h_mac.py b/scapy/layers/tls/crypto/h_mac.py index 26c69ebfbe0..db984dc22a2 100644 --- a/scapy/layers/tls/crypto/h_mac.py +++ b/scapy/layers/tls/crypto/h_mac.py @@ -8,10 +8,12 @@ HMAC classes. """ -import hmac - +from scapy.config import conf from scapy.layers.tls.crypto.hash import _tls_hash_algs -from scapy.compat import bytes_encode + +if conf.crypto_valid: + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives.hmac import HMAC _SSLv3_PAD1_MD5 = b"\x36" * 48 _SSLv3_PAD1_SHA1 = b"\x36" * 40 @@ -30,15 +32,18 @@ class _GenericHMACMetaclass(type): the associated hash function (see RFC 5246, appendix C). Also, we do not need to instantiate the associated hash function. """ + def __new__(cls, hmac_name, bases, dct): - hash_name = hmac_name[5:] # remove leading "Hmac_" + hash_name = hmac_name[5:] # remove leading "Hmac_" if hmac_name != "_GenericHMAC": + hash_alg = _tls_hash_algs[hash_name.lower()] dct["name"] = "HMAC-%s" % hash_name - dct["hash_alg"] = _tls_hash_algs[hash_name] - dct["hmac_len"] = _tls_hash_algs[hash_name].hash_len + dct["hash_alg"] = hash_alg + dct["hmac_len"] = hash_alg.hash_len dct["key_len"] = dct["hmac_len"] - the_class = super(_GenericHMACMetaclass, cls).__new__(cls, hmac_name, - bases, dct) + the_class = super(_GenericHMACMetaclass, cls).__new__( + cls, hmac_name, bases, dct + ) if hmac_name != "_GenericHMAC": _tls_hmac_algs[dct["name"]] = the_class return the_class @@ -48,38 +53,36 @@ class HMACError(Exception): """ Raised when HMAC verification fails. """ + pass class _GenericHMAC(metaclass=_GenericHMACMetaclass): def __init__(self, key=None): - if key is None: - self.key = b"" - else: - self.key = bytes_encode(key) + self.key = key or b"" def digest(self, tbd): if self.key is None: raise HMACError - tbd = bytes_encode(tbd) - return hmac.new(self.key, tbd, self.hash_alg.hash_cls).digest() + hm = HMAC(self.key, self.hash_alg.hash_cls(), backend=default_backend()) + hm.update(tbd) + return hm.finalize() def digest_sslv3(self, tbd): if self.key is None: raise HMACError h = self.hash_alg() - if h.name == "SHA": + if h.name == "sha": pad1 = _SSLv3_PAD1_SHA1 pad2 = _SSLv3_PAD2_SHA1 - elif h.name == "MD5": + elif h.name == "md5": pad1 = _SSLv3_PAD1_MD5 pad2 = _SSLv3_PAD2_MD5 else: raise HMACError("Provided hash does not work with SSLv3.") - return h.digest(self.key + pad2 + - h.digest(self.key + pad1 + tbd)) + return h.digest(self.key + pad2 + h.digest(self.key + pad1 + tbd)) class Hmac_NULL(_GenericHMAC): @@ -125,4 +128,4 @@ def Hmac(key, hashtype): """ Return Hmac object from Hash object and key """ - return _tls_hmac_algs[f"HMAC-{hashtype.name}"](key=key) + return _tls_hmac_algs[f"HMAC-{hashtype.name.upper()}"](key=key) diff --git a/scapy/layers/tls/crypto/hash.py b/scapy/layers/tls/crypto/hash.py index b1bcdbe2669..cb54b127cdc 100644 --- a/scapy/layers/tls/crypto/hash.py +++ b/scapy/layers/tls/crypto/hash.py @@ -8,9 +8,13 @@ Hash classes. """ -from hashlib import md5, sha1, sha224, sha256, sha384, sha512 +from scapy.config import conf, crypto_validator from scapy.layers.tls.crypto.md4 import MD4 as md4 +if conf.crypto_valid: + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.hashes import HashAlgorithm _tls_hash_algs = {} @@ -20,19 +24,23 @@ class _GenericHashMetaclass(type): Hash classes are automatically registered through this metaclass. Furthermore, their name attribute is extracted from their class name. """ + def __new__(cls, hash_name, bases, dct): if hash_name != "_GenericHash": - dct["name"] = hash_name[5:] # remove leading "Hash_" - the_class = super(_GenericHashMetaclass, cls).__new__(cls, hash_name, - bases, dct) + dct["name"] = hash_name[5:].lower() # remove leading "Hash_" + the_class = super(_GenericHashMetaclass, cls).__new__( + cls, hash_name, bases, dct + ) if hash_name != "_GenericHash": - _tls_hash_algs[hash_name[5:]] = the_class + _tls_hash_algs[dct["name"]] = the_class return the_class class _GenericHash(metaclass=_GenericHashMetaclass): def digest(self, tbd): - return self.hash_cls(tbd).digest() + digest = hashes.Hash(self.hash_cls(), backend=default_backend()) + digest.update(tbd) + return digest.finalize() class Hash_NULL(_GenericHash): @@ -46,32 +54,76 @@ class Hash_MD4(_GenericHash): hash_cls = md4 hash_len = 16 + def digest(self, tbd): + return self.hash_cls(tbd).digest() + class Hash_MD5(_GenericHash): - hash_cls = md5 + hash_cls = hashes.MD5 hash_len = 16 class Hash_SHA(_GenericHash): - hash_cls = sha1 + hash_cls = hashes.SHA1 hash_len = 20 +_tls_hash_algs["sha1"] = Hash_SHA + + class Hash_SHA224(_GenericHash): - hash_cls = sha224 + hash_cls = hashes.SHA224 hash_len = 28 class Hash_SHA256(_GenericHash): - hash_cls = sha256 + hash_cls = hashes.SHA256 hash_len = 32 class Hash_SHA384(_GenericHash): - hash_cls = sha384 + hash_cls = hashes.SHA384 hash_len = 48 class Hash_SHA512(_GenericHash): - hash_cls = sha512 + hash_cls = hashes.SHA512 hash_len = 64 + + +# first, we add the "md5-sha1" hash from openssl to python-cryptography +class MD5_SHA1(HashAlgorithm): + name = "md5-sha1" + digest_size = 36 + block_size = 64 + + +class Hash_MD5SHA1(_GenericHash): + hash_cls = MD5_SHA1 + hash_len = 36 + + +_tls_hash_algs["md5-sha1"] = Hash_MD5SHA1 + + +class Hash_SHAKE256(_GenericHash): + hash_cls = hashes.SHAKE256 + + def __init__(self, digest_size: int): + self.hash_len = digest_size + + def digest(self, tbd): + digest = hashes.Hash(self.hash_cls(self.hash_len), backend=default_backend()) + digest.update(tbd) + return digest.finalize() + + +@crypto_validator +def _get_hash(hashStr): + """ + Return a cryptography-hash by its name + """ + try: + return _tls_hash_algs[hashStr].hash_cls() + except KeyError: + raise KeyError("Unknown hash function %s" % hashStr) diff --git a/scapy/layers/tls/crypto/hkdf.py b/scapy/layers/tls/crypto/hkdf.py index 649305666a5..2d2af9d272b 100644 --- a/scapy/layers/tls/crypto/hkdf.py +++ b/scapy/layers/tls/crypto/hkdf.py @@ -10,7 +10,7 @@ import struct from scapy.config import conf, crypto_validator -from scapy.layers.tls.crypto.pkcs1 import _get_hash +from scapy.layers.tls.crypto.hash import _get_hash if conf.crypto_valid: from cryptography.hazmat.backends import default_backend diff --git a/scapy/layers/tls/crypto/pkcs1.py b/scapy/layers/tls/crypto/pkcs1.py index 18008a5e7d8..82082edc3de 100644 --- a/scapy/layers/tls/crypto/pkcs1.py +++ b/scapy/layers/tls/crypto/pkcs1.py @@ -16,12 +16,12 @@ from scapy.config import conf, crypto_validator from scapy.error import warning +from scapy.layers.tls.crypto.hash import _get_hash if conf.crypto_valid: from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding - from cryptography.hazmat.primitives.hashes import HashAlgorithm ##################################################################### @@ -89,31 +89,7 @@ def _legacy_pkcs1_v1_5_encode_md5_sha1(M, emLen): # Hash and padding helpers ##################################################################### -_get_hash = None if conf.crypto_valid: - - # first, we add the "md5-sha1" hash from openssl to python-cryptography - class MD5_SHA1(HashAlgorithm): - name = "md5-sha1" - digest_size = 36 - block_size = 64 - - _hashes = { - "md5": hashes.MD5, - "sha1": hashes.SHA1, - "sha224": hashes.SHA224, - "sha256": hashes.SHA256, - "sha384": hashes.SHA384, - "sha512": hashes.SHA512, - "md5-sha1": MD5_SHA1 - } - - def _get_hash(hashStr): - try: - return _hashes[hashStr]() - except KeyError: - raise KeyError("Unknown hash function %s" % hashStr) - def _get_padding(padStr, mgf=padding.MGF1, h=hashes.SHA256, label=None): if padStr == "pkcs": return padding.PKCS1v15() diff --git a/scapy/layers/tls/crypto/prf.py b/scapy/layers/tls/crypto/prf.py index 39f35509e54..d5e7e76f5b9 100644 --- a/scapy/layers/tls/crypto/prf.py +++ b/scapy/layers/tls/crypto/prf.py @@ -73,7 +73,7 @@ def _tls_P_SHA512(secret, seed, req_len): # PRF functions, according to the protocol version def _sslv2_PRF(secret, seed, req_len): - hash_md5 = _tls_hash_algs["MD5"]() + hash_md5 = _tls_hash_algs["md5"]() rounds = (req_len + hash_md5.hash_len - 1) // hash_md5.hash_len res = b"" @@ -108,8 +108,8 @@ def _ssl_PRF(secret, seed, req_len): b"M", b"N", b"O", b"P", b"Q", b"R", b"S", b"T", b"U", b"V", b"W", b"X", # noqa: E501 b"Y", b"Z"] res = b"" - hash_sha1 = _tls_hash_algs["SHA"]() - hash_md5 = _tls_hash_algs["MD5"]() + hash_sha1 = _tls_hash_algs["sha"]() + hash_md5 = _tls_hash_algs["md5"]() rounds = (req_len + hash_md5.hash_len - 1) // hash_md5.hash_len for i in range(rounds): @@ -185,7 +185,7 @@ class PRF(object): context of the connection state using the tls_version and the cipher suite. """ - def __init__(self, hash_name="SHA256", tls_version=0x0303): + def __init__(self, hash_name="sha256", tls_version=0x0303): self.tls_version = tls_version self.hash_name = hash_name @@ -197,13 +197,13 @@ def __init__(self, hash_name="SHA256", tls_version=0x0303): tls_version == 0x0302): # TLS 1.1 self.prf = _tls_PRF elif tls_version == 0x0303: # TLS 1.2 - if hash_name == "SHA384": + if hash_name == "sha384": self.prf = _tls12_SHA384PRF - elif hash_name == "SHA512": + elif hash_name == "sha512": self.prf = _tls12_SHA512PRF else: - if hash_name in ["MD5", "SHA"]: - self.hash_name = "SHA256" + if hash_name in ["md5", "sha"]: + self.hash_name = "sha256" self.prf = _tls12_SHA256PRF else: warning("Unknown TLS version") @@ -270,8 +270,8 @@ def compute_verify_data(self, con_end, read_or_write, sslv3_sha1_pad1 = b"\x36" * 40 sslv3_sha1_pad2 = b"\x5c" * 40 - md5 = _tls_hash_algs["MD5"]() - sha1 = _tls_hash_algs["SHA"]() + md5 = _tls_hash_algs["md5"]() + sha1 = _tls_hash_algs["sha"]() md5_hash = md5.digest(master_secret + sslv3_md5_pad2 + md5.digest(handshake_msg + label + @@ -290,8 +290,8 @@ def compute_verify_data(self, con_end, read_or_write, label = ("%s finished" % d[con_end]).encode() if self.tls_version <= 0x0302: - s1 = _tls_hash_algs["MD5"]().digest(handshake_msg) - s2 = _tls_hash_algs["SHA"]().digest(handshake_msg) + s1 = _tls_hash_algs["md5"]().digest(handshake_msg) + s2 = _tls_hash_algs["sha"]().digest(handshake_msg) verify_data = self.prf(master_secret, label, s1 + s2, 12) else: h = _tls_hash_algs[self.hash_name]() @@ -317,7 +317,7 @@ def postprocess_key_for_export(self, key, client_random, server_random, tbh = key + client_random + server_random else: tbh = key + server_random + client_random - export_key = _tls_hash_algs["MD5"]().digest(tbh)[:req_len] + export_key = _tls_hash_algs["md5"]().digest(tbh)[:req_len] else: if s: tag = b"client write key" @@ -346,7 +346,7 @@ def generate_iv_for_export(self, client_random, server_random, tbh = client_random + server_random else: tbh = server_random + client_random - iv = _tls_hash_algs["MD5"]().digest(tbh)[:req_len] + iv = _tls_hash_algs["md5"]().digest(tbh)[:req_len] else: iv_block = self.prf("", b"IV block", diff --git a/scapy/layers/tls/crypto/suites.py b/scapy/layers/tls/crypto/suites.py index f7079384d42..1626a442717 100644 --- a/scapy/layers/tls/crypto/suites.py +++ b/scapy/layers/tls/crypto/suites.py @@ -29,7 +29,7 @@ class and the HMAC class, through the parsing of the ciphersuite name. if s.endswith("CCM") or s.endswith("CCM_8"): kx_name, s = s.split("_WITH_") kx_alg = _tls_kx_algs.get(kx_name) - hash_alg = _tls_hash_algs.get("SHA256") + hash_alg = _tls_hash_algs.get("sha256") cipher_alg = _tls_cipher_algs.get(s) hmac_alg = None @@ -42,7 +42,7 @@ class and the HMAC class, through the parsing of the ciphersuite name. kx_alg = _tls_kx_algs.get("TLS13") hash_name = s.split('_')[-1] - hash_alg = _tls_hash_algs.get(hash_name) + hash_alg = _tls_hash_algs.get(hash_name.lower()) cipher_name = s[:-(len(hash_name) + 1)] if tls1_3: @@ -61,7 +61,7 @@ class and the HMAC class, through the parsing of the ciphersuite name. cipher_alg = _tls_cipher_algs.get(cipher_name.rstrip("_EXPORT40")) kx_alg.export = cipher_name.endswith("_EXPORT40") hmac_alg = _tls_hmac_algs.get("HMAC-NULL") - hash_alg = _tls_hash_algs.get(hash_name) + hash_alg = _tls_hash_algs.get(hash_name.lower()) return kx_alg, cipher_alg, hmac_alg, hash_alg, tls1_3 diff --git a/scapy/layers/windows/erref.py b/scapy/layers/windows/erref.py index b18cacf769e..5909b5d3727 100644 --- a/scapy/layers/windows/erref.py +++ b/scapy/layers/windows/erref.py @@ -17,7 +17,6 @@ 0x00000011: "ERROR_NOT_SAME_DEVICE", 0x00000013: "ERROR_WRITE_PROTECT", 0x00000057: "ERROR_INVALID_PARAMETER", - 0xC000006A: "STATUS_WRONG_PASSWORD", 0x0000007A: "ERROR_INSUFFICIENT_BUFFER", 0x0000007B: "ERROR_INVALID_NAME", 0x000000A1: "ERROR_BAD_PATHNAME", @@ -53,6 +52,7 @@ 0xC0000043: "STATUS_SHARING_VIOLATION", 0xC0000061: "STATUS_PRIVILEGE_NOT_HELD", 0xC0000064: "STATUS_NO_SUCH_USER", + 0xC000006A: "STATUS_WRONG_PASSWORD", 0xC000006D: "STATUS_LOGON_FAILURE", 0xC000006E: "STATUS_ACCOUNT_RESTRICTION", 0xC0000070: "STATUS_INVALID_WORKSTATION", @@ -73,5 +73,6 @@ 0xC000020C: "STATUS_CONNECTION_DISCONNECTED", 0xC0000225: "STATUS_NOT_FOUND", 0xC0000257: "STATUS_PATH_NOT_COVERED", + 0xC00002FB: "STATUS_KDC_INVALID_REQUEST", 0xC000035C: "STATUS_NETWORK_SESSION_EXPIRED", } diff --git a/scapy/layers/windows/registry.py b/scapy/layers/windows/registry.py index 198806b75c6..62c26216c0c 100644 --- a/scapy/layers/windows/registry.py +++ b/scapy/layers/windows/registry.py @@ -105,6 +105,7 @@ class RegType(IntEnum): # These constants are used to specify the type of a registry value. + REG_NONE = 0 # No defined value type REG_SZ = 1 # Unicode string REG_EXPAND_SZ = 2 # Unicode string with environment variable expansion REG_BINARY = 3 # Binary data @@ -194,7 +195,7 @@ def __init__( ]: if not isinstance(reg_data, str): raise ValueError("Data must be a 'str' for this type.") - elif reg_type == RegType.REG_BINARY: + elif reg_type in [RegType.REG_NONE, RegType.REG_BINARY]: if not isinstance(reg_data, bytes): raise ValueError("Data must be a 'bytes' for this type.") elif reg_type in [ @@ -227,7 +228,7 @@ def encode(self) -> bytes: RegType.REG_LINK, ]: return self.reg_data.encode("utf-16le") - elif self.reg_type == RegType.REG_BINARY: + elif self.reg_type in [RegType.REG_NONE, RegType.REG_BINARY]: return self.reg_data elif self.reg_type in [ RegType.REG_DWORD, @@ -257,7 +258,7 @@ def frombytes(reg_name: str, reg_type: RegType, data: bytes): RegType.REG_LINK, ]: reg_data = data.decode("utf-16le") - elif reg_type == RegType.REG_BINARY: + elif reg_type in [RegType.REG_NONE, RegType.REG_BINARY]: reg_data = data elif reg_type in [ RegType.REG_DWORD, @@ -292,7 +293,7 @@ def fromstr(reg_name: str, reg_type: RegType, data: str): RegType.REG_LINK, ]: reg_data = data - elif reg_type == RegType.REG_BINARY: + elif reg_type in [RegType.REG_NONE, RegType.REG_BINARY]: reg_data = bytes.fromhex(data) elif reg_type in [ RegType.REG_DWORD, diff --git a/scapy/modules/ticketer.py b/scapy/modules/ticketer.py index 16716ae0637..4bc469a78f9 100644 --- a/scapy/modules/ticketer.py +++ b/scapy/modules/ticketer.py @@ -2594,7 +2594,7 @@ def resign_ticket(self, i, hash=None, kdc_hash=None): def request_tgt( self, - upn, + upn=None, ip=None, key=None, password=None, diff --git a/test/scapy/layers/tls/tls.uts b/test/scapy/layers/tls/tls.uts index 0fff89f2f37..3c3d47b834d 100644 --- a/test/scapy/layers/tls/tls.uts +++ b/test/scapy/layers/tls/tls.uts @@ -17,35 +17,35 @@ = Crypto - Hmac_MD5 instantiation, parameter check from scapy.layers.tls.crypto.h_mac import Hmac_MD5 -a = Hmac_MD5("somekey") +a = Hmac_MD5(b"somekey") a.key_len == 16 and a.hmac_len == 16 = Crypto - Hmac_MD5 behavior on test vectors from RFC 2202 (+ errata) a = Hmac_MD5 -t1 = a(b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b').digest("Hi There") == b'\x92\x94\x72\x7a\x36\x38\xbb\x1c\x13\xf4\x8e\xf8\x15\x8b\xfc\x9d' -t2 = a('Jefe').digest('what do ya want for nothing?') == b'\x75\x0c\x78\x3e\x6a\xb0\xb5\x03\xea\xa8\x6e\x31\x0a\x5d\xb7\x38' +t1 = a(b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b').digest(b"Hi There") == b'\x92\x94\x72\x7a\x36\x38\xbb\x1c\x13\xf4\x8e\xf8\x15\x8b\xfc\x9d' +t2 = a(b'Jefe').digest(b'what do ya want for nothing?') == b'\x75\x0c\x78\x3e\x6a\xb0\xb5\x03\xea\xa8\x6e\x31\x0a\x5d\xb7\x38' t3 = a(b'\xaa'*16).digest(b'\xdd'*50) == b'\x56\xbe\x34\x52\x1d\x14\x4c\x88\xdb\xb8\xc7\x33\xf0\xe8\xb3\xf6' t4 = a(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19').digest(b'\xcd'*50) == b'\x69\x7e\xaf\x0a\xca\x3a\x3a\xea\x3a\x75\x16\x47\x46\xff\xaa\x79' -t5 = a(b'\x0c'*16).digest("Test With Truncation") == b'\x56\x46\x1e\xf2\x34\x2e\xdc\x00\xf9\xba\xb9\x95\x69\x0e\xfd\x4c' -t6 = a(b'\xaa'*80).digest("Test Using Larger Than Block-Size Key - Hash Key First") == b'\x6b\x1a\xb7\xfe\x4b\xd7\xbf\x8f\x0b\x62\xe6\xce\x61\xb9\xd0\xcd' -t7 = a(b'\xaa'*80).digest("Test Using Larger Than Block-Size Key and Larger Than One Block-Size Data") == b'\x6f\x63\x0f\xad\x67\xcd\xa0\xee\x1f\xb1\xf5\x62\xdb\x3a\xa5\x3e' +t5 = a(b'\x0c'*16).digest(b"Test With Truncation") == b'\x56\x46\x1e\xf2\x34\x2e\xdc\x00\xf9\xba\xb9\x95\x69\x0e\xfd\x4c' +t6 = a(b'\xaa'*80).digest(b"Test Using Larger Than Block-Size Key - Hash Key First") == b'\x6b\x1a\xb7\xfe\x4b\xd7\xbf\x8f\x0b\x62\xe6\xce\x61\xb9\xd0\xcd' +t7 = a(b'\xaa'*80).digest(b"Test Using Larger Than Block-Size Key and Larger Than One Block-Size Data") == b'\x6f\x63\x0f\xad\x67\xcd\xa0\xee\x1f\xb1\xf5\x62\xdb\x3a\xa5\x3e' t1 and t2 and t3 and t4 and t5 and t6 and t7 = Crypto - Hmac_SHA instantiation, parameter check from scapy.layers.tls.crypto.h_mac import Hmac_SHA -a = Hmac_SHA("somekey") +a = Hmac_SHA(b"somekey") a.key_len == 20 and a.hmac_len == 20 = Crypto - Hmac_SHA behavior on test vectors from RFC 2202 (+ errata) a = Hmac_SHA -t1 = a(b'\x0b'*20).digest("Hi There") == b'\xb6\x17\x31\x86\x55\x05\x72\x64\xe2\x8b\xc0\xb6\xfb\x37\x8c\x8e\xf1\x46\xbe\x00' -t2 = a('Jefe').digest("what do ya want for nothing?") == b'\xef\xfc\xdf\x6a\xe5\xeb\x2f\xa2\xd2\x74\x16\xd5\xf1\x84\xdf\x9c\x25\x9a\x7c\x79' +t1 = a(b'\x0b'*20).digest(b"Hi There") == b'\xb6\x17\x31\x86\x55\x05\x72\x64\xe2\x8b\xc0\xb6\xfb\x37\x8c\x8e\xf1\x46\xbe\x00' +t2 = a(b'Jefe').digest(b"what do ya want for nothing?") == b'\xef\xfc\xdf\x6a\xe5\xeb\x2f\xa2\xd2\x74\x16\xd5\xf1\x84\xdf\x9c\x25\x9a\x7c\x79' t3 = a(b'\xaa'*20).digest(b'\xdd'*50) == b'\x12\x5d\x73\x42\xb9\xac\x11\xcd\x91\xa3\x9a\xf4\x8a\xa1\x7b\x4f\x63\xf1\x75\xd3' t4 = a(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19').digest(b'\xcd'*50) == b'\x4c\x90\x07\xf4\x02\x62\x50\xc6\xbc\x84\x14\xf9\xbf\x50\xc8\x6c\x2d\x72\x35\xda' -t5 = a(b'\x0c'*20).digest("Test With Truncation") == b'\x4c\x1a\x03\x42\x4b\x55\xe0\x7f\xe7\xf2\x7b\xe1\xd5\x8b\xb9\x32\x4a\x9a\x5a\x04' -t6 = a(b'\xaa'*80).digest("Test Using Larger Than Block-Size Key - Hash Key First") == b'\xaa\x4a\xe5\xe1\x52\x72\xd0\x0e\x95\x70\x56\x37\xce\x8a\x3b\x55\xed\x40\x21\x12' -t7 = a(b'\xaa'*80).digest("Test Using Larger Than Block-Size Key and Larger Than One Block-Size Data") == b'\xe8\xe9\x9d\x0f\x45\x23\x7d\x78\x6d\x6b\xba\xa7\x96\x5c\x78\x08\xbb\xff\x1a\x91' +t5 = a(b'\x0c'*20).digest(b"Test With Truncation") == b'\x4c\x1a\x03\x42\x4b\x55\xe0\x7f\xe7\xf2\x7b\xe1\xd5\x8b\xb9\x32\x4a\x9a\x5a\x04' +t6 = a(b'\xaa'*80).digest(b"Test Using Larger Than Block-Size Key - Hash Key First") == b'\xaa\x4a\xe5\xe1\x52\x72\xd0\x0e\x95\x70\x56\x37\xce\x8a\x3b\x55\xed\x40\x21\x12' +t7 = a(b'\xaa'*80).digest(b"Test Using Larger Than Block-Size Key and Larger Than One Block-Size Data") == b'\xe8\xe9\x9d\x0f\x45\x23\x7d\x78\x6d\x6b\xba\xa7\x96\x5c\x78\x08\xbb\xff\x1a\x91' t1 and t2 and t3 and t4 and t5 and t6 and t7 @@ -304,21 +304,21 @@ t1 and t2 and t3 and t4 and t5 and t6 and t7 from scapy.layers.tls.crypto.prf import PRF class _prf_tls12_sha256_test: - h= "SHA256" + h= "sha256" k= b"\x9b\xbe\x43\x6b\xa9\x40\xf0\x17\xb1\x76\x52\x84\x9a\x71\xdb\x35" s= b"\xa0\xba\x9f\x93\x6c\xda\x31\x18\x27\xa6\xf7\x96\xff\xd5\x19\x8c" o=(b"\xe3\xf2\x29\xba\x72\x7b\xe1\x7b\x8d\x12\x26\x20\x55\x7c\xd4\x53" + b"\xc2\xaa\xb2\x1d\x07\xc3\xd4\x95\x32\x9b\x52\xd4\xe6\x1e\xdb\x5a") class _prf_tls12_sha384_test: - h= "SHA384" + h= "sha384" k= b"\xb8\x0b\x73\x3d\x6c\xee\xfc\xdc\x71\x56\x6e\xa4\x8e\x55\x67\xdf" s= b"\xcd\x66\x5c\xf6\xa8\x44\x7d\xd6\xff\x8b\x27\x55\x5e\xdb\x74\x65" o=(b"\x7b\x0c\x18\xe9\xce\xd4\x10\xed\x18\x04\xf2\xcf\xa3\x4a\x33\x6a" + b"\x1c\x14\xdf\xfb\x49\x00\xbb\x5f\xd7\x94\x21\x07\xe8\x1c\x83\xcd") class _prf_tls12_sha512_test: - h= "SHA512" + h= "sha512" k= b"\xb0\x32\x35\x23\xc1\x85\x35\x99\x58\x4d\x88\x56\x8b\xbb\x05\xeb" s= b"\xd4\x64\x0e\x12\xe4\xbc\xdb\xfb\x43\x7f\x03\xe6\xae\x41\x8e\xe5" o=(b"\x12\x61\xf5\x88\xc7\x98\xc5\xc2\x01\xff\x03\x6e\x7a\x9c\xb5\xed" + @@ -1028,6 +1028,7 @@ assert len(pkt.exchkeys.ecdh_Yc) == 133 # len(b'\x04') + ceil(521/8) * 2 # See https://github.com/secdev/scapy/issues/2784 +import base64 from scapy.layers.tls.cert import PrivKey from scapy.layers.tls.handshake import TLSFinished from scapy.layers.tls.record import TLS @@ -1067,7 +1068,7 @@ r2 = TLS(shello_extms, tls_session=r1.tls_session.mirror()) r3 = TLS(finished_extms, tls_session=r2.tls_session.mirror()) assert r3.tls_session.extms -assert r3.tls_session.pwcs.prf.hash_name == "SHA256" +assert r3.tls_session.pwcs.prf.hash_name == "sha256" assert r3.tls_session.session_hash == b'2\xdc\xf5\xcb\xbc\x99\xc6IV\xba\x0f.\x0bdq\x1f=\xef\xdaW\xfc*A\x9b\xe2?b\xccKW\xe9\xb7' l3 = r3.getlayer(TLS, 3)