Skip to content

Commit

Permalink
Merge pull request #134 from lsst-sqre/tickets/DM-37595
Browse files Browse the repository at this point in the history
DM-37595: Use websockets for heartbeats
  • Loading branch information
mfisherlevine committed Apr 12, 2023
2 parents 45a4079 + 26fcf08 commit c075235
Show file tree
Hide file tree
Showing 67 changed files with 2,443 additions and 1,041 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ dmypy.json
# Mac filesystem stuff
.DS_Store

# VScode store
# VScode
.vscode/
jsconfig.json

# jupyter notebooks
*.ipynb
Expand Down
1 change: 0 additions & 1 deletion requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,4 @@ google-cloud-storage
jinja2
safir<2
python-dateutil
dacite
PyYAML
4 changes: 0 additions & 4 deletions requirements/main.txt
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,6 @@ click==8.1.3 \
--hash=sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e \
--hash=sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48
# via -r requirements/main.in
dacite==1.8.0 \
--hash=sha256:6257a5e505b61a8cafee7ef3ad08cf32ee9b885718f42395d017e0a9b4c6af65 \
--hash=sha256:f7b1205cc5d9b62835aac8cbc1e6e37c1da862359a401f1edbe2ae08fbdc6193
# via -r requirements/main.in
frozenlist==1.3.3 \
--hash=sha256:008a054b75d77c995ea26629ab3a0c0d7281341f2fa7e1e85fa6153ae29ae99c \
--hash=sha256:02c9ac843e3390826a265e331105efeab489ffaf4dd86384595ee8ce6d35ae7f \
Expand Down
114 changes: 105 additions & 9 deletions src/rubintv/app.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,40 @@
"""The main application factory for the rubintv service."""

__all__ = ["create_app"]
__all__ = ["create_app", "create_app_light"]

import asyncio
import weakref
from pathlib import Path
from typing import AsyncGenerator

import aiohttp_jinja2
import jinja2
from aiohttp import web
from aiohttp import WSCloseCode, web, web_middlewares
from google.cloud import storage
from safir.http import init_http_session
from safir.logging import configure_logging
from safir.metadata import setup_metadata
from safir.middleware import bind_logger

from rubintv.background.heartbeats import poll_for_heartbeats
from rubintv.config import Configuration
from rubintv.handlers import init_external_routes, init_internal_routes
from rubintv.models.historicaldata import HistoricalData
from rubintv.models.models_assignment import locations
from rubintv.models.models_init import ModelsInitator

DATE_FOR_MINIMAL_LOAD = "2022-12-08"


def create_app(load_minimal_data: bool = False) -> web.Application:
"""Create and configure the aiohttp.web application."""
"""Create and configure the aiohttp.web application.
Run via `tox -e run-app`
Returns
-------
`web.Application`
The web app.
"""
config = Configuration()
configure_logging(
profile=config.profile,
Expand All @@ -30,27 +44,42 @@ def create_app(load_minimal_data: bool = False) -> web.Application:
root_app = web.Application()
root_app["safir/config"] = config

models = ModelsInitator()
root_app["rubintv/models"] = models

client = storage.Client()
bucket_names = {loc.slug: loc.bucket for loc in locations.values()}
bucket_names = {loc.slug: loc.bucket for loc in models.locations.values()}

for location, bucket_name in bucket_names.items():
for location_name, bucket_name in bucket_names.items():
bucket = client.bucket(bucket_name)
root_app[f"rubintv/buckets/{location}"] = bucket
root_app[f"rubintv/cached_data/{location}"] = HistoricalData(
location, bucket, load_minimal_data
root_app[f"rubintv/buckets/{location_name}"] = bucket
location = models.locations[location_name]
root_app[f"rubintv/cached_data/{location_name}"] = HistoricalData(
location,
bucket,
models.cameras,
load_minimal_data,
DATE_FOR_MINIMAL_LOAD,
)

root_app["rubintv/date_to_load"] = DATE_FOR_MINIMAL_LOAD
root_app["rubintv/site_title"] = "RubinTV Display"
setup_metadata(package_name="rubintv", app=root_app)
setup_middleware(root_app)
root_app.add_routes(init_internal_routes())
root_app.cleanup_ctx.append(init_http_session)
root_app["rubintv/heartbeats"] = {}
root_app.cleanup_ctx.append(heartbeat_polling_init)
root_app.cleanup_ctx.append(websockets_handler)

sub_app = web.Application()
aiohttp_jinja2.setup(
sub_app,
loader=jinja2.FileSystemLoader(Path(__file__).parent / "templates"),
)
# Prevent tojson() filter from re-ordering keys
env = aiohttp_jinja2.get_env(sub_app)
env.policies["json.dumps_kwargs"] = {"sort_keys": False}

setup_middleware(sub_app)
sub_app.add_routes(init_external_routes())
Expand All @@ -72,7 +101,74 @@ def create_app(load_minimal_data: bool = False) -> web.Application:
def setup_middleware(app: web.Application) -> None:
"""Add middleware to the application."""
app.middlewares.append(bind_logger)
app.middlewares.append(
web_middlewares.normalize_path_middleware(
append_slash=False, remove_slash=True
)
)


def create_app_light() -> web.Application:
"""Create an app that loads historical data for only a single date.
The date is set in DATE_FOR_MINIMAL_LOAD as ``"YYYY-MM-DD"``.
Used as the app entry via ``tox -e run-light``.
Returns
-------
`web.Application`
The web app with only the single date's data to cache.
"""
return create_app(load_minimal_data=True)


async def heartbeat_polling_init(app: web.Application) -> AsyncGenerator:
"""Initialise a loop for polling the heartbeats in the bucket.
Uses the aiohttp cleanup contexts pattern.
Parameters
----------
app : `web.Application`
The web app.
Returns
-------
`AsyncGenerator`
Set-up and tear down of polling for heartbeats
See Also
--------
aiohttp reference on `cleanup contexts`_.
.. _cleanup contexts: https://docs.aiohttp.org/en/stable/web_advanced.html#cleanup-context
"""
app["heartbeats_poller"] = asyncio.create_task(poll_for_heartbeats(app))
yield
app["heartbeats_poller"].cancel()
await app["heartbeats_poller"]


async def websockets_handler(app: web.Application) -> AsyncGenerator:
"""Initialise and gracefully close websockets.
Uses the aiohttp cleanup contexts pattern.
Parameters
----------
app : `web.Application`
The web app.
Returns
-------
`AsyncGenerator`
Set-up and tear down of websockets.
See Also
--------
aiohttp reference on `cleanup contexts`_.
.. _cleanup contexts: https://docs.aiohttp.org/en/stable/web_advanced.html#cleanup-context
"""
app["websockets"] = weakref.WeakSet()
yield
for ws in set(app["websockets"]):
await ws.close(code=WSCloseCode.GOING_AWAY, message="Server shutdown")
Empty file.
130 changes: 130 additions & 0 deletions src/rubintv/background/heartbeats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import asyncio
import json
import time
from typing import Any, Dict, List

from aiohttp import web
from google.api_core.exceptions import NotFound
from google.cloud.storage.client import Bucket

__all__ = [
"poll_for_heartbeats",
"process_heartbeats",
"download_heartbeat_objects",
]

HEARTBEATS_PREFIX = "heartbeats"


async def poll_for_heartbeats(app: web.Application) -> None:
"""Asynchronous loop that polls the buckets for heartbeat files.
Called from within a set-up/tear-down function during the app's
initialisation.
Parameters
----------
app : `web.Application`
The web app.
"""
try:
while True:
for location in app["rubintv/models"].locations:
# just use summit bucket to start
bucket = app[f"rubintv/buckets/{location}"]
heartbeats_json_arr = download_heartbeat_objects(
bucket, HEARTBEATS_PREFIX
)
heartbeats = process_heartbeats(heartbeats_json_arr)
app["rubintv/heartbeats"][location] = heartbeats
await asyncio.sleep(30)
except asyncio.exceptions.CancelledError:
print("Polling for heartbeats cancelled")


def download_heartbeat_objects(bucket: Bucket, prefix: str) -> List[Dict]:
"""Attempts to download heartbeat objects.
Parameters
----------
bucket : `Bucket`
The given bucket
prefix : `str`
The prefix for a particular heartbeat file.
Returns
-------
raw_heartbeats : `List` [`Dict`]
A list of heartbeat dicts.
See Also
--------
`process_heartbeats()` for details
"""
hb_blobs = list(bucket.list_blobs(prefix=prefix))
heartbeat_objs = []
for hb_blob in hb_blobs:
try:
the_blob = bucket.blob(hb_blob.name)
blob_content = the_blob.download_as_bytes()
except NotFound:
blob_content = None
print(f"Error: {hb_blob.name} not found.")
if not blob_content:
continue
else:
hb = json.loads(blob_content)
hb["url"] = hb_blob.name
heartbeat_objs.append(hb)
return heartbeat_objs


def process_heartbeats(
heartbeat_objs: List[Dict[str, Any]]
) -> Dict[str, Dict[str, Any]]:
"""Takes a list of heartbeat dicts and returns a dict of simplified
dicts keyed by channel/service name that include a boolean for the
current status of the heartbeat.
Parameters
----------
heartbeats_json_list : `List` [`Dict` [`str`, `Any`]]
A list of heartbeat dicts as they are as json files in the bucket
Returns
-------
heartbeats : `Dict` [`str`, `Dict` [`str`, `Any`]]
A dictionary of simplified heartbeat dicts keyed by channel/service name
Examples
--------
input:
``
{
'channel': 'allsky',
'currTime': 1675779822,
'nextExpected': 1675780422,
'errors': {},
'url': 'heartbeats/allsky.json'
}
``
outputs:
``
{
'allsky': {
'active': True,
'next': 1675780422,
'curr': 1675779822
}
}
``
"""
heartbeats = {}
t = time.time()
for hb in heartbeat_objs:
channel = hb["channel"]
next = hb["nextExpected"]
curr = hb["currTime"]
active = next > t
heartbeats[channel] = {"active": active, "next": next, "curr": curr}
return heartbeats
Loading

0 comments on commit c075235

Please sign in to comment.