This is the repo for Replicate's official v2 Python SDK, which provides access to Replicate's HTTP API from any Python 3.8+ application.
🤔 Looking for the legacy v1 Python client? Find it here.
- v2 beta release notes: https://github.com/replicate/replicate-python-beta/releases/tag/v2.0.0-beta.1
- v2 beta migration guide: https://github.com/replicate/replicate-python-beta/blob/main/UPGRADING.md
- v2 beta SDK reference: https://sdks.replicate.com/python
- v2 beta GitHub discussion: #89
- HTTP API reference: https://replicate.com/docs/reference/http
The replicate package is available on PyPI. Install it with pip (using the --pre flag to get the latest beta version):
pip install --pre replicateStart by getting a Replicate API token, then set it as REPLICATE_API_TOKEN in your environment:
export REPLICATE_API_TOKEN="r8_..."Then in your Python code, import the library and use it:
import replicate
claude = replicate.use("anthropic/claude-4.5-sonnet")
seedream = replicate.use("bytedance/seedream-4")
veo = replicate.use("google/veo-3-fast")
# Enhance a simple prompt
image_prompt = claude(prompt="bananas wearing cowboy hats", system_prompt="turn prompts into image prompts")
# Generate an image from the enhanced prompt
images = seedream(prompt=image_prompt)
# Generate a video from the image
video = veo(prompt="dancing bananas", image_input=images[0])
open(video)The library uses the REPLICATE_API_TOKEN environment variable by default to implicitly initialize a client, but you can also initialize a client explicitly and set the bearer_token parameter:
import os
from replicate import Replicate
client = Replicate(
bearer_token=os.environ.get("REPLICATE_API_TOKEN")
)The use() method provides a more concise way to call Replicate models as functions, offering a more pythonic approach to running models:
import replicate
# Create a model function
flux_dev = replicate.use("black-forest-labs/flux-dev")
# Call it like a regular Python function
outputs = flux_dev(
prompt="a cat wearing a wizard hat, digital art",
num_outputs=1,
aspect_ratio="1:1",
output_format="webp",
)
# outputs is a list of URLPath objects that auto-download when accessed
for output in outputs:
print(output) # e.g., Path(/tmp/a1b2c3/output.webp)Many models, particularly language models, support streaming output. Use the streaming=True parameter to get results as they're generated:
import replicate
# Create a streaming language model function
claude = replicate.use("anthropic/claude-4.5-sonnet", streaming=True)
# Stream the output
output = claude(prompt="Write a haiku about Python programming")
for chunk in output:
print(chunk, end="", flush=True)You can easily chain models together by passing the output of one model as input to another:
import replicate
# Create two model functions
flux_dev = replicate.use("black-forest-labs/flux-dev")
claude = replicate.use("anthropic/claude-4.5-sonnet")
# Generate an image
images = flux_dev(prompt="a mysterious ancient artifact")
# Describe the image
description = claude(
prompt="Describe this image in detail",
image=images[0], # Pass the first image directly
)
print(description)For async/await patterns, use the use_async=True parameter:
import asyncio
import replicate
async def main():
# Create an async model function
flux_dev = replicate.use("black-forest-labs/flux-dev", use_async=True)
# Await the result
outputs = await flux_dev(prompt="futuristic city at sunset")
for output in outputs:
print(output)
asyncio.run(main())If you need the URL without downloading the file, use the get_path_url() helper:
import replicate
from replicate.lib._predictions_use import get_path_url
flux_dev = replicate.use("black-forest-labs/flux-dev")
outputs = flux_dev(prompt="a serene landscape")
for output in outputs:
url = get_path_url(output)
print(f"URL: {url}") # https://replicate.delivery/...To create a prediction without waiting for it to complete, use the create() method:
import replicate
claude = replicate.use("anthropic/claude-4.5-sonnet")
# Start the prediction
run = claude.create(prompt="Explain quantum computing")
# Check logs while it's running
print(run.logs())
# Get the output when ready
result = run.output()
print(result)- The
use()method must be called at the module level (not inside functions or classes) - Type hints are limited compared to the standard client interface
You can run a model synchronously using replicate.run():
import replicate
output = replicate.run(
"black-forest-labs/flux-schnell", input={"prompt": "astronaut riding a rocket like a horse"}
)
print(output)The run() method is a convenience function that creates a prediction, waits for it to complete, and returns the output. If you want more control over the prediction process, you can use the lower-level API methods.
replicate.run() raises ModelError if the prediction fails. You can catch this exception to handle errors gracefully:
import replicate
from replicate.exceptions import ModelError
try:
output = replicate.run(
"stability-ai/stable-diffusion-3", input={"prompt": "An astronaut riding a rainbow unicorn"}
)
except ModelError as e:
print(f"Prediction failed: {e}")
# The prediction object is available as e.prediction
print(f"Prediction ID: {e.prediction.id}")
print(f"Status: {e.prediction.status}")To run a model that takes file inputs, you can pass either a URL to a publicly accessible file or a file handle:
# Using a URL
output = replicate.run(
"andreasjansson/blip-2:f677695e5e89f8b236e52ecd1d3f01beb44c34606419bcc19345e046d8f786f9",
input={"image": "https://example.com/image.jpg"},
)
# Using a local file
with open("path/to/image.jpg", "rb") as f:
output = replicate.run(
"andreasjansson/blip-2:f677695e5e89f8b236e52ecd1d3f01beb44c34606419bcc19345e046d8f786f9",
input={"image": f},
)By default, replicate.run() will wait up to 60 seconds for the prediction to complete. You can configure this timeout:
# Wait up to 30 seconds
output = replicate.run("...", input={...}, wait=30)
# Don't wait at all - returns immediately
output = replicate.run("...", input={...}, wait=False)When wait=False, the method returns immediately after creating the prediction, and you'll need to poll for the result manually.
For models that support streaming (particularly language models), use replicate.use() with streaming=True to stream the output response as it's generated:
import replicate
claude = replicate.use("anthropic/claude-4.5-sonnet", streaming=True)
for event in claude(input={"prompt": "Please write a haiku about streaming Python."}):
print(str(event), end="")Note: The legacy
replicate.stream()method is also available for backwards compatibility with the v1 SDK, but is deprecated and will be removed in a future version.
To use the Replicate client asynchronously, import AsyncReplicate instead of Replicate and use await with each API call:
import os
import asyncio
from replicate import AsyncReplicate
replicate = AsyncReplicate(
bearer_token=os.environ.get("REPLICATE_API_TOKEN"), # This is the default and can be omitted
)
async def main() -> None:
prediction = await replicate.predictions.get(
prediction_id="gm3qorzdhgbfurvjtvhg6dckhu",
)
print(prediction.id)
asyncio.run(main())Functionality between the synchronous and asynchronous clients is otherwise identical.
By default, the async client uses httpx for HTTP requests. However, for improved concurrency performance you may also use aiohttp as the HTTP backend.
You can enable this by installing aiohttp:
# install from PyPI
pip install --pre replicate[aiohttp]Then you can enable it by instantiating the client with http_client=DefaultAioHttpClient():
import asyncio
from replicate import DefaultAioHttpClient
from replicate import AsyncReplicate
async def main() -> None:
async with AsyncReplicate(
bearer_token="My Bearer Token",
http_client=DefaultAioHttpClient(),
) as replicate:
prediction = await replicate.predictions.get(
prediction_id="gm3qorzdhgbfurvjtvhg6dckhu",
)
print(prediction.id)
asyncio.run(main())Nested request parameters are TypedDicts. Responses are Pydantic models which also provide helper methods for things like:
- Serializing back into JSON,
model.to_json() - Converting to a dictionary,
model.to_dict()
Typed requests and responses provide autocomplete and documentation within your editor. If you would like to see type errors in VS Code to help catch bugs earlier, set python.analysis.typeCheckingMode to basic.
List methods in the Replicate API are paginated.
This library provides auto-paginating iterators with each list response, so you do not have to request successive pages manually:
from replicate import Replicate
replicate = Replicate()
all_models = []
# Automatically fetches more pages as needed.
for model in replicate.models.list():
# Do something with model here
all_models.append(model)
print(all_models)Or, asynchronously:
import asyncio
from replicate import AsyncReplicate
replicate = AsyncReplicate()
async def main() -> None:
all_models = []
# Iterate through items across all pages, issuing requests as needed.
async for model in replicate.models.list():
all_models.append(model)
print(all_models)
asyncio.run(main())Alternatively, you can use the .has_next_page(), .next_page_info(), or .get_next_page() methods for more granular control working with pages:
first_page = await replicate.models.list()
if first_page.has_next_page():
print(f"will fetch next page using these details: {first_page.next_page_info()}")
next_page = await first_page.get_next_page()
print(f"number of items we just fetched: {len(next_page.results)}")
# Remove `await` for non-async usage.Or just work directly with the returned data:
first_page = await replicate.models.list()
print(f"next URL: {first_page.next}") # => "next URL: ..."
for model in first_page.results:
print(model.cover_image_url)
# Remove `await` for non-async usage.Request parameters that correspond to file uploads can be passed as bytes, or a PathLike instance or a tuple of (filename, contents, media type).
from pathlib import Path
from replicate import Replicate
replicate = Replicate()
replicate.files.create(
content=Path("/path/to/file"),
)The async client uses the exact same interface. If you pass a PathLike instance, the file contents will be read asynchronously automatically.
When the library is unable to connect to the API (for example, due to network connection problems or a timeout), a subclass of replicate.APIConnectionError is raised.
When the API returns a non-success status code (that is, 4xx or 5xx
response), a subclass of replicate.APIStatusError is raised, containing status_code and response properties.
All errors inherit from replicate.APIError.
import replicate
from replicate import Replicate
replicate = Replicate()
try:
replicate.predictions.create(
input={"text": "Alice"},
version="replicate/hello-world:5c7d5dc6dd8bf75c1acaa8565735e7986bc5b66206b55cca93cb72c9bf15ccaa",
)
except replicate.APIConnectionError as e:
print("The server could not be reached")
print(e.__cause__) # an underlying Exception, likely raised within httpx.
except replicate.RateLimitError as e:
print("A 429 status code was received; we should back off a bit.")
except replicate.APIStatusError as e:
print("Another non-200-range status code was received")
print(e.status_code)
print(e.response)Error codes are as follows:
| Status Code | Error Type |
|---|---|
| 400 | BadRequestError |
| 401 | AuthenticationError |
| 403 | PermissionDeniedError |
| 404 | NotFoundError |
| 422 | UnprocessableEntityError |
| 429 | RateLimitError |
| >=500 | InternalServerError |
| N/A | APIConnectionError |
Certain errors are automatically retried 2 times by default, with a short exponential backoff. Connection errors (for example, due to a network connectivity problem), 408 Request Timeout, 409 Conflict, 429 Rate Limit, and >=500 Internal errors are all retried by default.
You can use the max_retries option to configure or disable retry settings:
from replicate import Replicate
# Configure the default for all requests:
replicate = Replicate(
# default is 2
max_retries=0,
)
# Or, configure per-request:
replicate.with_options(max_retries=5).predictions.create(
input={"text": "Alice"},
version="replicate/hello-world:5c7d5dc6dd8bf75c1acaa8565735e7986bc5b66206b55cca93cb72c9bf15ccaa",
)By default requests time out after 1 minute. You can configure this with a timeout option,
which accepts a float or an httpx.Timeout object:
from replicate import Replicate
# Configure the default for all requests:
replicate = Replicate(
# 20 seconds (default is 1 minute)
timeout=20.0,
)
# More granular control:
replicate = Replicate(
timeout=httpx.Timeout(60.0, read=5.0, write=10.0, connect=2.0),
)
# Override per-request:
replicate.with_options(timeout=5.0).predictions.create(
input={"text": "Alice"},
version="replicate/hello-world:5c7d5dc6dd8bf75c1acaa8565735e7986bc5b66206b55cca93cb72c9bf15ccaa",
)On timeout, an APITimeoutError is thrown.
Note that requests that time out are retried twice by default.
We use the standard library logging module.
You can enable logging by setting the environment variable REPLICATE_LOG to info.
$ export REPLICATE_LOG=infoOr to debug for more verbose logging.
In an API response, a field may be explicitly null, or missing entirely; in either case, its value is None in this library. You can differentiate the two cases with .model_fields_set:
if response.my_field is None:
if 'my_field' not in response.model_fields_set:
print('Got json like {}, without a "my_field" key present at all.')
else:
print('Got json like {"my_field": null}.')The "raw" Response object can be accessed by prefixing .with_raw_response. to any HTTP method call, e.g.,
from replicate import Replicate
replicate = Replicate()
response = replicate.predictions.with_raw_response.create(
input={
"text": "Alice"
},
version="replicate/hello-world:5c7d5dc6dd8bf75c1acaa8565735e7986bc5b66206b55cca93cb72c9bf15ccaa",
)
print(response.headers.get('X-My-Header'))
prediction = response.parse() # get the object that `predictions.create()` would have returned
print(prediction.id)These methods return an APIResponse object.
The async client returns an AsyncAPIResponse with the same structure, the only difference being awaitable methods for reading the response content.
The above interface eagerly reads the full response body when you make the request, which may not always be what you want.
To stream the response body, use .with_streaming_response instead, which requires a context manager and only reads the response body once you call .read(), .text(), .json(), .iter_bytes(), .iter_text(), .iter_lines() or .parse(). In the async client, these are async methods.
with replicate.predictions.with_streaming_response.create(
input={"text": "Alice"},
version="replicate/hello-world:5c7d5dc6dd8bf75c1acaa8565735e7986bc5b66206b55cca93cb72c9bf15ccaa",
) as response:
print(response.headers.get("X-My-Header"))
for line in response.iter_lines():
print(line)The context manager is required so that the response will reliably be closed.
This library is typed for convenient access to the documented API.
If you need to access undocumented endpoints, params, or response properties, the library can still be used.
To make requests to undocumented endpoints, you can make requests using replicate.get, replicate.post, and other
http verbs. Options on the client will be respected (such as retries) when making this request.
import httpx
response = replicate.post(
"/foo",
cast_to=httpx.Response,
body={"my_param": True},
)
print(response.headers.get("x-foo"))If you want to explicitly send an extra param, you can do so with the extra_query, extra_body, and extra_headers request
options.
To access undocumented response properties, you can access the extra fields like response.unknown_prop. You
can also get all the extra fields on the Pydantic model as a dict with
response.model_extra.
You can directly override the httpx client to customize it for your use case, including:
- Support for proxies
- Custom transports
- Additional advanced functionality
import httpx
from replicate import Replicate, DefaultHttpxClient
replicate = Replicate(
# Or use the `REPLICATE_BASE_URL` env var
base_url="http://my.test.server.example.com:8083",
http_client=DefaultHttpxClient(
proxy="http://my.test.proxy.example.com",
transport=httpx.HTTPTransport(local_address="0.0.0.0"),
),
)You can also customize the client on a per-request basis by using with_options():
replicate.with_options(http_client=DefaultHttpxClient(...))By default the library closes underlying HTTP connections whenever the client is garbage collected. You can manually close the client using the .close() method if desired, or with a context manager that closes when exiting.
from replicate import Replicate
with Replicate() as replicate:
# make requests here
...
# HTTP client is now closedThis package generally follows SemVer conventions, though certain backwards-incompatible changes may be released as minor versions:
- Changes that only affect static types, without breaking runtime behavior.
- Changes to library internals which are technically public but not intended or documented for external use. (Please open a GitHub issue to let us know if you are relying on such internals.)
- Changes that we do not expect to impact the vast majority of users in practice.
We take backwards-compatibility seriously and work hard to ensure you can rely on a smooth upgrade experience.
We are keen for your feedback; please open an issue with questions, bugs, or suggestions.
If you've upgraded to the latest version but aren't seeing any new features you were expecting then your python environment is likely still using an older version.
You can determine the version that is being used at runtime with:
import replicate
print(replicate.__version__)Python 3.8 or higher.