From dbdef8d33306f46eabc27c433a6379a010b298f7 Mon Sep 17 00:00:00 2001 From: Samson Nkrumah Date: Tue, 31 Mar 2026 17:24:20 +0100 Subject: [PATCH 1/5] feat: add management command to close stale ongoing conferences Adds cleanup_stale_conferences command that finds conferences with no event activity for longer than a threshold (default 4 hours) and closes them. Uses last GenericEvent timestamp, not conference creation time, so long-running active sessions are not affected. Usage: python manage.py cleanup_stale_conferences python manage.py cleanup_stale_conferences --hours 2 python manage.py cleanup_stale_conferences --dry-run --- .../commands/cleanup_stale_conferences.py | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 app/management/commands/cleanup_stale_conferences.py diff --git a/app/management/commands/cleanup_stale_conferences.py b/app/management/commands/cleanup_stale_conferences.py new file mode 100644 index 0000000..8e3cd36 --- /dev/null +++ b/app/management/commands/cleanup_stale_conferences.py @@ -0,0 +1,74 @@ +import datetime + +from django.core.management.base import BaseCommand +from django.db.models import Max + +from app.models.conference import Conference +from app.models.generic_event import GenericEvent + + +class Command(BaseCommand): + help = 'Close conferences with no activity for longer than the specified hours' + + def add_arguments(self, parser): + parser.add_argument( + '--hours', + type=int, + default=4, + help='Close conferences with no activity for this many hours (default: 4)', + ) + parser.add_argument( + '--dry-run', + action='store_true', + help='Show what would be closed without making changes', + ) + + def handle(self, *args, **options): + hours = options['hours'] + dry_run = options['dry_run'] + cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=hours) + now = datetime.datetime.utcnow() + + ongoing = Conference.objects.filter(ongoing=True) + + if not ongoing.exists(): + self.stdout.write('No ongoing conferences found.') + return + + stale = [] + for conference in ongoing: + last_event = GenericEvent.objects.filter( + conference=conference + ).aggregate(last=Max('created_at'))['last'] + + last_activity = last_event or conference.created_at + + if last_activity < cutoff: + idle = now - last_activity + stale.append((conference, last_activity, idle)) + + if not stale: + self.stdout.write(f'{ongoing.count()} ongoing conferences found, all have recent activity.') + return + + self.stdout.write(f'Found {len(stale)} conferences with no activity for more than {hours} hours.') + + for conference, last_activity, idle in stale: + self.stdout.write(f' {conference.id} ({conference.conference_name or conference.conference_id}) - last activity {idle} ago') + + if not dry_run: + for connection in conference.connections.filter(end_time__isnull=True): + connection.end(now) + connection.save() + + for session in conference.sessions.filter(end_time__isnull=True): + session.should_stop_call(now) + session.save() + + conference.should_stop_call(now) + conference.save() + + if dry_run: + self.stdout.write(f'\nDry run - no changes made. Run without --dry-run to close these conferences.') + else: + self.stdout.write(f'\nClosed {len(stale)} stale conferences.') From 80bcb494b78eed743448a0a8468ab1d6f2e36093 Mon Sep 17 00:00:00 2001 From: Samson Nkrumah Date: Tue, 31 Mar 2026 18:10:57 +0100 Subject: [PATCH 2/5] fix: address review feedback on cleanup command - Wrap each conference close in transaction.atomic() to match StopConferenceView and avoid partial updates - Isolate errors per conference with try/except so one bad row doesn't abort the rest - Use annotated query with Subquery to avoid N+1 queries - Add last_connection_at as fallback activity signal for conferences with no events - Read default hours from CONFERENCE_TIMEOUT_HOURS env var - Use consistent datetime.utcnow() throughout --- .../commands/cleanup_stale_conferences.py | 55 +++++++++++++------ 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/app/management/commands/cleanup_stale_conferences.py b/app/management/commands/cleanup_stale_conferences.py index 8e3cd36..cea0b64 100644 --- a/app/management/commands/cleanup_stale_conferences.py +++ b/app/management/commands/cleanup_stale_conferences.py @@ -1,12 +1,17 @@ import datetime +import os from django.core.management.base import BaseCommand -from django.db.models import Max +from django.db import transaction +from django.db.models import Max, Subquery, OuterRef from app.models.conference import Conference from app.models.generic_event import GenericEvent +DEFAULT_TIMEOUT_HOURS = 4 + + class Command(BaseCommand): help = 'Close conferences with no activity for longer than the specified hours' @@ -14,8 +19,8 @@ def add_arguments(self, parser): parser.add_argument( '--hours', type=int, - default=4, - help='Close conferences with no activity for this many hours (default: 4)', + default=int(os.getenv('CONFERENCE_TIMEOUT_HOURS', DEFAULT_TIMEOUT_HOURS)), + help=f'Close conferences with no activity for this many hours (default: {DEFAULT_TIMEOUT_HOURS}, env: CONFERENCE_TIMEOUT_HOURS)', ) parser.add_argument( '--dry-run', @@ -29,7 +34,15 @@ def handle(self, *args, **options): cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=hours) now = datetime.datetime.utcnow() - ongoing = Conference.objects.filter(ongoing=True) + # Single annotated query to get last activity per conference (avoids N+1) + ongoing = Conference.objects.filter(ongoing=True).annotate( + last_event_at=Subquery( + GenericEvent.objects.filter(conference=OuterRef('pk')) + .order_by('-created_at') + .values('created_at')[:1] + ), + last_connection_at=Max('connections__created_at'), + ) if not ongoing.exists(): self.stdout.write('No ongoing conferences found.') @@ -37,11 +50,7 @@ def handle(self, *args, **options): stale = [] for conference in ongoing: - last_event = GenericEvent.objects.filter( - conference=conference - ).aggregate(last=Max('created_at'))['last'] - - last_activity = last_event or conference.created_at + last_activity = conference.last_event_at or conference.last_connection_at or conference.created_at if last_activity < cutoff: idle = now - last_activity @@ -53,22 +62,32 @@ def handle(self, *args, **options): self.stdout.write(f'Found {len(stale)} conferences with no activity for more than {hours} hours.') + closed = 0 + failed = 0 + for conference, last_activity, idle in stale: self.stdout.write(f' {conference.id} ({conference.conference_name or conference.conference_id}) - last activity {idle} ago') if not dry_run: - for connection in conference.connections.filter(end_time__isnull=True): - connection.end(now) - connection.save() + try: + with transaction.atomic(): + for connection in conference.connections.filter(end_time__isnull=True): + connection.end(now) + connection.save() + + for session in conference.sessions.filter(end_time__isnull=True): + session.should_stop_call(now) + session.save() - for session in conference.sessions.filter(end_time__isnull=True): - session.should_stop_call(now) - session.save() + conference.should_stop_call(now) + conference.save() - conference.should_stop_call(now) - conference.save() + closed += 1 + except Exception as e: + failed += 1 + self.stderr.write(f' Error closing {conference.id}: {e}') if dry_run: self.stdout.write(f'\nDry run - no changes made. Run without --dry-run to close these conferences.') else: - self.stdout.write(f'\nClosed {len(stale)} stale conferences.') + self.stdout.write(f'\nClosed {closed} stale conferences.' + (f' {failed} failed.' if failed else '')) From 4757e7c4dfc0201ddbfddf8e8aed89cd109d73b2 Mon Sep 17 00:00:00 2001 From: Samson Nkrumah Date: Tue, 31 Mar 2026 18:15:20 +0100 Subject: [PATCH 3/5] feat: run stale conference cleanup as background thread in API Start a daemon thread in AppConfig.ready() that runs the cleanup periodically. No cron or external scheduler needed. Configurable via env vars: CONFERENCE_CLEANUP_INTERVAL_SECONDS (default: 3600) CONFERENCE_TIMEOUT_HOURS (default: 4) Set CONFERENCE_CLEANUP_INTERVAL_SECONDS=0 to disable. --- app/apps.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/app/apps.py b/app/apps.py index 3ad06dc..443cf39 100644 --- a/app/apps.py +++ b/app/apps.py @@ -1,5 +1,34 @@ +import logging +import os +import threading +import time + from django.apps import AppConfig +logger = logging.getLogger(__name__) + class UsersConfig(AppConfig): name = 'api' + + def ready(self): + interval = int(os.getenv('CONFERENCE_CLEANUP_INTERVAL_SECONDS', 3600)) + + if interval <= 0: + return + + def cleanup_loop(): + # Wait before first run to let the app fully start + time.sleep(60) + + while True: + try: + from app.management.commands.cleanup_stale_conferences import Command + Command().handle(hours=int(os.getenv('CONFERENCE_TIMEOUT_HOURS', 4)), dry_run=False, verbosity=1) + except Exception as e: + logger.warning(f'[ConferenceCleanup] Error: {e}') + + time.sleep(interval) + + thread = threading.Thread(target=cleanup_loop, daemon=True, name='conference-cleanup') + thread.start() From 776c9333b834557dfaa06ab8a2fbd6a3d9447ece Mon Sep 17 00:00:00 2001 From: Samson Nkrumah Date: Tue, 31 Mar 2026 18:28:39 +0100 Subject: [PATCH 4/5] fix: guard against multiple cleanup threads and register AppConfig - Add default_app_config in __init__.py so Django picks up the AppConfig - Use a module-level lock to ensure only one cleanup thread starts (gunicorn calls ready() multiple times for master + workers) - Remove debug print statements --- app/__init__.py | 1 + app/apps.py | 17 +++++++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index e69de29..d992c74 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -0,0 +1 @@ +default_app_config = 'app.apps.UsersConfig' diff --git a/app/apps.py b/app/apps.py index 443cf39..b85cb0b 100644 --- a/app/apps.py +++ b/app/apps.py @@ -7,24 +7,33 @@ logger = logging.getLogger(__name__) +_cleanup_started = False +_cleanup_lock = threading.Lock() + class UsersConfig(AppConfig): - name = 'api' + name = 'app' def ready(self): - interval = int(os.getenv('CONFERENCE_CLEANUP_INTERVAL_SECONDS', 3600)) + global _cleanup_started + interval = int(os.getenv('CONFERENCE_CLEANUP_INTERVAL_SECONDS', 3600)) if interval <= 0: return + with _cleanup_lock: + if _cleanup_started: + return + _cleanup_started = True + def cleanup_loop(): - # Wait before first run to let the app fully start time.sleep(60) while True: try: from app.management.commands.cleanup_stale_conferences import Command - Command().handle(hours=int(os.getenv('CONFERENCE_TIMEOUT_HOURS', 4)), dry_run=False, verbosity=1) + hours = int(os.getenv('CONFERENCE_TIMEOUT_HOURS', 4)) + Command().handle(hours=hours, dry_run=False, verbosity=0) except Exception as e: logger.warning(f'[ConferenceCleanup] Error: {e}') From de092bd8125713c4743f45e0d7b5a5d889ba29f1 Mon Sep 17 00:00:00 2001 From: Samson Nkrumah Date: Wed, 1 Apr 2026 12:53:41 +0100 Subject: [PATCH 5/5] fix: gate inline cleanup behind ENABLE_INLINE_CONFERENCE_CLEANUP flag The background thread in AppConfig.ready() only dedupes within one process (threading lock). With multiple gunicorn workers, each worker forks a separate process and runs its own cleanup loop. Gate the inline loop behind ENABLE_INLINE_CONFERENCE_CLEANUP=true so it's opt-in for dev/single-process. Production should use an external scheduler to run the management command. Also added cleanup documentation to README with dev vs production guidance. --- README.md | 22 ++++++++++++++++++++++ app/apps.py | 9 +++++++++ 2 files changed, 31 insertions(+) diff --git a/README.md b/README.md index d1f1146..e803c6c 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,28 @@ The table below lists all environment variables used by the API. Variables marke | `GOOGLE_TASK_QUEUE_NAME` | No | - | Name of the Cloud Tasks queue (e.g. `queue-1`). Required when `USE_GOOGLE_TASK_QUEUE` is `True`. | | `APP_ENGINE_LOCATION` | No | - | App Engine location (e.g. `us-east1`). Required when `USE_GOOGLE_TASK_QUEUE` is `True`. | | `TASK_QUEUE_DOMAIN` | No | - | Domain for task queue callbacks (e.g. `https://api.example.com/`). Required when `USE_GOOGLE_TASK_QUEUE` is `True`. | +| **Conference Cleanup** | | | | +| `CONFERENCE_TIMEOUT_HOURS` | No | `4` | Close ongoing conferences with no activity for this many hours. | +| `ENABLE_INLINE_CONFERENCE_CLEANUP` | No | `false` | Set to `true` to run cleanup in a background thread (dev/single-process only). | +| `CONFERENCE_CLEANUP_INTERVAL_SECONDS` | No | `3600` | How often the inline cleanup runs in seconds. Set to `0` to disable. Only applies when `ENABLE_INLINE_CONFERENCE_CLEANUP` is `true`. | + +### Stale Conference Cleanup + +Conferences can get stuck as "Ongoing" if the client fails to send the end signal (browser tab closed, network drop). The `cleanup_stale_conferences` management command closes conferences with no activity for a configurable period. + +```bash +python manage.py cleanup_stale_conferences # default 4 hour threshold +python manage.py cleanup_stale_conferences --hours 2 # custom threshold +python manage.py cleanup_stale_conferences --dry-run # preview without changes +``` + +**Development:** Set `ENABLE_INLINE_CONFERENCE_CLEANUP=true` to run the cleanup automatically in a background thread inside the API process. + +**Production (multi-worker):** Do not use the inline loop with multiple gunicorn workers as each worker runs its own cleanup loop. Instead, use an external scheduler: + +- **Kubernetes:** CronJob running `python manage.py cleanup_stale_conferences` +- **Docker Compose:** Host cron or a sidecar container on a timer +- **ECS:** Scheduled task via EventBridge **Example `.env` file for local development:** diff --git a/app/apps.py b/app/apps.py index b85cb0b..2e46371 100644 --- a/app/apps.py +++ b/app/apps.py @@ -17,6 +17,15 @@ class UsersConfig(AppConfig): def ready(self): global _cleanup_started + # The inline cleanup loop is for dev/single-process only. + # In production with multiple gunicorn workers, each worker forks + # a separate process so the threading lock cannot prevent duplicate + # loops. Use an external scheduler instead: + # python manage.py cleanup_stale_conferences + # Set ENABLE_INLINE_CONFERENCE_CLEANUP=true to enable the loop. + if os.getenv('ENABLE_INLINE_CONFERENCE_CLEANUP', '').lower() != 'true': + return + interval = int(os.getenv('CONFERENCE_CLEANUP_INTERVAL_SECONDS', 3600)) if interval <= 0: return