/
util.py
228 lines (185 loc) · 8.09 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""Provide helper classes used by other models."""
from __future__ import annotations
import random
import time
from collections import OrderedDict
from typing import Any, Callable, Generator
from ..util import _deprecate_args
@_deprecate_args("permissions", "known_permissions")
def permissions_string(
*, known_permissions: set[str], permissions: list[str] | None
) -> str:
"""Return a comma separated string of permission changes.
:param known_permissions: A set of strings representing the available permissions.
:param permissions: A list of strings, or ``None``. These strings can exclusively
contain ``+`` or ``-`` prefixes, or contain no prefixes at all. When prefixed,
the resulting string will simply be the joining of these inputs. When not
prefixed, all permissions are considered to be additions, and all permissions in
the ``known_permissions`` set that aren't provided are considered to be
removals. When ``None``, the result is ``"+all"``.
"""
if permissions is None:
to_set = ["+all"]
else:
to_set = ["-all"]
omitted = sorted(known_permissions - set(permissions))
to_set.extend(f"-{x}" for x in omitted)
to_set.extend(f"+{x}" for x in permissions)
return ",".join(to_set)
@_deprecate_args(
"function",
"pause_after",
"skip_existing",
"attribute_name",
"exclude_before",
"start_after",
)
def stream_generator(
function: Callable,
*,
attribute_name: str = "fullname",
exclude_before: bool = False,
pause_after: int | None = None,
skip_existing: bool = False,
start_after: str | None = None,
**function_kwargs: Any,
) -> Generator[Any, None, None]:
"""Yield new items from ``function`` as they become available.
:param function: A callable that returns a :class:`.ListingGenerator`, e.g.,
:meth:`.Subreddit.comments` or :meth:`.Subreddit.new`.
:param attribute_name: The field to use as an ID (default: ``"fullname"``).
:param exclude_before: When ``True`` does not pass ``params`` to ``function``
(default: ``False``).
:param pause_after: An integer representing the number of requests that result in no
new items before this function yields ``None``, effectively introducing a pause
into the stream. A negative value yields ``None`` after items from a single
response have been yielded, regardless of number of new items obtained in that
response. A value of ``0`` yields ``None`` after every response resulting in no
new items, and a value of ``None`` never introduces a pause (default: ``None``).
:param skip_existing: When ``True``, this does not yield any results from the first
request thereby skipping any items that existed in the stream prior to starting
the stream (default: ``False``).
:param start_after: The initial string value to use for ``before`` in ``params``.
The stream will start from the item following this one (default: ``None``).
Additional keyword arguments will be passed to ``function``.
.. note::
This function internally uses an exponential delay with jitter between
subsequent responses that contain no new results, up to a maximum delay of just
over 16 seconds. In practice, that means that the time before pause for
``pause_after=N+1`` is approximately twice the time before pause for
``pause_after=N``.
For example, to create a stream of comment replies, try:
.. code-block:: python
reply_function = reddit.inbox.comment_replies
for reply in praw.models.util.stream_generator(reply_function):
print(reply)
To pause a comment stream after six responses with no new comments, try:
.. code-block:: python
subreddit = reddit.subreddit("test")
for comment in subreddit.stream.comments(pause_after=6):
if comment is None:
break
print(comment)
To resume fetching comments after a pause, try:
.. code-block:: python
subreddit = reddit.subreddit("test")
comment_stream = subreddit.stream.comments(pause_after=5)
for comment in comment_stream:
if comment is None:
break
print(comment)
# Do any other processing, then try to fetch more data
for comment in comment_stream:
if comment is None:
break
print(comment)
To bypass the internal exponential backoff, try the following. This approach is
useful if you are monitoring a subreddit with infrequent activity, and you want to
consistently learn about new items from the stream as soon as possible, rather than
up to a delay of just over sixteen seconds.
.. code-block:: python
subreddit = reddit.subreddit("test")
for comment in subreddit.stream.comments(pause_after=0):
if comment is None:
continue
print(comment)
"""
before_attribute = start_after
exponential_counter = ExponentialCounter(max_counter=16)
seen_attributes = BoundedSet(301)
without_before_counter = 0
responses_without_new = 0
valid_pause_after = pause_after is not None
while True:
found = False
newest_attribute = None
limit = 100
if before_attribute is None:
limit -= without_before_counter
without_before_counter = (without_before_counter + 1) % 30
if not exclude_before:
function_kwargs["params"] = {"before": before_attribute}
for item in reversed(list(function(limit=limit, **function_kwargs))):
attribute = getattr(item, attribute_name)
if attribute in seen_attributes:
continue
found = True
seen_attributes.add(attribute)
newest_attribute = attribute
if not skip_existing:
yield item
before_attribute = newest_attribute
skip_existing = False
if valid_pause_after and pause_after < 0:
yield None
elif found:
exponential_counter.reset()
responses_without_new = 0
else:
responses_without_new += 1
if valid_pause_after and responses_without_new > pause_after:
exponential_counter.reset()
responses_without_new = 0
yield None
else:
time.sleep(exponential_counter.counter())
class BoundedSet:
"""A set with a maximum size that evicts the oldest items when necessary.
This class does not implement the complete set interface.
"""
def __contains__(self, item: Any) -> bool:
"""Test if the :class:`.BoundedSet` contains item."""
self._access(item)
return item in self._set
def __init__(self, max_items: int):
"""Initialize a :class:`.BoundedSet` instance."""
self.max_items = max_items
self._set = OrderedDict()
def _access(self, item: Any): # noqa: ANN001
if item in self._set:
self._set.move_to_end(item)
def add(self, item: Any):
"""Add an item to the set discarding the oldest item if necessary."""
self._access(item)
self._set[item] = None
if len(self._set) > self.max_items:
self._set.popitem(last=False)
class ExponentialCounter:
"""A class to provide an exponential counter with jitter."""
def __init__(self, max_counter: int):
"""Initialize an :class:`.ExponentialCounter` instance.
:param max_counter: The maximum base value.
.. note::
The computed value may be 3.125% higher due to jitter.
"""
self._base = 1
self._max = max_counter
def counter(self) -> int | float:
"""Increment the counter and return the current value with jitter."""
max_jitter = self._base / 16.0
value = self._base + random.random() * max_jitter - max_jitter / 2 # noqa: S311
self._base = min(self._base * 2, self._max)
return value
def reset(self):
"""Reset the counter to 1."""
self._base = 1