Skip to content

Commit

Permalink
Adds support for changes endpoint (#41)
Browse files Browse the repository at this point in the history
This endpoint allows receiving events when the database documents get created, changed, or deleted.
  • Loading branch information
bmario committed Jan 19, 2022
1 parent 017296c commit c207b13
Show file tree
Hide file tree
Showing 14 changed files with 522 additions and 26 deletions.
38 changes: 34 additions & 4 deletions aiocouch/database.py
Expand Up @@ -34,7 +34,6 @@
AsyncContextManager,
AsyncGenerator,
Callable,
Dict,
List,
Optional,
TypeVar,
Expand All @@ -44,13 +43,14 @@
from .bulk import BulkCreateOperation, BulkUpdateOperation
from .design_document import DesignDocument
from .document import Document, SecurityDocument
from .event import BaseChangeEvent, ChangedEvent, DeletedEvent
from .exception import ConflictError, NotFoundError
from .remote import RemoteDatabase
from .request import FindRequest
from .typing import JsonDict
from .view import AllDocsView, View

FuncT = TypeVar("FuncT", bound=Callable[..., Any])
JsonDict = Dict[str, Any]


def _returns_async_context_manager(f: FuncT) -> FuncT:
Expand Down Expand Up @@ -278,11 +278,15 @@ async def __getitem__(self, id: str) -> Document:
"""
return await self.get(id)

async def get(self, id: str, default: Optional[JsonDict] = None) -> Document:
async def get(
self, id: str, default: Optional[JsonDict] = None, *, rev: Optional[str] = None
) -> Document:
"""Returns the document with the given id
:raises `~aiocouch.NotFoundError`: if the given document does not exist and
`default` is `None`
:raises `~aiocouch.BadRequestError`: if the given rev of the document is
invalid or missing
:param id: the name of the document
Expand All @@ -291,13 +295,15 @@ async def get(self, id: str, default: Optional[JsonDict] = None) -> Document:
`default` as its contents, is returned. To create the document on the
server, :meth:`~aiocouch.document.Document.save` has to be called on the
returned instance.
:param rev: The requested rev of the document. The requested rev might not
or not anymore exist on the connected server.
:return: a local representation of the requested document
"""
doc = Document(self, id, data=default)

try:
await doc.fetch(discard_changes=True)
await doc.fetch(discard_changes=True, rev=rev)
except NotFoundError as e:
if default is None:
raise e
Expand All @@ -319,3 +325,27 @@ async def info(self) -> JsonDict:
"""
return await self._get()

async def changes(
self, last_event_id: Optional[str] = None, **params: Any
) -> AsyncGenerator[BaseChangeEvent, None]:
"""Listens for events made to documents of this database
This will return :class:`~aiocouch.event.DeletedEvent` and
:class:`~aiocouch.event.ChangedEvent` for deleted and modified
documents, respectively.
See also :ref:`/db/_changes<couchdb:api/db/changes>`.
For convinience, the ``last-event-id`` parameter can also be passed
as ``last_event_id``.
"""
if last_event_id and "last-event-id" not in params:
params["last-event-id"] = last_event_id

async for json in self._changes(**params):
if "deleted" in json and json["deleted"] is True:
yield DeletedEvent(json=json)
else:
yield ChangedEvent(database=self, json=json)
28 changes: 13 additions & 15 deletions aiocouch/document.py
Expand Up @@ -31,26 +31,15 @@
import json
from contextlib import suppress
from types import TracebackType
from typing import (
Any,
Dict,
ItemsView,
KeysView,
List,
Optional,
Type,
ValuesView,
cast,
)
from typing import Any, ItemsView, KeysView, List, Optional, Type, ValuesView, cast

from deprecated import deprecated

from . import database
from .attachment import Attachment
from .exception import ConflictError, ForbiddenError, NotFoundError, raises
from .remote import RemoteDocument

JsonDict = Dict[str, Any]
from .typing import JsonDict


class Document(RemoteDocument):
Expand Down Expand Up @@ -109,25 +98,34 @@ def _dirty_cache(self) -> bool:
json.dumps(self._data, sort_keys=True)
)

async def fetch(self, discard_changes: bool = False) -> None:
async def fetch(
self, discard_changes: bool = False, *, rev: Optional[str] = None
) -> None:
"""Retrieves the document data from the server
Fetching the document will retrieve the data from the server using a network
request and update the local data.
:raises ~aiocouch.ConflictError: if the local data has changed without saving
:raises ~aiocouch.BadRequestError: if the given rev is invalid or missing
:param discard_changes: If set to `True`, the local data object will the
overridden with the retrieved content. If the local data was changed, no
exception will be raised.
:param rev: The requested rev of the document. The requested rev might not
or not anymore exist on the connected server.
"""
if self._dirty_cache and not (discard_changes or self._fresh):
raise ConflictError(
f"Cannot fetch document '{self.id}' from server, "
"as the local cache has unsaved changes."
)
self._update_cache(await self._get())

if rev:
self._update_cache(await self._get(rev=rev))
else:
self._update_cache(await self._get())

async def save(self) -> None:
"""Saves the current state to the CouchDB server
Expand Down
91 changes: 91 additions & 0 deletions aiocouch/event.py
@@ -0,0 +1,91 @@
# Copyright (c) 2021, ZIH,
# Technische Universitaet Dresden,
# Federal Republic of Germany
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of metricq nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


from typing import cast

from attr import dataclass

from . import database as db
from . import document
from .typing import JsonDict


@dataclass
class BaseChangeEvent:
"""The base event for shared properties"""

json: JsonDict
"""The raw data of the event as JSON"""

@property
def id(self) -> str:
"""Returns the id of the document"""
return cast(str, self.json["id"])

@property
def rev(self) -> str:
"""Returns the new rev of the document"""
return cast(str, self.json["changes"][0]["rev"])

@property
def sequence(self) -> str:
"""Returns the sequence identifier of the event"""
return cast(str, self.json["seq"])


class DeletedEvent(BaseChangeEvent):
"""This event denotes that the document got deleted"""

pass


@dataclass
class ChangedEvent(BaseChangeEvent):
"""This event denotes that the document got modified"""

database: "db.Database"
"""The database for reference"""

async def doc(self) -> "document.Document":
"""Returns the document after the change
If the ``include_docs`` was set, this will use the data provided in the received event.
Otherwise, the document is fetched from the server.
"""
try:
# if in the request include_docs was given, we can create the
# document on the spot...
return document.Document(
self.database, self.json["doc"]["_id"], self.json["doc"]
)
except KeyError:
# ...otherwise, we fetch the document contents from the server
return await self.database.get(self.id, rev=self.rev)
33 changes: 32 additions & 1 deletion aiocouch/exception.py
Expand Up @@ -30,7 +30,17 @@

import functools
from contextlib import suppress
from typing import Any, Callable, Dict, NoReturn, Optional, Type, TypeVar, cast
from typing import (
Any,
AsyncGenerator,
Callable,
Dict,
NoReturn,
Optional,
Type,
TypeVar,
cast,
)

import aiohttp
from typing_extensions import Protocol
Expand Down Expand Up @@ -163,3 +173,24 @@ async def wrapper(endpoint: Endpoint, *args: Any, **kwargs: Any) -> Any:
return cast(FuncT, wrapper)

return decorator_raises


def generator_raises(
status: int, message: str, exception_type: Optional[Type[Exception]] = None
) -> Callable[[FuncT], FuncT]:
def decorator_raises(func: FuncT) -> FuncT:
@functools.wraps(func)
async def wrapper(
endpoint: Endpoint, *args: Any, **kwargs: Any
) -> AsyncGenerator[Any, None]:
try:
async for data in func(endpoint, *args, **kwargs):
yield data
except aiohttp.ClientResponseError as exception:
if status == exception.status:
raise_for_endpoint(endpoint, message, exception, exception_type)
raise exception

return cast(FuncT, wrapper)

return decorator_raises
44 changes: 41 additions & 3 deletions aiocouch/remote.py
Expand Up @@ -29,15 +29,17 @@
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio
from typing import Any, Dict, List, Optional, Tuple, Union, cast
import json
from contextlib import suppress
from typing import Any, AsyncGenerator, List, Optional, Tuple, Union, cast
from urllib.parse import quote

import aiohttp

from . import database, document
from .exception import NotFoundError, raises
from .exception import NotFoundError, generator_raises, raises
from .typing import JsonDict

JsonDict = Dict[str, Any]
RequestResult = Tuple[JsonDict, Union[bytes, JsonDict]]


Expand Down Expand Up @@ -145,6 +147,26 @@ async def _request(
await resp.json() if return_json else await resp.read(),
)

async def _streamed_request(
self,
method: str,
path: str,
params: Optional[JsonDict] = None,
**kwargs: Any,
) -> AsyncGenerator[JsonDict, None]:
kwargs["params"] = _stringify_params(params) if params else {}
kwargs.setdefault("timeout", aiohttp.ClientTimeout())

async with self._http_session.request(
method, url=f"{self._server}{path}", **kwargs
) as resp:
resp.raise_for_status()

async for line in resp.content:
# this should only happen for empty lines
with suppress(json.JSONDecodeError):
yield json.loads(line)

@raises(401, "Invalid credentials")
async def _all_dbs(self, **params: Any) -> List[str]:
_, json = await self._get("/_all_dbs", params)
Expand Down Expand Up @@ -271,6 +293,22 @@ async def _put_security(self, doc: JsonDict) -> JsonDict:
assert not isinstance(json, bytes)
return json

@generator_raises(400, "Invalid request")
async def _changes(self, **params: Any) -> AsyncGenerator[JsonDict, None]:
if "feed" in params and params["feed"] == "continuous":
params.setdefault("heartbeat", True)
async for data in self._remote._streamed_request(
"GET", f"{self.endpoint}/_changes", params=params
):
yield data
else:
_, json = await self._remote._get(
f"{self.endpoint}/_changes", params=params
)
assert not isinstance(json, bytes)
for result in json["results"]:
yield result


class RemoteDocument:
def __init__(self, database: "database.Database", id: str):
Expand Down
35 changes: 35 additions & 0 deletions aiocouch/typing.py
@@ -0,0 +1,35 @@
# Copyright (c) 2021, ZIH,
# Technische Universitaet Dresden,
# Federal Republic of Germany
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of metricq nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from typing import Any, Dict, List, Union

JsonDict = Dict[str, Any]
JsonList = List[Any]
Json = Union[JsonDict, JsonList]

0 comments on commit c207b13

Please sign in to comment.