Skip to content

Commit 897e920

Browse files
authored
[core][feat] Add Async Streams abstraction (#2273)
1 parent e0c1ae7 commit 897e920

25 files changed

Lines changed: 882 additions & 454 deletions

fixcore/.pylintrc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ ignored-modules=
246246

247247
# List of classes names for which member attributes should not be checked
248248
# (useful for classes with attributes dynamically set).
249-
ignored-classes=SQLObject, optparse.Values, thread._local, _thread._local, aiostream.pipe
249+
ignored-classes=SQLObject, optparse.Values, thread._local, _thread._local
250250

251251
# List of members which are set dynamically and missed by pylint inference
252252
# system, and so shouldn't trigger E1101 when accessed. Python regular

fixcore/fixcore/cli/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@
1515
AsyncIterable,
1616
)
1717

18-
from aiostream import stream
19-
from aiostream.core import Stream
2018
from parsy import Parser, regex, string
2119

2220
from fixcore.model.graph_access import Section
2321
from fixcore.types import JsonElement, Json
2422
from fixcore.util import utc, parse_utc, AnyT
23+
from fixlib.asynchronous.stream import Stream
2524
from fixlib.durations import parse_duration, DurationRe
2625
from fixlib.parse_util import (
2726
make_parser,
@@ -47,7 +46,7 @@
4746
# A sink function takes a stream and creates a result
4847
Sink = Callable[[JsStream], Awaitable[T]]
4948

50-
list_sink: Callable[[JsGen], Awaitable[Any]] = stream.list # type: ignore
49+
list_sink: Callable[[JsGen], Awaitable[List[Any]]] = Stream.as_list
5150

5251

5352
@make_parser

fixcore/fixcore/cli/cli.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,13 @@
1010
from typing import Dict, List, Tuple, Union, Sequence
1111
from typing import Optional, Any, TYPE_CHECKING
1212

13-
from aiostream import stream
1413
from attrs import evolve
1514
from parsy import Parser
1615
from rich.padding import Padding
1716

1817
from fixcore import version
1918
from fixcore.analytics import CoreEvent
20-
from fixcore.cli import cmd_with_args_parser, key_values_parser, T, Sink, args_values_parser, JsGen
19+
from fixcore.cli import cmd_with_args_parser, key_values_parser, T, Sink, args_values_parser, JsStream
2120
from fixcore.cli.command import (
2221
SearchPart,
2322
PredecessorsPart,
@@ -78,6 +77,7 @@
7877
from fixcore.types import JsonElement
7978
from fixcore.user.model import Permission
8079
from fixcore.util import group_by
80+
from fixlib.asynchronous.stream import Stream
8181
from fixlib.parse_util import make_parser, pipe_p, semicolon_p
8282

8383
if TYPE_CHECKING:
@@ -104,7 +104,7 @@ def command_line_parser() -> Parser:
104104
return ParsedCommands(commands, maybe_env if maybe_env else {})
105105

106106

107-
# multiple piped commands are separated by semicolon
107+
# semicolon separates multiple piped commands
108108
multi_command_parser = command_line_parser.sep_by(semicolon_p)
109109

110110

@@ -187,7 +187,7 @@ def overview() -> str:
187187
logo = ctx.render_console(Padding(WelcomeCommand.ck, pad=(0, 0, 0, middle))) if ctx.supports_color() else ""
188188
return headline + logo + ctx.render_console(result)
189189

190-
def help_command() -> JsGen:
190+
def help_command() -> JsStream:
191191
if not arg:
192192
result = overview()
193193
elif arg == "placeholders":
@@ -209,7 +209,7 @@ def help_command() -> JsGen:
209209
else:
210210
result = f"No command found with this name: {arg}"
211211

212-
return stream.just(result)
212+
return Stream.just(result)
213213

214214
return CLISource.single(help_command, required_permissions={Permission.read})
215215

@@ -352,11 +352,11 @@ def command(
352352
self, name: str, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwargs: Any
353353
) -> ExecutableCommand:
354354
"""
355-
Create an executable command for given command name, args and context.
356-
:param name: the name of the command to execute (must be a known command)
357-
:param arg: the arg of the command (must be parsable by the command)
358-
:param ctx: the context of this command.
359-
:return: the ready to run executable command.
355+
Create an executable command for given command name, args, and context.
356+
:param name: The name of the command to execute (must be a known command).
357+
:param arg: The arg of the command (must be parsable by the command).
358+
:param ctx: The context of this command.
359+
:return: The ready to run executable command.
360360
:raises:
361361
CLIParseError: if the name of the command is not known, or the argument fails to parse.
362362
"""
@@ -377,9 +377,9 @@ async def create_query(
377377
Takes a list of query part commands and combine them to a single executable query command.
378378
This process can also introduce new commands that should run after the query is finished.
379379
Therefore, a list of executable commands is returned.
380-
:param commands: the incoming executable commands, which actions are all instances of SearchCLIPart.
381-
:param ctx: the context to execute within.
382-
:return: the resulting list of commands to execute.
380+
:param commands: The incoming executable commands, which actions are all instances of SearchCLIPart.
381+
:param ctx: The context to execute within.
382+
:return: The resulting list of commands to execute.
383383
"""
384384

385385
# Pass parsed options to execute query
@@ -484,8 +484,8 @@ async def parse_query(query_arg: str) -> Query:
484484
first_head_tail_in_a_row = None
485485
head_tail_keep_order = True
486486

487-
# Define default sort order, if not already defined
488-
# A sort order is required to always return the result in a deterministic way to the user.
487+
# Define default sort order, if not already defined.
488+
# A sort order is required to always return the result deterministically to the user.
489489
# Deterministic order is required for head/tail to work
490490
if query.is_simple_fulltext_search():
491491
# Do not define any additional sort order for fulltext searches
@@ -494,7 +494,7 @@ async def parse_query(query_arg: str) -> Query:
494494
parts = [pt if pt.sort else evolve(pt, sort=default_sort) for pt in query.parts]
495495
query = evolve(query, parts=parts)
496496

497-
# If the last part is a navigation, we need to add sort which will ingest a new part.
497+
# If the last part is a navigation, we need to add a sort which will ingest a new part.
498498
with_sort = query.set_sort(*default_sort) if query.current_part.navigation else query
499499
section = ctx.env.get("section", PathRoot)
500500
# If this is an aggregate query, the default sort needs to be changed
@@ -534,7 +534,7 @@ def rewrite_command_line(cmds: List[ExecutableCommand], ctx: CLIContext) -> List
534534
Rules:
535535
- add the list command if no output format is defined
536536
- add a format to write commands if no output format is defined
537-
- report benchmark run will be formatted as benchmark result automatically
537+
- report benchmark run will be formatted as a benchmark result automatically
538538
"""
539539
if ctx.env.get("no_rewrite") or len(cmds) == 0:
540540
return cmds

0 commit comments

Comments
 (0)