|
1 | | -from _typeshed import Incomplete |
| 1 | +import json |
2 | 2 | from collections.abc import Generator |
| 3 | +from typing import Any, Callable, Iterator, TypeAlias |
3 | 4 |
|
4 | | -json_decoder: Incomplete |
| 5 | +json_decoder: json.JSONDecoder |
5 | 6 |
|
6 | | -def stream_as_text(stream) -> Generator[Incomplete, None, None]: ... |
7 | | -def json_splitter(buffer): ... |
8 | | -def json_stream(stream): ... |
9 | | -def line_splitter(buffer, separator: str = "\n"): ... |
10 | | -def split_buffer(stream, splitter: Incomplete | None = None, decoder=...) -> Generator[Incomplete, None, Incomplete]: ... |
| 7 | +# Type alias for JSON, explained at: |
| 8 | +# https://github.com/python/typing/issues/182#issuecomment-1320974824. |
| 9 | +_JSON: TypeAlias = dict[str, _JSON] | list[_JSON] | str | int | float | bool | None |
| 10 | + |
| 11 | +def stream_as_text(stream: Iterator[str | bytes]) -> Generator[str, None, None]: ... |
| 12 | +def json_splitter(buffer: str) -> tuple[_JSON, str] | None: ... |
| 13 | +def json_stream(stream: Iterator[str]) -> Generator[_JSON, None, None]: ... |
| 14 | +def line_splitter(buffer: str, separator: str = "\n") -> tuple[str, str] | None: ... |
| 15 | +def split_buffer( |
| 16 | + stream: Iterator[str | bytes], |
| 17 | + splitter: Callable[[str], tuple[str, str]] | None = None, |
| 18 | + decoder: Callable[[str], Any] = ..., |
| 19 | +) -> Generator[Any, None, None]: ... |
0 commit comments