Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Add redis stream #16

Merged
merged 33 commits into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2e2350a
Add timeloop to worker requirements
Aug 11, 2019
2b16d69
Add a timeloop joob to fetch challenges
Aug 11, 2019
621a557
Add timeloop start to asyncio loop
Aug 11, 2019
c11dd63
Use session requests in http_client script
Aug 11, 2019
ccc309f
Use aioredis instead of redis
Aug 11, 2019
dfcbfcf
Add hiredis requirement
Aug 11, 2019
ffb7ce3
Manage redis streams for data updates
Aug 11, 2019
cf3c955
Update constants vars
Aug 11, 2019
6a7cc38
Remove timeloop requirement
Aug 11, 2019
c8183a6
Manage NoneType data in RootMeStaticHandler
Sep 15, 2019
b9b9da6
Use an external function to fetch data
Sep 15, 2019
8db3939
Create consumer groups when api starts
Sep 15, 2019
ba75cd2
Add tasks to update data to streams for worker
Sep 16, 2019
5116b0b
Use redis consumer groups in worker
Sep 16, 2019
0b90931
Add timestamp data for every task
Sep 16, 2019
fba38a8
Manage case when user has a null score
Sep 16, 2019
22b499a
Split functions updating data and timestamp for every tasks
Sep 16, 2019
6047a19
Send tasks to worker after fetching data if needed
Sep 16, 2019
6e2dd20
Rename consumer group var (more explicit)
Sep 16, 2019
b2d4a50
Rename functions for redis consumer group and streams
Sep 16, 2019
4b68d47
Simplify if statement on exception for consumer group creation
Sep 16, 2019
689a581
Check redis timestamp data format
Sep 16, 2019
64eeca9
Use class from app object
Sep 16, 2019
005cd70
Use run method from asyncio
Sep 17, 2019
5a513d9
Integrate timestamp info to redis keys containing data
Sep 17, 2019
2af4f96
Unpack list before chaining with itertools
Sep 17, 2019
3689190
Use typing lib on functions
Sep 17, 2019
3de4c48
Rename function fetching data from redis
Sep 18, 2019
5bb71a6
Refactor function fetching data from redis using new data structure f…
Sep 18, 2019
3147511
Add argument types to functions
Sep 18, 2019
3fedeed
Split dynamic handlers for categories and users data
Sep 18, 2019
ba8ae70
Split function fetching data and sending tasks to worker
Sep 18, 2019
0dca6c5
Use datetime iso format instead of str method
Sep 18, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,8 @@
VERSION = 'v2'
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_STREAM_USERS = 'update_users'
REDIS_STREAM_CHALLENGES = 'update_challenges'
CONSUMER_GROUP_NAME = 'rootme'
UPDATE_TIMEOUT = 60
REQUEST_TIMEOUT = 3
57 changes: 39 additions & 18 deletions api/api/handlers.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import json

from tornado.web import RequestHandler

from api.constants import VERSION, AUTHORS, GITHUB_ACCOUNTS
from api.routes import routes
from src.fetch import read_from_redis_key


class RedirectHandler(RequestHandler):
bonnetn marked this conversation as resolved.
Show resolved Hide resolved

def initialize(self, url):
def initialize(self, url: str):
self.url = url

def get(self):
Expand All @@ -24,43 +26,62 @@ def get(self):

class RootMeStaticHandler(RequestHandler):

def initialize(self, key):
def initialize(self, key: str):
self.key = key

async def get(self):
"""Construct and send a JSON response with appropriate status code."""
data = await self.application.redis.get(self.key)
data = dict(body=json.loads(data))
self.write(data)
data = await read_from_redis_key(self.application.redis, self.key, None, handler_type='static')
if data is None:
self.write_error(status_code=404)
else:
self.write(json.loads(data))


class RootMeDynamicCategoryHandler(RequestHandler):

def initialize(self, key: str):
self.key = key

async def get(self, url_argument):
"""Construct and send a JSON response with appropriate status code."""
key = self.key.format(url_argument)
data = await read_from_redis_key(self.application.redis, key, url_argument, handler_type='dynamic_category')

if data is None:
self.write_error(status_code=404)
else:
self.write(json.loads(data))


class RootMeDynamicHandler(RequestHandler):
class RootMeDynamicUserHandler(RequestHandler):

def initialize(self, key):
def initialize(self, key: str):
self.key = key

async def get(self, url_argument):
"""Construct and send a JSON response with appropriate status code."""
data = await self.application.redis.get(self.key.format(url_argument))
key = self.key.format(url_argument)
data = await read_from_redis_key(self.application.redis, key, url_argument, handler_type='dynamic_user')

if data is None:
self.write_error(status_code=404)
else:
data = dict(body=json.loads(data))
self.write(data)
self.write(json.loads(data))


pattern = '([\\w-]+)'
handlers = [
('/', RedirectHandler, {'url': f'/{VERSION}'}),
(f'/{VERSION}', InfoHandler),
(f'/{VERSION}/categories', RootMeStaticHandler, {'key': 'categories'}),
(f'/{VERSION}/category/{pattern}', RootMeDynamicHandler, {'key': 'categories.{}'}),
(f'/{VERSION}/category/{pattern}', RootMeDynamicCategoryHandler, {'key': 'categories.{}'}),
(f'/{VERSION}/challenges', RootMeStaticHandler, {'key': 'challenges'}),
(f'/{VERSION}/{pattern}/profile', RootMeDynamicHandler, {'key': '{}.profile'}),
(f'/{VERSION}/{pattern}/contributions', RootMeDynamicHandler, {'key': '{}.contributions'}),
(f'/{VERSION}/{pattern}/contributions/challenges', RootMeDynamicHandler, {'key': '{}.contributions.challenges'}),
(f'/{VERSION}/{pattern}/contributions/solutions', RootMeDynamicHandler, {'key': '{}.contributions.solutions'}),
(f'/{VERSION}/{pattern}/details', RootMeDynamicHandler, {'key': '{}.details'}),
(f'/{VERSION}/{pattern}/ctf', RootMeDynamicHandler, {'key': '{}.ctfs'}),
(f'/{VERSION}/{pattern}/stats', RootMeDynamicHandler, {'key': '{}.stats'})
(f'/{VERSION}/{pattern}/profile', RootMeDynamicUserHandler, {'key': '{}.profile'}),
(f'/{VERSION}/{pattern}/contributions', RootMeDynamicUserHandler, {'key': '{}.contributions'}),
(f'/{VERSION}/{pattern}/contributions/challenges', RootMeDynamicUserHandler, {'key': '{}.contributions.challenges'}),
(f'/{VERSION}/{pattern}/contributions/solutions', RootMeDynamicUserHandler, {'key': '{}.contributions.solutions'}),
(f'/{VERSION}/{pattern}/details', RootMeDynamicUserHandler, {'key': '{}.details'}),
(f'/{VERSION}/{pattern}/ctf', RootMeDynamicUserHandler, {'key': '{}.ctfs'}),
(f'/{VERSION}/{pattern}/stats', RootMeDynamicUserHandler, {'key': '{}.stats'})
]
21 changes: 21 additions & 0 deletions api/init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import aioredis

from api.constants import REDIS_STREAM_USERS, REDIS_STREAM_CHALLENGES, CONSUMER_GROUP_NAME


async def add_stream_to_consumer_group(redis_app: aioredis.Redis, stream: str, group_name: str, latest_id: str = '$',
mkstream: bool = False):
# aioredis==1.2.0 install via pip does not support mkstream option on xgroup_create (see github repository)
args = [b'CREATE', stream, group_name, latest_id]
if mkstream:
args.append(b'MKSTREAM')
await redis_app.execute(b'XGROUP', *args)


async def create_consumer_group(redis_app: aioredis.Redis):
for stream in [REDIS_STREAM_CHALLENGES, REDIS_STREAM_USERS]:
try:
await add_stream_to_consumer_group(redis_app, stream, CONSUMER_GROUP_NAME, mkstream=True)
except aioredis.errors.ReplyError as exception:
if 'BUSYGROUP Consumer Group name already exists' != str(exception):
raise exception
zteeed marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from api.constants import REDIS_HOST, REDIS_PORT
from api.handlers import handlers
from init import create_consumer_group

if __name__ == '__main__':
define('port', default=3000, help='port to listen on')
Expand All @@ -21,5 +22,6 @@
application.redis = loop.run_until_complete(
aioredis.create_redis_pool((REDIS_HOST, REDIS_PORT), loop=loop)
)
asyncio.get_event_loop().run_until_complete(create_consumer_group(application.redis))
loop.run_forever()
IOLoop.current().start()
63 changes: 63 additions & 0 deletions api/src/fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import json
from datetime import datetime
from typing import Optional
from aioredis.commands import Redis

from api.constants import REDIS_STREAM_CHALLENGES, REDIS_STREAM_USERS, REQUEST_TIMEOUT, UPDATE_TIMEOUT


def get_timeout(handler_type: str) -> int:
if handler_type == 'dynamic_user':
return UPDATE_TIMEOUT
else:
return 10 * UPDATE_TIMEOUT


def extract_timestamp_last_update(data: str) -> Optional[datetime]:
if data is None:
return
data = json.loads(data)
if 'last_update' not in data.keys():
return
return datetime.fromisoformat(data['last_update'])


async def send_tasks_to_worker(redis_app: Redis, arg: Optional[str], now: datetime, timestamp: Optional[datetime],
timeout: int, handler_type: str) -> None:
# updates conditions
condition = timestamp is None or (now - timestamp).total_seconds() > timeout

# make updates (send tasks to worker)
if handler_type == 'static' and condition:
await redis_app.xadd(REDIS_STREAM_CHALLENGES, {b'update': b"ok"})
elif handler_type == 'dynamic_user' and arg is not None and condition:
await redis_app.xadd(REDIS_STREAM_USERS, {b'username': arg.encode()})
elif handler_type == 'dynamic_categories' and arg is not None and condition:
await redis_app.xadd(REDIS_STREAM_CHALLENGES, {b'update': b"ok"})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update:ok est ignoré non? pourquoi mettre une valeur ici?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

peu importe oui



def need_waiting_for_update(now: datetime, timestamp: Optional[datetime], timeout: int) -> bool:
return timestamp is None or (now - timestamp).total_seconds() > 2*timeout


async def force_update(redis_app: Redis, key: str, now: datetime, timestamp: Optional[datetime], timeout: int) -> None:
condition = timestamp is None or (now - timestamp).total_seconds() > timeout
while condition and abs(now-datetime.now()).total_seconds() < REQUEST_TIMEOUT:
data = await redis_app.get(f'{key}')
timestamp = extract_timestamp_last_update(data)
condition = timestamp is None or (now - timestamp).total_seconds() > timeout
return


async def read_from_redis_key(redis_app: Redis, key: str, arg: Optional[str], handler_type: str = 'static'):
data = await redis_app.get(f'{key}')
now = datetime.now()
timestamp = extract_timestamp_last_update(data)
timeout = get_timeout(handler_type)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pourquoi tu passes pas le timeout directement en argumetn de cette fonction? C'est la fonction parente qui a le plus de connaissance sur quel timeout mettre


await send_tasks_to_worker(redis_app, arg, now, timestamp, timeout, handler_type)
response = need_waiting_for_update(now, timestamp, timeout)
if response:
await force_update(redis_app, key, now, timestamp, timeout)

return await redis_app.get(f'{key}')
45 changes: 35 additions & 10 deletions worker/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
import asyncio

import aioredis
from typing import List, Tuple
from collections import OrderedDict

from worker import app
from worker.constants import CG_NAME, CONSUMER_NAME, REDIS_STREAM_USERS, REDIS_STREAM_CHALLENGES
from worker.redis_interface.challenges import set_all_challenges
from worker.redis_interface.contributions import set_user_contributions
from worker.redis_interface.ctf import set_user_ctf
Expand All @@ -6,16 +14,33 @@
from worker.redis_interface.stats import set_user_stats


def main():
users = ['zTeeed-115405']
set_all_challenges()
for username in users:
set_user_profile(username)
set_user_contributions(username)
set_user_details(username)
set_user_ctf(username)
set_user_stats(username)
async def use_stream_item(stream_item: List[Tuple[bytes, bytes, OrderedDict]]) -> None:
for item in stream_item:
(stream_name, message_id, ordered_dict) = item
stream_name = stream_name.decode()

if stream_name == REDIS_STREAM_USERS:
username = ordered_dict[b'username'].decode()
await set_user_profile(username)
await set_user_contributions(username)
await set_user_details(username)
await set_user_ctf(username)
await set_user_stats(username)

if stream_name == REDIS_STREAM_CHALLENGES:
await set_all_challenges()

await app.redis.xack(stream_name, CG_NAME, message_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya pas un mode autoack sur aioredis?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

euh aucune idée jvais regarder



async def main() -> None:
app.redis = await aioredis.create_redis_pool(('localhost', 6379))
streams = [REDIS_STREAM_USERS, REDIS_STREAM_CHALLENGES]
while True:
item = await app.redis.xread_group(CG_NAME, CONSUMER_NAME, streams, count=1, latest_ids=['>'] * len(streams))
if len(item) > 0:
await use_stream_item(item)


if __name__ == '__main__':
main()
asyncio.run(main())
3 changes: 2 additions & 1 deletion worker/requirements.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
redis
aioredis
bonnetn marked this conversation as resolved.
Show resolved Hide resolved
hiredis
requests
lxml
structlog
Expand Down
4 changes: 3 additions & 1 deletion worker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
#
# pip-compile
#
aioredis==1.2.0
async-timeout==3.0.1 # via aioredis
atomicwrites==1.3.0 # via pytest
attrs==19.1.0 # via packaging, pytest
certifi==2019.6.16 # via requests
chardet==3.0.4 # via requests
hiredis==1.0.0
idna==2.8 # via requests
importlib-metadata==0.19 # via pluggy, pytest
lxml==4.4.0
Expand All @@ -17,7 +20,6 @@ pluggy==0.12.0 # via pytest
py==1.8.0 # via pytest
pyparsing==2.4.2 # via packaging
pytest==5.0.1
redis==3.3.4
requests==2.22.0
six==1.12.0 # via packaging, structlog
structlog==19.1.0
Expand Down
8 changes: 8 additions & 0 deletions worker/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
from structlog import get_logger


class App:
zteeed marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self):
self.redis = None


app = App()
log = get_logger()
6 changes: 4 additions & 2 deletions worker/worker/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
URL = 'https://www.root-me.org/'
AUTHORS = ["Aurélien Duboc", "Nicolas Bonnet"]
GITHUB_ACCOUNTS = ["https://github.com/zteeed", "https://github.com/bonnetn"]
REDIS_STREAM_USERS = 'update_users'
REDIS_STREAM_CHALLENGES = 'update_challenges'
CG_NAME = 'rootme'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tu peux pas merge api/api/constants.py et celui là? si tu mets les mêmes valeurs..

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bah ca veut dire que les subprojects worker et api sont dépendants, a terme peut etre gérer ça avec des variables d'environnements spécifiés dans les Dockerfile ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oui tu peux, ou laisse comme ça.si tu as une bonne raison c'est pas grave

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

je note ca quelque part ca sera modifié sur la partie mise en prod

CONSUMER_NAME = 'worker1'
17 changes: 15 additions & 2 deletions worker/worker/http_client.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
from typing import Optional

from requests import Session
from requests.adapters import HTTPAdapter
from urllib3 import Retry

from worker import log
from worker.redis_interface import session

session = Session()
retry = Retry(
total=10,
backoff_factor=1,
status_forcelist=[429],
)
adapter = HTTPAdapter(max_retries=retry, pool_maxsize=100, pool_block=True)
session.mount('http://', adapter)
session.mount('https://', adapter)


class HTTPBadStatusCodeError(RuntimeError):
def __init__(self, code: int):
super().__init__(f'bad http status code {code}')


def http_get(url: str) -> Optional[str]:
def http_get(url: str) -> Optional[bytes]:
"""
Retrieves the HTML from a page via HTTP(s).
Expand Down
Loading