From 621162ac65582acaf5cea99addc556429bf38dd3 Mon Sep 17 00:00:00 2001 From: Zeke Sikelianos Date: Mon, 6 Oct 2025 13:48:00 -0700 Subject: [PATCH 1/6] feat: add deprecated replicate.stream() for v1 compatibility Add deprecated replicate.stream() method as a wrapper around replicate.use() with streaming=True to provide backwards compatibility with the v1 SDK. - Add stream() methods to Replicate and AsyncReplicate classes - Add module-level stream() function - Use @deprecated decorator with clear migration message - Add detailed deprecation warning with example code - Add comprehensive tests for sync, async, and deprecation warnings - Update README to recommend replicate.use() with streaming=True - Use anthropic/claude-4.5-sonnet in all examples Resolves: DP-671 --- README.md | 45 ++---- src/replicate/__init__.py | 3 +- src/replicate/_client.py | 66 ++++++++- src/replicate/_module_client.py | 14 ++ src/replicate/lib/_stream.py | 98 +++++++++++++ tests/lib/test_stream.py | 243 ++++++++++++++++++++++++++++++++ 6 files changed, 430 insertions(+), 39 deletions(-) create mode 100644 src/replicate/lib/_stream.py create mode 100644 tests/lib/test_stream.py diff --git a/README.md b/README.md index d4812d4..314defa 100644 --- a/README.md +++ b/README.md @@ -110,25 +110,24 @@ output = replicate.run("...", input={...}, wait=False) When `wait=False`, the method returns immediately after creating the prediction, and you'll need to poll for the result manually. -## Run a model and stream its output +## Streaming output -For models that support streaming (particularly language models), you can use `replicate.stream()`: +For models that support streaming (particularly language models), use `replicate.use()` with `streaming=True` to stream the output response as it's generated: ```python import replicate -for event in replicate.stream( - "meta/meta-llama-3-70b-instruct", - input={ - "prompt": "Please write a haiku about llamas.", - }, -): +claude = replicate.use("anthropic/claude-4.5-sonnet", streaming=True) + +for event in claude(input={"prompt": "Please write a haiku about streaming Python."}): print(str(event), end="") ``` +> **Note:** The [legacy `replicate.stream()` method](https://github.com/replicate/replicate-python/blob/d2956ff9c3e26ef434bc839cc5c87a50c49dfe20/README.md#run-a-model-and-stream-its-output) is also available for backwards compatibility with the v1 SDK, but is deprecated and will be removed in a future version. + ## Async usage -Simply import `AsyncReplicate` instead of `Replicate` and use `await` with each API call: +To use the Replicate client asynchronously, import `AsyncReplicate` instead of `Replicate` and use `await` with each API call: ```python import os @@ -152,34 +151,6 @@ asyncio.run(main()) Functionality between the synchronous and asynchronous clients is otherwise identical. -### Async run() and stream() - -The async client also supports `run()` and `stream()` methods: - -```python -import asyncio -from replicate import AsyncReplicate - -replicate = AsyncReplicate() - - -async def main(): - # Run a model - output = await replicate.run( - "black-forest-labs/flux-schnell", input={"prompt": "astronaut riding a rocket like a horse"} - ) - print(output) - - # Stream a model's output - async for event in replicate.stream( - "meta/meta-llama-3-70b-instruct", input={"prompt": "Write a haiku about coding"} - ): - print(str(event), end="") - - -asyncio.run(main()) -``` - ### With aiohttp By default, the async client uses `httpx` for HTTP requests. However, for improved concurrency performance you may also use `aiohttp` as the HTTP backend. diff --git a/src/replicate/__init__.py b/src/replicate/__init__.py index 1cfff56..2e2f286 100644 --- a/src/replicate/__init__.py +++ b/src/replicate/__init__.py @@ -109,7 +109,7 @@ if not __name.startswith("__"): try: # Skip symbols that are imported later from _module_client - if __name in ("run", "use"): + if __name in ("run", "use", "stream"): continue __locals[__name].__module__ = "replicate" except (TypeError, AttributeError): @@ -253,6 +253,7 @@ def _reset_client() -> None: # type: ignore[reportUnusedFunction] use as use, files as files, models as models, + stream as stream, account as account, hardware as hardware, webhooks as webhooks, diff --git a/src/replicate/_client.py b/src/replicate/_client.py index 390a552..4fc9a84 100644 --- a/src/replicate/_client.py +++ b/src/replicate/_client.py @@ -16,7 +16,7 @@ AsyncIterator, overload, ) -from typing_extensions import Self, Unpack, ParamSpec, override +from typing_extensions import Self, Unpack, ParamSpec, override, deprecated import httpx @@ -320,6 +320,38 @@ def use( # TODO: Fix mypy overload matching for streaming parameter return _use(self, ref, hint=hint, streaming=streaming) # type: ignore[call-overload, no-any-return] + @deprecated("replicate.stream() is deprecated. Use replicate.use() with streaming=True instead") + def stream( + self, + ref: str, + *, + input: dict[str, Any], + ) -> Iterator[str]: + """ + Run a model and stream its output (deprecated). + + .. deprecated:: + Use :meth:`use` with ``streaming=True`` instead: + + .. code-block:: python + + model = replicate.use("anthropic/claude-4.5-sonnet", streaming=True) + for event in model(input={"prompt": "Hello"}): + print(str(event), end="") + + Args: + ref: Reference to the model to run. Can be a string with owner/name format + (e.g., "anthropic/claude-4.5-sonnet") or owner/name:version format. + input: Dictionary of input parameters for the model. The required keys depend + on the specific model being run. + + Returns: + An iterator that yields output strings as they are generated by the model + """ + from .lib._stream import stream as _stream + + return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type] + def copy( self, *, @@ -695,6 +727,38 @@ def use( # TODO: Fix mypy overload matching for streaming parameter return _use(self, ref, hint=hint, streaming=streaming) # type: ignore[call-overload, no-any-return] + @deprecated("replicate.stream() is deprecated. Use replicate.use() with streaming=True instead") + async def stream( + self, + ref: str, + *, + input: dict[str, Any], + ) -> AsyncIterator[str]: + """ + Run a model and stream its output asynchronously (deprecated). + + .. deprecated:: + Use :meth:`use` with ``streaming=True`` instead: + + .. code-block:: python + + model = replicate.use("anthropic/claude-4.5-sonnet", streaming=True) + async for event in model(input={"prompt": "Hello"}): + print(str(event), end="") + + Args: + ref: Reference to the model to run. Can be a string with owner/name format + (e.g., "anthropic/claude-4.5-sonnet") or owner/name:version format. + input: Dictionary of input parameters for the model. The required keys depend + on the specific model being run. + + Returns: + An async iterator that yields output strings as they are generated by the model + """ + from .lib._stream import stream as _stream + + return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type] + def copy( self, *, diff --git a/src/replicate/_module_client.py b/src/replicate/_module_client.py index a3e8ab4..9fc0bed 100644 --- a/src/replicate/_module_client.py +++ b/src/replicate/_module_client.py @@ -82,6 +82,7 @@ def __load__(self) -> PredictionsResource: __client: Replicate = cast(Replicate, {}) run = __client.run use = __client.use + stream = __client.stream else: def _run(*args, **kwargs): @@ -100,8 +101,21 @@ def _use(ref, *, hint=None, streaming=False, use_async=False, **kwargs): return use(Replicate, ref, hint=hint, streaming=streaming, **kwargs) + def _stream(ref, *, input, use_async=False): + from .lib._stream import stream + + if use_async: + from ._client import AsyncReplicate + + return stream(AsyncReplicate, ref, input=input) + + from ._client import Replicate + + return stream(Replicate, ref, input=input) + run = _run use = _use + stream = _stream files: FilesResource = FilesResourceProxy().__as_proxied__() models: ModelsResource = ModelsResourceProxy().__as_proxied__() diff --git a/src/replicate/lib/_stream.py b/src/replicate/lib/_stream.py new file mode 100644 index 0000000..851eede --- /dev/null +++ b/src/replicate/lib/_stream.py @@ -0,0 +1,98 @@ +""" +Deprecated streaming functionality for backwards compatibility with v1 SDK. + +This module provides the stream() function which wraps replicate.use() with streaming=True. +""" + +from __future__ import annotations + +import warnings +from typing import Any, Dict, Type, Union, Iterator, AsyncIterator, overload +from typing_extensions import deprecated + +from .._client import Client, AsyncClient +from ._predictions_use import use + +__all__ = ["stream"] + + +def _format_deprecation_message(ref: str, input: Dict[str, Any]) -> str: + """Format the deprecation message with a working example.""" + # Format the input dict for display + input_str = "{\n" + for key, value in input.items(): + if isinstance(value, str): + input_str += f' "{key}": "{value}",\n' + else: + input_str += f' "{key}": {value},\n' + input_str += " }" + + return ( + f"replicate.stream() is deprecated and will be removed in a future version. " + f"Use replicate.use() with streaming=True instead:\n\n" + f' model = replicate.use("{ref}", streaming=True)\n' + f" for event in model(input={input_str}):\n" + f' print(str(event), end="")\n' + ) + + +@overload +def stream( + client: Type[Client], + ref: str, + *, + input: Dict[str, Any], +) -> Iterator[str]: ... + + +@overload +def stream( + client: Type[AsyncClient], + ref: str, + *, + input: Dict[str, Any], +) -> AsyncIterator[str]: ... + + +@deprecated("replicate.stream() is deprecated. Use replicate.use() with streaming=True instead") +def stream( + client: Union[Type[Client], Type[AsyncClient]], + ref: str, + *, + input: Dict[str, Any], +) -> Union[Iterator[str], AsyncIterator[str]]: + """ + Run a model and stream its output (deprecated). + + This function is deprecated. Use replicate.use() with streaming=True instead: + + model = replicate.use("anthropic/claude-4.5-sonnet", streaming=True) + for event in model(input={"prompt": "Hello"}): + print(str(event), end="") + + Args: + client: The Replicate client class (Client or AsyncClient) + ref: Reference to the model to run. Can be a string with owner/name format + (e.g., "anthropic/claude-4.5-sonnet") or owner/name:version format. + input: Dictionary of input parameters for the model. The required keys depend + on the specific model being run. + + Returns: + An iterator (or async iterator) that yields output strings as they are + generated by the model + + Raises: + DeprecationWarning: Always raised when this function is called + """ + # Log deprecation warning with helpful migration example + warnings.warn( + _format_deprecation_message(ref, input), + DeprecationWarning, + stacklevel=2, + ) + + # Use the existing use() function with streaming=True + model = use(client, ref, streaming=True) + + # Call the model with the input + return model(**input) # type: ignore[return-value] diff --git a/tests/lib/test_stream.py b/tests/lib/test_stream.py new file mode 100644 index 0000000..678b608 --- /dev/null +++ b/tests/lib/test_stream.py @@ -0,0 +1,243 @@ +"""Tests for the deprecated stream() function.""" + +from __future__ import annotations + +import warnings +from unittest.mock import Mock, patch + +import pytest + +import replicate +from replicate import Replicate, AsyncReplicate + + +def test_stream_shows_deprecation_warning(): + """Test that stream() shows a deprecation warning.""" + with patch("replicate.lib._predictions_use.use") as mock_use: + # Create a mock function that returns an iterator + mock_function = Mock() + mock_function.return_value = iter(["Hello", " ", "world"]) + mock_use.return_value = mock_function + + # Call stream and capture warnings + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + + client = Replicate(bearer_token="test-token") + result = list( + client.stream( + "anthropic/claude-4.5-sonnet", + input={"prompt": "Hello"}, + ) + ) + + # Check that deprecation warnings were raised + assert len(w) > 0 + deprecation_warnings = [warning for warning in w if issubclass(warning.category, DeprecationWarning)] + assert len(deprecation_warnings) > 0 + + # Verify the @deprecated decorator message appears + # (there may be multiple warnings from different decorator levels) + messages = [str(warning.message) for warning in deprecation_warnings] + expected_message = ( + "replicate.stream() is deprecated. " + "Use replicate.use() with streaming=True instead" + ) + assert expected_message in messages + + +def test_stream_calls_use_with_streaming_true(): + """Test that stream() internally calls use() with streaming=True.""" + with patch("replicate.lib._predictions_use.use") as mock_use: + # Create a mock function that returns an iterator + mock_function = Mock() + mock_function.return_value = iter(["Hello", " ", "world"]) + mock_use.return_value = mock_function + + client = Replicate(bearer_token="test-token") + + # Suppress deprecation warnings for this test + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + result = list( + client.stream( + "anthropic/claude-4.5-sonnet", + input={"prompt": "Hello"}, + ) + ) + + # Verify use() was called with streaming=True + mock_use.assert_called_once() + call_args = mock_use.call_args + assert call_args.kwargs["streaming"] is True + assert call_args.args[1] == "anthropic/claude-4.5-sonnet" + + # Verify the mock function was called with the input + mock_function.assert_called_once_with(prompt="Hello") + + +def test_stream_returns_iterator(): + """Test that stream() returns an iterator of strings.""" + with patch("replicate.lib._predictions_use.use") as mock_use: + # Create a mock function that returns an iterator + mock_function = Mock() + mock_function.return_value = iter(["Hello", " ", "world", "!"]) + mock_use.return_value = mock_function + + client = Replicate(bearer_token="test-token") + + # Suppress deprecation warnings for this test + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + result = client.stream( + "anthropic/claude-4.5-sonnet", + input={"prompt": "Say hello"}, + ) + + # Verify we get an iterator + assert hasattr(result, "__iter__") + + # Verify the content + output = list(result) + assert output == ["Hello", " ", "world", "!"] + + +def test_stream_works_same_as_use_with_streaming(): + """Test that stream() produces the same output as use() with streaming=True.""" + with patch("replicate.lib._predictions_use.use") as mock_use: + # Create a mock function that returns an iterator + mock_function = Mock() + expected_output = ["Test", " ", "output"] + mock_function.return_value = iter(expected_output.copy()) + mock_use.return_value = mock_function + + client = Replicate(bearer_token="test-token") + + # Get output from stream() + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + stream_output = list( + client.stream( + "test-model", + input={"prompt": "test"}, + ) + ) + + # Reset the mock + mock_function.return_value = iter(expected_output.copy()) + + # Get output from use() with streaming=True + model = client.use("test-model", streaming=True) + use_output = list(model(prompt="test")) + + # Verify they produce the same output + assert stream_output == use_output + + +def test_module_level_stream_function(): + """Test that the module-level replicate.stream() function works.""" + with patch("replicate.lib._predictions_use.use") as mock_use: + # Create a mock function that returns an iterator + mock_function = Mock() + mock_function.return_value = iter(["a", "b", "c"]) + mock_use.return_value = mock_function + + # Suppress deprecation warnings for this test + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + result = list( + replicate.stream( + "test-model", + input={"prompt": "test"}, + ) + ) + + # Verify we got the expected output + assert result == ["a", "b", "c"] + + # Verify use() was called with streaming=True + mock_use.assert_called_once() + assert mock_use.call_args.kwargs["streaming"] is True + + +@pytest.mark.asyncio +async def test_async_stream_shows_deprecation_warning(): + """Test that async stream() shows a deprecation warning.""" + with patch("replicate.lib._predictions_use.use") as mock_use: + # Create a mock async function that returns an async iterator + async def async_gen(): + yield "Hello" + yield " " + yield "world" + + mock_function = Mock() + mock_function.return_value = async_gen() + mock_use.return_value = mock_function + + # Call stream and capture warnings + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + + client = AsyncReplicate(bearer_token="test-token") + result = [] + async for item in await client.stream( + "anthropic/claude-4.5-sonnet", + input={"prompt": "Hello"}, + ): + result.append(item) + + # Check that deprecation warnings were raised + assert len(w) > 0 + deprecation_warnings = [warning for warning in w if issubclass(warning.category, DeprecationWarning)] + assert len(deprecation_warnings) > 0 + + # Verify the @deprecated decorator message appears + messages = [str(warning.message) for warning in deprecation_warnings] + expected_message = ( + "replicate.stream() is deprecated. " + "Use replicate.use() with streaming=True instead" + ) + assert expected_message in messages + + +def test_deprecation_message_includes_example(): + """Test that the detailed deprecation message includes a helpful example.""" + with patch("replicate.lib._predictions_use.use") as mock_use: + mock_function = Mock() + mock_function.return_value = iter([]) + mock_use.return_value = mock_function + + client = Replicate(bearer_token="test-token") + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + list( + client.stream( + "anthropic/claude-4.5-sonnet", + input={"prompt": "Hello", "max_tokens": 100}, + ) + ) + + # Find the detailed warning from _format_deprecation_message + # (should be one of the warnings, as there are multiple deprecation warnings) + detailed_message = None + for warning in w: + msg = str(warning.message) + if "will be removed in a future version" in msg: + detailed_message = msg + break + + assert detailed_message is not None, "Expected detailed deprecation message not found" + + # Verify the complete detailed message format + expected_message = ( + "replicate.stream() is deprecated and will be removed in a future version. " + "Use replicate.use() with streaming=True instead:\n\n" + ' model = replicate.use("anthropic/claude-4.5-sonnet", streaming=True)\n' + " for event in model(input={\n" + ' "prompt": "Hello",\n' + ' "max_tokens": 100,\n' + " }):\n" + ' print(str(event), end="")\n' + ) + assert detailed_message == expected_message From fee09ac69367cdea7673f709c0e2511a84c437f0 Mon Sep 17 00:00:00 2001 From: Zeke Sikelianos Date: Mon, 6 Oct 2025 13:51:25 -0700 Subject: [PATCH 2/6] fix: add pyright ignores for deprecated function usage in tests and internal calls --- src/replicate/__init__.py | 2 +- src/replicate/_client.py | 4 +-- src/replicate/_module_client.py | 2 +- src/replicate/lib/_stream.py | 2 +- tests/lib/test_stream.py | 44 ++++++++++++++------------------- 5 files changed, 24 insertions(+), 30 deletions(-) diff --git a/src/replicate/__init__.py b/src/replicate/__init__.py index 2e2f286..e91608a 100644 --- a/src/replicate/__init__.py +++ b/src/replicate/__init__.py @@ -253,7 +253,7 @@ def _reset_client() -> None: # type: ignore[reportUnusedFunction] use as use, files as files, models as models, - stream as stream, + stream as stream, # pyright: ignore[reportDeprecated] account as account, hardware as hardware, webhooks as webhooks, diff --git a/src/replicate/_client.py b/src/replicate/_client.py index 4fc9a84..ba1bf8f 100644 --- a/src/replicate/_client.py +++ b/src/replicate/_client.py @@ -350,7 +350,7 @@ def stream( """ from .lib._stream import stream as _stream - return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type] + return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type] # pyright: ignore[reportDeprecated] def copy( self, @@ -757,7 +757,7 @@ async def stream( """ from .lib._stream import stream as _stream - return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type] + return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type] # pyright: ignore[reportDeprecated] def copy( self, diff --git a/src/replicate/_module_client.py b/src/replicate/_module_client.py index 9fc0bed..0f19a61 100644 --- a/src/replicate/_module_client.py +++ b/src/replicate/_module_client.py @@ -82,7 +82,7 @@ def __load__(self) -> PredictionsResource: __client: Replicate = cast(Replicate, {}) run = __client.run use = __client.use - stream = __client.stream + stream = __client.stream # pyright: ignore[reportDeprecated] else: def _run(*args, **kwargs): diff --git a/src/replicate/lib/_stream.py b/src/replicate/lib/_stream.py index 851eede..8c75c8e 100644 --- a/src/replicate/lib/_stream.py +++ b/src/replicate/lib/_stream.py @@ -92,7 +92,7 @@ def stream( ) # Use the existing use() function with streaming=True - model = use(client, ref, streaming=True) + model = use(client, ref, streaming=True) # pyright: ignore[reportUnknownVariableType] # Call the model with the input return model(**input) # type: ignore[return-value] diff --git a/tests/lib/test_stream.py b/tests/lib/test_stream.py index 678b608..acc1fc5 100644 --- a/tests/lib/test_stream.py +++ b/tests/lib/test_stream.py @@ -13,7 +13,7 @@ def test_stream_shows_deprecation_warning(): """Test that stream() shows a deprecation warning.""" - with patch("replicate.lib._predictions_use.use") as mock_use: + with patch("replicate.lib._stream.use") as mock_use: # Create a mock function that returns an iterator mock_function = Mock() mock_function.return_value = iter(["Hello", " ", "world"]) @@ -24,8 +24,8 @@ def test_stream_shows_deprecation_warning(): warnings.simplefilter("always") client = Replicate(bearer_token="test-token") - result = list( - client.stream( + _ = list( # pyright: ignore[reportDeprecated] + client.stream( # pyright: ignore[reportDeprecated] "anthropic/claude-4.5-sonnet", input={"prompt": "Hello"}, ) @@ -39,16 +39,13 @@ def test_stream_shows_deprecation_warning(): # Verify the @deprecated decorator message appears # (there may be multiple warnings from different decorator levels) messages = [str(warning.message) for warning in deprecation_warnings] - expected_message = ( - "replicate.stream() is deprecated. " - "Use replicate.use() with streaming=True instead" - ) + expected_message = "replicate.stream() is deprecated. Use replicate.use() with streaming=True instead" assert expected_message in messages def test_stream_calls_use_with_streaming_true(): """Test that stream() internally calls use() with streaming=True.""" - with patch("replicate.lib._predictions_use.use") as mock_use: + with patch("replicate.lib._stream.use") as mock_use: # Create a mock function that returns an iterator mock_function = Mock() mock_function.return_value = iter(["Hello", " ", "world"]) @@ -59,8 +56,8 @@ def test_stream_calls_use_with_streaming_true(): # Suppress deprecation warnings for this test with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) - result = list( - client.stream( + _ = list( # pyright: ignore[reportDeprecated] + client.stream( # pyright: ignore[reportDeprecated] "anthropic/claude-4.5-sonnet", input={"prompt": "Hello"}, ) @@ -70,7 +67,7 @@ def test_stream_calls_use_with_streaming_true(): mock_use.assert_called_once() call_args = mock_use.call_args assert call_args.kwargs["streaming"] is True - assert call_args.args[1] == "anthropic/claude-4.5-sonnet" + assert call_args.args[0] == "anthropic/claude-4.5-sonnet" # Verify the mock function was called with the input mock_function.assert_called_once_with(prompt="Hello") @@ -78,7 +75,7 @@ def test_stream_calls_use_with_streaming_true(): def test_stream_returns_iterator(): """Test that stream() returns an iterator of strings.""" - with patch("replicate.lib._predictions_use.use") as mock_use: + with patch("replicate.lib._stream.use") as mock_use: # Create a mock function that returns an iterator mock_function = Mock() mock_function.return_value = iter(["Hello", " ", "world", "!"]) @@ -89,7 +86,7 @@ def test_stream_returns_iterator(): # Suppress deprecation warnings for this test with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) - result = client.stream( + result = client.stream( # pyright: ignore[reportDeprecated] "anthropic/claude-4.5-sonnet", input={"prompt": "Say hello"}, ) @@ -104,7 +101,7 @@ def test_stream_returns_iterator(): def test_stream_works_same_as_use_with_streaming(): """Test that stream() produces the same output as use() with streaming=True.""" - with patch("replicate.lib._predictions_use.use") as mock_use: + with patch("replicate.lib._stream.use") as mock_use: # Create a mock function that returns an iterator mock_function = Mock() expected_output = ["Test", " ", "output"] @@ -117,7 +114,7 @@ def test_stream_works_same_as_use_with_streaming(): with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) stream_output = list( - client.stream( + client.stream( # pyright: ignore[reportDeprecated] "test-model", input={"prompt": "test"}, ) @@ -136,7 +133,7 @@ def test_stream_works_same_as_use_with_streaming(): def test_module_level_stream_function(): """Test that the module-level replicate.stream() function works.""" - with patch("replicate.lib._predictions_use.use") as mock_use: + with patch("replicate.lib._stream.use") as mock_use: # Create a mock function that returns an iterator mock_function = Mock() mock_function.return_value = iter(["a", "b", "c"]) @@ -146,7 +143,7 @@ def test_module_level_stream_function(): with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) result = list( - replicate.stream( + replicate.stream( # pyright: ignore[reportDeprecated] "test-model", input={"prompt": "test"}, ) @@ -163,7 +160,7 @@ def test_module_level_stream_function(): @pytest.mark.asyncio async def test_async_stream_shows_deprecation_warning(): """Test that async stream() shows a deprecation warning.""" - with patch("replicate.lib._predictions_use.use") as mock_use: + with patch("replicate.lib._stream.use") as mock_use: # Create a mock async function that returns an async iterator async def async_gen(): yield "Hello" @@ -180,7 +177,7 @@ async def async_gen(): client = AsyncReplicate(bearer_token="test-token") result = [] - async for item in await client.stream( + async for item in await client.stream( # pyright: ignore[reportDeprecated] "anthropic/claude-4.5-sonnet", input={"prompt": "Hello"}, ): @@ -193,16 +190,13 @@ async def async_gen(): # Verify the @deprecated decorator message appears messages = [str(warning.message) for warning in deprecation_warnings] - expected_message = ( - "replicate.stream() is deprecated. " - "Use replicate.use() with streaming=True instead" - ) + expected_message = "replicate.stream() is deprecated. Use replicate.use() with streaming=True instead" assert expected_message in messages def test_deprecation_message_includes_example(): """Test that the detailed deprecation message includes a helpful example.""" - with patch("replicate.lib._predictions_use.use") as mock_use: + with patch("replicate.lib._stream.use") as mock_use: mock_function = Mock() mock_function.return_value = iter([]) mock_use.return_value = mock_function @@ -212,7 +206,7 @@ def test_deprecation_message_includes_example(): with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") list( - client.stream( + client.stream( # pyright: ignore[reportDeprecated] "anthropic/claude-4.5-sonnet", input={"prompt": "Hello", "max_tokens": 100}, ) From 8fadecbd71dd16b67d142f071aac3805b426750d Mon Sep 17 00:00:00 2001 From: Zeke Sikelianos Date: Mon, 6 Oct 2025 13:55:32 -0700 Subject: [PATCH 3/6] fix: correct test assertions and add missing pyright ignore --- src/replicate/_client.py | 4 ++-- tests/lib/test_stream.py | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/replicate/_client.py b/src/replicate/_client.py index ba1bf8f..14b6d9b 100644 --- a/src/replicate/_client.py +++ b/src/replicate/_client.py @@ -350,7 +350,7 @@ def stream( """ from .lib._stream import stream as _stream - return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type] # pyright: ignore[reportDeprecated] + return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type] # pyright: ignore[reportDeprecated, reportGeneralTypeIssues] def copy( self, @@ -757,7 +757,7 @@ async def stream( """ from .lib._stream import stream as _stream - return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type] # pyright: ignore[reportDeprecated] + return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type] # pyright: ignore[reportDeprecated, reportGeneralTypeIssues] def copy( self, diff --git a/tests/lib/test_stream.py b/tests/lib/test_stream.py index acc1fc5..d19ab17 100644 --- a/tests/lib/test_stream.py +++ b/tests/lib/test_stream.py @@ -67,7 +67,7 @@ def test_stream_calls_use_with_streaming_true(): mock_use.assert_called_once() call_args = mock_use.call_args assert call_args.kwargs["streaming"] is True - assert call_args.args[0] == "anthropic/claude-4.5-sonnet" + assert call_args.args[1] == "anthropic/claude-4.5-sonnet" # Verify the mock function was called with the input mock_function.assert_called_once_with(prompt="Hello") @@ -101,12 +101,14 @@ def test_stream_returns_iterator(): def test_stream_works_same_as_use_with_streaming(): """Test that stream() produces the same output as use() with streaming=True.""" - with patch("replicate.lib._stream.use") as mock_use: + with patch("replicate.lib._stream.use") as mock_stream_use, \ + patch("replicate.lib._predictions_use.use") as mock_predictions_use: # Create a mock function that returns an iterator mock_function = Mock() expected_output = ["Test", " ", "output"] mock_function.return_value = iter(expected_output.copy()) - mock_use.return_value = mock_function + mock_stream_use.return_value = mock_function + mock_predictions_use.return_value = mock_function client = Replicate(bearer_token="test-token") From 019c4d699b5c162fcc5f65564dce0c324a546b22 Mon Sep 17 00:00:00 2001 From: Zeke Sikelianos Date: Mon, 6 Oct 2025 13:58:16 -0700 Subject: [PATCH 4/6] fix: add pyright ignores for deprecated imports and unknown types --- src/replicate/_client.py | 8 ++++---- tests/lib/test_stream.py | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/replicate/_client.py b/src/replicate/_client.py index 14b6d9b..b7bdb01 100644 --- a/src/replicate/_client.py +++ b/src/replicate/_client.py @@ -348,9 +348,9 @@ def stream( Returns: An iterator that yields output strings as they are generated by the model """ - from .lib._stream import stream as _stream + from .lib._stream import stream as _stream # pyright: ignore[reportDeprecated] - return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type] # pyright: ignore[reportDeprecated, reportGeneralTypeIssues] + return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type] def copy( self, @@ -755,9 +755,9 @@ async def stream( Returns: An async iterator that yields output strings as they are generated by the model """ - from .lib._stream import stream as _stream + from .lib._stream import stream as _stream # pyright: ignore[reportDeprecated] - return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type] # pyright: ignore[reportDeprecated, reportGeneralTypeIssues] + return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type] def copy( self, diff --git a/tests/lib/test_stream.py b/tests/lib/test_stream.py index d19ab17..5317b1a 100644 --- a/tests/lib/test_stream.py +++ b/tests/lib/test_stream.py @@ -126,8 +126,8 @@ def test_stream_works_same_as_use_with_streaming(): mock_function.return_value = iter(expected_output.copy()) # Get output from use() with streaming=True - model = client.use("test-model", streaming=True) - use_output = list(model(prompt="test")) + model = client.use("test-model", streaming=True) # pyright: ignore[reportUnknownVariableType] + use_output = list(model(prompt="test")) # pyright: ignore[reportUnknownVariableType, reportUnknownArgumentType] # Verify they produce the same output assert stream_output == use_output @@ -183,7 +183,7 @@ async def async_gen(): "anthropic/claude-4.5-sonnet", input={"prompt": "Hello"}, ): - result.append(item) + result.append(item) # pyright: ignore[reportUnknownMemberType] # Check that deprecation warnings were raised assert len(w) > 0 From 9b9d6b9c770eb5fd0e5cbc27c61d73261beef56d Mon Sep 17 00:00:00 2001 From: Zeke Sikelianos Date: Mon, 6 Oct 2025 14:00:51 -0700 Subject: [PATCH 5/6] fix: add mypy type ignore for var-annotated --- src/replicate/lib/_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replicate/lib/_stream.py b/src/replicate/lib/_stream.py index 8c75c8e..4044294 100644 --- a/src/replicate/lib/_stream.py +++ b/src/replicate/lib/_stream.py @@ -92,7 +92,7 @@ def stream( ) # Use the existing use() function with streaming=True - model = use(client, ref, streaming=True) # pyright: ignore[reportUnknownVariableType] + model = use(client, ref, streaming=True) # type: ignore[var-annotated] # pyright: ignore[reportUnknownVariableType] # Call the model with the input return model(**input) # type: ignore[return-value] From 1f036c821922d1dcc7eeae4c657d93d89bc383a9 Mon Sep 17 00:00:00 2001 From: Zeke Sikelianos Date: Fri, 10 Oct 2025 09:27:48 -0700 Subject: [PATCH 6/6] docs: remove incorrect Raises section from stream() docstring The stream() function issues a deprecation warning via warnings.warn() but doesn't raise an exception, so the Raises section was incorrect. --- src/replicate/lib/_stream.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/replicate/lib/_stream.py b/src/replicate/lib/_stream.py index 4044294..852c2fd 100644 --- a/src/replicate/lib/_stream.py +++ b/src/replicate/lib/_stream.py @@ -80,9 +80,6 @@ def stream( Returns: An iterator (or async iterator) that yields output strings as they are generated by the model - - Raises: - DeprecationWarning: Always raised when this function is called """ # Log deprecation warning with helpful migration example warnings.warn(