From 05997dcce8bf8cba81d59361ee1dd3915ebd3be3 Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Sun, 30 Nov 2025 15:22:52 +0200 Subject: [PATCH] models: Extend EventHistory model for durable pub/sub delivery This is necessary changes to update EventHistory model, so it can be used for new durable pub/sub. Signed-off-by: Denys Fedoryshchenko --- kernelci/api/models.py | 26 +++++++++++++++++++++++--- kernelci/api/models_base.py | 12 +++++++++--- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/kernelci/api/models.py b/kernelci/api/models.py index afaaaa6b79..e3e300fea1 100644 --- a/kernelci/api/models.py +++ b/kernelci/api/models.py @@ -16,6 +16,7 @@ from datetime import datetime, timedelta from typing import Any, Optional, Dict, List, ClassVar, Literal import enum +import os from operator import attrgetter import json from typing_extensions import Annotated @@ -855,12 +856,29 @@ def parse_node_obj(node: Node): # eventhistory model +# Environment variable to configure TTL (default 7 days = 604800 seconds) +# Set EVENT_HISTORY_TTL_SECONDS to override +EVENT_HISTORY_TTL_SECONDS = int(os.getenv('EVENT_HISTORY_TTL_SECONDS', '604800')) + + class EventHistory(DatabaseModel): """Event history object model""" timestamp: datetime = Field( description='Timestamp of event creation', default_factory=datetime.now ) + sequence_id: Optional[int] = Field( + default=None, + description='Sequential ID for pub/sub ordering (auto-generated)' + ) + channel: Optional[str] = Field( + default='node', + description='Pub/Sub channel name' + ) + owner: Optional[str] = Field( + default=None, + description='Username of event publisher' + ) data: Dict[str, Any] = Field( description='Event data', default={} @@ -871,9 +889,11 @@ def get_indexes(cls): """ Create the index with the expiresAfterSeconds option for the timestamp field to automatically remove documents after a certain - time. - Default is 86400 seconds (24 hours). + time. Default is 604800 seconds (7 days), configurable via + EVENT_HISTORY_TTL_SECONDS environment variable. + Also creates compound index for efficient pub/sub catch-up queries. """ return [ - cls.Index('timestamp', {'expireAfterSeconds': 86400}), + cls.Index('timestamp', {'expireAfterSeconds': EVENT_HISTORY_TTL_SECONDS}), + cls.Index([('channel', 1), ('sequence_id', 1)], {}), ] diff --git a/kernelci/api/models_base.py b/kernelci/api/models_base.py index 0724593e25..946e71a194 100644 --- a/kernelci/api/models_base.py +++ b/kernelci/api/models_base.py @@ -12,7 +12,7 @@ """Common KernelCI API model definitions""" -from typing import Optional, Any, Dict +from typing import Optional, Any, Dict, List, Tuple, Union from bson import ObjectId from pydantic import ( BaseModel, @@ -74,8 +74,14 @@ class DatabaseModel(ModelId): """Database model""" @dataclass class Index(): - """Index class""" - field: str + """Index class + + field: Either a single field name (str) for simple indexes, + or a list of (field, direction) tuples for compound indexes. + Direction is typically 1 (ascending) or -1 (descending). + attributes: MongoDB index options (e.g., {'expireAfterSeconds': 3600}) + """ + field: Union[str, List[Tuple[str, int]]] attributes: dict[str, Any] def update(self):