Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Commit

Permalink
Split function fetching data and sending tasks to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
zteeed committed Sep 18, 2019
1 parent 6b48a56 commit 668a91a
Showing 1 changed file with 33 additions and 15 deletions.
48 changes: 33 additions & 15 deletions api/src/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
from api.constants import REDIS_STREAM_CHALLENGES, REDIS_STREAM_USERS, REQUEST_TIMEOUT, UPDATE_TIMEOUT


def get_timeout(handler_type: str) -> int:
if handler_type == 'dynamic_user':
return UPDATE_TIMEOUT
else:
return 10 * UPDATE_TIMEOUT


def extract_timestamp_last_update(data: str) -> Optional[datetime]:
if data is None:
return
Expand All @@ -15,31 +22,42 @@ def extract_timestamp_last_update(data: str) -> Optional[datetime]:
return datetime.strptime(data['last_update'], '%Y-%m-%d %H:%M:%S.%f')


async def read_from_redis_key(redis_app: Redis, key: str, arg: Optional[str], handler_type: str = 'static'):
data = await redis_app.get(f'{key}')
timestamp = extract_timestamp_last_update(data)
now = datetime.now()
timeout = 10 * UPDATE_TIMEOUT

async def send_tasks_to_worker(redis_app: Redis, arg: Optional[str], now: datetime, timestamp: Optional[datetime],
timeout: int, handler_type: str) -> None:
# updates conditions
condition_user = timestamp is None or (now - timestamp).total_seconds() > UPDATE_TIMEOUT
condition_challenges = timestamp is None or (now - timestamp).total_seconds() > 10*UPDATE_TIMEOUT
condition = timestamp is None or (now - timestamp).total_seconds() > timeout

# make updates (send tasks to worker)
if handler_type == 'static' and condition_user:
if handler_type == 'static' and condition:
await redis_app.xadd(REDIS_STREAM_CHALLENGES, {b'update': b"ok"})
elif handler_type == 'dynamic_user' and arg is not None and condition_user:
timeout = UPDATE_TIMEOUT
elif handler_type == 'dynamic_user' and arg is not None and condition:
await redis_app.xadd(REDIS_STREAM_USERS, {b'username': arg.encode()})
elif handler_type == 'dynamic_categories' and arg is not None and condition_challenges:
elif handler_type == 'dynamic_categories' and arg is not None and condition:
await redis_app.xadd(REDIS_STREAM_CHALLENGES, {b'update': b"ok"})
else:
return data


def need_waiting_for_update(now: datetime, timestamp: Optional[datetime], timeout: int) -> bool:
return timestamp is None or (now - timestamp).total_seconds() > 2*timeout


async def force_update(redis_app: Redis, key: str, now: datetime, timestamp: Optional[datetime], timeout: int) -> None:
condition = timestamp is None or (now - timestamp).total_seconds() > timeout
while condition and abs(now-datetime.now()).total_seconds() < REQUEST_TIMEOUT:
data = await redis_app.get(f'{key}')
timestamp = extract_timestamp_last_update(data)
condition = timestamp is None or (now - timestamp).total_seconds() > timeout
return


async def read_from_redis_key(redis_app: Redis, key: str, arg: Optional[str], handler_type: str = 'static'):
data = await redis_app.get(f'{key}')
now = datetime.now()
timestamp = extract_timestamp_last_update(data)
timeout = get_timeout(handler_type)

await send_tasks_to_worker(redis_app, arg, now, timestamp, timeout, handler_type)
response = need_waiting_for_update(now, timestamp, timeout)
if response:
await force_update(redis_app, key, now, timestamp, timeout)

return data
return await redis_app.get(f'{key}')

0 comments on commit 668a91a

Please sign in to comment.