diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 91610fb..a80d1e7 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,7 +16,6 @@ repos: name: black entry: black language: python - language_version: python3.8 args: - . - --check diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cf8ef1..ac3dfb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added: + +- Support HTTP API v0.7, [PR-36](https://github.com/reduct-storage/reduct-py/pull/36) + ## [0.3.0] - 2022-07-02 ### Added: diff --git a/pkg/reduct/bucket.py b/pkg/reduct/bucket.py index 29ef17e..33ea1ec 100644 --- a/pkg/reduct/bucket.py +++ b/pkg/reduct/bucket.py @@ -1,9 +1,19 @@ """Bucket API""" import json -from enum import Enum -from typing import Optional, List, Tuple, AsyncIterator, Union import time - +from dataclasses import dataclass +from enum import Enum +from typing import ( + Optional, + List, + Tuple, + AsyncIterator, + Union, + Callable, + Awaitable, +) + +from deprecation import deprecated from pydantic import BaseModel from reduct.http import HttpClient @@ -86,6 +96,22 @@ class BucketFullInfo(BaseModel): """information about entries of bucket""" +@dataclass +class Record: + """Record in a query""" + + timestamp: int + """UNIX timestamp in microsecods""" + size: int + """size of data""" + last: bool + """last record in the query""" + read_all: Callable[[None], Awaitable[bytes]] + """read all data""" + read: Callable[[int], AsyncIterator[bytes]] + """read data in chunks""" + + class Bucket: """A bucket of data in Reduct Storage""" @@ -111,7 +137,7 @@ async def set_settings(self, settings: BucketSettings): Raises: ReductError: if there is an HTTP error """ - await self._http.request("PUT", f"/b/{self.name}", data=settings.json()) + await self._http.request_all("PUT", f"/b/{self.name}", data=settings.json()) async def info(self) -> BucketInfo: """ @@ -139,7 +165,7 @@ async def remove(self): Raises: ReductError: if there is an HTTP error """ - await self._http.request("DELETE", f"/b/{self.name}") + await self._http.request_all("DELETE", f"/b/{self.name}") async def read(self, entry_name: str, timestamp: Optional[int] = None) -> bytes: """ @@ -168,7 +194,8 @@ async def read_by( >>> print(chunk) Args: entry_name: name of entry in the bucket - timestamp: UNIX timestamp in microseconds if None get the latest record + timestamp: UNIX timestamp in microseconds + if None get the latest record chunk_size: Returns: bytes: @@ -212,7 +239,7 @@ async def write( """ params = {"ts": timestamp if timestamp else time.time_ns() / 1000} - await self._http.request( + await self._http.request_all( "POST", f"/b/{self.name}/{entry_name}", params=params, @@ -220,6 +247,9 @@ async def write( content_length=content_length if content_length else len(data), ) + @deprecated( + deprecated_in="0.4.0", removed_in="1.0.0", details="Use Bucket.query instead" + ) async def list( self, entry_name: str, start: int, stop: int ) -> List[Tuple[int, int]]: @@ -236,7 +266,7 @@ async def list( has time stamp (first element) of a record and its size in bytes """ params = {"start": start, "stop": stop} - data = await self._http.request( + data = await self._http.request_all( "GET", f"/b/{self.name}/{entry_name}/list", params=params, @@ -245,7 +275,64 @@ async def list( items = [(int(record["ts"]), int(record["size"])) for record in records] return items + async def query( + self, + entry_name: str, + start: Optional[int] = None, + stop: Optional[int] = None, + ttl: Optional[int] = None, + ) -> AsyncIterator[Record]: + """ + Query data for a time interval + + >>> async for record in bucket.query("entry-1", stop=time.time_ns() / 1000): + >>> data: bytes = record.read_all() + >>> # or + >>> async for chunk in record.read(n=1024): + >>> print(chunk) + Args: + entry_name: name of entry in the bucket + start: the beginning of the time interval + stop: the end of the time interval + ttl: Time To Live of the request in seconds + + Returns: + AsyncIterator[Record]: iterator to the records + """ + params = {} + if start: + params["start"] = start + if stop: + params["stop"] = stop + if ttl: + params["ttl"] = ttl + + url = f"/b/{self.name}/{entry_name}" + data = await self._http.request_all( + "GET", + f"{url}/q", + params=params, + ) + query_id = json.loads(data)["id"] + last = False + while not last: + async with self._http.request("GET", f"{url}?q={query_id}") as resp: + if resp.status == 202: + return + + timestamp = int(resp.headers["x-reduct-time"]) + size = int(resp.headers["content-length"]) + last = int(resp.headers["x-reduct-last"]) != 0 + + yield Record( + timestamp=timestamp, + size=size, + last=last, + read_all=resp.read, + read=resp.content.iter_chunked, + ) + async def __get_full_info(self) -> BucketFullInfo: return BucketFullInfo.parse_raw( - await self._http.request("GET", f"/b/{self.name}") + await self._http.request_all("GET", f"/b/{self.name}") ) diff --git a/pkg/reduct/client.py b/pkg/reduct/client.py index 799d228..85b195c 100644 --- a/pkg/reduct/client.py +++ b/pkg/reduct/client.py @@ -76,7 +76,7 @@ async def info(self) -> ServerInfo: Raises: ReductError: if there is an HTTP error """ - return ServerInfo.parse_raw(await self._http.request("GET", "/info")) + return ServerInfo.parse_raw(await self._http.request_all("GET", "/info")) async def list(self) -> List[BucketInfo]: """ @@ -87,7 +87,9 @@ async def list(self) -> List[BucketInfo]: Raises: ReductError: if there is an HTTP error """ - return BucketList.parse_raw(await self._http.request("GET", "/list")).buckets + return BucketList.parse_raw( + await self._http.request_all("GET", "/list") + ).buckets async def get_bucket(self, name: str) -> Bucket: """ @@ -99,7 +101,7 @@ async def get_bucket(self, name: str) -> Bucket: Raises: ReductError: if there is an HTTP error """ - await self._http.request("HEAD", f"/b/{name}") + await self._http.request_all("HEAD", f"/b/{name}") return Bucket(name, self._http) async def create_bucket( @@ -120,7 +122,7 @@ async def create_bucket( """ data = settings.json() if settings else None try: - await self._http.request("POST", f"/b/{name}", data=data) + await self._http.request_all("POST", f"/b/{name}", data=data) except ReductError as err: if err.status_code != 409 or not exist_ok: raise err diff --git a/pkg/reduct/http.py b/pkg/reduct/http.py index ad5bc33..bf6a6d1 100644 --- a/pkg/reduct/http.py +++ b/pkg/reduct/http.py @@ -1,10 +1,10 @@ """Internal HTTP helper""" -import hashlib import json +from contextlib import asynccontextmanager from typing import Optional, AsyncIterator import aiohttp -from aiohttp import ClientTimeout +from aiohttp import ClientTimeout, ClientResponse from reduct.error import ReductError @@ -20,10 +20,9 @@ def __init__( self.headers = {} self.timeout = ClientTimeout(timeout) - async def request_by( - self, method: str, path: str = "", chunk_size=1024, **kwargs - ) -> AsyncIterator[bytes]: - """HTTP request with ReductError exception by chunks""" + @asynccontextmanager + async def request(self, method: str, path: str = "", **kwargs) -> ClientResponse: + """HTTP request with ReductError exception""" extra_headers = {} if "content_length" in kwargs: @@ -40,16 +39,14 @@ async def request_by( ) as response: if response.ok: - async for chunk in response.content.iter_chunked(chunk_size): - yield chunk - return # Success + yield response + break if response.status == 401: # Authentication issue, try to refresh token and repeat request - hasher = hashlib.sha256(bytes(self.api_token, "utf-8")) async with session.post( f"{self.url}/auth/refresh", - headers={"Authorization": f"Bearer {hasher.hexdigest()}"}, + headers={"Authorization": f"Bearer {self.api_token}"}, ) as auth_resp: if auth_resp.status == 200: data = json.loads(await auth_resp.read()) @@ -60,9 +57,16 @@ async def request_by( raise ReductError(response.status, await response.text()) - async def request(self, method: str, path: str = "", **kwargs) -> bytes: + async def request_all(self, method: str, path: str = "", **kwargs) -> bytes: + """Http request""" + async with self.request(method, path, **kwargs) as response: + return await response.read() + + async def request_by( + self, method: str, path: str = "", chunk_size=1024, **kwargs + ) -> AsyncIterator[bytes]: """Http request""" - blob = b"" - async for chunk in self.request_by(method, path, chunk_size=1024, **kwargs): - blob += chunk - return blob + async with self.request(method, path, **kwargs) as response: + async for chunk in response.content.iter_chunked(chunk_size): + yield chunk + return diff --git a/setup.py b/setup.py index 93c7bb7..c74c184 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ PACKAGE_NAME = "reduct-py" MAJOR_VERSION = 0 -MINOR_VERSION = 3 +MINOR_VERSION = 4 PATCH_VERSION = 0 HERE = Path(__file__).parent.resolve() @@ -48,7 +48,7 @@ def get_long_description(base_path: Path): package_data={"": ["VERSION"]}, packages=find_packages(where="pkg"), python_requires=">=3.8", - install_requires=["aiohttp~=3.8", "pydantic~=1.9"], + install_requires=["aiohttp~=3.8", "pydantic~=1.9", "deprecation~=2.1"], extras_require={ "test": ["pytest~=7.1", "pytest-asyncio~=0.18"], "lint": [ diff --git a/tests/bucket_test.py b/tests/bucket_test.py index 166427b..98303ee 100644 --- a/tests/bucket_test.py +++ b/tests/bucket_test.py @@ -1,9 +1,11 @@ """Tests for Bucket""" import time +from typing import List import pytest from reduct import ReductError, BucketSettings, QuotaType +from reduct.bucket import Record @pytest.mark.asyncio @@ -135,3 +137,64 @@ async def test__list(bucket_1): """Should get list of records for time interval""" records = await bucket_1.list("entry-2", start=0, stop=5_000_000) assert records == [(3000000, 11), (4000000, 11)] + + +@pytest.mark.asyncio +async def test_query_records(bucket_1): + """Should query records for a time interval""" + records: List[Record] = [ + record + async for record in bucket_1.query("entry-2", start=0, stop=5_000_000, ttl=5) + ] + assert len(records) == 2 + + assert records[0].timestamp == 3000000 + assert records[0].size == 11 + assert not records[0].last + + assert records[1].timestamp == 4000000 + assert records[1].size == 11 + assert records[1].last + + +@pytest.mark.asyncio +async def test_query_records_first(bucket_1): + """Should query records for from first record""" + + records: List[Record] = [ + record async for record in bucket_1.query("entry-2", stop=4_000_000) + ] + assert len(records) == 1 + assert records[0].timestamp == 3_000_000 + + +@pytest.mark.asyncio +async def test_query_records_last(bucket_1): + """Should query records for until last record""" + records: List[Record] = [ + record async for record in bucket_1.query("entry-2", start=4_000_000) + ] + assert len(records) == 1 + assert records[0].timestamp == 4_000_000 + + +@pytest.mark.asyncio +async def test_query_records_all(bucket_1): + """Should query records all data""" + records = [record async for record in bucket_1.query("entry-2")] + assert len(records) == 2 + + +@pytest.mark.asyncio +async def test_read_record(bucket_1): + """Should provide records with read method""" + data = [await record.read_all() async for record in bucket_1.query("entry-2")] + assert data == [b"some-data-3", b"some-data-4"] + + data = [] + + async for record in bucket_1.query("entry-2"): + async for chunk in record.read(n=4): + data.append(chunk) + + assert data == [b"some", b"-dat", b"a-3", b"some", b"-dat", b"a-4"] diff --git a/tests/client_test.py b/tests/client_test.py index 5d0cb6e..bc8e693 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -32,7 +32,7 @@ async def test__info(client): await sleep(1) info: ServerInfo = await client.info() - assert info.version >= "0.6.0" + assert info.version >= "0.7.0" assert info.uptime >= 1 assert info.bucket_count == 2 assert info.usage == 66 @@ -40,7 +40,7 @@ async def test__info(client): assert info.latest_record == 6_000_000 assert info.defaults.bucket.dict() == { - "max_block_size": 67108864, + "max_block_size": 64000000, "max_block_records": 1024, "quota_size": 0, "quota_type": QuotaType.NONE, @@ -62,7 +62,7 @@ async def test__create_bucket_default_settings(bucket_1): """Should create a bucket with default settings""" settings = await bucket_1.get_settings() assert settings.dict() == { - "max_block_size": 67108864, + "max_block_size": 64000000, "max_block_records": 1024, "quota_size": 0, "quota_type": QuotaType.NONE,