5959from tuf .api .serialization .json import JSONSerializer
6060from tuf .exceptions import FetcherHTTPError
6161from tuf .api .metadata import (
62+ DelegatedRole ,
63+ Delegations ,
6264 Key ,
6365 Metadata ,
6466 MetaFile ,
@@ -106,6 +108,9 @@ def __init__(self):
106108 self .dump_dir = None
107109 self .dump_version = 0
108110
111+ now = datetime .utcnow ()
112+ self .safe_expiry = now .replace (microsecond = 0 ) + timedelta (days = 30 )
113+
109114 self ._initialize ()
110115
111116 @property
@@ -135,20 +140,19 @@ def create_key(self) -> Tuple[Key, SSlibSigner]:
135140
136141 def _initialize (self ):
137142 """Setup a minimal valid repository"""
138- expiry = datetime .utcnow ().replace (microsecond = 0 ) + timedelta (days = 30 )
139143
140- targets = Targets (1 , SPEC_VER , expiry , {}, None )
144+ targets = Targets (1 , SPEC_VER , self . safe_expiry , {}, None )
141145 self .md_targets = Metadata (targets , OrderedDict ())
142146
143147 meta = {"targets.json" : MetaFile (targets .version )}
144- snapshot = Snapshot (1 , SPEC_VER , expiry , meta )
148+ snapshot = Snapshot (1 , SPEC_VER , self . safe_expiry , meta )
145149 self .md_snapshot = Metadata (snapshot , OrderedDict ())
146150
147151 snapshot_meta = MetaFile (snapshot .version )
148- timestamp = Timestamp (1 , SPEC_VER , expiry , snapshot_meta )
152+ timestamp = Timestamp (1 , SPEC_VER , self . safe_expiry , snapshot_meta )
149153 self .md_timestamp = Metadata (timestamp , OrderedDict ())
150154
151- root = Root (1 , SPEC_VER , expiry , {}, {}, True )
155+ root = Root (1 , SPEC_VER , self . safe_expiry , {}, {}, True )
152156 for role in ["root" , "timestamp" , "snapshot" , "targets" ]:
153157 key , signer = self .create_key ()
154158 root .roles [role ] = Role ([], 1 )
@@ -172,27 +176,27 @@ def publish_root(self):
172176 def fetch (self , url : str ) -> Iterator [bytes ]:
173177 if not self .root .consistent_snapshot :
174178 raise NotImplementedError ("non-consistent snapshot not supported" )
175-
176- spliturl = parse .urlparse (url )
177- if spliturl .path .startswith ("/metadata/" ):
178- parts = spliturl .path [len ("/metadata/" ) :].split ("." )
179- if len (parts ) == 3 :
180- version : Optional [int ] = int (parts [0 ])
181- role = parts [1 ]
182- else :
179+ path = parse .urlparse (url ).path
180+ if path .startswith ("/metadata/" ) and path .endswith (".json" ):
181+ ver_and_name = path [len ("/metadata/" ) :][: - len (".json" )]
182+ # only consistent_snapshot supported ATM: timestamp is special case
183+ if ver_and_name == "timestamp" :
183184 version = None
184- role = parts [0 ]
185+ role = "timestamp"
186+ else :
187+ version , _ , role = ver_and_name .partition ("." )
188+ version = int (version )
185189 yield self ._fetch_metadata (role , version )
186- elif spliturl . path .startswith ("/targets/" ):
190+ elif path .startswith ("/targets/" ):
187191 # figure out target path and hash prefix
188- path = spliturl . path [len ("/targets/" ) :]
189- dir_parts , sep , prefixed_filename = path .rpartition ("/" )
192+ target_path = path [len ("/targets/" ) :]
193+ dir_parts , sep , prefixed_filename = target_path .rpartition ("/" )
190194 prefix , _ , filename = prefixed_filename .partition ("." )
191195 target_path = f"{ dir_parts } { sep } { filename } "
192196
193197 yield self ._fetch_target (target_path , prefix )
194198 else :
195- raise FetcherHTTPError (f"Unknown path '{ spliturl . path } '" , 404 )
199+ raise FetcherHTTPError (f"Unknown path '{ path } '" , 404 )
196200
197201 def _fetch_target (self , target_path : str , hash : Optional [str ]) -> bytes :
198202 """Return data for 'target_path', checking 'hash' if it is given.
@@ -268,12 +272,14 @@ def update_timestamp(self):
268272
269273 def update_snapshot (self ):
270274 for role , delegate in self .all_targets ():
271- self . snapshot . meta [ f" { role } .json" ]. version = delegate . version
272-
275+ hashes = None
276+ length = None
273277 if self .compute_metafile_hashes_length :
274278 hashes , length = self ._compute_hashes_and_length (role )
275- self .snapshot .meta [f"{ role } .json" ].hashes = hashes
276- self .snapshot .meta [f"{ role } .json" ].length = length
279+
280+ self .snapshot .meta [f"{ role } .json" ] = MetaFile (
281+ delegate .version , length , hashes
282+ )
277283
278284 self .snapshot .version += 1
279285 self .update_timestamp ()
@@ -288,6 +294,37 @@ def add_target(self, role: str, data: bytes, path: str):
288294 targets .targets [path ] = target
289295 self .target_files [path ] = RepositoryTarget (data , target )
290296
297+ def add_delegation (
298+ self ,
299+ delegator_name : str ,
300+ name : str ,
301+ targets : Targets ,
302+ terminating : bool ,
303+ paths : Optional [List [str ]],
304+ hash_prefixes : Optional [List [str ]],
305+ ):
306+ if delegator_name == "targets" :
307+ delegator = self .targets
308+ else :
309+ delegator = self .md_delegates [delegator_name ].signed
310+
311+ # Create delegation
312+ role = DelegatedRole (name , [], 1 , terminating , paths , hash_prefixes )
313+ if delegator .delegations is None :
314+ delegator .delegations = Delegations ({}, {})
315+ # put delegation last by default
316+ delegator .delegations .roles [role .name ] = role
317+
318+ # By default add one new key for the role
319+ key , signer = self .create_key ()
320+ delegator .add_key (role .name , key )
321+ if role .name not in self .signers :
322+ self .signers [role .name ] = []
323+ self .signers [role .name ].append (signer )
324+
325+ # Add metadata for the role
326+ self .md_delegates [role .name ] = Metadata (targets , OrderedDict ())
327+
291328 def write (self ):
292329 """Dump current repository metadata to self.dump_dir
293330
0 commit comments