-
-
Notifications
You must be signed in to change notification settings - Fork 9
/
events.py
272 lines (247 loc) · 10.1 KB
/
events.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# Copyright (C) 2020-2024, Pyronear.
# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.
from datetime import datetime, timedelta
from typing import Annotated, List, cast
from fastapi import APIRouter, Depends, Path, Security, status
from pydantic import PositiveInt
from sqlalchemy import and_, func, select
from sqlalchemy.orm import aliased
from app.api import crud
from app.api.crud.authorizations import check_group_read, check_group_update, is_admin_access
from app.api.crud.groups import get_entity_group_id
from app.api.deps import get_current_access, get_db
from app.db import alerts, events
from app.models import Access, AccessType, Alert, Device, Event, Media
from app.schemas import (
AccessRead,
Acknowledgement,
AcknowledgementOut,
AlertOut,
EventIn,
EventOut,
EventPayload,
EventUpdate,
)
from app.services import s3_bucket
from app.services.telemetry import telemetry_client
router = APIRouter(redirect_slashes=True)
@router.post("/", status_code=status.HTTP_201_CREATED, summary="Create a new event")
async def create_event(
payload: EventIn,
access: Annotated[AccessRead, Security(get_current_access, scopes=[AccessType.admin, AccessType.device])],
) -> EventOut:
"""Creates a new event based on the given information
Below, click on "Schema" for more detailed information about arguments
or "Example Value" to get a concrete idea of arguments
"""
telemetry_client.capture(access.id, event="events-create")
return EventOut(**(await crud.create_entry(events, payload)))
@router.get("/{event_id}/", response_model=EventOut, summary="Get information about a specific event")
async def get_event(
event_id: int = Path(..., gt=0), requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user])
):
"""
Based on a event_id, retrieves information about the specified event
"""
telemetry_client.capture(requester.id, event="events-get", properties={"event_id": event_id})
requested_group_id = await get_entity_group_id(events, event_id)
await check_group_read(requester.id, cast(int, requested_group_id))
return await crud.get_entry(events, event_id)
@router.get("/", response_model=List[EventOut], summary="Get the list of all events")
async def fetch_events(
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), session=Depends(get_db)
):
"""
Retrieves the list of all events and their information
"""
telemetry_client.capture(requester.id, event="events-fetch")
if await is_admin_access(requester.id):
return await crud.fetch_all(events)
else:
retrieved_events = (
session.query(Event).join(Alert).join(Device).join(Access).filter(Access.group_id == requester.group_id)
)
retrieved_events = [x.__dict__ for x in retrieved_events.all()]
return retrieved_events
@router.get("/past", response_model=List[EventOut], summary="Get the list of all past events")
async def fetch_past_events(
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), session=Depends(get_db)
):
"""
Retrieves the list of all events and their information
"""
telemetry_client.capture(requester.id, event="events-fetch-past")
if await is_admin_access(requester.id):
return await crud.fetch_all(events, exclusions={"end_ts": None})
else:
retrieved_events = (
session.query(Event)
.join(Alert)
.join(Device)
.join(Access)
.filter(and_(Access.group_id == requester.group_id, Event.end_ts.isnot(None)))
)
retrieved_events = [x.__dict__ for x in retrieved_events.all()]
return retrieved_events
@router.put("/{event_id}/", response_model=EventOut, summary="Update information about a specific event")
async def update_event(
payload: EventUpdate,
event_id: int = Path(..., gt=0),
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.device]),
):
"""
Based on a event_id, updates information about the specified event
"""
telemetry_client.capture(requester.id, event="events-update", properties={"event_id": event_id})
requested_group_id = await get_entity_group_id(events, event_id)
await check_group_update(requester.id, cast(int, requested_group_id))
return await crud.update_entry(events, payload, event_id)
@router.put("/{event_id}/acknowledge", response_model=AcknowledgementOut, summary="Acknowledge an existing event")
async def acknowledge_event(
event_id: int = Path(..., gt=0), requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user])
):
"""
Based on a event_id, acknowledge the specified event
"""
telemetry_client.capture(requester.id, event="events-acknowledge", properties={"event_id": event_id})
requested_group_id = await get_entity_group_id(events, event_id)
await check_group_update(requester.id, cast(int, requested_group_id))
return await crud.update_entry(events, Acknowledgement(is_acknowledged=True), event_id)
@router.delete("/{event_id}/", response_model=EventOut, summary="Delete a specific event")
async def delete_event(
event_id: int = Path(..., gt=0),
access=Security(get_current_access, scopes=[AccessType.admin]),
session=Depends(get_db),
):
"""
Based on a event_id, deletes the specified event
"""
telemetry_client.capture(access.id, event="events-delete", properties={"event_id": event_id})
return await crud.delete_entry(events, event_id)
@router.get(
"/unacknowledged",
response_model=List[EventPayload],
summary="Get the list of events that haven't been acknowledged",
)
async def fetch_unacknowledged_events(
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), session=Depends(get_db)
):
"""
Retrieves the last 10 un-acknowledged events and their 10 first alerts
"""
telemetry_client.capture(requester.id, event="events-fetch-unacnkowledged")
# Last 15 unacknowledged events
subquery_events = (
session
.query(Event.id)
.filter(and_(
Event.is_acknowledged.is_(False),
Event.created_at > datetime.utcnow() - timedelta(hours=24)
))
.order_by(Event.id.desc())
.limit(15)
.subquery()
)
if await is_admin_access(requester.id):
# Alerts associated to the last 10 unacknowledged events
ranked_alerts = (
session
.query(
Alert.id.label('alert_id'), # Alias the columns to avoid ambiguity
Alert.event_id,
Alert.localization,
Alert.device_id,
Alert.media_id,
func.dense_rank().over(
partition_by=Alert.event_id,
order_by=Alert.id.asc()
).label('rank')
)
.filter(Alert.event_id.in_(select([subquery_events.c.id])))
.subquery()
)
else:
# Limit to devices in the same group
subquery_devices = (
session.query(Device.id)
.join(Access, Device.access_id == Access.id)
.filter(Access.group_id == requester.group_id)
.subquery()
)
ranked_alerts = (
session
.query(
Alert.id.label('alert_id'), # Alias the columns to avoid ambiguity
Alert.event_id,
Alert.localization,
Alert.device_id,
Alert.media_id,
func.dense_rank().over(
partition_by=Alert.event_id,
order_by=Alert.id.asc()
).label('rank')
)
.filter(and_(
Alert.event_id.in_(select([subquery_events.c.id])),
Alert.device_id.in_(select([subquery_devices.c.id]))
))
.subquery()
)
filtered_alerts = (
session.query(
ranked_alerts.c.alert_id,
ranked_alerts.c.event_id,
ranked_alerts.c.localization,
ranked_alerts.c.device_id,
ranked_alerts.c.media_id
)
.filter(ranked_alerts.c.rank <= 10)
.subquery()
)
# Aliased table to allow correct joining without conflicts
alert_alias = aliased(filtered_alerts)
# Final query
retrieved_alerts = (
session
.query(
Event,
alert_alias.c.alert_id,
Media.bucket_key,
alert_alias.c.localization,
alert_alias.c.device_id,
Device.login,
Device.azimuth,
)
.join(Event, Event.id == alert_alias.c.event_id)
.join(Media, Media.id == alert_alias.c.media_id)
.join(Device, Device.id == alert_alias.c.device_id)
.distinct()
.yield_per(100) # Fetch 100 rows at a time to reduce memory usage
.all()
)
return [
EventPayload(
**event.__dict__,
media_url=await s3_bucket.get_public_url(bucket_key),
localization=loc,
device_id=device_id,
alert_id=alert_id,
device_login=login,
device_azimuth=azimuth,
)
for event, alert_id, bucket_key, loc, device_id, login, azimuth in retrieved_alerts
]
@router.get("/{event_id}/alerts", response_model=List[AlertOut], summary="Get the list of alerts for event")
async def fetch_alerts_for_event(
event_id: PositiveInt,
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]),
session=Depends(get_db),
):
"""
Retrieves the list of alerts associated to the given event and their information
"""
telemetry_client.capture(requester.id, event="events-fetch-alerts", properties={"event_id": event_id})
requested_group_id = await get_entity_group_id(events, event_id)
await check_group_read(requester.id, cast(int, requested_group_id))
return await crud.base.database.fetch_all(query=alerts.select().where(alerts.c.event_id == event_id))