Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,28 @@ The table below lists all environment variables used by the API. Variables marke
| `GOOGLE_TASK_QUEUE_NAME` | No | - | Name of the Cloud Tasks queue (e.g. `queue-1`). Required when `USE_GOOGLE_TASK_QUEUE` is `True`. |
| `APP_ENGINE_LOCATION` | No | - | App Engine location (e.g. `us-east1`). Required when `USE_GOOGLE_TASK_QUEUE` is `True`. |
| `TASK_QUEUE_DOMAIN` | No | - | Domain for task queue callbacks (e.g. `https://api.example.com/`). Required when `USE_GOOGLE_TASK_QUEUE` is `True`. |
| **Conference Cleanup** | | | |
| `CONFERENCE_TIMEOUT_HOURS` | No | `4` | Close ongoing conferences with no activity for this many hours. |
| `ENABLE_INLINE_CONFERENCE_CLEANUP` | No | `false` | Set to `true` to run cleanup in a background thread (dev/single-process only). |
| `CONFERENCE_CLEANUP_INTERVAL_SECONDS` | No | `3600` | How often the inline cleanup runs in seconds. Set to `0` to disable. Only applies when `ENABLE_INLINE_CONFERENCE_CLEANUP` is `true`. |

### Stale Conference Cleanup

Conferences can get stuck as "Ongoing" if the client fails to send the end signal (browser tab closed, network drop). The `cleanup_stale_conferences` management command closes conferences with no activity for a configurable period.

```bash
python manage.py cleanup_stale_conferences # default 4 hour threshold
python manage.py cleanup_stale_conferences --hours 2 # custom threshold
python manage.py cleanup_stale_conferences --dry-run # preview without changes
```

**Development:** Set `ENABLE_INLINE_CONFERENCE_CLEANUP=true` to run the cleanup automatically in a background thread inside the API process.

**Production (multi-worker):** Do not use the inline loop with multiple gunicorn workers as each worker runs its own cleanup loop. Instead, use an external scheduler:

- **Kubernetes:** CronJob running `python manage.py cleanup_stale_conferences`
- **Docker Compose:** Host cron or a sidecar container on a timer
- **ECS:** Scheduled task via EventBridge

**Example `.env` file for local development:**

Expand Down
1 change: 1 addition & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
default_app_config = 'app.apps.UsersConfig'
49 changes: 48 additions & 1 deletion app/apps.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,52 @@
import logging
import os
import threading
import time

from django.apps import AppConfig

logger = logging.getLogger(__name__)

_cleanup_started = False
_cleanup_lock = threading.Lock()


class UsersConfig(AppConfig):
name = 'api'
name = 'app'

def ready(self):
global _cleanup_started

# The inline cleanup loop is for dev/single-process only.
# In production with multiple gunicorn workers, each worker forks
# a separate process so the threading lock cannot prevent duplicate
# loops. Use an external scheduler instead:
# python manage.py cleanup_stale_conferences
# Set ENABLE_INLINE_CONFERENCE_CLEANUP=true to enable the loop.
if os.getenv('ENABLE_INLINE_CONFERENCE_CLEANUP', '').lower() != 'true':
return

interval = int(os.getenv('CONFERENCE_CLEANUP_INTERVAL_SECONDS', 3600))
if interval <= 0:
return

with _cleanup_lock:
if _cleanup_started:
return
_cleanup_started = True

def cleanup_loop():
time.sleep(60)

while True:
try:
from app.management.commands.cleanup_stale_conferences import Command
hours = int(os.getenv('CONFERENCE_TIMEOUT_HOURS', 4))
Command().handle(hours=hours, dry_run=False, verbosity=0)
except Exception as e:
logger.warning(f'[ConferenceCleanup] Error: {e}')

time.sleep(interval)

thread = threading.Thread(target=cleanup_loop, daemon=True, name='conference-cleanup')
thread.start()
93 changes: 93 additions & 0 deletions app/management/commands/cleanup_stale_conferences.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import datetime
import os

from django.core.management.base import BaseCommand
from django.db import transaction
from django.db.models import Max, Subquery, OuterRef

from app.models.conference import Conference
from app.models.generic_event import GenericEvent


DEFAULT_TIMEOUT_HOURS = 4


class Command(BaseCommand):
help = 'Close conferences with no activity for longer than the specified hours'

def add_arguments(self, parser):
parser.add_argument(
'--hours',
type=int,
default=int(os.getenv('CONFERENCE_TIMEOUT_HOURS', DEFAULT_TIMEOUT_HOURS)),
help=f'Close conferences with no activity for this many hours (default: {DEFAULT_TIMEOUT_HOURS}, env: CONFERENCE_TIMEOUT_HOURS)',
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be closed without making changes',
)

def handle(self, *args, **options):
hours = options['hours']
dry_run = options['dry_run']
cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=hours)
now = datetime.datetime.utcnow()

# Single annotated query to get last activity per conference (avoids N+1)
ongoing = Conference.objects.filter(ongoing=True).annotate(
last_event_at=Subquery(
GenericEvent.objects.filter(conference=OuterRef('pk'))
.order_by('-created_at')
.values('created_at')[:1]
),
last_connection_at=Max('connections__created_at'),
)

if not ongoing.exists():
self.stdout.write('No ongoing conferences found.')
return

stale = []
for conference in ongoing:
last_activity = conference.last_event_at or conference.last_connection_at or conference.created_at

if last_activity < cutoff:
idle = now - last_activity
stale.append((conference, last_activity, idle))

if not stale:
self.stdout.write(f'{ongoing.count()} ongoing conferences found, all have recent activity.')
return

self.stdout.write(f'Found {len(stale)} conferences with no activity for more than {hours} hours.')

closed = 0
failed = 0

for conference, last_activity, idle in stale:
self.stdout.write(f' {conference.id} ({conference.conference_name or conference.conference_id}) - last activity {idle} ago')

if not dry_run:
try:
with transaction.atomic():
for connection in conference.connections.filter(end_time__isnull=True):
connection.end(now)
connection.save()

for session in conference.sessions.filter(end_time__isnull=True):
session.should_stop_call(now)
session.save()

conference.should_stop_call(now)
conference.save()

closed += 1
except Exception as e:
failed += 1
self.stderr.write(f' Error closing {conference.id}: {e}')

if dry_run:
self.stdout.write(f'\nDry run - no changes made. Run without --dry-run to close these conferences.')
else:
self.stdout.write(f'\nClosed {closed} stale conferences.' + (f' {failed} failed.' if failed else ''))