Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add limit and offset parameters to routes that fetch lists from db #277

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Dockerfile-dev
Original file line number Original file line Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM pyroapi:python3.8-alpine3.10 FROM pyronear/pyro-api:python3.8-alpine3.10


# copy requirements file # copy requirements file
COPY requirements-dev.txt requirements-dev.txt COPY requirements-dev.txt requirements-dev.txt
Expand Down
12 changes: 10 additions & 2 deletions src/app/api/crud/base.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from fastapi import HTTPException, Path, status from fastapi import HTTPException, Path, status
from pydantic import BaseModel from pydantic import BaseModel
from sqlalchemy import Table from sqlalchemy import Table
from sqlalchemy.orm import Query
from sqlalchemy.sql import Select


from app.db import database from app.db import database


Expand Down Expand Up @@ -41,16 +43,22 @@ async def fetch_all(
query_filters: Optional[Dict[str, Any]] = None, query_filters: Optional[Dict[str, Any]] = None,
exclusions: Optional[Dict[str, Any]] = None, exclusions: Optional[Dict[str, Any]] = None,
limit: int = 50, limit: int = 50,
offset: Optional[int] = None,
query: Optional[Select] = None,
) -> List[Mapping[str, Any]]: ) -> List[Mapping[str, Any]]:
query = table.select().order_by(table.c.id.desc()) if query is None:
query = table.select()
if isinstance(query_filters, dict): if isinstance(query_filters, dict):
for key, value in query_filters.items(): for key, value in query_filters.items():
query = query.where(getattr(table.c, key) == value) query = query.where(getattr(table.c, key) == value)


if isinstance(exclusions, dict): if isinstance(exclusions, dict):
for key, value in exclusions.items(): for key, value in exclusions.items():
query = query.where(getattr(table.c, key) != value) query = query.where(getattr(table.c, key) != value)
return (await database.fetch_all(query=query.limit(limit)))[::-1] query = query.order_by(table.c.id.desc()).limit(limit).offset(offset)
if isinstance(query, Query):
return [item.__dict__ for item in query[::-1]]
return (await database.fetch_all(query=query))[::-1]
Comment on lines +46 to +61
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Two questions though:

  • I think I understand this is an equivalent of the offset syntax in SQL. But here we seem to be making an arbitrary choice on order direction. Should we make that a boolean param?
  • regarding the query arg, we expect it to already have a .select()?

If we only want to paginate the queries, there are a few plugin options for this I think

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(paginate will prevent the case where we have a huge table and someone send limit=100000 which might blow up the RAM)





async def fetch_one(table: Table, query_filters: Dict[str, Any]) -> Optional[Mapping[str, Any]]: async def fetch_one(table: Table, query_filters: Dict[str, Any]) -> Optional[Mapping[str, Any]]:
Expand Down
13 changes: 9 additions & 4 deletions src/app/api/endpoints/accesses.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
# This program is licensed under the Apache License 2.0. # This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details. # See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.


from typing import List from typing import List, Optional


from fastapi import APIRouter, Path, Security from fastapi import APIRouter, Path, Query, Security
from typing_extensions import Annotated


from app.api import crud from app.api import crud
from app.api.deps import get_current_access from app.api.deps import get_current_access
Expand All @@ -25,8 +26,12 @@ async def get_access(access_id: int = Path(..., gt=0), _=Security(get_current_ac




@router.get("/", response_model=List[AccessRead], summary="Get the list of all accesses") @router.get("/", response_model=List[AccessRead], summary="Get the list of all accesses")
async def fetch_accesses(_=Security(get_current_access, scopes=[AccessType.admin])): async def fetch_accesses(
limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
_=Security(get_current_access, scopes=[AccessType.admin]),
):
Comment on lines -28 to +33
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a reminder for me: how do we change those values when sending a request?

""" """
Retrieves the list of all accesses and their information Retrieves the list of all accesses and their information
""" """
return await crud.fetch_all(accesses) return await crud.fetch_all(accesses, limit=limit, offset=offset)
55 changes: 24 additions & 31 deletions src/app/api/endpoints/alerts.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@


from functools import partial from functools import partial
from string import Template from string import Template
from typing import List, cast from typing import List, Optional, cast


from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Path, Security, status from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Path, Query, Security, status
from sqlalchemy import select from typing_extensions import Annotated


from app.api import crud from app.api import crud
from app.api.crud.authorizations import check_group_read, is_admin_access from app.api.crud.authorizations import check_group_read, is_admin_access
Expand All @@ -18,7 +18,7 @@
from app.api.endpoints.notifications import send_notification from app.api.endpoints.notifications import send_notification
from app.api.endpoints.recipients import fetch_recipients_for_group from app.api.endpoints.recipients import fetch_recipients_for_group
from app.api.external import post_request from app.api.external import post_request
from app.db import alerts, events, media from app.db import alerts, media
from app.models import Access, AccessType, Alert, Device, Event from app.models import Access, AccessType, Alert, Device, Event
from app.schemas import AlertBase, AlertIn, AlertOut, DeviceOut, NotificationIn, RecipientOut from app.schemas import AlertBase, AlertIn, AlertOut, DeviceOut, NotificationIn, RecipientOut


Expand Down Expand Up @@ -123,19 +123,22 @@ async def get_alert(


@router.get("/", response_model=List[AlertOut], summary="Get the list of all alerts") @router.get("/", response_model=List[AlertOut], summary="Get the list of all alerts")
async def fetch_alerts( async def fetch_alerts(
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), session=Depends(get_db) limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]),
session=Depends(get_db),
): ):
""" """
Retrieves the list of all alerts and their information Retrieves the list of all alerts and their information
""" """
if await is_admin_access(requester.id): return await crud.fetch_all(
return await crud.fetch_all(alerts) alerts,
else: query=None
retrieved_alerts = ( if await is_admin_access(requester.id)
session.query(Alert).join(Device).join(Access).filter(Access.group_id == requester.group_id).all() else session.query(Alert).join(Device).join(Access).filter(Access.group_id == requester.group_id),
) limit=limit,
retrieved_alerts = [x.__dict__ for x in retrieved_alerts] offset=offset,
return retrieved_alerts )




@router.delete("/{alert_id}/", response_model=AlertOut, summary="Delete a specific alert") @router.delete("/{alert_id}/", response_model=AlertOut, summary="Delete a specific alert")
Expand All @@ -148,25 +151,15 @@ async def delete_alert(alert_id: int = Path(..., gt=0), _=Security(get_current_a


@router.get("/ongoing", response_model=List[AlertOut], summary="Get the list of ongoing alerts") @router.get("/ongoing", response_model=List[AlertOut], summary="Get the list of ongoing alerts")
async def fetch_ongoing_alerts( async def fetch_ongoing_alerts(
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), session=Depends(get_db) limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]),
session=Depends(get_db),
): ):
""" """
Retrieves the list of ongoing alerts and their information Retrieves the list of ongoing alerts and their information
""" """
if await is_admin_access(requester.id): query = session.query(Alert).join(Event).filter(Event.end_ts.is_(None))
query = ( if not await is_admin_access(requester.id):
alerts.select().where(alerts.c.event_id.in_(select([events.c.id]).where(events.c.end_ts.is_(None)))) query = query.join(Device).join(Access).filter(Access.group_id == requester.group_id)
).order_by(alerts.c.id.desc()) return await crud.fetch_all(alerts, query=query, limit=limit, offset=offset)

return (await crud.base.database.fetch_all(query=query.limit(50)))[::-1]
else:
retrieved_alerts = (
session.query(Alert)
.join(Event)
.filter(Event.end_ts.is_(None))
.join(Device)
.join(Access)
.filter(Access.group_id == requester.group_id)
)
retrieved_alerts = [x.__dict__ for x in retrieved_alerts.all()]
return retrieved_alerts
33 changes: 21 additions & 12 deletions src/app/api/endpoints/devices.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details. # See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.


from datetime import datetime from datetime import datetime
from typing import List, cast from typing import List, Optional, cast


from fastapi import APIRouter, Depends, HTTPException, Path, Security, status from fastapi import APIRouter, Depends, HTTPException, Path, Query, Security, status
from typing_extensions import Annotated


from app.api import crud from app.api import crud
from app.api.crud.authorizations import is_admin_access from app.api.crud.authorizations import is_admin_access
Expand Down Expand Up @@ -80,18 +81,22 @@ async def get_my_device(me: DeviceOut = Security(get_current_device, scopes=["de


@router.get("/", response_model=List[DeviceOut], summary="Get the list of all devices") @router.get("/", response_model=List[DeviceOut], summary="Get the list of all devices")
async def fetch_devices( async def fetch_devices(
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), session=Depends(get_db) limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]),
session=Depends(get_db),
): ):
""" """
Retrieves the list of all devices and their information Retrieves the list of all devices and their information
""" """
if await is_admin_access(requester.id): return await crud.fetch_all(
return await crud.fetch_all(devices) devices,
else: query=None
retrieved_devices = session.query(Device).join(Access).filter(Access.group_id == requester.group_id).all() if await is_admin_access(requester.id)
retrieved_devices = [x.__dict__ for x in retrieved_devices] else session.query(Device).join(Access).filter(Access.group_id == requester.group_id),

limit=limit,
return retrieved_devices offset=offset,
)




@router.put("/{device_id}/", response_model=DeviceOut, summary="Update information about a specific device") @router.put("/{device_id}/", response_model=DeviceOut, summary="Update information about a specific device")
Expand All @@ -115,11 +120,15 @@ async def delete_device(device_id: int = Path(..., gt=0), _=Security(get_current
@router.get( @router.get(
"/my-devices", response_model=List[DeviceOut], summary="Get the list of all devices belonging to the current user" "/my-devices", response_model=List[DeviceOut], summary="Get the list of all devices belonging to the current user"
) )
async def fetch_my_devices(me: UserRead = Security(get_current_user, scopes=[AccessType.admin, AccessType.user])): async def fetch_my_devices(
limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
me: UserRead = Security(get_current_user, scopes=[AccessType.admin, AccessType.user]),
):
""" """
Retrieves the list of all devices and the information which are owned by the current user Retrieves the list of all devices and the information which are owned by the current user
""" """
return await crud.fetch_all(devices, {"owner_id": me.id}) return await crud.fetch_all(devices, {"owner_id": me.id}, limit=limit, offset=offset)




@router.put("/heartbeat", response_model=DeviceOut, summary="Update the last ping of the current device") @router.put("/heartbeat", response_model=DeviceOut, summary="Update the last ping of the current device")
Expand Down
87 changes: 46 additions & 41 deletions src/app/api/endpoints/events.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
# This program is licensed under the Apache License 2.0. # This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details. # See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.


from typing import List, cast from typing import List, Optional, cast


from fastapi import APIRouter, Depends, Path, Security, status from fastapi import APIRouter, Depends, Path, Query, Security, status
from pydantic import PositiveInt from pydantic import PositiveInt
from sqlalchemy import and_ from typing_extensions import Annotated


from app.api import crud from app.api import crud
from app.api.crud.authorizations import check_group_read, check_group_update, is_admin_access from app.api.crud.authorizations import check_group_read, check_group_update, is_admin_access
Expand Down Expand Up @@ -44,40 +44,43 @@ async def get_event(


@router.get("/", response_model=List[EventOut], summary="Get the list of all events") @router.get("/", response_model=List[EventOut], summary="Get the list of all events")
async def fetch_events( async def fetch_events(
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), session=Depends(get_db) limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]),
session=Depends(get_db),
): ):
""" """
Retrieves the list of all events and their information Retrieves the list of all events and their information
""" """
if await is_admin_access(requester.id): return await crud.fetch_all(
return await crud.fetch_all(events) events,
else: query=None
retrieved_events = ( if await is_admin_access(requester.id)
session.query(Event).join(Alert).join(Device).join(Access).filter(Access.group_id == requester.group_id) else session.query(Event).join(Alert).join(Device).join(Access).filter(Access.group_id == requester.group_id),
) limit=limit,
retrieved_events = [x.__dict__ for x in retrieved_events.all()] offset=offset,
return retrieved_events )




@router.get("/past", response_model=List[EventOut], summary="Get the list of all past events") @router.get("/past", response_model=List[EventOut], summary="Get the list of all past events")
async def fetch_past_events( async def fetch_past_events(
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), session=Depends(get_db) limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]),
session=Depends(get_db),
): ):
""" """
Retrieves the list of all events and their information Retrieves the list of all events without end timestamp and their information
""" """
if await is_admin_access(requester.id): return await crud.fetch_all(
return await crud.fetch_all(events, exclusions={"end_ts": None}) events,
else: exclusions={"end_ts": None},
retrieved_events = ( query=None
session.query(Event) if await is_admin_access(requester.id)
.join(Alert) else session.query(Event).join(Alert).join(Device).join(Access).filter(Access.group_id == requester.group_id),
.join(Device) limit=limit,
.join(Access) offset=offset,
.filter(and_(Access.group_id == requester.group_id, Event.end_ts.isnot(None))) )
)
retrieved_events = [x.__dict__ for x in retrieved_events.all()]
return retrieved_events




@router.put("/{event_id}/", response_model=EventOut, summary="Update information about a specific event") @router.put("/{event_id}/", response_model=EventOut, summary="Update information about a specific event")
Expand Down Expand Up @@ -122,28 +125,30 @@ async def delete_event(
"/unacknowledged", response_model=List[EventOut], summary="Get the list of events that haven't been acknowledged" "/unacknowledged", response_model=List[EventOut], summary="Get the list of events that haven't been acknowledged"
) )
async def fetch_unacknowledged_events( async def fetch_unacknowledged_events(
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), session=Depends(get_db) limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]),
session=Depends(get_db),
): ):
""" """
Retrieves the list of non confirmed alerts and their information Retrieves the list of unacknowledged alerts and their information
""" """
if await is_admin_access(requester.id): return await crud.fetch_all(
return await crud.fetch_all(events, {"is_acknowledged": False}) events,
else: {"is_acknowledged": False},
retrieved_events = ( query=None
session.query(Event) if await is_admin_access(requester.id)
.join(Alert) else session.query(Event).join(Alert).join(Device).join(Access).filter(Access.group_id == requester.group_id),
.join(Device) limit=limit,
.join(Access) offset=offset,
.filter(and_(Access.group_id == requester.group_id, Event.is_acknowledged.is_(False))) )
)
retrieved_events = [x.__dict__ for x in retrieved_events.all()]
return retrieved_events




@router.get("/{event_id}/alerts", response_model=List[AlertOut], summary="Get the list of alerts for event") @router.get("/{event_id}/alerts", response_model=List[AlertOut], summary="Get the list of alerts for event")
async def fetch_alerts_for_event( async def fetch_alerts_for_event(
event_id: PositiveInt, event_id: PositiveInt,
limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]),
session=Depends(get_db), session=Depends(get_db),
): ):
Expand All @@ -152,4 +157,4 @@ async def fetch_alerts_for_event(
""" """
requested_group_id = await get_entity_group_id(events, event_id) requested_group_id = await get_entity_group_id(events, event_id)
await check_group_read(requester.id, cast(int, requested_group_id)) await check_group_read(requester.id, cast(int, requested_group_id))
return await crud.base.database.fetch_all(query=alerts.select().where(alerts.c.event_id == event_id)) return await crud.fetch_all(alerts, {"event_id": event_id}, limit=limit, offset=offset)
12 changes: 8 additions & 4 deletions src/app/api/endpoints/groups.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
# This program is licensed under the Apache License 2.0. # This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details. # See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.


from typing import List from typing import List, Optional


from fastapi import APIRouter, Path, Security, status from fastapi import APIRouter, Path, Query, Security, status
from typing_extensions import Annotated


from app.api import crud from app.api import crud
from app.api.deps import get_current_access from app.api.deps import get_current_access
Expand Down Expand Up @@ -35,11 +36,14 @@ async def get_group(group_id: int = Path(..., gt=0)):




@router.get("/", response_model=List[GroupOut], summary="Get the list of all groups") @router.get("/", response_model=List[GroupOut], summary="Get the list of all groups")
async def fetch_groups(): async def fetch_groups(
limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
):
""" """
Retrieves the list of all groups and their information Retrieves the list of all groups and their information
""" """
return await crud.fetch_all(groups) return await crud.fetch_all(groups, limit=limit, offset=offset)




@router.put("/{group_id}/", response_model=GroupOut, summary="Update information about a specific group") @router.put("/{group_id}/", response_model=GroupOut, summary="Update information about a specific group")
Expand Down
Loading
Loading