Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,6 @@ myenv/
myvenv/

#parlant
parlant-data
parlant-data

test.md
450 changes: 270 additions & 180 deletions runagent/cli/commands.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion runagent/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ def runagent():
runagent.add_command(commands.teardown)
runagent.add_command(commands.init)
runagent.add_command(commands.template)
runagent.add_command(commands.deploy_local)
runagent.add_command(commands.upload)
runagent.add_command(commands.start)
runagent.add_command(commands.deploy)
runagent.add_command(commands.serve)
runagent.add_command(commands.run)
runagent.add_command(commands.run_stream)
runagent.add_command(commands.delete)
runagent.add_command(commands.db)
runagent.add_command(commands.local_sync)
Expand Down
74 changes: 44 additions & 30 deletions runagent/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,48 +31,62 @@ def __init__(self, agent_id: str, entrypoint_tag: str, local: bool = True, host:

console.print(f"🔍 [cyan]Auto-resolved address for agent {agent_id}: {agent_host}:{agent_port}[/cyan]")

agent_base_url = f"http://{agent_host}:{agent_port}"
agent_socket_url = f"ws://{agent_host}:{agent_port}"

self.rest_client = RestClient(base_url=agent_base_url, api_prefix="/api/v1")
self.socket_client = SocketClient(
base_socket_url=agent_socket_url,
api_prefix="/api/v1"
)
agent_base_url_local = f"http://{agent_host}:{agent_port}"
agent_socket_url_local = f"ws://{agent_host}:{agent_port}"

self.rest_client = RestClient(base_url=agent_base_url_local)
self.socket_client = SocketClient(base_socket_url=agent_socket_url_local)
else:
self.rest_client = RestClient()
self.socket_client = SocketClient()
self.rest_client = RestClient(is_local=False)
self.socket_client = SocketClient(is_local=False)

self.agent_architecture = self.rest_client.get_agent_architecture(agent_id)
# self.agent_architecture = self.rest_client.get_agent_architecture(agent_id)

selected_entrypoint = next(
(
entrypoint for entrypoint in self.agent_architecture['entrypoints']
if entrypoint['tag'] == entrypoint_tag
), None)
# selected_entrypoint = next(
# (
# entrypoint for entrypoint in self.agent_architecture['entrypoints']
# if entrypoint['tag'] == entrypoint_tag
# ), None)

if not selected_entrypoint:
raise ValueError(f"Entrypoint `{entrypoint_tag}` not found in agent {agent_id}")
# if not selected_entrypoint:
# raise ValueError(f"Entrypoint `{entrypoint_tag}` not found in agent {agent_id}")

def _run(self, *input_args, **input_kwargs):
def run(self, *input_args, **input_kwargs):

response = self.rest_client.run_agent(
self.agent_id, self.entrypoint_tag, input_args=input_args, input_kwargs=input_kwargs
)
if response.get("success"):
response_data = response.get("output_data")
# Handle new response format with nested data
if "data" in response and "result_data" in response["data"]:
response_data = response["data"]["result_data"].get("data")
else:
# Fallback to old format for backward compatibility
response_data = response.get("output_data")
return self.serializer.deserialize_object(response_data)

else:
raise Exception(response.get("error"))
# Handle new error format with ErrorDetail object
error_info = response.get("error")
if isinstance(error_info, dict) and "message" in error_info:
# New format with ErrorDetail object
error_message = error_info.get("message", "Unknown error")
error_code = error_info.get("code", "UNKNOWN_ERROR")
raise Exception(f"[{error_code}] {error_message}")
else:
# Fallback to old format for backward compatibility
raise Exception(response.get("error", "Unknown error"))

def run_stream(self, *input_args, **input_kwargs):
"""Stream agent execution results in real-time via WebSocket"""
try:
return self.socket_client.run_stream(
self.agent_id, self.entrypoint_tag, input_args=input_args, input_kwargs=input_kwargs
)
except Exception as e:
# Handle streaming errors with proper formatting
raise Exception(f"Streaming failed: {str(e)}")

def _run_stream(self, *input_args, **input_kwargs):
return self.socket_client.run_stream(
self.agent_id, self.entrypoint_tag, input_args=input_args, input_kwargs=input_kwargs
)

def run(self, *input_args, **input_kwargs):
if self.entrypoint_tag.endswith("_stream"):
return self._run_stream(*input_args, **input_kwargs)
else:
return self._run(*input_args, **input_kwargs)
"""Legacy method - use run_stream instead"""
return self.run_stream(*input_args, **input_kwargs)
60 changes: 35 additions & 25 deletions runagent/sdk/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,34 +144,42 @@ def _test_authentication(self) -> t.Dict[str, t.Any]:
base_url=self._config.get("base_url"),
)

# Test connection using the profile endpoint
response = client.http.get("/users/profile", timeout=10)
# Test connection using the token validation endpoint
api_key = self._config.get("api_key")
response = client.http.post(f"/tokens/validate?token={api_key}", timeout=10)

if response.status_code == 200:
profile_data = response.json()
token_data = response.json()

# Extract user info from the middleware response structure
auth_data = profile_data.get("auth_data", {})
profile_data_inner = profile_data.get("profile_data", {})

user_info = {
"email": auth_data.get("email") or profile_data_inner.get("email"),
"user_id": auth_data.get("id") or profile_data_inner.get("id"),
"tier": profile_data_inner.get("tier", "free")
}

# Store user info for later display
self._config.update({
"user_email": user_info["email"],
"user_id": user_info["user_id"],
"user_tier": user_info["tier"],
"auth_validated": True
})

return {
"success": True,
"user_info": user_info
}
# Check if token validation was successful
if token_data.get("success") and token_data.get("data", {}).get("valid"):
data = token_data.get("data", {})

user_info = {
"email": data.get("user_email"),
"user_id": data.get("user_id"),
"tier": data.get("user_tier", "Free")
}

# Store user info for later display
self._config.update({
"user_email": user_info["email"],
"user_id": user_info["user_id"],
"user_tier": user_info["tier"],
"auth_validated": True
})

return {
"success": True,
"user_info": user_info
}
else:
# Token validation failed
reason = token_data.get("data", {}).get("reason", "Token validation failed")
return {
"success": False,
"error": reason
}

elif response.status_code == 401:
try:
Expand All @@ -192,6 +200,8 @@ def _test_authentication(self) -> t.Dict[str, t.Any]:
}

except Exception as e:
if os.getenv('DISABLE_TRY_CATCH'):
raise
# Handle connection errors gracefully
error_msg = str(e)
if "Connection" in error_msg or "timeout" in error_msg.lower():
Expand Down
155 changes: 155 additions & 0 deletions runagent/sdk/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class Agent(Base):
port = Column(Integer, nullable=False, default=8000)
framework = Column(String)
status = Column(String, default="deployed")
is_local = Column(Boolean, default=True) # True for local agents, False for remote uploads
fingerprint = Column(String, nullable=True) # Agent folder fingerprint for duplicate detection
deployed_at = Column(DateTime, default=func.current_timestamp())
last_run = Column(DateTime)
run_count = Column(Integer, default=0)
Expand Down Expand Up @@ -1259,6 +1261,8 @@ def get_agent_by_path(self, agent_path: str) -> Optional[Dict]:
"port": agent.port,
"framework": agent.framework,
"status": agent.status,
"is_local": agent.is_local,
"fingerprint": agent.fingerprint,
"deployed_at": (
agent.deployed_at.isoformat() if agent.deployed_at else None
),
Expand All @@ -1277,6 +1281,157 @@ def get_agent_by_path(self, agent_path: str) -> Optional[Dict]:
console.print(f"Error getting agent by path from database: {e}")
return None


def get_agent_by_fingerprint(self, fingerprint: str) -> Optional[Dict]:
"""Get agent information by fingerprint"""
with self.db_manager.get_session() as session:
try:
agent = session.query(Agent).filter(
Agent.fingerprint == fingerprint
).first()

if not agent:
return None

return {
"agent_id": agent.agent_id,
"agent_path": agent.agent_path,
"host": agent.host,
"port": agent.port,
"framework": agent.framework,
"status": agent.status,
"is_local": agent.is_local,
"fingerprint": agent.fingerprint,
"deployed_at": (
agent.deployed_at.isoformat() if agent.deployed_at else None
),
"last_run": agent.last_run.isoformat() if agent.last_run else None,
"run_count": agent.run_count,
"success_count": agent.success_count,
"error_count": agent.error_count,
"created_at": (
agent.created_at.isoformat() if agent.created_at else None
),
"updated_at": (
agent.updated_at.isoformat() if agent.updated_at else None
),
}
except Exception as e:
console.print(f"Error getting agent by fingerprint from database: {e}")
return None

def add_remote_agent(
self,
agent_id: str,
agent_path: str,
framework: str = None,
fingerprint: str = None,
status: str = "uploaded"
) -> Dict:
"""
Add a remote uploaded agent to the database

Args:
agent_id: Unique agent identifier
agent_path: Path to agent directory (local path for reference)
framework: Framework type
fingerprint: Agent fingerprint for duplicate detection
status: Agent status (uploaded, uploading, deployed)

Returns:
Dictionary with success status and details
"""
if not agent_id or not agent_path:
return {
"success": False,
"error": "Missing required fields",
"code": "INVALID_INPUT",
}

with self.db_manager.get_session() as session:
try:
# Check if agent already exists by ID
existing_agent = (
session.query(Agent).filter(Agent.agent_id == agent_id).first()
)
if existing_agent:
return {
"success": False,
"error": f"Agent {agent_id} already exists",
"code": "AGENT_EXISTS",
"existing_agent": {
"agent_id": existing_agent.agent_id,
"status": existing_agent.status,
"is_local": existing_agent.is_local,
}
}

# Check if agent with same fingerprint exists (for duplicate detection)
if fingerprint:
duplicate_agent = (
session.query(Agent).filter(Agent.fingerprint == fingerprint).first()
)
if duplicate_agent:
return {
"success": False,
"error": f"Agent with same fingerprint already exists",
"code": "DUPLICATE_FINGERPRINT",
"existing_agent": {
"agent_id": duplicate_agent.agent_id,
"status": duplicate_agent.status,
"is_local": duplicate_agent.is_local,
"fingerprint": duplicate_agent.fingerprint,
}
}

# Create new remote agent record
new_agent = Agent(
agent_id=agent_id,
agent_path=str(agent_path),
host="remote", # Remote agents don't have local host/port
port=0, # Remote agents don't have local port
framework=framework,
status=status,
is_local=False, # Mark as remote
fingerprint=fingerprint,
)

session.add(new_agent)
session.commit()

return {
"success": True,
"message": f"Remote agent {agent_id} added successfully",
"agent_id": agent_id,
"status": status,
"is_local": False,
}

except Exception as e:
session.rollback()
return {
"success": False,
"error": f"Database error: {str(e)}",
"code": "DATABASE_ERROR",
}

def update_agent_fingerprint(self, agent_id: str, fingerprint: str) -> bool:
"""Update agent fingerprint"""
with self.db_manager.get_session() as session:
try:
agent = session.query(Agent).filter(Agent.agent_id == agent_id).first()
if not agent:
return False

agent.fingerprint = fingerprint
agent.updated_at = func.current_timestamp()
session.commit()
return True
except Exception as e:
session.rollback()
console.print(f"Error updating agent fingerprint: {e}")
return False

def list_agents(self, status: str = None) -> List[Dict]:
"""List all agents, optionally filtered by status"""
with self.db_manager.get_session() as session:
Expand Down
Loading