Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 16 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,24 @@ output = replicate.run("...", input={...}, wait=False)

When `wait=False`, the method returns immediately after creating the prediction, and you'll need to poll for the result manually.

## Streaming output

For models that support streaming (particularly language models), use `replicate.use()` with `streaming=True` to stream the output response as it's generated:

```python
import replicate

claude = replicate.use("anthropic/claude-4.5-sonnet", streaming=True)

for event in claude(input={"prompt": "Please write a haiku about streaming Python."}):
print(str(event), end="")
```

> **Note:** The [legacy `replicate.stream()` method](https://github.com/replicate/replicate-python/blob/d2956ff9c3e26ef434bc839cc5c87a50c49dfe20/README.md#run-a-model-and-stream-its-output) is also available for backwards compatibility with the v1 SDK, but is deprecated and will be removed in a future version.

## Async usage

Simply import `AsyncReplicate` instead of `Replicate` and use `await` with each API call:
To use the Replicate client asynchronously, import `AsyncReplicate` instead of `Replicate` and use `await` with each API call:

```python
import os
Expand All @@ -136,34 +151,6 @@ asyncio.run(main())

Functionality between the synchronous and asynchronous clients is otherwise identical.

### Async run() and stream()

The async client also supports `run()` and `stream()` methods:

```python
import asyncio
from replicate import AsyncReplicate

replicate = AsyncReplicate()


async def main():
# Run a model
output = await replicate.run(
"black-forest-labs/flux-schnell", input={"prompt": "astronaut riding a rocket like a horse"}
)
print(output)

# Stream a model's output
async for event in replicate.stream(
"meta/meta-llama-3-70b-instruct", input={"prompt": "Write a haiku about coding"}
):
print(str(event), end="")


asyncio.run(main())
```

### With aiohttp

By default, the async client uses `httpx` for HTTP requests. However, for improved concurrency performance you may also use `aiohttp` as the HTTP backend.
Expand Down
3 changes: 2 additions & 1 deletion src/replicate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
if not __name.startswith("__"):
try:
# Skip symbols that are imported later from _module_client
if __name in ("run", "use"):
if __name in ("run", "use", "stream"):
continue
__locals[__name].__module__ = "replicate"
except (TypeError, AttributeError):
Expand Down Expand Up @@ -253,6 +253,7 @@ def _reset_client() -> None: # type: ignore[reportUnusedFunction]
use as use,
files as files,
models as models,
stream as stream, # pyright: ignore[reportDeprecated]
account as account,
hardware as hardware,
webhooks as webhooks,
Expand Down
66 changes: 65 additions & 1 deletion src/replicate/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
AsyncIterator,
overload,
)
from typing_extensions import Self, Unpack, ParamSpec, override
from typing_extensions import Self, Unpack, ParamSpec, override, deprecated

import httpx

Expand Down Expand Up @@ -320,6 +320,38 @@ def use(
# TODO: Fix mypy overload matching for streaming parameter
return _use(self, ref, hint=hint, streaming=streaming) # type: ignore[call-overload, no-any-return]

@deprecated("replicate.stream() is deprecated. Use replicate.use() with streaming=True instead")
def stream(
self,
ref: str,
*,
input: dict[str, Any],
) -> Iterator[str]:
"""
Run a model and stream its output (deprecated).

.. deprecated::
Use :meth:`use` with ``streaming=True`` instead:

.. code-block:: python

model = replicate.use("anthropic/claude-4.5-sonnet", streaming=True)
for event in model(input={"prompt": "Hello"}):
print(str(event), end="")

Args:
ref: Reference to the model to run. Can be a string with owner/name format
(e.g., "anthropic/claude-4.5-sonnet") or owner/name:version format.
input: Dictionary of input parameters for the model. The required keys depend
on the specific model being run.

Returns:
An iterator that yields output strings as they are generated by the model
"""
from .lib._stream import stream as _stream # pyright: ignore[reportDeprecated]

return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type]

def copy(
self,
*,
Expand Down Expand Up @@ -695,6 +727,38 @@ def use(
# TODO: Fix mypy overload matching for streaming parameter
return _use(self, ref, hint=hint, streaming=streaming) # type: ignore[call-overload, no-any-return]

@deprecated("replicate.stream() is deprecated. Use replicate.use() with streaming=True instead")
async def stream(
self,
ref: str,
*,
input: dict[str, Any],
) -> AsyncIterator[str]:
"""
Run a model and stream its output asynchronously (deprecated).

.. deprecated::
Use :meth:`use` with ``streaming=True`` instead:

.. code-block:: python

model = replicate.use("anthropic/claude-4.5-sonnet", streaming=True)
async for event in model(input={"prompt": "Hello"}):
print(str(event), end="")

Args:
ref: Reference to the model to run. Can be a string with owner/name format
(e.g., "anthropic/claude-4.5-sonnet") or owner/name:version format.
input: Dictionary of input parameters for the model. The required keys depend
on the specific model being run.

Returns:
An async iterator that yields output strings as they are generated by the model
"""
from .lib._stream import stream as _stream # pyright: ignore[reportDeprecated]

return _stream(type(self), ref, input=input) # type: ignore[return-value, arg-type]

def copy(
self,
*,
Expand Down
14 changes: 14 additions & 0 deletions src/replicate/_module_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def __load__(self) -> PredictionsResource:
__client: Replicate = cast(Replicate, {})
run = __client.run
use = __client.use
stream = __client.stream # pyright: ignore[reportDeprecated]
else:

def _run(*args, **kwargs):
Expand All @@ -100,8 +101,21 @@ def _use(ref, *, hint=None, streaming=False, use_async=False, **kwargs):

return use(Replicate, ref, hint=hint, streaming=streaming, **kwargs)

def _stream(ref, *, input, use_async=False):
from .lib._stream import stream

if use_async:
from ._client import AsyncReplicate

return stream(AsyncReplicate, ref, input=input)

from ._client import Replicate

return stream(Replicate, ref, input=input)

run = _run
use = _use
stream = _stream

files: FilesResource = FilesResourceProxy().__as_proxied__()
models: ModelsResource = ModelsResourceProxy().__as_proxied__()
Expand Down
95 changes: 95 additions & 0 deletions src/replicate/lib/_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
Deprecated streaming functionality for backwards compatibility with v1 SDK.

This module provides the stream() function which wraps replicate.use() with streaming=True.
"""

from __future__ import annotations

import warnings
from typing import Any, Dict, Type, Union, Iterator, AsyncIterator, overload
from typing_extensions import deprecated

from .._client import Client, AsyncClient
from ._predictions_use import use

__all__ = ["stream"]


def _format_deprecation_message(ref: str, input: Dict[str, Any]) -> str:
"""Format the deprecation message with a working example."""
# Format the input dict for display
input_str = "{\n"
for key, value in input.items():
if isinstance(value, str):
input_str += f' "{key}": "{value}",\n'
else:
input_str += f' "{key}": {value},\n'
input_str += " }"

return (
f"replicate.stream() is deprecated and will be removed in a future version. "
f"Use replicate.use() with streaming=True instead:\n\n"
f' model = replicate.use("{ref}", streaming=True)\n'
f" for event in model(input={input_str}):\n"
f' print(str(event), end="")\n'
)


@overload
def stream(
client: Type[Client],
ref: str,
*,
input: Dict[str, Any],
) -> Iterator[str]: ...


@overload
def stream(
client: Type[AsyncClient],
ref: str,
*,
input: Dict[str, Any],
) -> AsyncIterator[str]: ...


@deprecated("replicate.stream() is deprecated. Use replicate.use() with streaming=True instead")
def stream(
client: Union[Type[Client], Type[AsyncClient]],
ref: str,
*,
input: Dict[str, Any],
) -> Union[Iterator[str], AsyncIterator[str]]:
"""
Run a model and stream its output (deprecated).

This function is deprecated. Use replicate.use() with streaming=True instead:

model = replicate.use("anthropic/claude-4.5-sonnet", streaming=True)
for event in model(input={"prompt": "Hello"}):
print(str(event), end="")

Args:
client: The Replicate client class (Client or AsyncClient)
ref: Reference to the model to run. Can be a string with owner/name format
(e.g., "anthropic/claude-4.5-sonnet") or owner/name:version format.
input: Dictionary of input parameters for the model. The required keys depend
on the specific model being run.

Returns:
An iterator (or async iterator) that yields output strings as they are
generated by the model
"""
# Log deprecation warning with helpful migration example
warnings.warn(
_format_deprecation_message(ref, input),
DeprecationWarning,
stacklevel=2,
)

# Use the existing use() function with streaming=True
model = use(client, ref, streaming=True) # type: ignore[var-annotated] # pyright: ignore[reportUnknownVariableType]

# Call the model with the input
return model(**input) # type: ignore[return-value]
Loading