-
Notifications
You must be signed in to change notification settings - Fork 41
/
dsse.py
281 lines (220 loc) · 8.5 KB
/
dsse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# Copyright 2022 The Sigstore Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Functionality for building and manipulating in-toto Statements and DSSE envelopes.
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Literal, Optional, Union
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from pydantic import BaseModel, ConfigDict, Field, RootModel, StrictStr, ValidationError
from sigstore_protobuf_specs.dev.sigstore.common.v1 import HashAlgorithm
from sigstore_protobuf_specs.io.intoto import Envelope as _Envelope
from sigstore_protobuf_specs.io.intoto import Signature
from sigstore.errors import Error, VerificationError
from sigstore.hashes import Hashed
_logger = logging.getLogger(__name__)
_Digest = Union[
Literal["sha256"],
Literal["sha384"],
Literal["sha512"],
Literal["sha3_256"],
Literal["sha3_384"],
Literal["sha3_512"],
]
"""
NOTE: in-toto's DigestSet contains all kinds of hash algorithms that
we intentionally do not support. This model is limited to common members of the
SHA-2 and SHA-3 family that are at least as strong as SHA-256.
See: <https://github.com/in-toto/attestation/blob/main/spec/v1/digest_set.md>
"""
_DigestSet = RootModel[Dict[_Digest, str]]
"""
An internal validation model for in-toto subject digest sets.
"""
class _Subject(BaseModel):
"""
A single in-toto statement subject.
"""
name: Optional[StrictStr]
digest: _DigestSet = Field(...)
class _Statement(BaseModel):
"""
An internal validation model for in-toto statements.
"""
model_config = ConfigDict(populate_by_name=True)
type_: Literal["https://in-toto.io/Statement/v1"] = Field(..., alias="_type")
subjects: List[_Subject] = Field(..., min_length=1, alias="subject")
predicate_type: StrictStr = Field(..., alias="predicateType")
predicate: Optional[Dict[str, Any]] = Field(None, alias="predicate")
class Statement:
"""
Represents an in-toto statement.
This type deals with opaque bytes to ensure that the encoding does not
change, but Statements are internally checked for conformance against
the JSON object layout defined in the in-toto attesation spec.
See: <https://github.com/in-toto/attestation/blob/main/spec/v1/statement.md>
"""
def __init__(self, contents: bytes) -> None:
"""
Construct a new Statement.
This takes an opaque `bytes` containing the statement; use
`StatementBuilder` to manually construct an in-toto statement
from constituent pieces.
"""
self._contents = contents
try:
self._inner = _Statement.model_validate_json(contents)
except ValidationError:
raise Error("malformed in-toto statement")
def _matches_digest(self, digest: Hashed) -> bool:
"""
Returns a boolean indicating whether this in-toto Statement contains a subject
matching the given digest. The subject's name is **not** checked.
No digests other than SHA256 are currently supported.
"""
if digest.algorithm != HashAlgorithm.SHA2_256:
raise VerificationError(f"unexpected digest algorithm: {digest.algorithm}")
for sub in self._inner.subjects:
sub_digest = sub.digest.root.get("sha256")
if sub_digest is None:
continue
if sub_digest == digest.digest.hex():
return True
return False
def _pae(self) -> bytes:
"""
Construct the PAE encoding for this statement.
"""
return _pae(Envelope._TYPE, self._contents)
class _StatementBuilder:
"""
A builder-style API for constructing in-toto Statements.
"""
def __init__(
self,
subjects: Optional[List[_Subject]] = None,
predicate_type: Optional[str] = None,
predicate: Optional[Dict[str, Any]] = None,
):
"""
Create a new `_StatementBuilder`.
"""
self._subjects = subjects or []
self._predicate_type = predicate_type
self._predicate = predicate
def subjects(self, subjects: list[_Subject]) -> _StatementBuilder:
"""
Configure the subjects for this builder.
"""
self._subjects = subjects
return self
def predicate_type(self, predicate_type: str) -> _StatementBuilder:
"""
Configure the predicate type for this builder.
"""
self._predicate_type = predicate_type
return self
def predicate(self, predicate: dict[str, Any]) -> _StatementBuilder:
"""
Configure the predicate for this builder.
"""
self._predicate = predicate
return self
def build(self) -> Statement:
"""
Build a `Statement` from the builder's state.
"""
try:
stmt = _Statement(
type_="https://in-toto.io/Statement/v1",
subjects=self._subjects,
predicate_type=self._predicate_type,
predicate=self._predicate,
)
except ValidationError as e:
raise Error(f"invalid statement: {e}")
return Statement(stmt.model_dump_json(by_alias=True).encode())
class Envelope:
"""
Represents a DSSE envelope.
This class cannot be constructed directly; you must use `sign` or `from_json`.
See: <https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md>
"""
_TYPE = "application/vnd.in-toto+json"
def __init__(self, inner: _Envelope) -> None:
"""
@private
"""
self._inner = inner
@classmethod
def _from_json(cls, contents: bytes | str) -> Envelope:
"""Return a DSSE envelope from the given JSON representation."""
inner = _Envelope().from_json(contents)
return cls(inner)
def to_json(self) -> str:
"""
Return a JSON string with this DSSE envelope's contents.
"""
return self._inner.to_json()
def __eq__(self, other: object) -> bool:
"""Equality for DSSE envelopes."""
if not isinstance(other, Envelope):
return NotImplemented
return self._inner == other._inner
def _pae(type_: str, body: bytes) -> bytes:
"""
Compute the PAE encoding for the given `type_` and `body`.
"""
# See:
# https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md
# https://github.com/in-toto/attestation/blob/v1.0/spec/v1.0/envelope.md
pae = f"DSSEv1 {len(type_)} {type_} ".encode()
pae += b" ".join([str(len(body)).encode(), body])
return pae
def _sign(key: ec.EllipticCurvePrivateKey, stmt: Statement) -> Envelope:
"""
Sign for the given in-toto `Statement`, and encapsulate the resulting
signature in a DSSE `Envelope`.
"""
pae = stmt._pae()
_logger.debug(f"DSSE PAE: {pae!r}")
signature = key.sign(pae, ec.ECDSA(hashes.SHA256()))
return Envelope(
_Envelope(
payload=stmt._contents,
payload_type=Envelope._TYPE,
signatures=[Signature(sig=signature)],
)
)
def _verify(key: ec.EllipticCurvePublicKey, evp: Envelope) -> bytes:
"""
Verify the given in-toto `Envelope`, returning the verified inner payload.
This function does **not** check the envelope's payload type. The caller
is responsible for performing this check.
"""
pae = _pae(evp._inner.payload_type, evp._inner.payload)
if not evp._inner.signatures:
raise VerificationError("DSSE: envelope contains no signatures")
# In practice checking more than one signature here is frivolous, since
# they're all being checked against the same key. But there's no
# particular harm in checking them all either.
for signature in evp._inner.signatures:
try:
key.verify(signature.sig, pae, ec.ECDSA(hashes.SHA256()))
except InvalidSignature:
raise VerificationError("DSSE: invalid signature")
return evp._inner.payload