Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 91 additions & 42 deletions src/agentex/lib/cli/handlers/deploy_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def merge_deployment_configs(
raise DeploymentError("Repository and image tag are required")

# Start with global configuration
helm_values = {
helm_values: dict[str, Any] = {
"global": {
"image": {
"repository": repository,
Expand All @@ -157,54 +157,76 @@ def merge_deployment_configs(
"memory": manifest.deployment.global_config.resources.limits.memory,
},
},
# Enable autoscaling by default for production deployments
"autoscaling": {
"enabled": True,
"minReplicas": 1,
"maxReplicas": 10,
"targetCPUUtilizationPercentage": 50,
},
}

# Handle temporal configuration using new helper methods
if agent_config.is_temporal_agent():
temporal_config = agent_config.get_temporal_workflow_config()
if temporal_config:
helm_values[TEMPORAL_WORKER_KEY] = {}
helm_values[TEMPORAL_WORKER_KEY] = {
"enabled": True,
# Enable autoscaling for temporal workers as well
"autoscaling": {
"enabled": True,
"minReplicas": 1,
"maxReplicas": 10,
"targetCPUUtilizationPercentage": 50,
},
}
helm_values["global"]["workflow"] = {
"name": temporal_config.name,
"taskQueue": temporal_config.queue_name,
}
helm_values[TEMPORAL_WORKER_KEY]["enabled"] = True

secret_env_vars = []
if agent_config.credentials:
for credential in agent_config.credentials:
secret_env_vars.append(
{
"name": credential.env_var_name,
"secretName": credential.secret_name,
"secretKey": credential.secret_key,
}
)

helm_values["secretEnvVars"] = secret_env_vars
if TEMPORAL_WORKER_KEY in helm_values:
helm_values[TEMPORAL_WORKER_KEY]["secretEnvVars"] = secret_env_vars

# Set the agent_config env vars first to the helm values and so then it can be overriden by the cluster config
# Collect all environment variables with conflict detection
all_env_vars: dict[str, str] = {}
secret_env_vars: list[dict[str, str]] = []

# Start with agent_config env vars
if agent_config.env:
helm_values["env"] = agent_config.env
if TEMPORAL_WORKER_KEY in helm_values:
helm_values[TEMPORAL_WORKER_KEY]["env"] = agent_config.env
all_env_vars.update(agent_config.env)

# Add auth principal env var if manifest principal is set
encoded_principal = _encode_principal_context(manifest)
if encoded_principal:
if "env" not in helm_values:
helm_values["env"] = {}
helm_values["env"][EnvVarKeys.AUTH_PRINCIPAL_B64.value] = encoded_principal
all_env_vars[EnvVarKeys.AUTH_PRINCIPAL_B64.value] = encoded_principal

if manifest.deployment and manifest.deployment.imagePullSecrets:
pull_secrets = [
pull_secret.to_dict()
for pull_secret in manifest.deployment.imagePullSecrets
]
helm_values["global"]["imagePullSecrets"] = pull_secrets
helm_values["imagePullSecrets"] = pull_secrets
# Handle credentials and check for conflicts
if agent_config.credentials:
for credential in agent_config.credentials:
# Handle both CredentialMapping objects and legacy dict format
if isinstance(credential, dict):
env_var_name = credential["env_var_name"]
secret_name = credential["secret_name"]
secret_key = credential["secret_key"]
else:
env_var_name = credential.env_var_name
secret_name = credential.secret_name
secret_key = credential.secret_key

# Check if the environment variable name conflicts with existing env vars
if env_var_name in all_env_vars:
logger.warning(
f"Environment variable '{env_var_name}' is defined in both "
f"env and secretEnvVars. The secret value will take precedence."
)
# Remove from regular env vars since secret takes precedence
del all_env_vars[env_var_name]

secret_env_vars.append(
{
"name": env_var_name,
"secretName": secret_name,
"secretKey": secret_key,
}
)

# Apply cluster-specific overrides
if cluster_config:
Expand Down Expand Up @@ -235,23 +257,50 @@ def merge_deployment_configs(
}
)

# Handle cluster env vars with conflict detection
if cluster_config.env:
helm_values["env"] = cluster_config.env
# Convert cluster env list to dict for easier conflict detection
cluster_env_dict = {env_var["name"]: env_var["value"] for env_var in cluster_config.env}

# Check for conflicts with secret env vars
for secret_env_var in secret_env_vars:
if secret_env_var["name"] in cluster_env_dict:
logger.warning(
f"Environment variable '{secret_env_var['name']}' is defined in both "
f"cluster config env and secretEnvVars. The secret value will take precedence."
)
del cluster_env_dict[secret_env_var["name"]]

# Update all_env_vars with cluster overrides
all_env_vars.update(cluster_env_dict)

# Apply additional arbitrary overrides
if cluster_config.additional_overrides:
_deep_merge(helm_values, cluster_config.additional_overrides)

# Convert the env vars to a list of dictionaries
if "env" in helm_values:
helm_values["env"] = convert_env_vars_dict_to_list(helm_values["env"])

# Convert the temporal worker env vars to a list of dictionaries
if TEMPORAL_WORKER_KEY in helm_values and "env" in helm_values[TEMPORAL_WORKER_KEY]:
helm_values[TEMPORAL_WORKER_KEY]["env"] = convert_env_vars_dict_to_list(
helm_values[TEMPORAL_WORKER_KEY]["env"]
)
# Set final environment variables
if all_env_vars:
helm_values["env"] = convert_env_vars_dict_to_list(all_env_vars)

if secret_env_vars:
helm_values["secretEnvVars"] = secret_env_vars

# Set environment variables for temporal worker if enabled
if TEMPORAL_WORKER_KEY in helm_values:
if all_env_vars:
helm_values[TEMPORAL_WORKER_KEY]["env"] = convert_env_vars_dict_to_list(all_env_vars)
if secret_env_vars:
helm_values[TEMPORAL_WORKER_KEY]["secretEnvVars"] = secret_env_vars

# Handle image pull secrets
if manifest.deployment and manifest.deployment.imagePullSecrets:
pull_secrets = [
pull_secret.to_dict()
for pull_secret in manifest.deployment.imagePullSecrets
]
helm_values["global"]["imagePullSecrets"] = pull_secrets
helm_values["imagePullSecrets"] = pull_secrets

# Add dynamic ACP command based on manifest configuration
add_acp_command_to_helm_values(helm_values, manifest, manifest_path)

Expand Down
58 changes: 36 additions & 22 deletions src/agentex/resources/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,17 +508,24 @@ def send_message_stream(
raise ValueError("Either agent_id or agent_name must be provided")

with raw_agent_rpc_response as response:
for agent_rpc_response_str in response.iter_text():
if agent_rpc_response_str.strip(): # Only process non-empty lines
try:
chunk_rpc_response = SendMessageStreamResponse.model_validate(
json.loads(agent_rpc_response_str),
from_attributes=True
)
yield chunk_rpc_response
except json.JSONDecodeError:
# Skip invalid JSON lines
continue
for _line in response.iter_lines():
if not _line:
continue
line = _line.strip()
# Handle optional SSE-style prefix
if line.startswith("data:"):
line = line[len("data:"):].strip()
if not line:
continue
try:
chunk_rpc_response = SendMessageStreamResponse.model_validate(
json.loads(line),
from_attributes=True
)
yield chunk_rpc_response
except json.JSONDecodeError:
# Skip invalid JSON lines
continue

def send_event(
self,
Expand Down Expand Up @@ -1048,17 +1055,24 @@ async def send_message_stream(
raise ValueError("Either agent_id or agent_name must be provided")

async with raw_agent_rpc_response as response:
async for agent_rpc_response_str in response.iter_text():
if agent_rpc_response_str.strip(): # Only process non-empty lines
try:
chunk_rpc_response = SendMessageStreamResponse.model_validate(
json.loads(agent_rpc_response_str),
from_attributes=True
)
yield chunk_rpc_response
except json.JSONDecodeError:
# Skip invalid JSON lines
continue
async for _line in response.iter_lines():
if not _line:
continue
line = _line.strip()
# Handle optional SSE-style prefix
if line.startswith("data:"):
line = line[len("data:"):].strip()
if not line:
continue
try:
chunk_rpc_response = SendMessageStreamResponse.model_validate(
json.loads(line),
from_attributes=True
)
yield chunk_rpc_response
except json.JSONDecodeError:
# Skip invalid JSON lines
continue

async def send_event(
self,
Expand Down
Loading