This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
mypy_synapse_plugin.py
299 lines (249 loc) · 10.1 KB
/
mypy_synapse_plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This is a mypy plugin for Synpase to deal with some of the funky typing that
can crop up, e.g the cache descriptors.
"""
from typing import Callable, Optional, Tuple, Type
import mypy.types
from mypy.erasetype import remove_instance_last_known_values
from mypy.errorcodes import ErrorCode
from mypy.nodes import ARG_NAMED_OPT, Var
from mypy.plugin import MethodSigContext, Plugin
from mypy.typeops import bind_self
from mypy.types import (
AnyType,
CallableType,
Instance,
NoneType,
TupleType,
TypeAliasType,
UninhabitedType,
UnionType,
)
class SynapsePlugin(Plugin):
def get_method_signature_hook(
self, fullname: str
) -> Optional[Callable[[MethodSigContext], CallableType]]:
if fullname.startswith(
(
"synapse.util.caches.descriptors.CachedFunction.__call__",
"synapse.util.caches.descriptors._LruCachedFunction.__call__",
)
):
return cached_function_method_signature
return None
def cached_function_method_signature(ctx: MethodSigContext) -> CallableType:
"""Fixes the `CachedFunction.__call__` signature to be correct.
It already has *almost* the correct signature, except:
1. the `self` argument needs to be marked as "bound";
2. any `cache_context` argument should be removed;
3. an optional keyword argument `on_invalidated` should be added.
"""
# 1. Mark this as a bound function signature.
signature: CallableType = bind_self(ctx.default_signature)
# 2. Remove any "cache_context" args.
#
# Note: We should be only doing this if `cache_context=True` is set, but if
# it isn't then the code will raise an exception when its called anyway, so
# its not the end of the world.
context_arg_index = None
for idx, name in enumerate(signature.arg_names):
if name == "cache_context":
context_arg_index = idx
break
arg_types = list(signature.arg_types)
arg_names = list(signature.arg_names)
arg_kinds = list(signature.arg_kinds)
if context_arg_index:
arg_types.pop(context_arg_index)
arg_names.pop(context_arg_index)
arg_kinds.pop(context_arg_index)
# 3. Add an optional "on_invalidate" argument.
#
# This is a either
# - a callable which accepts no input and returns nothing, or
# - None.
calltyp = UnionType(
[
NoneType(),
CallableType(
arg_types=[],
arg_kinds=[],
arg_names=[],
ret_type=NoneType(),
fallback=ctx.api.named_generic_type("builtins.function", []),
),
]
)
arg_types.append(calltyp)
arg_names.append("on_invalidate")
arg_kinds.append(ARG_NAMED_OPT) # Arg is an optional kwarg.
# 4. Ensure the return type is a Deferred.
if (
isinstance(signature.ret_type, Instance)
and signature.ret_type.type.fullname == "twisted.internet.defer.Deferred"
):
# If it is already a Deferred, nothing to do.
ret_type = signature.ret_type
else:
ret_arg = None
if isinstance(signature.ret_type, Instance):
# If a coroutine, wrap the coroutine's return type in a Deferred.
if signature.ret_type.type.fullname == "typing.Coroutine":
ret_arg = signature.ret_type.args[2]
# If an awaitable, wrap the awaitable's final value in a Deferred.
elif signature.ret_type.type.fullname == "typing.Awaitable":
ret_arg = signature.ret_type.args[0]
# Otherwise, wrap the return value in a Deferred.
if ret_arg is None:
ret_arg = signature.ret_type
# This should be able to use ctx.api.named_generic_type, but that doesn't seem
# to find the correct symbol for anything more than 1 module deep.
#
# modules is not part of CheckerPluginInterface. The following is a combination
# of TypeChecker.named_generic_type and TypeChecker.lookup_typeinfo.
sym = ctx.api.modules["twisted.internet.defer"].names.get("Deferred") # type: ignore[attr-defined]
ret_type = Instance(sym.node, [remove_instance_last_known_values(ret_arg)])
signature = signature.copy_modified(
arg_types=arg_types,
arg_names=arg_names,
arg_kinds=arg_kinds,
ret_type=ret_type,
)
# 5. Complain loudly if we are returning something mutable
check_is_cacheable(signature, ctx, ret_type)
return signature
def check_is_cacheable(
signature: CallableType,
ctx: MethodSigContext,
deferred_return_type: Instance,
) -> None:
# The previous code wraps the return type into a Deferred.
assert deferred_return_type.type.fullname == "twisted.internet.defer.Deferred"
return_type = deferred_return_type.args[0]
verbose = ctx.api.options.verbosity >= 1
# TODO Technically a cachedList only needs immutable values, but forcing them
# to return Mapping instead of Dict is fine.
ok, note = is_cacheable(return_type, signature, verbose)
if ok:
message = f"function {signature.name} is @cached, returning {return_type}"
else:
message = f"function {signature.name} is @cached, but has mutable return value {return_type}"
if note:
message += f" ({note})"
message = message.replace("builtins.", "").replace("typing.", "")
# TODO The context is the context of the caller, not the method itself.
if ok and note:
ctx.api.note(message, ctx.context) # type: ignore[attr-defined]
elif not ok:
ctx.api.fail(message, ctx.context, code=AT_CACHED_MUTABLE_RETURN)
# Immutable simple values.
IMMUTABLE_VALUE_TYPES = {
"builtins.bool",
"builtins.int",
"builtins.float",
"builtins.str",
"builtins.bytes",
}
# Types defined in Synapse which are known to be immutable.
IMMUTABLE_CUSTOM_TYPES = {
"synapse.synapse_rust.push.FilteredPushRules",
# This is technically not immutable, but close enough.
"signedjson.types.VerifyKey",
}
# Immutable containers only if the values are also immutable.
IMMUTABLE_CONTAINER_TYPES_REQUIRING_IMMUTABLE_ELEMENTS = {
"builtins.frozenset",
"builtins.tuple",
"typing.AbstractSet",
"typing.Sequence",
"immutabledict.immutabledict",
}
MUTABLE_CONTAINER_TYPES = {
"builtins.set",
"builtins.list",
"builtins.dict",
}
AT_CACHED_MUTABLE_RETURN = ErrorCode(
"synapse-@cached-mutable",
"@cached() should have an immutable return type",
"General",
)
def is_cacheable(
rt: mypy.types.Type, signature: CallableType, verbose: bool
) -> Tuple[bool, Optional[str]]:
"""
Returns: a 2-tuple (cacheable, message).
- cachable: False means the type is definitely not cacheable;
true means anything else.
- Optional message.
"""
# This should probably be done via a TypeVisitor. Apologies to the reader!
if isinstance(rt, AnyType):
return True, ("may be mutable" if verbose else None)
elif isinstance(rt, Instance):
if (
rt.type.fullname in IMMUTABLE_VALUE_TYPES
or rt.type.fullname in IMMUTABLE_CUSTOM_TYPES
):
return True, None
elif rt.type.fullname == "typing.Mapping":
return is_cacheable(rt.args[1], signature, verbose)
elif rt.type.fullname in IMMUTABLE_CONTAINER_TYPES_REQUIRING_IMMUTABLE_ELEMENTS:
# E.g. Collection[T] is cachable iff T is cachable.
return is_cacheable(rt.args[0], signature, verbose)
elif rt.type.fullname in MUTABLE_CONTAINER_TYPES:
return False, None
elif "attrs" in rt.type.metadata:
frozen = rt.type.metadata["attrs"]["frozen"]
if frozen:
for attribute in rt.type.metadata["attrs"]["attributes"]:
attribute_name = attribute["name"]
symbol_node = rt.type.names[attribute_name].node
assert isinstance(symbol_node, Var)
assert symbol_node.type is not None
ok, note = is_cacheable(symbol_node.type, signature, verbose)
if not ok:
return False, f"non-frozen attrs property: {attribute_name}"
# All attributes were frozen.
return True, None
else:
return False, "non-frozen attrs class"
else:
return False, f"Don't know how to handle {rt.type.fullname}"
elif isinstance(rt, NoneType):
return True, None
elif isinstance(rt, (TupleType, UnionType)):
for item in rt.items:
ok, note = is_cacheable(item, signature, verbose)
if not ok:
return False, note
# This discards notes but that's probably fine
return True, None
elif isinstance(rt, TypeAliasType):
return is_cacheable(mypy.types.get_proper_type(rt), signature, verbose)
# The tests check what happens if you raise an Exception, so they don't return.
elif isinstance(rt, UninhabitedType) and rt.is_noreturn:
# There's no return value, just consider it cachable.
return True, None
else:
return False, f"Don't know how to handle {type(rt).__qualname__} return type"
def plugin(version: str) -> Type[SynapsePlugin]:
# This is the entry point of the plugin, and lets us deal with the fact
# that the mypy plugin interface is *not* stable by looking at the version
# string.
#
# However, since we pin the version of mypy Synapse uses in CI, we don't
# really care.
return SynapsePlugin