Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/cryptography/hazmat/backends/openssl/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,7 @@ def poly1305_supported(self) -> bool:
return not self._fips_enabled

def pkcs7_supported(self) -> bool:
return (
not rust_openssl.CRYPTOGRAPHY_IS_BORINGSSL
and not rust_openssl.CRYPTOGRAPHY_IS_AWSLC
)
return True


backend = Backend()
117 changes: 82 additions & 35 deletions src/rust/src/pkcs7.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

use std::borrow::Cow;
use std::collections::HashMap;
use std::mem;
use std::ops::Deref;
use std::sync::LazyLock;

use cryptography_x509::certificate::Certificate as RawCertificate;
use cryptography_x509::common::{AlgorithmIdentifier, AlgorithmParameters};
use cryptography_x509::csr::Attribute;
use cryptography_x509::pkcs7::PKCS7_DATA_OID;
Expand All @@ -23,6 +25,7 @@ use crate::buf::CffiBuf;
use crate::error::{CryptographyError, CryptographyResult};
use crate::padding::PKCS7UnpaddingContext;
use crate::pkcs12::symmetric_encrypt;
use crate::x509::certificate;
#[cfg(not(any(CRYPTOGRAPHY_IS_BORINGSSL, CRYPTOGRAPHY_IS_AWSLC)))]
use crate::x509::certificate::load_der_x509_certificate;
use crate::{exceptions, types, x509};
Expand Down Expand Up @@ -739,65 +742,109 @@ fn load_pkcs7_certificates(
}
}

#[pyo3::pyfunction]
fn load_pem_pkcs7_certificates<'p>(
pub fn try_list_of_certificates<'p, F>(
py: pyo3::Python<'p>,
data: &[u8],
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyList>> {
cfg_if::cfg_if! {
if #[cfg(not(any(CRYPTOGRAPHY_IS_BORINGSSL, CRYPTOGRAPHY_IS_AWSLC)))] {
let pem_block = pem::parse(data)?;
if pem_block.tag() != "PKCS7" {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"The provided PEM data does not have the PKCS7 tag.",
),
));
}
data: pyo3::Py<pyo3::types::PyBytes>,
f: F,
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyList>>
where
F: for<'a> FnOnce(
&'a pyo3::Py<pyo3::types::PyBytes>,
&mut dyn FnMut(RawCertificate<'a>) -> CryptographyResult<()>,
) -> CryptographyResult<()>,
{
let result = pyo3::types::PyList::empty(py);
let mut cb = |val| {
// SAFETY: based on the type of `F`, we know `val` must be derived from
// data, and we know that `data.clone_ref(py)` makes any pointers into
// the original one also valid.
let raw_cert = certificate::OwnedCertificate::new(data.clone_ref(py), |_| unsafe {
mem::transmute(val)
});
result.append(pyo3::Bound::new(
py,
x509::certificate::Certificate {
raw: raw_cert,
cached_extensions: pyo3::sync::PyOnceLock::new(),
},
)?)?;

load_der_pkcs7_certificates(py, pem_block.contents())
} else {
let _ = py;
let _ = data;
Err(CryptographyError::from(
Ok(())
};
f(&data, &mut cb)?;

Ok(result)
}

fn load_pkcs7_certificates_rust(
py: pyo3::Python<'_>,
data: pyo3::Py<pyo3::types::PyBytes>,
) -> CryptographyResult<pyo3::Bound<'_, pyo3::types::PyList>> {
try_list_of_certificates(py, data, |data, cb| {
let p7 = asn1::parse_single::<pkcs7::ContentInfo<'_>>(data.as_bytes(py))?;
let pkcs7::Content::SignedData(signed_data) = p7.content else {
return Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
"PKCS#7 is not supported by this backend.",
"Only basic signed structures are currently supported.",
exceptions::Reasons::UNSUPPORTED_SERIALIZATION,
)),
))
));
};
let Some(certs) = signed_data.into_inner().certificates else {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"The provided PKCS7 has no certificate data, but a cert loading method was called.",
),
));
};
for c in certs.unwrap_read().clone() {
cb(c)?;
}

Ok(())
})
}

#[pyo3::pyfunction]
fn load_pem_pkcs7_certificates(
py: pyo3::Python<'_>,
data: pyo3::Py<pyo3::types::PyBytes>,
) -> CryptographyResult<pyo3::Bound<'_, pyo3::types::PyList>> {
let pem_block = pem::parse(data.as_bytes(py))?;
if pem_block.tag() != "PKCS7" {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"The provided PEM data does not have the PKCS7 tag.",
),
));
}
let data = pyo3::types::PyBytes::new(py, pem_block.contents()).unbind();

load_der_pkcs7_certificates(py, data)
}

#[pyo3::pyfunction]
fn load_der_pkcs7_certificates<'p>(
py: pyo3::Python<'p>,
data: &[u8],
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyList>> {
fn load_der_pkcs7_certificates(
py: pyo3::Python<'_>,
data: pyo3::Py<pyo3::types::PyBytes>,
) -> CryptographyResult<pyo3::Bound<'_, pyo3::types::PyList>> {
cfg_if::cfg_if! {
if #[cfg(not(any(CRYPTOGRAPHY_IS_BORINGSSL, CRYPTOGRAPHY_IS_AWSLC)))] {
let pkcs7_decoded = openssl::pkcs7::Pkcs7::from_der(data).map_err(|_| {
let pkcs7_decoded = openssl::pkcs7::Pkcs7::from_der(data.as_bytes(py)).map_err(|_| {
CryptographyError::from(pyo3::exceptions::PyValueError::new_err(
"Unable to parse PKCS7 data",
))
})?;
let result = load_pkcs7_certificates(py, pkcs7_decoded)?;
if asn1::parse_single::<pkcs7::ContentInfo<'_>>(data).is_err() {
if load_pkcs7_certificates_rust(py, data).is_err() {
let warning_cls = pyo3::exceptions::PyUserWarning::type_object(py);
let message = c"PKCS#7 certificates could not be parsed as DER, falling back to parsing as BER. Please file an issue at https://github.com/pyca/cryptography/issues explaining how your PKCS#7 certificates were created. In the future, this may become an exception.";
pyo3::PyErr::warn(py, &warning_cls, message, 1)?;
}

Ok(result)
} else {
let _ = py;
let _ = data;
Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
"PKCS#7 is not supported by this backend.",
exceptions::Reasons::UNSUPPORTED_SERIALIZATION,
)),
))
load_pkcs7_certificates_rust(py, data)
}
}
}
Expand Down
47 changes: 30 additions & 17 deletions tests/hazmat/primitives/test_pkcs7.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@

from cryptography import exceptions, x509
from cryptography.exceptions import _Reasons
from cryptography.hazmat.bindings._rust import test_support
from cryptography.hazmat.bindings._rust import (
openssl as rust_openssl,
)
from cryptography.hazmat.bindings._rust import (
test_support,
)
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ed25519, padding, rsa
from cryptography.hazmat.primitives.ciphers import algorithms
Expand Down Expand Up @@ -77,8 +82,16 @@ def test_load_pkcs7_pem(self, backend):
],
)
def test_load_pkcs7_der(self, filepath, backend):
loading_fails = False
if filepath.endswith("p7b"):
ctx: typing.Any = pytest.warns(UserWarning)
if (
rust_openssl.CRYPTOGRAPHY_IS_AWSLC
or rust_openssl.CRYPTOGRAPHY_IS_BORINGSSL
):
ctx: typing.Any = pytest.raises(ValueError)
loading_fails = True
else:
ctx = pytest.warns(UserWarning)
else:
ctx = contextlib.nullcontext()

Expand All @@ -90,6 +103,10 @@ def test_load_pkcs7_der(self, filepath, backend):
),
mode="rb",
)

if loading_fails:
return

assert len(certs) == 2
assert certs[0].subject.get_attributes_for_oid(
x509.oid.NameOID.COMMON_NAME
Expand Down Expand Up @@ -118,7 +135,13 @@ def test_load_pkcs7_unsupported_type(self, backend):

def test_load_pkcs7_empty_certificates(self):
der = b"\x30\x0b\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01\x07\x02"
with pytest.raises(ValueError):
pkcs7.load_der_pkcs7_certificates(der)

der = (
b"0#\x06\t*\x86H\x86\xf7\r\x01\x07\x02\xa0\x160\x14\x02\x01\x011"
b"\x000\x0b\x06\t*\x86H\x86\xf7\r\x01\x07\x011\x00"
)
with pytest.raises(ValueError):
pkcs7.load_der_pkcs7_certificates(der)

Expand All @@ -140,8 +163,11 @@ def _load_cert_key():


@pytest.mark.supported(
only_if=lambda backend: backend.pkcs7_supported(),
skip_message="Requires OpenSSL with PKCS7 support",
only_if=lambda backend: not (
rust_openssl.CRYPTOGRAPHY_IS_AWSLC
or rust_openssl.CRYPTOGRAPHY_IS_BORINGSSL
),
skip_message="Requires OpenSSL with PKCS7 verification test support",
)
class TestPKCS7SignatureBuilder:
def test_invalid_data(self, backend):
Expand Down Expand Up @@ -1483,19 +1509,6 @@ def test_invalid_types(self):
)


@pytest.mark.supported(
only_if=lambda backend: not backend.pkcs7_supported(),
skip_message="Requires OpenSSL without PKCS7 support (BoringSSL)",
)
class TestPKCS7Unsupported:
def test_pkcs7_functions_unsupported(self):
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
pkcs7.load_der_pkcs7_certificates(b"nonsense")

with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
pkcs7.load_pem_pkcs7_certificates(b"nonsense")


@pytest.mark.supported(
only_if=lambda backend: backend.pkcs7_supported()
and not backend.rsa_encryption_supported(padding.PKCS1v15()),
Expand Down