Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Commit

Permalink
Merge pull request #16 from zteeed/add_redis_stream
Browse files Browse the repository at this point in the history
Add redis stream
  • Loading branch information
Aurélien Duboc committed Sep 18, 2019
2 parents 9ced75d + 0dca6c5 commit 77fa3cc
Show file tree
Hide file tree
Showing 25 changed files with 330 additions and 129 deletions.
5 changes: 5 additions & 0 deletions api/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,8 @@
VERSION = 'v2'
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_STREAM_USERS = 'update_users'
REDIS_STREAM_CHALLENGES = 'update_challenges'
CONSUMER_GROUP_NAME = 'rootme'
UPDATE_TIMEOUT = 60
REQUEST_TIMEOUT = 3
57 changes: 39 additions & 18 deletions api/api/handlers.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import json

from tornado.web import RequestHandler

from api.constants import VERSION, AUTHORS, GITHUB_ACCOUNTS
from api.routes import routes
from src.fetch import read_from_redis_key


class RedirectHandler(RequestHandler):

def initialize(self, url):
def initialize(self, url: str):
self.url = url

def get(self):
Expand All @@ -24,43 +26,62 @@ def get(self):

class RootMeStaticHandler(RequestHandler):

def initialize(self, key):
def initialize(self, key: str):
self.key = key

async def get(self):
"""Construct and send a JSON response with appropriate status code."""
data = await self.application.redis.get(self.key)
data = dict(body=json.loads(data))
self.write(data)
data = await read_from_redis_key(self.application.redis, self.key, None, handler_type='static')
if data is None:
self.write_error(status_code=404)
else:
self.write(json.loads(data))


class RootMeDynamicCategoryHandler(RequestHandler):

def initialize(self, key: str):
self.key = key

async def get(self, url_argument):
"""Construct and send a JSON response with appropriate status code."""
key = self.key.format(url_argument)
data = await read_from_redis_key(self.application.redis, key, url_argument, handler_type='dynamic_category')

if data is None:
self.write_error(status_code=404)
else:
self.write(json.loads(data))


class RootMeDynamicHandler(RequestHandler):
class RootMeDynamicUserHandler(RequestHandler):

def initialize(self, key):
def initialize(self, key: str):
self.key = key

async def get(self, url_argument):
"""Construct and send a JSON response with appropriate status code."""
data = await self.application.redis.get(self.key.format(url_argument))
key = self.key.format(url_argument)
data = await read_from_redis_key(self.application.redis, key, url_argument, handler_type='dynamic_user')

if data is None:
self.write_error(status_code=404)
else:
data = dict(body=json.loads(data))
self.write(data)
self.write(json.loads(data))


pattern = '([\\w-]+)'
handlers = [
('/', RedirectHandler, {'url': f'/{VERSION}'}),
(f'/{VERSION}', InfoHandler),
(f'/{VERSION}/categories', RootMeStaticHandler, {'key': 'categories'}),
(f'/{VERSION}/category/{pattern}', RootMeDynamicHandler, {'key': 'categories.{}'}),
(f'/{VERSION}/category/{pattern}', RootMeDynamicCategoryHandler, {'key': 'categories.{}'}),
(f'/{VERSION}/challenges', RootMeStaticHandler, {'key': 'challenges'}),
(f'/{VERSION}/{pattern}/profile', RootMeDynamicHandler, {'key': '{}.profile'}),
(f'/{VERSION}/{pattern}/contributions', RootMeDynamicHandler, {'key': '{}.contributions'}),
(f'/{VERSION}/{pattern}/contributions/challenges', RootMeDynamicHandler, {'key': '{}.contributions.challenges'}),
(f'/{VERSION}/{pattern}/contributions/solutions', RootMeDynamicHandler, {'key': '{}.contributions.solutions'}),
(f'/{VERSION}/{pattern}/details', RootMeDynamicHandler, {'key': '{}.details'}),
(f'/{VERSION}/{pattern}/ctf', RootMeDynamicHandler, {'key': '{}.ctfs'}),
(f'/{VERSION}/{pattern}/stats', RootMeDynamicHandler, {'key': '{}.stats'})
(f'/{VERSION}/{pattern}/profile', RootMeDynamicUserHandler, {'key': '{}.profile'}),
(f'/{VERSION}/{pattern}/contributions', RootMeDynamicUserHandler, {'key': '{}.contributions'}),
(f'/{VERSION}/{pattern}/contributions/challenges', RootMeDynamicUserHandler, {'key': '{}.contributions.challenges'}),
(f'/{VERSION}/{pattern}/contributions/solutions', RootMeDynamicUserHandler, {'key': '{}.contributions.solutions'}),
(f'/{VERSION}/{pattern}/details', RootMeDynamicUserHandler, {'key': '{}.details'}),
(f'/{VERSION}/{pattern}/ctf', RootMeDynamicUserHandler, {'key': '{}.ctfs'}),
(f'/{VERSION}/{pattern}/stats', RootMeDynamicUserHandler, {'key': '{}.stats'})
]
21 changes: 21 additions & 0 deletions api/init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import aioredis

from api.constants import REDIS_STREAM_USERS, REDIS_STREAM_CHALLENGES, CONSUMER_GROUP_NAME


async def add_stream_to_consumer_group(redis_app: aioredis.Redis, stream: str, group_name: str, latest_id: str = '$',
mkstream: bool = False):
# aioredis==1.2.0 install via pip does not support mkstream option on xgroup_create (see github repository)
args = [b'CREATE', stream, group_name, latest_id]
if mkstream:
args.append(b'MKSTREAM')
await redis_app.execute(b'XGROUP', *args)


async def create_consumer_group(redis_app: aioredis.Redis):
for stream in [REDIS_STREAM_CHALLENGES, REDIS_STREAM_USERS]:
try:
await add_stream_to_consumer_group(redis_app, stream, CONSUMER_GROUP_NAME, mkstream=True)
except aioredis.errors.ReplyError as exception:
if 'BUSYGROUP Consumer Group name already exists' != str(exception):
raise exception
2 changes: 2 additions & 0 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from api.constants import REDIS_HOST, REDIS_PORT
from api.handlers import handlers
from init import create_consumer_group

if __name__ == '__main__':
define('port', default=3000, help='port to listen on')
Expand All @@ -21,5 +22,6 @@
application.redis = loop.run_until_complete(
aioredis.create_redis_pool((REDIS_HOST, REDIS_PORT), loop=loop)
)
asyncio.get_event_loop().run_until_complete(create_consumer_group(application.redis))
loop.run_forever()
IOLoop.current().start()
63 changes: 63 additions & 0 deletions api/src/fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import json
from datetime import datetime
from typing import Optional
from aioredis.commands import Redis

from api.constants import REDIS_STREAM_CHALLENGES, REDIS_STREAM_USERS, REQUEST_TIMEOUT, UPDATE_TIMEOUT


def get_timeout(handler_type: str) -> int:
if handler_type == 'dynamic_user':
return UPDATE_TIMEOUT
else:
return 10 * UPDATE_TIMEOUT


def extract_timestamp_last_update(data: str) -> Optional[datetime]:
if data is None:
return
data = json.loads(data)
if 'last_update' not in data.keys():
return
return datetime.fromisoformat(data['last_update'])


async def send_tasks_to_worker(redis_app: Redis, arg: Optional[str], now: datetime, timestamp: Optional[datetime],
timeout: int, handler_type: str) -> None:
# updates conditions
condition = timestamp is None or (now - timestamp).total_seconds() > timeout

# make updates (send tasks to worker)
if handler_type == 'static' and condition:
await redis_app.xadd(REDIS_STREAM_CHALLENGES, {b'update': b"ok"})
elif handler_type == 'dynamic_user' and arg is not None and condition:
await redis_app.xadd(REDIS_STREAM_USERS, {b'username': arg.encode()})
elif handler_type == 'dynamic_categories' and arg is not None and condition:
await redis_app.xadd(REDIS_STREAM_CHALLENGES, {b'update': b"ok"})


def need_waiting_for_update(now: datetime, timestamp: Optional[datetime], timeout: int) -> bool:
return timestamp is None or (now - timestamp).total_seconds() > 2*timeout


async def force_update(redis_app: Redis, key: str, now: datetime, timestamp: Optional[datetime], timeout: int) -> None:
condition = timestamp is None or (now - timestamp).total_seconds() > timeout
while condition and abs(now-datetime.now()).total_seconds() < REQUEST_TIMEOUT:
data = await redis_app.get(f'{key}')
timestamp = extract_timestamp_last_update(data)
condition = timestamp is None or (now - timestamp).total_seconds() > timeout
return


async def read_from_redis_key(redis_app: Redis, key: str, arg: Optional[str], handler_type: str = 'static'):
data = await redis_app.get(f'{key}')
now = datetime.now()
timestamp = extract_timestamp_last_update(data)
timeout = get_timeout(handler_type)

await send_tasks_to_worker(redis_app, arg, now, timestamp, timeout, handler_type)
response = need_waiting_for_update(now, timestamp, timeout)
if response:
await force_update(redis_app, key, now, timestamp, timeout)

return await redis_app.get(f'{key}')
45 changes: 35 additions & 10 deletions worker/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
import asyncio

import aioredis
from typing import List, Tuple
from collections import OrderedDict

from worker import app
from worker.constants import CG_NAME, CONSUMER_NAME, REDIS_STREAM_USERS, REDIS_STREAM_CHALLENGES
from worker.redis_interface.challenges import set_all_challenges
from worker.redis_interface.contributions import set_user_contributions
from worker.redis_interface.ctf import set_user_ctf
Expand All @@ -6,16 +14,33 @@
from worker.redis_interface.stats import set_user_stats


def main():
users = ['zTeeed-115405']
set_all_challenges()
for username in users:
set_user_profile(username)
set_user_contributions(username)
set_user_details(username)
set_user_ctf(username)
set_user_stats(username)
async def use_stream_item(stream_item: List[Tuple[bytes, bytes, OrderedDict]]) -> None:
for item in stream_item:
(stream_name, message_id, ordered_dict) = item
stream_name = stream_name.decode()

if stream_name == REDIS_STREAM_USERS:
username = ordered_dict[b'username'].decode()
await set_user_profile(username)
await set_user_contributions(username)
await set_user_details(username)
await set_user_ctf(username)
await set_user_stats(username)

if stream_name == REDIS_STREAM_CHALLENGES:
await set_all_challenges()

await app.redis.xack(stream_name, CG_NAME, message_id)


async def main() -> None:
app.redis = await aioredis.create_redis_pool(('localhost', 6379))
streams = [REDIS_STREAM_USERS, REDIS_STREAM_CHALLENGES]
while True:
item = await app.redis.xread_group(CG_NAME, CONSUMER_NAME, streams, count=1, latest_ids=['>'] * len(streams))
if len(item) > 0:
await use_stream_item(item)


if __name__ == '__main__':
main()
asyncio.run(main())
3 changes: 2 additions & 1 deletion worker/requirements.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
redis
aioredis
hiredis
requests
lxml
structlog
Expand Down
4 changes: 3 additions & 1 deletion worker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
#
# pip-compile
#
aioredis==1.2.0
async-timeout==3.0.1 # via aioredis
atomicwrites==1.3.0 # via pytest
attrs==19.1.0 # via packaging, pytest
certifi==2019.6.16 # via requests
chardet==3.0.4 # via requests
hiredis==1.0.0
idna==2.8 # via requests
importlib-metadata==0.19 # via pluggy, pytest
lxml==4.4.0
Expand All @@ -17,7 +20,6 @@ pluggy==0.12.0 # via pytest
py==1.8.0 # via pytest
pyparsing==2.4.2 # via packaging
pytest==5.0.1
redis==3.3.4
requests==2.22.0
six==1.12.0 # via packaging, structlog
structlog==19.1.0
Expand Down
8 changes: 8 additions & 0 deletions worker/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
from structlog import get_logger


class App:

def __init__(self):
self.redis = None


app = App()
log = get_logger()
6 changes: 4 additions & 2 deletions worker/worker/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
URL = 'https://www.root-me.org/'
AUTHORS = ["Aurélien Duboc", "Nicolas Bonnet"]
GITHUB_ACCOUNTS = ["https://github.com/zteeed", "https://github.com/bonnetn"]
REDIS_STREAM_USERS = 'update_users'
REDIS_STREAM_CHALLENGES = 'update_challenges'
CG_NAME = 'rootme'
CONSUMER_NAME = 'worker1'
17 changes: 15 additions & 2 deletions worker/worker/http_client.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
from typing import Optional

from requests import Session
from requests.adapters import HTTPAdapter
from urllib3 import Retry

from worker import log
from worker.redis_interface import session

session = Session()
retry = Retry(
total=10,
backoff_factor=1,
status_forcelist=[429],
)
adapter = HTTPAdapter(max_retries=retry, pool_maxsize=100, pool_block=True)
session.mount('http://', adapter)
session.mount('https://', adapter)


class HTTPBadStatusCodeError(RuntimeError):
def __init__(self, code: int):
super().__init__(f'bad http status code {code}')


def http_get(url: str) -> Optional[str]:
def http_get(url: str) -> Optional[bytes]:
"""
Retrieves the HTML from a page via HTTP(s).
Expand Down
Loading

0 comments on commit 77fa3cc

Please sign in to comment.