-
Notifications
You must be signed in to change notification settings - Fork 27
Description
Along with all of the other choices made by the previous maintainers around typing, it seems the mappings passed to xadd
, read
, and xreadgroup
are invariant and do not allow passing in values like dict[str, str]
. This makes sense when you're concerned about someone setting an incompatible value into the dict or mapping. A bit less for cases like this where we just want to read the values. (I think? I'm not an expert on Python typing!)
This can be seen with xreadgroup
, when passing in the streams and stream IDs:
valkey-py/valkey/commands/core.py
Lines 3972 to 3980 in c490780
def xreadgroup( | |
self, | |
groupname: str, | |
consumername: str, | |
streams: Dict[KeyT, StreamIdT], | |
count: Union[int, None] = None, | |
block: Union[int, None] = None, | |
noack: bool = False, | |
) -> ResponseT: |
If I were to try to interact with this like such:
stream = { "incoming-text": "$" }
payload = cast(Any, client.xread(stream, None, 0))
'course, things like pyright don't like this and emit an error about it. Invariant types and all that.
Pyright: Argument of type "dict[str, str]" cannot be assigned to parameter "streams" of type "Dict[KeyT, Unknown]" in function "xread"
"dict[str, str]" is not assignable to "Dict[KeyT, Unknown]"
Type parameter "_KT@dict" is invariant, but "str" is not the same as "KeyT" (reportArgumentType)
There's a helpful comment about this in the typing.py
file.
Lines 42 to 46 in c490780
# Mapping is not covariant in the key type, which prevents | |
# Mapping[_StringLikeT, X] from accepting arguments of type Dict[str, X]. Using | |
# a TypeVar instead of a Union allows mappings with any of the permitted types | |
# to be passed. Care is needed if there is more than one such mapping in a | |
# type signature because they will all be required to be the same key type. |
Using this, we can apply it to StreamIdT
to create a version that's covariant.
AnyStreamIdT = TypeVar("AnyStreamIdT", int, bytes, str, memoryview)
def xreadgroup(
self,
groupname: str,
consumername: str,
- streams: Dict[KeyT, StreamIdT],
+ streams: Mapping[AnyKeyT, AnyStreamIDT],
count: Union[int, None] = None,
block: Union[int, None] = None,
noack: bool = False,
) -> ResponseT:
It seems to work well! I tested it with the above example with xreadgroup
, at least. I can't seem to find any issues to this approach -- and it seems to be used elsewhere even if not here.