Skip to content

Commit

Permalink
[#35] Support HTTP API v0.7 (#36)
Browse files Browse the repository at this point in the history
* remove hash for api token

* update tests for api 0.7

* bump version

* make Bucket.list depricated

* refactor http wrapper

* fix pylint

* query records

* implement Bucket.query

* fix pylint

* update CHANGELOG

* add docs for Bucket.query
  • Loading branch information
atimin committed Aug 15, 2022
1 parent 6ce2bd9 commit 9c318f7
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 35 deletions.
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ repos:
name: black
entry: black
language: python
language_version: python3.8
args:
- .
- --check
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added:

- Support HTTP API v0.7, [PR-36](https://github.com/reduct-storage/reduct-py/pull/36)

## [0.3.0] - 2022-07-02

### Added:
Expand Down
105 changes: 96 additions & 9 deletions pkg/reduct/bucket.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
"""Bucket API"""
import json
from enum import Enum
from typing import Optional, List, Tuple, AsyncIterator, Union
import time

from dataclasses import dataclass
from enum import Enum
from typing import (
Optional,
List,
Tuple,
AsyncIterator,
Union,
Callable,
Awaitable,
)

from deprecation import deprecated
from pydantic import BaseModel

from reduct.http import HttpClient
Expand Down Expand Up @@ -86,6 +96,22 @@ class BucketFullInfo(BaseModel):
"""information about entries of bucket"""


@dataclass
class Record:
"""Record in a query"""

timestamp: int
"""UNIX timestamp in microsecods"""
size: int
"""size of data"""
last: bool
"""last record in the query"""
read_all: Callable[[None], Awaitable[bytes]]
"""read all data"""
read: Callable[[int], AsyncIterator[bytes]]
"""read data in chunks"""


class Bucket:
"""A bucket of data in Reduct Storage"""

Expand All @@ -111,7 +137,7 @@ async def set_settings(self, settings: BucketSettings):
Raises:
ReductError: if there is an HTTP error
"""
await self._http.request("PUT", f"/b/{self.name}", data=settings.json())
await self._http.request_all("PUT", f"/b/{self.name}", data=settings.json())

async def info(self) -> BucketInfo:
"""
Expand Down Expand Up @@ -139,7 +165,7 @@ async def remove(self):
Raises:
ReductError: if there is an HTTP error
"""
await self._http.request("DELETE", f"/b/{self.name}")
await self._http.request_all("DELETE", f"/b/{self.name}")

async def read(self, entry_name: str, timestamp: Optional[int] = None) -> bytes:
"""
Expand Down Expand Up @@ -168,7 +194,8 @@ async def read_by(
>>> print(chunk)
Args:
entry_name: name of entry in the bucket
timestamp: UNIX timestamp in microseconds if None get the latest record
timestamp: UNIX timestamp in microseconds
if None get the latest record
chunk_size:
Returns:
bytes:
Expand Down Expand Up @@ -212,14 +239,17 @@ async def write(
"""
params = {"ts": timestamp if timestamp else time.time_ns() / 1000}

await self._http.request(
await self._http.request_all(
"POST",
f"/b/{self.name}/{entry_name}",
params=params,
data=data,
content_length=content_length if content_length else len(data),
)

@deprecated(
deprecated_in="0.4.0", removed_in="1.0.0", details="Use Bucket.query instead"
)
async def list(
self, entry_name: str, start: int, stop: int
) -> List[Tuple[int, int]]:
Expand All @@ -236,7 +266,7 @@ async def list(
has time stamp (first element) of a record and its size in bytes
"""
params = {"start": start, "stop": stop}
data = await self._http.request(
data = await self._http.request_all(
"GET",
f"/b/{self.name}/{entry_name}/list",
params=params,
Expand All @@ -245,7 +275,64 @@ async def list(
items = [(int(record["ts"]), int(record["size"])) for record in records]
return items

async def query(
self,
entry_name: str,
start: Optional[int] = None,
stop: Optional[int] = None,
ttl: Optional[int] = None,
) -> AsyncIterator[Record]:
"""
Query data for a time interval
>>> async for record in bucket.query("entry-1", stop=time.time_ns() / 1000):
>>> data: bytes = record.read_all()
>>> # or
>>> async for chunk in record.read(n=1024):
>>> print(chunk)
Args:
entry_name: name of entry in the bucket
start: the beginning of the time interval
stop: the end of the time interval
ttl: Time To Live of the request in seconds
Returns:
AsyncIterator[Record]: iterator to the records
"""
params = {}
if start:
params["start"] = start
if stop:
params["stop"] = stop
if ttl:
params["ttl"] = ttl

url = f"/b/{self.name}/{entry_name}"
data = await self._http.request_all(
"GET",
f"{url}/q",
params=params,
)
query_id = json.loads(data)["id"]
last = False
while not last:
async with self._http.request("GET", f"{url}?q={query_id}") as resp:
if resp.status == 202:
return

timestamp = int(resp.headers["x-reduct-time"])
size = int(resp.headers["content-length"])
last = int(resp.headers["x-reduct-last"]) != 0

yield Record(
timestamp=timestamp,
size=size,
last=last,
read_all=resp.read,
read=resp.content.iter_chunked,
)

async def __get_full_info(self) -> BucketFullInfo:
return BucketFullInfo.parse_raw(
await self._http.request("GET", f"/b/{self.name}")
await self._http.request_all("GET", f"/b/{self.name}")
)
10 changes: 6 additions & 4 deletions pkg/reduct/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async def info(self) -> ServerInfo:
Raises:
ReductError: if there is an HTTP error
"""
return ServerInfo.parse_raw(await self._http.request("GET", "/info"))
return ServerInfo.parse_raw(await self._http.request_all("GET", "/info"))

async def list(self) -> List[BucketInfo]:
"""
Expand All @@ -87,7 +87,9 @@ async def list(self) -> List[BucketInfo]:
Raises:
ReductError: if there is an HTTP error
"""
return BucketList.parse_raw(await self._http.request("GET", "/list")).buckets
return BucketList.parse_raw(
await self._http.request_all("GET", "/list")
).buckets

async def get_bucket(self, name: str) -> Bucket:
"""
Expand All @@ -99,7 +101,7 @@ async def get_bucket(self, name: str) -> Bucket:
Raises:
ReductError: if there is an HTTP error
"""
await self._http.request("HEAD", f"/b/{name}")
await self._http.request_all("HEAD", f"/b/{name}")
return Bucket(name, self._http)

async def create_bucket(
Expand All @@ -120,7 +122,7 @@ async def create_bucket(
"""
data = settings.json() if settings else None
try:
await self._http.request("POST", f"/b/{name}", data=data)
await self._http.request_all("POST", f"/b/{name}", data=data)
except ReductError as err:
if err.status_code != 409 or not exist_ok:
raise err
Expand Down
36 changes: 20 additions & 16 deletions pkg/reduct/http.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"""Internal HTTP helper"""
import hashlib
import json
from contextlib import asynccontextmanager
from typing import Optional, AsyncIterator

import aiohttp
from aiohttp import ClientTimeout
from aiohttp import ClientTimeout, ClientResponse

from reduct.error import ReductError

Expand All @@ -20,10 +20,9 @@ def __init__(
self.headers = {}
self.timeout = ClientTimeout(timeout)

async def request_by(
self, method: str, path: str = "", chunk_size=1024, **kwargs
) -> AsyncIterator[bytes]:
"""HTTP request with ReductError exception by chunks"""
@asynccontextmanager
async def request(self, method: str, path: str = "", **kwargs) -> ClientResponse:
"""HTTP request with ReductError exception"""

extra_headers = {}
if "content_length" in kwargs:
Expand All @@ -40,16 +39,14 @@ async def request_by(
) as response:

if response.ok:
async for chunk in response.content.iter_chunked(chunk_size):
yield chunk
return # Success
yield response
break

if response.status == 401:
# Authentication issue, try to refresh token and repeat request
hasher = hashlib.sha256(bytes(self.api_token, "utf-8"))
async with session.post(
f"{self.url}/auth/refresh",
headers={"Authorization": f"Bearer {hasher.hexdigest()}"},
headers={"Authorization": f"Bearer {self.api_token}"},
) as auth_resp:
if auth_resp.status == 200:
data = json.loads(await auth_resp.read())
Expand All @@ -60,9 +57,16 @@ async def request_by(

raise ReductError(response.status, await response.text())

async def request(self, method: str, path: str = "", **kwargs) -> bytes:
async def request_all(self, method: str, path: str = "", **kwargs) -> bytes:
"""Http request"""
async with self.request(method, path, **kwargs) as response:
return await response.read()

async def request_by(
self, method: str, path: str = "", chunk_size=1024, **kwargs
) -> AsyncIterator[bytes]:
"""Http request"""
blob = b""
async for chunk in self.request_by(method, path, chunk_size=1024, **kwargs):
blob += chunk
return blob
async with self.request(method, path, **kwargs) as response:
async for chunk in response.content.iter_chunked(chunk_size):
yield chunk
return
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

PACKAGE_NAME = "reduct-py"
MAJOR_VERSION = 0
MINOR_VERSION = 3
MINOR_VERSION = 4
PATCH_VERSION = 0

HERE = Path(__file__).parent.resolve()
Expand Down Expand Up @@ -48,7 +48,7 @@ def get_long_description(base_path: Path):
package_data={"": ["VERSION"]},
packages=find_packages(where="pkg"),
python_requires=">=3.8",
install_requires=["aiohttp~=3.8", "pydantic~=1.9"],
install_requires=["aiohttp~=3.8", "pydantic~=1.9", "deprecation~=2.1"],
extras_require={
"test": ["pytest~=7.1", "pytest-asyncio~=0.18"],
"lint": [
Expand Down
63 changes: 63 additions & 0 deletions tests/bucket_test.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Tests for Bucket"""
import time
from typing import List

import pytest

from reduct import ReductError, BucketSettings, QuotaType
from reduct.bucket import Record


@pytest.mark.asyncio
Expand Down Expand Up @@ -135,3 +137,64 @@ async def test__list(bucket_1):
"""Should get list of records for time interval"""
records = await bucket_1.list("entry-2", start=0, stop=5_000_000)
assert records == [(3000000, 11), (4000000, 11)]


@pytest.mark.asyncio
async def test_query_records(bucket_1):
"""Should query records for a time interval"""
records: List[Record] = [
record
async for record in bucket_1.query("entry-2", start=0, stop=5_000_000, ttl=5)
]
assert len(records) == 2

assert records[0].timestamp == 3000000
assert records[0].size == 11
assert not records[0].last

assert records[1].timestamp == 4000000
assert records[1].size == 11
assert records[1].last


@pytest.mark.asyncio
async def test_query_records_first(bucket_1):
"""Should query records for from first record"""

records: List[Record] = [
record async for record in bucket_1.query("entry-2", stop=4_000_000)
]
assert len(records) == 1
assert records[0].timestamp == 3_000_000


@pytest.mark.asyncio
async def test_query_records_last(bucket_1):
"""Should query records for until last record"""
records: List[Record] = [
record async for record in bucket_1.query("entry-2", start=4_000_000)
]
assert len(records) == 1
assert records[0].timestamp == 4_000_000


@pytest.mark.asyncio
async def test_query_records_all(bucket_1):
"""Should query records all data"""
records = [record async for record in bucket_1.query("entry-2")]
assert len(records) == 2


@pytest.mark.asyncio
async def test_read_record(bucket_1):
"""Should provide records with read method"""
data = [await record.read_all() async for record in bucket_1.query("entry-2")]
assert data == [b"some-data-3", b"some-data-4"]

data = []

async for record in bucket_1.query("entry-2"):
async for chunk in record.read(n=4):
data.append(chunk)

assert data == [b"some", b"-dat", b"a-3", b"some", b"-dat", b"a-4"]
Loading

0 comments on commit 9c318f7

Please sign in to comment.