-
-
Notifications
You must be signed in to change notification settings - Fork 182
/
meta.py
166 lines (130 loc) · 5.47 KB
/
meta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from pathlib import Path
from urllib.parse import unquote
import aiohttp
from aiocache import cached
from sanic.log import logger
from .. import settings
def version() -> str:
changelog_lines = Path("CHANGELOG.md").read_text().splitlines()
version_heading = changelog_lines[2]
return version_heading.split(" ")[-1]
@cached(ttl=60 * 15 if settings.DEPLOYED else 0)
async def authenticate(request) -> dict:
info: dict = {}
if settings.REMOTE_TRACKING_URL:
api = settings.REMOTE_TRACKING_URL + "auth"
else:
return info
api_key = _get_api_key(request)
if api_key:
api_mask = api_key[:2] + "***" + api_key[-2:]
logger.info(f"Authenticating with API key: {api_mask}")
async with aiohttp.ClientSession() as session:
response = await session.get(api, headers={"X-API-KEY": api_key})
info = await response.json()
return info
@cached(ttl=60 * 15 if settings.DEPLOYED else 0)
async def tokenize(request, url: str) -> tuple[str, bool]:
api_key = _get_api_key(request) or ""
token = request.args.get("token")
default_url = url.replace(f"api_key={api_key}", "").replace("?&", "?").strip("?&")
if api_key == "myapikey42" and "example.png" not in url:
logger.warning(f"Example API key used to tokenize: {url}")
return default_url, True
if settings.REMOTE_TRACKING_URL:
api = settings.REMOTE_TRACKING_URL + "tokenize"
else:
return url, False
if api_key or token:
async with aiohttp.ClientSession() as session:
response = await session.post(
api, data={"url": default_url}, headers={"X-API-KEY": api_key}
)
data = await response.json()
return data["url"], data["url"] != url
return url, False
async def custom_watermarks_allowed(request) -> bool:
info = await authenticate(request)
if info.get("image_access", False):
return True
token = request.args.get("token")
if token:
logger.info(f"Authenticating with token: {token}")
_url, updated = await tokenize(request, request.url)
return not updated
return False
async def get_watermark(request) -> tuple[str, bool]:
watermark = request.args.get("watermark", "")
if await custom_watermarks_allowed(request):
if watermark == settings.DISABLED_WATERMARK:
return "", False
return watermark, False
if watermark:
if watermark == settings.DEFAULT_WATERMARK:
logger.warning(f"Redundant watermark: {watermark}")
return settings.DEFAULT_WATERMARK, True
if watermark in settings.ALLOWED_WATERMARKS:
return watermark, False
logger.warning(f"Invalid watermark: {watermark}")
return settings.DEFAULT_WATERMARK, True
return settings.DEFAULT_WATERMARK, False
async def track(request, lines: list[str]):
if settings.TRACK_REQUESTS and settings.REMOTE_TRACKING_URL:
api = settings.REMOTE_TRACKING_URL
else:
return
text = " ".join(lines).strip()
if not text:
return
if any(name in request.args for name in ["height", "width", "watermark"]):
return
if "localhost" in getattr(request, "host", "localhost"):
return
async with aiohttp.ClientSession() as session:
params = dict(
text=text,
client=_get_referer(request) or settings.BASE_URL,
result=unquote(request.url),
)
logger.info(f"Tracking request: {params}")
headers = {"X-API-KEY": _get_api_key(request) or ""}
response = await session.get(api, params=params, headers=headers)
if response.status != 200:
try:
message = await response.json()
except aiohttp.client_exceptions.ContentTypeError:
message = await response.text()
logger.error(f"Tracker response {response.status}: {message}")
if response.status >= 404 and response.status not in {414, 421, 520}:
settings.REMOTE_TRACKING_ERRORS += 1
logger.info(f"Tracker error count: {settings.REMOTE_TRACKING_ERRORS}")
if settings.REMOTE_TRACKING_ERRORS >= settings.REMOTE_TRACKING_ERRORS_LIMIT:
settings.TRACK_REQUESTS = False
logger.warning(f"Disabled tracking after {response.status} response")
async def search(request, text: str, safe: bool, *, mode="") -> list[dict]:
if settings.REMOTE_TRACKING_URL:
api = settings.REMOTE_TRACKING_URL + mode
else:
return []
async with aiohttp.ClientSession() as session:
params = dict(
text=text,
nsfw=0 if safe else 1,
client=_get_referer(request) or settings.BASE_URL,
)
logger.info(f"Searching for results: {text!r} (safe={safe})")
headers = {"X-API-KEY": _get_api_key(request) or ""}
response = await session.get(api, params=params, headers=headers)
assert response.status < 500, f"Invalid response: {response}"
data = await response.json()
if response.status == 200:
return data
logger.error(f"Search response: {data}")
return []
def _get_referer(request):
referer = request.headers.get("referer") or request.args.get("referer")
if referer and referer.startswith(settings.BASE_URL) and "/docs/" not in referer:
referer = None
return referer
def _get_api_key(request):
return request.headers.get("x-api-key") or request.args.get("api_key")