Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Add redis stream #16

Merged
merged 33 commits into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2e2350a
Add timeloop to worker requirements
Aug 11, 2019
2b16d69
Add a timeloop joob to fetch challenges
Aug 11, 2019
621a557
Add timeloop start to asyncio loop
Aug 11, 2019
c11dd63
Use session requests in http_client script
Aug 11, 2019
ccc309f
Use aioredis instead of redis
Aug 11, 2019
dfcbfcf
Add hiredis requirement
Aug 11, 2019
ffb7ce3
Manage redis streams for data updates
Aug 11, 2019
cf3c955
Update constants vars
Aug 11, 2019
6a7cc38
Remove timeloop requirement
Aug 11, 2019
c8183a6
Manage NoneType data in RootMeStaticHandler
Sep 15, 2019
b9b9da6
Use an external function to fetch data
Sep 15, 2019
8db3939
Create consumer groups when api starts
Sep 15, 2019
ba75cd2
Add tasks to update data to streams for worker
Sep 16, 2019
5116b0b
Use redis consumer groups in worker
Sep 16, 2019
0b90931
Add timestamp data for every task
Sep 16, 2019
fba38a8
Manage case when user has a null score
Sep 16, 2019
22b499a
Split functions updating data and timestamp for every tasks
Sep 16, 2019
6047a19
Send tasks to worker after fetching data if needed
Sep 16, 2019
6e2dd20
Rename consumer group var (more explicit)
Sep 16, 2019
b2d4a50
Rename functions for redis consumer group and streams
Sep 16, 2019
4b68d47
Simplify if statement on exception for consumer group creation
Sep 16, 2019
689a581
Check redis timestamp data format
Sep 16, 2019
64eeca9
Use class from app object
Sep 16, 2019
005cd70
Use run method from asyncio
Sep 17, 2019
5a513d9
Integrate timestamp info to redis keys containing data
Sep 17, 2019
2af4f96
Unpack list before chaining with itertools
Sep 17, 2019
3689190
Use typing lib on functions
Sep 17, 2019
3de4c48
Rename function fetching data from redis
Sep 18, 2019
5bb71a6
Refactor function fetching data from redis using new data structure f…
Sep 18, 2019
3147511
Add argument types to functions
Sep 18, 2019
3fedeed
Split dynamic handlers for categories and users data
Sep 18, 2019
ba8ae70
Split function fetching data and sending tasks to worker
Sep 18, 2019
0dca6c5
Use datetime iso format instead of str method
Sep 18, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 45 additions & 9 deletions worker/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import asyncio

import aioredis

from worker import app
from worker.constants import REDIS_STREAM_USERS, REDIS_STREAM_CHALLENGES
from worker.redis_interface.challenges import set_all_challenges
from worker.redis_interface.contributions import set_user_contributions
from worker.redis_interface.ctf import set_user_ctf
Expand All @@ -6,16 +12,46 @@
from worker.redis_interface.stats import set_user_stats


def main():
users = ['zTeeed-115405']
async def use_users_stream_item(stream_item):
(stream_name, message_id, ordered_dict) = stream_item[0]
username = ordered_dict[b'username'].decode()

# AttributeError: 'Redis' object has no attribute 'xdel'
response = await app.redis.execute('XDEL', REDIS_STREAM_USERS, message_id)
zteeed marked this conversation as resolved.
Show resolved Hide resolved
if response != 1: # an other worker already took this username for update
return

await set_user_profile(username)
await set_user_contributions(username)
await set_user_details(username)
await set_user_ctf(username)
await set_user_stats(username)


async def use_challenges_stream_item(challenges_stream_item):
(stream_name, message_id, ordered_dict) = challenges_stream_item[0]

# AttributeError: 'Redis' object has no attribute 'xdel'
response = await app.redis.execute('XDEL', REDIS_STREAM_CHALLENGES, message_id)
if response != 1: # an other worker already took this username for update
return

set_all_challenges()
for username in users:
set_user_profile(username)
set_user_contributions(username)
set_user_details(username)
set_user_ctf(username)
set_user_stats(username)


async def main():
while True:
users_stream_item = await app.redis.xread([REDIS_STREAM_USERS], count=1, timeout=1, latest_ids=[0])
zteeed marked this conversation as resolved.
Show resolved Hide resolved
if not users_stream_item:
challenges_stream_item = await app.redis.xread([REDIS_STREAM_CHALLENGES], count=1, timeout=1, latest_ids=[0])
if challenges_stream_item:
await use_challenges_stream_item(challenges_stream_item)
else:
await use_users_stream_item(users_stream_item)


if __name__ == '__main__':
main()
loop = asyncio.get_event_loop()
app.redis = loop.run_until_complete(aioredis.create_redis_pool(('localhost', 6379), loop=loop))
zteeed marked this conversation as resolved.
Show resolved Hide resolved
loop.run_until_complete(main())
bonnetn marked this conversation as resolved.
Show resolved Hide resolved
loop.run_forever()
4 changes: 3 additions & 1 deletion worker/requirements.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
redis
aioredis
bonnetn marked this conversation as resolved.
Show resolved Hide resolved
hiredis
requests
lxml
structlog
pytest
timeloop
bonnetn marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 4 additions & 1 deletion worker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
#
# pip-compile
#
aioredis==1.2.0
async-timeout==3.0.1 # via aioredis
atomicwrites==1.3.0 # via pytest
attrs==19.1.0 # via packaging, pytest
certifi==2019.6.16 # via requests
chardet==3.0.4 # via requests
hiredis==1.0.0
idna==2.8 # via requests
importlib-metadata==0.19 # via pluggy, pytest
lxml==4.4.0
Expand All @@ -17,10 +20,10 @@ pluggy==0.12.0 # via pytest
py==1.8.0 # via pytest
pyparsing==2.4.2 # via packaging
pytest==5.0.1
redis==3.3.4
requests==2.22.0
six==1.12.0 # via packaging, structlog
structlog==19.1.0
timeloop==1.0.2
urllib3==1.25.3 # via requests
wcwidth==0.1.7 # via pytest
zipp==0.5.2 # via importlib-metadata
1 change: 1 addition & 0 deletions worker/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from structlog import get_logger

app = lambda: None
zteeed marked this conversation as resolved.
Show resolved Hide resolved
log = get_logger()
4 changes: 2 additions & 2 deletions worker/worker/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
URL = 'https://www.root-me.org/'
AUTHORS = ["Aurélien Duboc", "Nicolas Bonnet"]
GITHUB_ACCOUNTS = ["https://github.com/zteeed", "https://github.com/bonnetn"]
REDIS_STREAM_USERS = 'update_users'
REDIS_STREAM_CHALLENGES = 'update_challenges'
15 changes: 14 additions & 1 deletion worker/worker/http_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
from typing import Optional

from requests import Session
from requests.adapters import HTTPAdapter
from urllib3 import Retry

from worker import log
from worker.redis_interface import session

session = Session()
retry = Retry(
total=10,
backoff_factor=1,
status_forcelist=[429],
)
adapter = HTTPAdapter(max_retries=retry, pool_maxsize=100, pool_block=True)
session.mount('http://', adapter)
session.mount('https://', adapter)


class HTTPBadStatusCodeError(RuntimeError):
Expand Down
16 changes: 0 additions & 16 deletions worker/worker/redis_interface/__init__.py

This file was deleted.

11 changes: 5 additions & 6 deletions worker/worker/redis_interface/challenges.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import json
from multiprocessing.pool import ThreadPool

from worker import log
from worker import app, log
from worker.constants import URL
from worker.http_client import http_get
from worker.parser.category import extract_categories, extract_category_info
from worker.redis_interface import redis_app


def retrieve_category_info(category):
Expand All @@ -18,7 +17,7 @@ def retrieve_category_info(category):
return extract_category_info(html, category)


def set_all_challenges():
async def set_all_challenges():
html = http_get(URL + 'fr/Challenges/')
if html is None:
log.error('challenges_page_not_found')
Expand All @@ -30,9 +29,9 @@ def set_all_challenges():
with ThreadPool(len(categories)) as tp:
response = tp.map(retrieve_category_info, categories)

redis_app.set('challenges', json.dumps(response))
redis_app.set('categories', json.dumps(categories))
await app.redis.set('challenges', json.dumps(response))
await app.redis.set('categories', json.dumps(categories))
bonnetn marked this conversation as resolved.
Show resolved Hide resolved
for category_data in response:
redis_app.set(f'categories.{category_data[0]["name"]}', json.dumps(category_data))
await app.redis.set(f'categories.{category_data[0]["name"]}', json.dumps(category_data))

log.debug('set_all_challenges_success')
11 changes: 5 additions & 6 deletions worker/worker/redis_interface/contributions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
from functools import partial
from multiprocessing.pool import ThreadPool

from worker import log
from worker import app, log
from worker.constants import URL
from worker.http_client import http_get
from worker.parser.contributions import extract_challenges_contributions, extract_solutions_contributions, \
extract_contributions_page_numbers
from worker.redis_interface import redis_app


def get_challenge_contributions(username, page_index):
Expand Down Expand Up @@ -55,7 +54,7 @@ def format_contributions_solutions(username, nb_solutions_pages):
return solutions_contributions


def set_user_contributions(username):
async def set_user_contributions(username):
html = http_get(URL + username + '?inc=contributions')
if html is None:
log.warning('could_not_get_user_contributions', username=username)
Expand All @@ -75,7 +74,7 @@ def set_user_contributions(username):
}
}]
if challenges_contributions is not None:
redis_app.set(f'{username}.contributions.challenges', json.dumps(challenges_contributions))
await app.redis.set(f'{username}.contributions.challenges', json.dumps(challenges_contributions))
if solutions_contributions is not None:
redis_app.set(f'{username}.contributions.solutions', json.dumps(solutions_contributions))
redis_app.set(f'{username}.contributions', json.dumps(response))
await app.redis.set(f'{username}.contributions.solutions', json.dumps(solutions_contributions))
await app.redis.set(f'{username}.contributions', json.dumps(response))
7 changes: 3 additions & 4 deletions worker/worker/redis_interface/ctf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
from functools import partial
from multiprocessing.pool import ThreadPool

from worker import log
from worker import app, log
from worker.constants import URL
from worker.http_client import http_get
from worker.parser.ctf import is_not_participating, extract_summary, extract_ctf
from worker.parser.profile import extract_pseudo
from worker.redis_interface import redis_app


def get_ctf_page(username, page_index):
Expand All @@ -21,7 +20,7 @@ def get_ctf_page(username, page_index):
return extract_ctf(html)


def set_user_ctf(username):
async def set_user_ctf(username):
html = http_get(URL + username + '?inc=ctf')
if html is None:
log.warning(f'ctf_page_not_found', username=username)
Expand All @@ -48,5 +47,5 @@ def set_user_ctf(username):
'description': description,
'ctfs': ctfs,
}]
redis_app.set(f'{username}.ctfs', json.dumps(response))
await app.redis.set(f'{username}.ctfs', json.dumps(response))
log.debug('set_user_ctf_success', username=username)
7 changes: 3 additions & 4 deletions worker/worker/redis_interface/details.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import json

from worker import log
from worker import app, log
from worker.constants import URL
from worker.http_client import http_get
from worker.parser.details import extract_score, extract_nb_challenges_solved, extract_ranking, \
extract_ranking_category, extract_challenges
from worker.parser.profile import extract_pseudo
from worker.redis_interface import redis_app


def set_user_details(username):
async def set_user_details(username):
html = http_get(URL + username + '?inc=score')
if html is None:
log.warning(f'could_not_get_user_details', username=username)
Expand All @@ -33,5 +32,5 @@ def set_user_details(username):
'categories': categories,
}]

redis_app.set(f'{username}.details', json.dumps(response))
await app.redis.set(f'{username}.details', json.dumps(response))
log.debug('set_user_details_success', username=username)
9 changes: 4 additions & 5 deletions worker/worker/redis_interface/profile.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import json

from worker import log
from worker import app, log
from worker.constants import URL
from worker.http_client import http_get
from worker.parser.profile import extract_pseudo, extract_score
from worker.redis_interface import redis_app


def set_user_profile(username):
async def set_user_profile(username):
html = http_get(URL + username)
if html is None:
log.warning(f'user_profile_not_found', username=username)
Expand All @@ -19,7 +18,7 @@ def set_user_profile(username):
'pseudo': pseudo,
'score': score,
}]
redis_app.set(f'{username}', json.dumps(response))
redis_app.set(f'{username}.profile', json.dumps(response))
await app.redis.set(f'{username}', json.dumps(response))
await app.redis.set(f'{username}.profile', json.dumps(response))

log.debug('set_user_profile_success', username=username)
7 changes: 3 additions & 4 deletions worker/worker/redis_interface/stats.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import json

from worker import log
from worker import app, log
from worker.constants import URL
from worker.http_client import http_get
from worker.parser.profile import extract_pseudo
from worker.parser.stats import extract_stats
from worker.redis_interface import redis_app


def set_user_stats(username):
async def set_user_stats(username):
html = http_get(URL + username + '?inc=statistiques')
if html is None:
log.warning(f'could_not_get_user_stats', username=username)
Expand All @@ -22,5 +21,5 @@ def set_user_stats(username):
'solved_challenges': solved_challenges,
}

redis_app.set(f'{username}.stats', json.dumps(response))
await app.redis.set(f'{username}.stats', json.dumps(response))
log.debug('set_user_stats_success', username=username)