Skip to content

Commit

Permalink
[resotocore][fix] Allow only one timeseries creation at a time (#1906)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Feb 9, 2024
1 parent 42b7665 commit 9036f6d
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 25 deletions.
73 changes: 48 additions & 25 deletions resotocore/resotocore/db/timeseriesdb.py
@@ -1,17 +1,20 @@
import asyncio
import logging
import time
from collections import defaultdict
from contextlib import asynccontextmanager
from datetime import timedelta, datetime, timezone
from functools import partial
from numbers import Number
from typing import Optional, List, Set, Union, cast, Callable, Dict
from typing import Optional, List, Set, Union, cast, Callable, Dict, AsyncIterator

from attr import evolve, define
from resotocore.core_config import CoreConfig
from resotocore.db import arango_query
from resotocore.db.async_arangodb import AsyncArangoDB, AsyncCursor, AsyncArangoTransactionDB
from resotocore.db.graphdb import GraphDB
from resotocore.db.model import QueryModel
from resotocore.error import ConflictingChangeInProgress
from resotocore.query.model import Predicate
from resotocore.types import Json, JsonElement
from resotocore.util import utc_str, utc, if_set, parse_utc
Expand Down Expand Up @@ -86,17 +89,19 @@ async def add_entries(self, name: str, query_model: QueryModel, graph_db: GraphD
)
at = at if at is not None else int(time.time()) # only use seconds
qs, bv = arango_query.create_time_series(QueryModel(query, model), graph_db, self.collection_name, name, at)
result, *_ = cast(List[int], await self.__execute_aql(qs, bv))
if result > 0:
# update meta information
await self.__execute_aql(
"UPSERT { _key: @key } "
"INSERT { _key: @key, created_at: DATE_NOW(), last_updated: DATE_NOW(), count: @count } "
f"UPDATE {{ last_updated: DATE_NOW(), count: @count }} IN `{self.names_db}`",
dict(key=name, count=result),
)

return result
async with self._lock() as locked:
if not locked:
raise ConflictingChangeInProgress("Another update is already in progress.")
result, *_ = cast(List[int], await self.__execute_aql(qs, bv))
if result > 0:
# update meta information
await self.__execute_aql(
"UPSERT { _key: @key } "
"INSERT { _key: @key, created_at: DATE_NOW(), last_updated: DATE_NOW(), count: @count } "
f"UPDATE {{ last_updated: DATE_NOW(), count: @count }} IN `{self.names_db}`",
dict(key=name, count=result),
)
return result

async def load_time_series(
self,
Expand Down Expand Up @@ -164,17 +169,10 @@ def ts_format(ts: str, js: Json) -> Json:
last_run = if_set(await self.db.get(self.meta_db, last_run_name), lambda d: parse_utc(d[last_run_name]), oldest)
if (now - last_run) < self.smallest_resolution:
return "No changes since last downsample run"
# acquire a lock to ensure exclusive access
try:
ttl = int((now + timedelta(minutes=15)).timestamp())
await self.db.insert(self.meta_db, dict(_key="lock", expires=ttl), sync=True)
except Exception:
return "Another downsample run is already in progress."
# If we come here, the lock is acquired: exclusive access.
# We only touch time series that are older than the minimal resolution (>1h).
# So we never interfere with snapshots that are eventually created concurrently.
result: Json = defaultdict(list)
try:
async with self._lock() as locked:
if not locked:
return "Another downsample run is already in progress."
result: Json = defaultdict(list)
for ts in await self.list_time_series():
dst = ts.downsample_times.copy()
for bucket in self.buckets:
Expand Down Expand Up @@ -229,8 +227,6 @@ def ts_format(ts: str, js: Json) -> Json:
)
# update last run
await self.db.insert(self.meta_db, {"_key": last_run_name, last_run_name: utc_str(now)}, overwrite=True)
finally:
await self.db.delete(self.meta_db, "lock", ignore_missing=True)
return result

async def create_update_schema(self) -> None:
Expand Down Expand Up @@ -260,6 +256,33 @@ async def wipe(self) -> bool:
and await self.db.truncate(self.meta_db)
)

@asynccontextmanager
async def _lock(
self,
get_lock: timedelta = timedelta(seconds=30), # how long to try to get the lock
lock_for: timedelta = timedelta(minutes=10), # acquired: how long to hold the lock max
retry_interval: timedelta = timedelta(seconds=1), # how long to wait between retries
) -> AsyncIterator[bool]:
now = utc()
ttl = int((now + lock_for).timestamp())
deadline = now + get_lock
flag = True
while flag and utc() < deadline:
try:
# try to insert a document with key __lock__
await self.db.insert(self.meta_db, dict(_key="__lock__", expires=ttl), sync=True)
except Exception:
# could not insert the document. Wait and try again
await asyncio.sleep(retry_interval.total_seconds())
continue
flag = False
try:
yield True
finally:
await self.db.delete(self.meta_db, "__lock__", ignore_missing=True)
if flag:
yield False

def _buckets(self) -> List[TimeSeriesBucket]:
result: List[TimeSeriesBucket] = []
if bs := self.config.timeseries.buckets:
Expand Down
27 changes: 27 additions & 0 deletions resotocore/tests/resotocore/db/timeseriesdb_test.py
@@ -1,3 +1,4 @@
import asyncio
from datetime import datetime, timedelta, timezone
from typing import Any, List

Expand Down Expand Up @@ -125,3 +126,29 @@ async def create_ts(before_now: timedelta, granularity: timedelta, number_of_ent
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 140
assert await timeseries_db.downsample(now=now + timedelta(days=400)) == {}
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 140


async def test_acquire_lock(timeseries_db: TimeSeriesDB) -> None:
flag = False

async def with_lock(num: int) -> None:
nonlocal flag
async with timeseries_db._lock(
get_lock=timedelta(seconds=1), retry_interval=timedelta(microseconds=1)
) as locked:
assert locked, f"{num} not locked!"
assert flag == False
flag = True
await asyncio.sleep(0.01)
flag = False

# run a couple of tasks in parallel
await asyncio.gather(*(with_lock(n) for n in range(10)))

# counter example: getting the lock should fail in case it was acquired
timeseries_db.db.collection(timeseries_db.meta_db).insert({"_key": "__lock__"})
async with timeseries_db._lock(
get_lock=timedelta(milliseconds=5), retry_interval=timedelta(microseconds=1)
) as locked:
assert not locked, "Was able to get the lock!"
timeseries_db.db.collection(timeseries_db.meta_db).delete({"_key": "__lock__"})

0 comments on commit 9036f6d

Please sign in to comment.