Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 39 additions & 39 deletions moz_kinto_publisher/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ def is_run_valid(self, run_id, channel):
self.filter_bucket, f"{run_id}/ct-logs.json"
)
)
log.debug(f"{run_id} {'Is Valid' if is_valid else 'Is Not Valid'}")
log.debug("%s %s", run_id, 'Is Valid' if is_valid else 'Is Not Valid')
return is_valid

def is_run_ready(self, run_id):
is_ready = workflow.google_cloud_file_exists(
self.filter_bucket, f"{run_id}/completed"
)
log.debug(f"{run_id}/completed {'is ready' if is_ready else 'is not ready'}")
log.debug("%s/completed %s", run_id, 'is ready' if is_ready else 'is not ready')
return is_ready

def await_most_recent_run(self, *, timeout=timedelta(minutes=5)):
Expand Down Expand Up @@ -226,7 +226,7 @@ def request_review_of_collection(self, *, collection=None):
raise KintoException("Malformed response from Kinto")

if status != "work-in-progress":
log.info(f"Collection {collection} is unchanged. Does not need review.")
log.info("Collection %s is unchanged. Does not need review.", collection)
return

try:
Expand Down Expand Up @@ -542,7 +542,7 @@ def load_remote_intermediates(*, kinto_client):
kinto_client
) # intObj.pemAttachment was set by constructor
if intObj.unique_id() in remote_intermediates:
log.warning(f"Will remove duplicate intermediate: {intObj}")
log.warning("Will remove duplicate intermediate: %s", intObj)
remote_error_records.append(record)
else:
remote_intermediates[intObj.unique_id()] = intObj
Expand Down Expand Up @@ -601,15 +601,15 @@ def publish_intermediates(*, args, rw_client):
if remote_intermediates[i].is_expired(kinto_client=rw_client):
remote_expired.add(i)
except Exception as e:
log.warning(f"Failed to track expiration for {i}: {e}")
log.warning("Failed to track expiration for %s: %s", i, e)

log.info(f"Local intermediates: {len(local_intermediates)}")
log.info(f"Remote intermediates: {len(remote_intermediates)}")
log.info(f"- Expired: {len(remote_expired)}")
log.info(f"- In error: {len(remote_error_records)}")
log.info(f"To add: {len(to_upload)}")
log.info(f"To update: {len(to_update)}")
log.info(f"To delete: {len(remote_only)}")
log.info("Local intermediates: %d", len(local_intermediates))
log.info("Remote intermediates: %d", len(remote_intermediates))
log.info("- Expired: %d", len(remote_expired))
log.info("- In error: %d", len(remote_error_records))
log.info("To add: %d", len(to_upload))
log.info("To update: %d", len(to_update))
log.info("To delete: %d", len(remote_only))

if args.noop:
log.info("Noop flag set, exiting before any intermediate updates")
Expand All @@ -618,40 +618,40 @@ def publish_intermediates(*, args, rw_client):
# All intermediates must be in the local list
for unique_id in remote_only:
record = remote_intermediates[unique_id]
log.info(f"Removing deleted intermediate {record}")
log.info("Removing deleted intermediate %s", record)
try:
record.delete_from_kinto(rw_client=rw_client)
except KintoException as ke:
log.error(f"Couldn't delete record {record}: {ke}")
log.error("Couldn't delete record %s: %s", record, ke)

# Delete any remote records that had errors
# (note these "records" are just dictionaries)
for raw_record in remote_error_records:
log.info(f"Deleting remote record with error: {raw_record}")
log.info("Deleting remote record with error: %s", raw_record)
try:
rw_client.delete_record(
collection=settings.KINTO_INTERMEDIATES_COLLECTION,
id=raw_record["id"],
)
except KintoException as ke:
log.error(f"Couldn't delete record id {raw_record['id']}: {ke}")
log.error("Couldn't delete record id %s: %s", raw_record['id'], ke)
except KeyError: # raw_record doesn't have "id"
log.error(f"Couldn't delete record: {raw_record}")
log.error("Couldn't delete record: %s", raw_record)

# New records
for unique_id in to_upload:
record = local_intermediates[unique_id]
log.info(f"Adding new record: {record}")
log.info("Adding new record: %s", record)
try:
record.add_to_kinto(rw_client=rw_client)
except KintoException as ke:
log.error(f"Couldn't add record {record}: {ke}")
log.error("Couldn't add record %s: %s", record, ke)

# Updates
for unique_id in to_update:
local_int = local_intermediates[unique_id]
remote_int = remote_intermediates[unique_id]
log.info(f"Updating record: {remote_int} to {local_int}")
log.info("Updating record: %s to %s", remote_int, local_int)
try:
local_int.update_kinto(
rw_client=rw_client,
Expand Down Expand Up @@ -702,7 +702,7 @@ def clear_crlite_filters(*, rw_client, noop, channel):
]
existing_filters = filter(lambda x: x["incremental"] is False, existing_records)
for filter_record in existing_filters:
log.info(f"Cleaning up stale filter record {filter_record['id']}.")
log.info("Cleaning up stale filter record %s.", filter_record['id'])
rw_client.delete_record(
collection=settings.KINTO_CRLITE_COLLECTION,
id=filter_record["id"],
Expand All @@ -721,7 +721,7 @@ def clear_crlite_deltas(*, rw_client, noop, channel):
]
existing_deltas = filter(lambda x: x["incremental"] is True, existing_records)
for delta in existing_deltas:
log.info(f"Cleaning up stale delta record {delta['id']}.")
log.info("Cleaning up stale delta record %s.", delta['id'])
rw_client.delete_record(
collection=settings.KINTO_CRLITE_COLLECTION,
id=delta["id"],
Expand Down Expand Up @@ -798,7 +798,7 @@ def publish_crlite_main_filter(
"enrolledIssuers": [], # legacy attribute
}

log.info(f"Publishing full filter {filter_path} {timestamp}")
log.info("Publishing full filter %s %s", filter_path, timestamp)
return publish_crlite_record(
rw_client=rw_client,
attributes=attributes,
Expand Down Expand Up @@ -927,7 +927,7 @@ def crlite_determine_publish(*, existing_records, run_db, channel):
existing_records=existing_records, channel=channel
)
except ConsistencyException as se:
log.error(f"Failed to verify existing record consistency: {se}")
log.error("Failed to verify existing record consistency: %s", se)
return default

# A run ID is a "YYYYMMDD" date and an index, e.g. "20210101-3".
Expand Down Expand Up @@ -963,7 +963,7 @@ def crlite_determine_publish(*, existing_records, run_db, channel):
run_db=run_db, identifiers_to_check=new_run_ids, channel=channel
)
except ConsistencyException as se:
log.error(f"Failed to verify run ID consistency: {se}")
log.error("Failed to verify run ID consistency: %s", se)
return default

# If the full filter is too old, publish a full filter.
Expand All @@ -972,7 +972,7 @@ def crlite_determine_publish(*, existing_records, run_db, channel):
if new_timestamp - earliest_timestamp >= timedelta(
days=channel.max_filter_age_days
):
log.info(f"Published full filter is >= {channel.max_filter_age_days} days old")
log.info("Published full filter is >= %d days old", channel.max_filter_age_days)
return default

return {"clear_all": False, "upload": new_run_ids}
Expand All @@ -998,14 +998,14 @@ def publish_crlite(*, args, rw_client, channel, timeout=timedelta(minutes=5)):
try:
published_run_db.await_most_recent_run(timeout=timeout)
except TimeoutException as te:
log.warning(f"The most recent run is not ready to be published (waited {te}).")
log.warning("The most recent run is not ready to be published (waited %s).", te)
return rv

tasks = crlite_determine_publish(
existing_records=existing_records, run_db=published_run_db, channel=channel
)

log.debug(f"crlite_determine_publish tasks={tasks}")
log.debug("crlite_determine_publish tasks=%s", tasks)

if not tasks["upload"]:
log.info("Nothing to do.")
Expand All @@ -1027,7 +1027,7 @@ def publish_crlite(*, args, rw_client, channel, timeout=timedelta(minutes=5)):
)

if tasks["clear_all"]:
log.info(f"Uploading a full filter based on {final_run_id}.")
log.info("Uploading a full filter based on %s.", final_run_id)

clear_crlite_filters(rw_client=rw_client, noop=args.noop, channel=channel)
clear_crlite_deltas(rw_client=rw_client, noop=args.noop, channel=channel)
Expand Down Expand Up @@ -1171,19 +1171,19 @@ def publish_ctlogs(*, args, rw_client):

if args.noop:
log.info(
f"Noop enabled, skipping upload of \"{upstream_log['description']}\"."
"Noop enabled, skipping upload of \"%s\".", upstream_log['description']
)
continue

log.info(f"Uploading new log {upstream_log}")
log.info("Uploading new log %s", upstream_log)
try:
rw_client.create_record(
collection=settings.KINTO_CTLOGS_COLLECTION,
data=upstream_log,
permissions={"read": ["system.Everyone"]},
)
except KintoException as ke:
log.error(f"Upload failed, {ke}")
log.error("Upload failed, %s", ke)

# Delete logs that have been removed from Google's list
# (this probably doesn't happen)
Expand All @@ -1192,19 +1192,19 @@ def publish_ctlogs(*, args, rw_client):
continue

if args.noop:
log.info(f"Noop enabled, skipping deletion of {known_log}.")
log.info("Noop enabled, skipping deletion of %s.", known_log)
continue

log.info(
f"Removing log {known_log}, which has been deleted from Google's list."
"Removing log %s, which has been deleted from Google's list.", known_log
)
try:
rw_client.delete_record(
collection=settings.KINTO_CTLOGS_COLLECTION,
id=known_log["id"],
)
except KintoException as ke:
log.error(f"Deletion failed, {ke}")
log.error("Deletion failed, %s", ke)

# Update logs if upstream metadata has changed.
for known_id, known_log in known_lut.items():
Expand Down Expand Up @@ -1234,18 +1234,18 @@ def publish_ctlogs(*, args, rw_client):
continue

if args.noop:
log.info(f"Noop enabled, skipping update log with id {known_id}.")
log.info("Noop enabled, skipping update log with id %s.", known_id)
continue

log.info(f"Changing {known_log} to {upstream_log}")
log.info("Changing %s to %s", known_log, upstream_log)
try:
rw_client.update_record(
collection=settings.KINTO_CTLOGS_COLLECTION,
data=upstream_log,
id=known_log["id"],
)
except KintoException as ke:
log.error(f"Update failed, {ke}")
log.error("Update failed, %s", ke)

if not args.noop:
rw_client.request_review_of_collection(
Expand Down Expand Up @@ -1298,7 +1298,7 @@ def main():
)
)

log.info(f"Connecting... {settings.KINTO_RW_SERVER_URL}")
log.info("Connecting... %s", settings.KINTO_RW_SERVER_URL)

rw_client = PublisherClient(
server_url=settings.KINTO_RW_SERVER_URL,
Expand Down