Official Python SDK for the Splox API — run workflows, manage chats, and monitor execution programmatically.
pip install sploxfrom splox import SploxClient
client = SploxClient(api_key="your-api-key")
# Create a chat session
chat = client.chats.create(
name="My Session",
resource_id="your-workflow-id",
)
# Run a workflow
result = client.workflows.run(
workflow_version_id="your-version-id",
chat_id=chat.id,
start_node_id="your-start-node-id",
query="Summarize the latest sales report",
)
print(result.workflow_request_id)
# Get execution tree
tree = client.workflows.get_execution_tree(result.workflow_request_id)
for node in tree.execution_tree.nodes:
print(f"{node.node_label}: {node.status}")import asyncio
from splox import AsyncSploxClient
async def main():
client = AsyncSploxClient(api_key="your-api-key")
chat = await client.chats.create(
name="Async Session",
resource_id="your-workflow-id",
)
result = await client.workflows.run(
workflow_version_id="your-version-id",
chat_id=chat.id,
start_node_id="your-start-node-id",
query="Hello from async!",
)
# Stream execution events via SSE
async for event in client.workflows.listen(result.workflow_request_id):
if event.node_execution:
print(f"Node {event.node_execution.status}: {event.node_execution.output_data}")
if event.workflow_request and event.workflow_request.status in ("completed", "failed"):
break
await client.close()
asyncio.run(main())# Sync
for event in client.workflows.listen(workflow_request_id):
print(event)
# Async
async for event in async_client.workflows.listen(workflow_request_id):
print(event)Stream real-time chat events including text deltas, tool calls, and more:
# Async example — collect streamed response
async for event in client.chats.listen(chat_id):
if event.event_type == "text_delta":
print(event.text_delta, end="", flush=True)
elif event.event_type == "tool_call_start":
print(f"\\nCalling tool: {event.tool_name}")
elif event.event_type == "done":
print("\\nIteration complete")
# Stop when workflow completes
if event.workflow_request and event.workflow_request.status == "completed":
breakEvent types:
| Type | Fields | Description |
|---|---|---|
text_delta |
text_delta |
Streamed text chunk |
reasoning_delta |
reasoning_delta, reasoning_type |
Thinking content |
tool_call_start |
tool_call_id, tool_name |
Tool call initiated |
tool_call_delta |
tool_call_id, tool_args_delta |
Tool arguments delta |
tool_start |
tool_name, tool_call_id |
Tool execution started |
tool_complete |
tool_name, tool_call_id, tool_result |
Tool finished |
tool_error |
tool_name, tool_call_id, error |
Tool failed |
done |
iteration, run_id |
Iteration complete |
error |
error |
Error occurred |
Convenience method that runs a workflow and waits for completion:
execution = client.workflows.run_and_wait(
workflow_version_id="your-version-id",
chat_id=chat.id,
start_node_id="your-start-node-id",
query="Process this request",
timeout=300, # 5 minutes
)
print(execution.status) # "completed"
for node in execution.nodes:
print(f"{node.node_label}: {node.output_data}")Inspect and manage agent context memory — list instances, read messages, summarize, trim, clear, or export.
# List memory instances (paginated)
result = client.memory.list("workflow-version-id", limit=20)
for inst in result.chats:
print(f"{inst.memory_node_label}: {inst.message_count} messages")
# Paginate
if result.has_more:
more = client.memory.list("workflow-version-id", cursor=result.next_cursor)
# Get messages for an agent node
messages = client.memory.get("agent-node-id", chat_id="session-id", limit=20)
for msg in messages.messages:
print(f"[{msg.role}] {msg.content}")
# Summarize — compress older messages into an LLM-generated summary
result = client.memory.summarize(
"agent-node-id",
context_memory_id="session-id",
workflow_version_id="version-id",
keep_last_n=3,
)
print(f"Summary: {result.summary}")
# Trim — drop oldest messages to stay under a limit
client.memory.trim(
"agent-node-id",
context_memory_id="session-id",
workflow_version_id="version-id",
max_messages=20,
)
# Export all messages without modifying them
exported = client.memory.export(
"agent-node-id",
context_memory_id="session-id",
workflow_version_id="version-id",
)
# Clear all messages
client.memory.clear(
"agent-node-id",
context_memory_id="session-id",
workflow_version_id="version-id",
)
# Delete a specific memory instance
client.memory.delete(
"session-id",
memory_node_id="agent-node-id",
workflow_version_id="version-id",
)# Trigger a workflow via webhook (no auth required)
from splox import SploxClient
client = SploxClient() # No API key needed for webhooks
result = client.events.send(
webhook_id="your-webhook-id",
payload={"order_id": "12345", "status": "paid"},
)
print(result.event_id)from splox import SploxClient
from splox.exceptions import (
SploxAPIError,
SploxAuthError,
SploxRateLimitError,
SploxNotFoundError,
)
client = SploxClient(api_key="your-api-key")
try:
result = client.workflows.run(...)
except SploxAuthError:
print("Invalid or expired API token")
except SploxRateLimitError as e:
print(f"Rate limited. Retry after: {e.retry_after}")
except SploxNotFoundError:
print("Resource not found")
except SploxAPIError as e:
print(f"API error {e.status_code}: {e.message}")client = SploxClient(
api_key="your-api-key",
base_url="https://your-self-hosted-instance.com/api/v1",
)| Parameter | Type | Default | Description |
|---|---|---|---|
api_key |
str | None |
SPLOX_API_KEY env |
API authentication token |
base_url |
str |
https://app.splox.io/api/v1 |
API base URL |
timeout |
float |
30.0 |
Request timeout in seconds |
| Method | Description |
|---|---|
run(...) |
Trigger a workflow execution |
listen(id) |
Stream execution events (SSE) |
get_execution_tree(id) |
Get complete execution hierarchy |
get_history(id, ...) |
Get paginated execution history |
stop(id) |
Stop a running workflow |
run_and_wait(...) |
Run and wait for completion |
| Method | Description |
|---|---|
create(...) |
Create a new chat session |
get(id) |
Get a chat by ID |
listen(id) |
Stream chat events (SSE) |
| Method | Description |
|---|---|
send(webhook_id, ...) |
Send event via webhook |
| Method | Description |
|---|---|
list(version_id, ...) |
List memory instances (paginated) |
get(node_id, ...) |
Get paginated messages |
summarize(node_id, ...) |
Summarize older messages with LLM |
trim(node_id, ...) |
Drop oldest messages |
clear(node_id, ...) |
Remove all messages |
export(node_id, ...) |
Export all messages |
delete(memory_id, ...) |
Delete a memory instance |
MIT