Skip to content

Commit

Permalink
Merge pull request #108 from xenova/interruptible_retry
Browse files Browse the repository at this point in the history
Improved testing for GitHub actions
  • Loading branch information
xenova committed Jul 23, 2021
2 parents 4283443 + b306a6c commit d319d79
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 142 deletions.
4 changes: 4 additions & 0 deletions chat_downloader/chat_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def get_chat(self, url=None,
end_time=None,
max_attempts=15,
retry_timeout=None,
interruptible_retry=True,
timeout=None,
inactivity_timeout=None,
max_messages=None,
Expand Down Expand Up @@ -128,6 +129,9 @@ def get_chat(self, url=None,
this to a negative number will wait for user input. Default is None
(use exponential backoff, i.e. immediate, 1s, 2s, 4s, 8s, ...)
:type retry_timeout: float, optional
:param interruptible_retry: Have the option to skip waiting and
immediately retry. Defaults to True
:type interruptible_retry: bool, optional
:param timeout: Stop retrieving chat after a certain duration
(in seconds), defaults to None
:type timeout: float, optional
Expand Down
1 change: 1 addition & 0 deletions chat_downloader/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def add_init_param(group, *keys, **kwargs):
'Retry Arguments') # what to do when an error occurs
add_chat_param(retry_group, '--max_attempts', type=int)
add_chat_param(retry_group, '--retry_timeout', type=float)
add_chat_param(retry_group, '--interruptible_retry', is_boolean_flag=True)

termination_group = parser.add_argument_group('Termination Arguments')
add_chat_param(termination_group, '--max_messages', type=int)
Expand Down
20 changes: 15 additions & 5 deletions chat_downloader/sites/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
safe_print
)

from ..utils.timed_utils import timed_input
from ..utils.timed_utils import (
timed_input,
interruptible_sleep
)
from ..debugging import log


Expand Down Expand Up @@ -558,13 +561,13 @@ def _move_to_dict(info, dict_name, replace_key=None, create_when_empty=False, *i
return new_dict

@staticmethod
def retry(attempt_number, max_attempts, error=None, retry_timeout=None, text=None):
def retry(attempt_number, max_attempts=1, error=None, retry_timeout=None, text=None, interruptible_retry=True, **kwargs):
"""Retry to occur after an error occurs
:param attempt_number: The current attempt number
:type attempt_number: int
:param max_attempts: The maximum number of attempts allowed
:type max_attempts: int
:type max_attempts: int, optional
:param error: The error which was raised, defaults to None
:type error: Exception, optional
:param retry_timeout: The number of seconds to sleep after failing,
Expand Down Expand Up @@ -596,7 +599,11 @@ def retry(attempt_number, max_attempts, error=None, retry_timeout=None, text=Non

must_sleep = time_to_sleep >= 0
if must_sleep:
sleep_text = '(sleep for {}s or press Enter)'.format(time_to_sleep)
if interruptible_retry:
sleep_text = '(sleep for {}s or press Enter)'.format(
time_to_sleep)
else:
sleep_text = '(sleep for {}s)'.format(time_to_sleep)
else:
sleep_text = ''

Expand All @@ -620,7 +627,10 @@ def retry(attempt_number, max_attempts, error=None, retry_timeout=None, text=Non
)

if must_sleep:
timed_input(time_to_sleep)
if interruptible_retry:
timed_input(time_to_sleep)
else:
interruptible_sleep(time_to_sleep)
else:
pause()

Expand Down
32 changes: 13 additions & 19 deletions chat_downloader/sites/facebook.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ def _parse_fb_json(info):
_GRAPH_API = _FB_HOMEPAGE + '/api/graphql/'
_VIDEO_URL_FORMAT = _FB_HOMEPAGE + '/video.php?v={}'

def _attempt_fb_retrieve(self, url, max_attempts, retry_timeout=None, fb_json=False, is_json=True, **post_kwargs):
def _attempt_fb_retrieve(self, url, program_params, fb_json=False, is_json=True, **post_kwargs):
max_attempts = program_params.get('max_attempts')
for attempt_number in attempts(max_attempts):
try:
response = self._session_post(url, **post_kwargs)
Expand All @@ -177,24 +178,22 @@ def _attempt_fb_retrieve(self, url, max_attempts, retry_timeout=None, fb_json=Fa
return response.text

except JSONDecodeError as e:
self.retry(attempt_number, max_attempts, e, retry_timeout,
self.retry(attempt_number, error=e, **program_params,
text='Unable to parse JSON: `{}`'.format(response.text))

except RequestException as e:
self.retry(attempt_number, max_attempts, e, retry_timeout)
self.retry(attempt_number, error=e, **program_params)

_VIDEO_TITLE_REGEX = r'<meta\s+name=["\'].*title["\']\s+content=["\']([^"\']+)["\']\s*/>'

def _get_initial_info(self, video_id, params):
info = {}
max_attempts = params.get('max_attempts')
retry_timeout = params.get('retry_timeout')

# TODO remove duplication - many similar methods
json_data = self._attempt_fb_retrieve(
self._VIDEO_PAGE_TAHOE_TEMPLATE.format(video_id),
max_attempts,
retry_timeout,
params,
fb_json=True,
headers=self._FB_HEADERS, data=self.data
)
Expand All @@ -219,7 +218,7 @@ def _get_initial_info(self, video_id, params):
break

except RequestException as e:
self.retry(attempt_number, max_attempts, e, retry_timeout)
self.retry(attempt_number, error=e, **params)

instances = multi_get(json_data, 'jsmods', 'instances')
if not instances:
Expand Down Expand Up @@ -582,9 +581,6 @@ def _parse_live_stream_node(node):
return info

def _get_live_chat_messages_by_video_id(self, video_id, params):
max_attempts = params.get('max_attempts')
retry_timeout = params.get('retry_timeout')

buffer_size = 25 # max num comments returned by api call
# cursor = ''
variables = {
Expand All @@ -605,8 +601,7 @@ def _get_live_chat_messages_by_video_id(self, video_id, params):

json_data = self._attempt_fb_retrieve(
self._GRAPH_API,
max_attempts,
retry_timeout,
params,
headers=self._FB_HEADERS, data=data
)

Expand Down Expand Up @@ -675,9 +670,6 @@ def _get_live_chat_messages_by_video_id(self, video_id, params):

def _get_chat_replay_messages_by_video_id(self, video_id, max_duration, params):

max_attempts = params.get('max_attempts')
retry_timeout = params.get('retry_timeout')

# useful tool (convert curl to python request)
# https://curl.trillworks.com/
# timeout_duration = 10 # TODO make this modifiable
Expand Down Expand Up @@ -707,8 +699,7 @@ def _get_chat_replay_messages_by_video_id(self, video_id, max_duration, params):

json_data = self._attempt_fb_retrieve(
self._VOD_COMMENTS_API,
max_attempts,
retry_timeout,
params,
fb_json=True,
headers=self._FB_HEADERS, params=request_params, data=self.data
)
Expand Down Expand Up @@ -783,6 +774,9 @@ def get_chat_by_video_id(self, video_id, params):

def generate_urls(self, **kwargs):
max_attempts = 10
program_params = {
'max_attempts': max_attempts
}

limit = 50 # amount_to_get
step = 8
Expand Down Expand Up @@ -811,7 +805,7 @@ def generate_urls(self, **kwargs):
data.update(self.data)
req = self._attempt_fb_retrieve(
'https://www.facebook.com/ajax/route-definition/',
max_attempts,
program_params,
is_json=False,
headers=self._FB_HEADERS, data=data
)
Expand All @@ -826,7 +820,7 @@ def generate_urls(self, **kwargs):
data.update(self.data)
json_data = self._attempt_fb_retrieve(
self._GRAPH_API,
max_attempts,
program_params,
headers=self._FB_HEADERS, data=data
)

Expand Down
45 changes: 19 additions & 26 deletions chat_downloader/sites/reddit.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class RedditChatDownloader(BaseChatDownloader):
def __init__(self, **kwargs):
super().__init__(**kwargs)

max_attempts = 10
max_attempts = 3

for attempt_number in attempts(max_attempts):
initial_data = self._session_get(self._REDDIT_HOMEPAGE).text
Expand All @@ -65,7 +65,8 @@ def __init__(self, **kwargs):
break
else:
title = get_title_of_webpage(initial_data)
self.retry(attempt_number, max_attempts, text=title)
self.retry(attempt_number, text=title,
max_attempts=max_attempts)
continue

bearer = multi_get(info, 'user', 'session', 'accessToken')
Expand Down Expand Up @@ -326,25 +327,22 @@ def _get_chat_by_post_id(self, match, params):
match_id = match.group('id')
return self.get_chat_by_post_id(match_id, params)

def _try_get_info(self, url, max_attempts, retry_timeout=None, **kwargs):

def _try_get_info(self, url, program_params, **kwargs):
max_attempts = program_params.get('max_attempts')
for attempt_number in attempts(max_attempts):
try:
return self._session_get_json(url, **kwargs)
except (JSONDecodeError, RequestException) as e:
self.retry(attempt_number, max_attempts, e, retry_timeout)
self.retry(attempt_number, error=e, **program_params)

_API_TEMPLATE = 'https://strapi.reddit.com/videos/t3_{}'
_FALLBACK_API_TEMPLATE = 'https://gateway.reddit.com/desktopapi/v1/postcomments/t3_{}?limit=1'

def get_chat_by_post_id(self, post_id, params, attempt_number=0, initial_info=None):

max_attempts = params.get('max_attempts')
retry_timeout = params.get('retry_timeout')

if initial_info is None: # Optimisation
initial_info = self._try_get_info(self._API_TEMPLATE.format(
post_id), max_attempts, retry_timeout, headers=self.authed_headers)
post_id), params, headers=self.authed_headers)

status = initial_info.get('status')
status_message = initial_info.get('status_message')
Expand Down Expand Up @@ -394,8 +392,7 @@ def get_chat_by_post_id(self, post_id, params, attempt_number=0, initial_info=No
elif status == 'failure':
if 'wait' in data.lower():
message = 'Response from Reddit: "{}"'.format(data)
self.retry(attempt_number, max_attempts,
retry_timeout=retry_timeout, text=message)
self.retry(attempt_number, text=message, **params)
return self.get_chat_by_post_id(post_id, params, attempt_number + 1)

raise RedditError(data)
Expand All @@ -410,7 +407,6 @@ def _get_chat_messages_by_socket(self, socket_url, params):

message_receive_timeout = params.get('message_receive_timeout')
max_attempts = params.get('max_attempts')
retry_timeout = params.get('retry_timeout')

def create_connection():
for attempt_number in attempts(max_attempts):
Expand All @@ -422,7 +418,7 @@ def create_connection():
ws.settimeout(message_receive_timeout)
return ws
except (ConnectionError, websocket.WebSocketException) as e:
self.retry(attempt_number, max_attempts, e, retry_timeout)
self.retry(attempt_number, error=e, **params)

ws = create_connection()

Expand Down Expand Up @@ -465,12 +461,9 @@ def create_connection():

def _get_chat_messages_by_post_id(self, post_id, params, stream_start_time=None):

max_attempts = params.get('max_attempts')
retry_timeout = params.get('retry_timeout')

# 1. Get all comment ids
url = self._COMMENTS_API_TEMPLATE.format(post_id)
initial_info = self._try_get_info(url, max_attempts, retry_timeout)
initial_info = self._try_get_info(url, params)

children = multi_get(initial_info, -1, 'data', 'children')

Expand Down Expand Up @@ -500,7 +493,7 @@ def _get_chat_messages_by_post_id(self, post_id, params, stream_start_time=None)
def _parse_chunk(index):
if not all_stored[index]: # get if not stored
url = info_api + ',t1_'.join(chunk_info[index])
info = self._try_get_info(url, max_attempts, retry_timeout)
info = self._try_get_info(url, params)
children = multi_get(info, 'data', 'children') or []
all_stored[index] = [self._parse_item(
child.get('data'), stream_start_time) for child in children]
Expand Down Expand Up @@ -562,11 +555,8 @@ def _binary_search(low, high):

def get_chat_by_subreddit_id(self, subreddit_id, params, attempt_number=0):
# Get chat of top broadcast
max_attempts = params.get('max_attempts')
retry_timeout = params.get('retry_timeout')

initial_info = self._try_get_info(self._SUBREDDIT_BROADCAST_API_URL.format(
subreddit_id), max_attempts, retry_timeout, headers=self.authed_headers)
subreddit_id), params, headers=self.authed_headers)

status = initial_info.get('status')
data = initial_info.get('data')
Expand All @@ -585,8 +575,7 @@ def get_chat_by_subreddit_id(self, subreddit_id, params, attempt_number=0):
elif status == 'failure':
if 'wait' in data.lower():
message = 'Response from Reddit: "{}"'.format(data)
self.retry(attempt_number, max_attempts,
retry_timeout=retry_timeout, text=message)
self.retry(attempt_number, text=message, **params)
return self.get_chat_by_subreddit_id(post_id, params, attempt_number + 1)

raise RedditError(data)
Expand Down Expand Up @@ -617,6 +606,9 @@ def generate_urls(self, **kwargs):
# TODO add options for live and past

max_attempts = 30 # TODO make param
program_params = {
'max_attempts': max_attempts
}

# Get live streams
for attempt_number in attempts(max_attempts):
Expand All @@ -637,7 +629,8 @@ def generate_urls(self, **kwargs):
except (JSONDecodeError, RequestException) as e:
error = e

self.retry(attempt_number, max_attempts, error, text=message)
self.retry(attempt_number, error=error,
text=message, **program_params)

for stream in data:
yield multi_get(stream, 'post', 'url')
Expand All @@ -656,7 +649,7 @@ def generate_urls(self, **kwargs):
count = 0
while True:
rpan_info = self._try_get_info(
api_url, max_attempts, params=past_params)
api_url, program_params, params=past_params)

rpan_data = rpan_info.get('data')
if not rpan_data:
Expand Down

0 comments on commit d319d79

Please sign in to comment.