Skip to content

Consolidate __init__, _build_stream_metadata, _parse_output into base Stream class #169

@Kamilbenkirane

Description

@Kamilbenkirane

Problem

After #164 consolidated _aggregate_usage, _aggregate_finish_reason, _aggregate_event_data, and _parse_chunk into the base Stream class (~370 lines removed), three methods remain identically duplicated across all modality streams (TextStream, ImagesStream, AudioStream):

  1. __init__ — stores _transform_output and _client (identical in all 3)
  2. _build_stream_metadata — adds model/provider/modality from _client (identical in all 3)
  3. _parse_output — aggregates content → transforms → builds Output object (near-identical in all 3, only Output class differs)

Estimated savings: ~80 lines across 6 files with zero behavioral changes.

Proposed Changes

1. Move _transform_output and _stream_metadata to base Stream.__init__()

File: src/celeste/streaming.py

Add transform_output: Callable[..., Any] | None = None and stream_metadata: dict[str, Any] | None = None params to Stream.__init__(). Make both | None = None so test helpers (ConcreteStream, TypedStream) continue to work unchanged.

Instead of passing the entire client object, pass only the metadata the stream needs:

# ModalityClient._stream() call site
return stream_class(
    sse_iterator,
    transform_output=self._transform_output,
    stream_metadata={"model": self.model.id, "provider": self.provider, "modality": self.modality},
    **parameters,
)

This eliminates the _client reference entirely — the stream only receives the data it actually uses. No circular import, no Any, no TYPE_CHECKING.

Remove the identical __init__ overrides from:

  • src/celeste/modalities/text/streaming.py
  • src/celeste/modalities/images/streaming.py
  • src/celeste/modalities/audio/streaming.py

2. Consolidate _build_stream_metadata() into base Stream

File: src/celeste/streaming.py

Replace the minimal base implementation:

def _build_stream_metadata(self, raw_events: list[dict[str, Any]]) -> dict[str, Any]:
    return {**self._stream_metadata, "raw_events": raw_events}

Remove the identical overrides from all 3 modality streams.

MRO safety: 8 provider stream mixins override _build_stream_metadata to filter events, then call super()._build_stream_metadata(filtered). Currently resolves: ProviderMixin → ModalityStream → base Stream. After removing the modality override, it resolves: ProviderMixin → base Stream directly. Behavior is identical because the modality override and the new base implementation produce the same output.

3. Pull _parse_output() into base Stream

File: src/celeste/streaming.py

Add _output_class: ClassVar[type[Output]] and abstract _aggregate_content() to base Stream. Change _parse_output from @abstractmethod to a concrete default:

def _parse_output(self, chunks, **parameters):
    raw_content = self._aggregate_content(chunks)
    content = self._transform_output(raw_content, **parameters) if self._transform_output else raw_content
    raw_events = self._aggregate_event_data(chunks)
    return self._output_class(
        content=content,
        usage=self._aggregate_usage(chunks),
        finish_reason=self._aggregate_finish_reason(chunks),
        metadata=self._build_stream_metadata(raw_events),
    )

Remove _parse_output from all 3 modality streams. Add _output_class ClassVar to each:

  • TextStream._output_class = TextOutput
  • ImagesStream._output_class = ImageOutput
  • AudioStream._output_class = AudioOutput

4. Update ModalityClient._stream() call site

File: src/celeste/client.py

Change from passing client=self to passing stream_metadata=dict:

return stream_class(
    sse_iterator,
    transform_output=self._transform_output,
    stream_metadata={"model": self.model.id, "provider": self.provider, "modality": self.modality},
    **parameters,
)

5. Update template and tests

  • templates/modalities/{modality_slug}/streaming.py.template — slim to ClassVars + _aggregate_content only
  • tests/unit_tests/test_streaming.py — update abstract method assertions (_aggregate_content replaces _parse_output as the required abstract method)

Result

Each modality stream reduces to ~10-15 lines (ClassVars + _aggregate_content):

class TextStream(Stream[TextOutput, TextParameters, TextChunk]):
    _usage_class = TextUsage
    _finish_reason_class = TextFinishReason
    _chunk_class = TextChunk
    _output_class = TextOutput
    _empty_content = ""

    def _aggregate_content(self, chunks: list[TextChunk]) -> str:
        return "".join(chunk.content for chunk in chunks)

Design Decisions

  • stream_metadata dict instead of client reference — the stream only uses _client for 3 metadata values (.model.id, .provider, .modality). Passing a dict eliminates the circular import between celeste.streaming and celeste.client without resorting to Any or TYPE_CHECKING
  • | None = None for transform_output and stream_metadata — keeps base Stream usable in tests without requiring a full client setup
  • _output_class as ClassVar — consistent with existing pattern (_usage_class, _finish_reason_class, _chunk_class, _empty_content)
  • Defensive empty-chunks guard in base _parse_outputImagesStream had this; base __anext__ already prevents it, but included for safety

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions