Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions clients/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,13 @@ impl LocalHttpGateway {
}

#[pyfunction]
#[pyo3(signature = (*, config_file, clickhouse_url, postgres_url, async_setup))]
#[pyo3(signature = (*, config_file, clickhouse_url, postgres_url, valkey_url, async_setup))]
fn _start_http_gateway(
py: Python<'_>,
config_file: Option<String>,
clickhouse_url: Option<String>,
postgres_url: Option<String>,
valkey_url: Option<String>,
async_setup: bool,
) -> PyResult<Bound<'_, PyAny>> {
warn_no_config(py, config_file.as_deref())?;
Expand All @@ -181,6 +182,7 @@ fn _start_http_gateway(
config_file,
clickhouse_url,
postgres_url,
valkey_url,
)
.await?;
Ok(LocalHttpGateway {
Expand Down Expand Up @@ -659,19 +661,22 @@ impl TensorZeroGateway {
}

#[classmethod]
#[pyo3(signature = (*, config_file=None, clickhouse_url=None, postgres_url=None, timeout=None))]
#[pyo3(signature = (*, config_file=None, clickhouse_url=None, postgres_url=None, valkey_url=None, timeout=None))]
/// Initialize the TensorZero client, using an embedded gateway.
/// This connects to ClickHouse (if provided) and runs DB migrations.
///
/// :param config_file: The path to the TensorZero configuration file. Example: "tensorzero.toml"
/// :param clickhouse_url: The URL of the ClickHouse instance to use for the gateway. If observability is disabled in the config, this can be `None`
/// :param postgres_url: The URL of the PostgreSQL instance to use for rate limiting.
/// :param valkey_url: The URL of the Valkey instance to use for rate limiting.
/// :param timeout: The timeout for embedded gateway request processing, in seconds. If this timeout is hit, any in-progress LLM requests may be aborted. If not provided, no timeout will be set.
/// :return: A `TensorZeroGateway` instance configured to use an embedded gateway.
fn build_embedded(
cls: &Bound<'_, PyType>,
config_file: Option<&str>,
clickhouse_url: Option<String>,
postgres_url: Option<String>,
valkey_url: Option<String>,
timeout: Option<f64>,
) -> PyResult<Py<TensorZeroGateway>> {
warn_no_config(cls.py(), config_file)?;
Expand All @@ -683,6 +688,7 @@ impl TensorZeroGateway {
config_file: config_file.map(PathBuf::from),
clickhouse_url,
postgres_config: postgres_url.map(PostgresConfig::Url),
valkey_url,
timeout,
verify_credentials: true,
allow_batch_writes: false,
Expand Down Expand Up @@ -1866,12 +1872,14 @@ impl AsyncTensorZeroGateway {
// as `AsyncTensorZeroGateway` would be completely async *except* for this one method
// (which potentially takes a very long time due to running DB migrations).
#[classmethod]
#[pyo3(signature = (*, config_file=None, clickhouse_url=None, postgres_url=None, timeout=None, async_setup=true))]
#[pyo3(signature = (*, config_file=None, clickhouse_url=None, postgres_url=None, valkey_url=None, timeout=None, async_setup=true))]
/// Initialize the TensorZero client, using an embedded gateway.
/// This connects to ClickHouse (if provided) and runs DB migrations.
///
/// :param config_file: The path to the TensorZero configuration file. Example: "tensorzero.toml"
/// :param clickhouse_url: The URL of the ClickHouse instance to use for the gateway. If observability is disabled in the config, this can be `None`
/// :param postgres_url: The URL of the PostgreSQL instance to use for rate limiting.
/// :param valkey_url: The URL of the Valkey instance to use for rate limiting.
/// :param timeout: The timeout for embedded gateway request processing, in seconds. If this timeout is hit, any in-progress LLM requests may be aborted. If not provided, no timeout will be set.
/// :param async_setup: If true, this method will return a `Future` that resolves to an `AsyncTensorZeroGateway` instance. Otherwise, it will block and construct the `AsyncTensorZeroGateway`
/// :return: A `Future` that resolves to an `AsyncTensorZeroGateway` instance configured to use an embedded gateway (or an `AsyncTensorZeroGateway` if `async_setup=False`).
Expand All @@ -1881,6 +1889,7 @@ impl AsyncTensorZeroGateway {
config_file: Option<&str>,
clickhouse_url: Option<String>,
postgres_url: Option<String>,
valkey_url: Option<String>,
timeout: Option<f64>,
async_setup: bool,
) -> PyResult<Py<PyAny>> {
Expand All @@ -1893,6 +1902,7 @@ impl AsyncTensorZeroGateway {
config_file: config_file.map(PathBuf::from),
clickhouse_url,
postgres_config: postgres_url.map(PostgresConfig::Url),
valkey_url,
timeout,
verify_credentials: true,
allow_batch_writes: false,
Expand Down
1 change: 1 addition & 0 deletions clients/python/tensorzero/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ def patch_openai_client(
config_file=config_file,
clickhouse_url=clickhouse_url,
postgres_url=None,
valkey_url=None,
async_setup=async_setup,
)
if async_setup:
Expand Down
4 changes: 4 additions & 0 deletions clients/python/tensorzero/tensorzero.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ class TensorZeroGateway(BaseTensorZeroGateway):
config_file: Optional[str] = None,
clickhouse_url: Optional[str] = None,
postgres_url: Optional[str] = None,
valkey_url: Optional[str] = None,
timeout: Optional[float] = None,
) -> "TensorZeroGateway":
"""
Expand All @@ -524,6 +525,7 @@ class TensorZeroGateway(BaseTensorZeroGateway):
:param config_file: (Optional) The path to the TensorZero configuration file.
:param clickhouse_url: (Optional) The URL of the ClickHouse database.
:param postgres_url: (Optional) The URL of the Postgres database.
:param valkey_url: (Optional) The URL of the Valkey instance.
:param timeout: (Optional) The timeout for embedded gateway request processing, in seconds. If this timeout is hit, any in-progress LLM requests may be aborted. If not provided, no timeout will be set.
"""

Expand Down Expand Up @@ -1069,6 +1071,7 @@ class AsyncTensorZeroGateway(BaseTensorZeroGateway):
config_file: Optional[str] = None,
clickhouse_url: Optional[str] = None,
postgres_url: Optional[str] = None,
valkey_url: Optional[str] = None,
timeout: Optional[float] = None,
async_setup: bool = True,
) -> Union[Awaitable["AsyncTensorZeroGateway"], "AsyncTensorZeroGateway"]:
Expand Down Expand Up @@ -1603,6 +1606,7 @@ def _start_http_gateway(
config_file: Optional[str],
clickhouse_url: Optional[str],
postgres_url: Optional[str],
valkey_url: Optional[str],
async_setup: bool,
) -> Union[Any, Awaitable[Any]]: ...
@final
Expand Down
1 change: 1 addition & 0 deletions clients/rust/examples/inference_demo/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ async fn main() {
postgres_config: std::env::var("TENSORZERO_POSTGRES_URL")
.ok()
.map(PostgresConfig::Url),
valkey_url: std::env::var("TENSORZERO_VALKEY_URL").ok(),
timeout: None,
verify_credentials: true,
allow_batch_writes: false,
Expand Down
37 changes: 35 additions & 2 deletions clients/rust/src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub async fn make_embedded_gateway() -> Client {
config_file: Some(config_path),
clickhouse_url: Some(CLICKHOUSE_URL.clone()),
postgres_config: None,
valkey_url: None,
timeout: None,
verify_credentials: true,
allow_batch_writes: true,
Expand All @@ -40,6 +41,7 @@ pub async fn make_embedded_gateway_no_config() -> Client {
config_file: None,
clickhouse_url: Some(CLICKHOUSE_URL.clone()),
postgres_config: None,
valkey_url: None,
timeout: None,
verify_credentials: true,
allow_batch_writes: true,
Expand All @@ -56,6 +58,7 @@ pub async fn make_embedded_gateway_with_config(config: &str) -> Client {
config_file: Some(tmp_config.path().to_owned()),
clickhouse_url: Some(CLICKHOUSE_URL.clone()),
postgres_config: None,
valkey_url: None,
timeout: None,
verify_credentials: true,
allow_batch_writes: true,
Expand All @@ -67,14 +70,41 @@ pub async fn make_embedded_gateway_with_config(config: &str) -> Client {

pub async fn make_embedded_gateway_with_config_and_postgres(config: &str) -> Client {
let postgres_url = std::env::var("TENSORZERO_POSTGRES_URL")
.expect("TENSORZERO_POSTGRES_URL must be set for rate limiting tests");
.expect("TENSORZERO_POSTGRES_URL must be set for tests that require Postgres");

let tmp_config = NamedTempFile::new().unwrap();
std::fs::write(tmp_config.path(), config).unwrap();
ClientBuilder::new(ClientBuilderMode::EmbeddedGateway {
config_file: Some(tmp_config.path().to_owned()),
clickhouse_url: Some(CLICKHOUSE_URL.clone()),
postgres_config: Some(PostgresConfig::Url(postgres_url)),
valkey_url: None,
timeout: None,
verify_credentials: true,
allow_batch_writes: true,
})
.build()
.await
.unwrap()
}

/// Creates an embedded gateway with rate limiting backend support.
/// Reads both TENSORZERO_POSTGRES_URL and TENSORZERO_VALKEY_URL from env vars.
/// The rate limiting backend selection is determined by the config's `[rate_limiting].backend` field:
/// - `auto` (default): Valkey if available, otherwise Postgres
/// - `postgres`: Force Postgres backend
/// - `valkey`: Force Valkey backend
pub async fn make_embedded_gateway_with_rate_limiting(config: &str) -> Client {
let postgres_url = std::env::var("TENSORZERO_POSTGRES_URL").ok();
let valkey_url = std::env::var("TENSORZERO_VALKEY_URL").ok();

let tmp_config = NamedTempFile::new().unwrap();
std::fs::write(tmp_config.path(), config).unwrap();
ClientBuilder::new(ClientBuilderMode::EmbeddedGateway {
config_file: Some(tmp_config.path().to_owned()),
clickhouse_url: Some(CLICKHOUSE_URL.clone()),
postgres_config: postgres_url.map(PostgresConfig::Url),
valkey_url,
timeout: None,
verify_credentials: true,
allow_batch_writes: true,
Expand Down Expand Up @@ -152,6 +182,7 @@ pub async fn make_embedded_gateway_with_unique_db(config: &str, db_prefix: &str)
config_file: Some(tmp_config.path().to_owned()),
clickhouse_url: Some(clickhouse_url),
postgres_config: None,
valkey_url: None,
timeout: None,
verify_credentials: true,
allow_batch_writes: true,
Expand All @@ -171,6 +202,7 @@ pub async fn make_embedded_gateway_e2e_with_unique_db(db_prefix: &str) -> Client
config_file: Some(config_path),
clickhouse_url: Some(clickhouse_url),
postgres_config: None,
valkey_url: None,
timeout: None,
verify_credentials: true,
allow_batch_writes: true,
Expand All @@ -192,7 +224,8 @@ pub async fn start_http_gateway_with_unique_db(
let (addr, shutdown_handle) = tensorzero_core::utils::gateway::start_openai_compatible_gateway(
Some(config_path.to_string_lossy().to_string()),
Some(clickhouse_url),
None,
None, // postgres_url
None, // valkey_url
)
.await
.unwrap();
Expand Down
8 changes: 8 additions & 0 deletions evaluations/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pub async fn run_evaluation(
);
}

// TODO(#5754): Extract environment variable reading to a centralized location
info!("Initializing evaluation environment");
let clickhouse_url = std::env::var("TENSORZERO_CLICKHOUSE_URL")
.map_err(|_| anyhow!("Missing ClickHouse URL at TENSORZERO_CLICKHOUSE_URL"))?;
Expand All @@ -132,6 +133,12 @@ pub async fn run_evaluation(
} else {
debug!("PostgreSQL URL not provided");
}
let valkey_url = std::env::var("TENSORZERO_VALKEY_URL").ok();
if let Some(valkey_url) = valkey_url.as_ref() {
debug!(valkey_url = %valkey_url, "Valkey URL resolved");
} else {
debug!("Valkey URL not provided");
}

// We do not validate credentials here since we just want the evaluator config
// If we are using an embedded gateway, credentials are validated when that is initialized
Expand Down Expand Up @@ -173,6 +180,7 @@ pub async fn run_evaluation(
config_file: Some(args.config_file),
postgres_config: postgres_url.map(PostgresConfig::Url),
clickhouse_url: Some(clickhouse_url.clone()),
valkey_url,
timeout: None,
verify_credentials: true,
allow_batch_writes: true,
Expand Down
1 change: 1 addition & 0 deletions evaluations/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub async fn get_tensorzero_client() -> Client {
config_file: Some(get_e2e_config_path()),
clickhouse_url: Some(CLICKHOUSE_URL.clone()),
postgres_config: None,
valkey_url: None,
timeout: None,
verify_credentials: true,
allow_batch_writes: true,
Expand Down
2 changes: 2 additions & 0 deletions evaluations/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,7 @@ async fn test_run_llm_judge_evaluator_chat() {
))),
clickhouse_url: None,
postgres_config: None,
valkey_url: None,
timeout: None,
verify_credentials: true,
allow_batch_writes: true,
Expand Down Expand Up @@ -2694,6 +2695,7 @@ async fn test_evaluation_with_dynamic_variant() {
config_file: Some(config_path.clone()),
clickhouse_url: None,
postgres_config: None,
valkey_url: None,
timeout: None,
verify_credentials: true,
allow_batch_writes: true,
Expand Down
13 changes: 13 additions & 0 deletions gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tensorzero_auth::constants::{DEFAULT_ORGANIZATION, DEFAULT_WORKSPACE};
use tensorzero_core::config::{Config, ConfigFileGlob};
use tensorzero_core::db::clickhouse::migration_manager::manual_run_clickhouse_migrations;
use tensorzero_core::db::postgres::{PostgresConnectionInfo, manual_run_postgres_migrations};
use tensorzero_core::db::valkey::ValkeyConnectionInfo;
use tensorzero_core::endpoints::status::TENSORZERO_VERSION;
use tensorzero_core::error;
use tensorzero_core::feature_flags;
Expand Down Expand Up @@ -341,6 +342,11 @@ async fn run() -> Result<(), ExitCode> {
// Print whether postgres is enabled
tracing::info!("├ Postgres: {postgres_enabled_pretty}");

// Print whether valkey is enabled
let valkey_enabled_pretty =
get_valkey_status_string(&gateway_handle.app_state.valkey_connection_info);
tracing::info!("├ Valkey: {valkey_enabled_pretty}");

if let Some(gateway_url) = config
.gateway
.relay
Expand Down Expand Up @@ -460,6 +466,13 @@ fn get_postgres_status_string(postgres: &PostgresConnectionInfo) -> String {
}
}

fn get_valkey_status_string(valkey: &ValkeyConnectionInfo) -> String {
match valkey {
ValkeyConnectionInfo::Disabled => "disabled".to_string(),
ValkeyConnectionInfo::Enabled { .. } => "enabled".to_string(),
}
}

pub async fn shutdown_signal() {
// If any errors occur in these futures, we log them and return from the future
// This will cause the `tokio::select!` block to resolve - i.e. we treat it as
Expand Down
7 changes: 0 additions & 7 deletions gateway/tests/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,8 @@ async fn test_tensorzero_unauthenticated_routes() {
.unwrap();

let status = health_response.status();
let text = health_response.text().await.unwrap();
assert_eq!(status, StatusCode::OK);

// TODO(shuyangli): Add a HealthResponse type and validate the parsed form.
assert_eq!(
text,
"{\"gateway\":\"ok\",\"clickhouse\":\"ok\",\"postgres\":\"ok\"}"
);

let status_response = reqwest::Client::new()
.request(Method::GET, format!("http://{}/status", child_data.addr))
.send()
Expand Down
3 changes: 2 additions & 1 deletion internal/durable-tools/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ pub async fn action(
app_state.http_client.clone(),
app_state.clickhouse_connection_info.clone(),
app_state.postgres_connection_info.clone(),
app_state.valkey_connection_info.clone(),
app_state.deferred_tasks.clone(),
);
)?;

let response = feedback(snapshot_app_state, *feedback_params, None).await?;
Ok(ActionResponse::Feedback(response))
Expand Down
3 changes: 3 additions & 0 deletions internal/durable-tools/src/tensorzero_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,17 +417,20 @@ pub fn http_gateway_client(url: Url) -> Result<Arc<dyn TensorZeroClient>, Client
/// Some("tensorzero.toml".into()),
/// Some("http://localhost:8123".into()),
/// None,
/// None,
/// ).await?;
/// ```
pub async fn embedded_gateway_client(
config_file: Option<PathBuf>,
clickhouse_url: Option<String>,
postgres_config: Option<String>,
valkey_url: Option<String>,
) -> Result<Arc<dyn TensorZeroClient>, ClientBuilderError> {
let client = ClientBuilder::new(ClientBuilderMode::EmbeddedGateway {
config_file,
clickhouse_url,
postgres_config: postgres_config.map(PostgresConfig::Url),
valkey_url,
timeout: None,
verify_credentials: true,
allow_batch_writes: false,
Expand Down
Loading
Loading