From 9ee7a02b1ab96a98ce4d735d25a31163b307d6f0 Mon Sep 17 00:00:00 2001 From: Jason Yang Date: Sat, 9 Aug 2025 11:57:57 -0700 Subject: [PATCH] Adding fixes --- .../lib/cli/handlers/deploy_handlers.py | 133 ++++++++++++------ src/agentex/resources/agents.py | 58 +++++--- 2 files changed, 127 insertions(+), 64 deletions(-) diff --git a/src/agentex/lib/cli/handlers/deploy_handlers.py b/src/agentex/lib/cli/handlers/deploy_handlers.py index 01f8856e..0d64aa26 100644 --- a/src/agentex/lib/cli/handlers/deploy_handlers.py +++ b/src/agentex/lib/cli/handlers/deploy_handlers.py @@ -133,7 +133,7 @@ def merge_deployment_configs( raise DeploymentError("Repository and image tag are required") # Start with global configuration - helm_values = { + helm_values: dict[str, Any] = { "global": { "image": { "repository": repository, @@ -157,54 +157,76 @@ def merge_deployment_configs( "memory": manifest.deployment.global_config.resources.limits.memory, }, }, + # Enable autoscaling by default for production deployments + "autoscaling": { + "enabled": True, + "minReplicas": 1, + "maxReplicas": 10, + "targetCPUUtilizationPercentage": 50, + }, } # Handle temporal configuration using new helper methods if agent_config.is_temporal_agent(): temporal_config = agent_config.get_temporal_workflow_config() if temporal_config: - helm_values[TEMPORAL_WORKER_KEY] = {} + helm_values[TEMPORAL_WORKER_KEY] = { + "enabled": True, + # Enable autoscaling for temporal workers as well + "autoscaling": { + "enabled": True, + "minReplicas": 1, + "maxReplicas": 10, + "targetCPUUtilizationPercentage": 50, + }, + } helm_values["global"]["workflow"] = { "name": temporal_config.name, "taskQueue": temporal_config.queue_name, } - helm_values[TEMPORAL_WORKER_KEY]["enabled"] = True - secret_env_vars = [] - if agent_config.credentials: - for credential in agent_config.credentials: - secret_env_vars.append( - { - "name": credential.env_var_name, - "secretName": credential.secret_name, - "secretKey": credential.secret_key, - } - ) - - helm_values["secretEnvVars"] = secret_env_vars - if TEMPORAL_WORKER_KEY in helm_values: - helm_values[TEMPORAL_WORKER_KEY]["secretEnvVars"] = secret_env_vars - - # Set the agent_config env vars first to the helm values and so then it can be overriden by the cluster config + # Collect all environment variables with conflict detection + all_env_vars: dict[str, str] = {} + secret_env_vars: list[dict[str, str]] = [] + + # Start with agent_config env vars if agent_config.env: - helm_values["env"] = agent_config.env - if TEMPORAL_WORKER_KEY in helm_values: - helm_values[TEMPORAL_WORKER_KEY]["env"] = agent_config.env + all_env_vars.update(agent_config.env) # Add auth principal env var if manifest principal is set encoded_principal = _encode_principal_context(manifest) if encoded_principal: - if "env" not in helm_values: - helm_values["env"] = {} - helm_values["env"][EnvVarKeys.AUTH_PRINCIPAL_B64.value] = encoded_principal + all_env_vars[EnvVarKeys.AUTH_PRINCIPAL_B64.value] = encoded_principal - if manifest.deployment and manifest.deployment.imagePullSecrets: - pull_secrets = [ - pull_secret.to_dict() - for pull_secret in manifest.deployment.imagePullSecrets - ] - helm_values["global"]["imagePullSecrets"] = pull_secrets - helm_values["imagePullSecrets"] = pull_secrets + # Handle credentials and check for conflicts + if agent_config.credentials: + for credential in agent_config.credentials: + # Handle both CredentialMapping objects and legacy dict format + if isinstance(credential, dict): + env_var_name = credential["env_var_name"] + secret_name = credential["secret_name"] + secret_key = credential["secret_key"] + else: + env_var_name = credential.env_var_name + secret_name = credential.secret_name + secret_key = credential.secret_key + + # Check if the environment variable name conflicts with existing env vars + if env_var_name in all_env_vars: + logger.warning( + f"Environment variable '{env_var_name}' is defined in both " + f"env and secretEnvVars. The secret value will take precedence." + ) + # Remove from regular env vars since secret takes precedence + del all_env_vars[env_var_name] + + secret_env_vars.append( + { + "name": env_var_name, + "secretName": secret_name, + "secretKey": secret_key, + } + ) # Apply cluster-specific overrides if cluster_config: @@ -235,23 +257,50 @@ def merge_deployment_configs( } ) + # Handle cluster env vars with conflict detection if cluster_config.env: - helm_values["env"] = cluster_config.env + # Convert cluster env list to dict for easier conflict detection + cluster_env_dict = {env_var["name"]: env_var["value"] for env_var in cluster_config.env} + + # Check for conflicts with secret env vars + for secret_env_var in secret_env_vars: + if secret_env_var["name"] in cluster_env_dict: + logger.warning( + f"Environment variable '{secret_env_var['name']}' is defined in both " + f"cluster config env and secretEnvVars. The secret value will take precedence." + ) + del cluster_env_dict[secret_env_var["name"]] + + # Update all_env_vars with cluster overrides + all_env_vars.update(cluster_env_dict) # Apply additional arbitrary overrides if cluster_config.additional_overrides: _deep_merge(helm_values, cluster_config.additional_overrides) - # Convert the env vars to a list of dictionaries - if "env" in helm_values: - helm_values["env"] = convert_env_vars_dict_to_list(helm_values["env"]) - - # Convert the temporal worker env vars to a list of dictionaries - if TEMPORAL_WORKER_KEY in helm_values and "env" in helm_values[TEMPORAL_WORKER_KEY]: - helm_values[TEMPORAL_WORKER_KEY]["env"] = convert_env_vars_dict_to_list( - helm_values[TEMPORAL_WORKER_KEY]["env"] - ) + # Set final environment variables + if all_env_vars: + helm_values["env"] = convert_env_vars_dict_to_list(all_env_vars) + if secret_env_vars: + helm_values["secretEnvVars"] = secret_env_vars + + # Set environment variables for temporal worker if enabled + if TEMPORAL_WORKER_KEY in helm_values: + if all_env_vars: + helm_values[TEMPORAL_WORKER_KEY]["env"] = convert_env_vars_dict_to_list(all_env_vars) + if secret_env_vars: + helm_values[TEMPORAL_WORKER_KEY]["secretEnvVars"] = secret_env_vars + + # Handle image pull secrets + if manifest.deployment and manifest.deployment.imagePullSecrets: + pull_secrets = [ + pull_secret.to_dict() + for pull_secret in manifest.deployment.imagePullSecrets + ] + helm_values["global"]["imagePullSecrets"] = pull_secrets + helm_values["imagePullSecrets"] = pull_secrets + # Add dynamic ACP command based on manifest configuration add_acp_command_to_helm_values(helm_values, manifest, manifest_path) diff --git a/src/agentex/resources/agents.py b/src/agentex/resources/agents.py index ebeefc26..8cfa5776 100644 --- a/src/agentex/resources/agents.py +++ b/src/agentex/resources/agents.py @@ -508,17 +508,24 @@ def send_message_stream( raise ValueError("Either agent_id or agent_name must be provided") with raw_agent_rpc_response as response: - for agent_rpc_response_str in response.iter_text(): - if agent_rpc_response_str.strip(): # Only process non-empty lines - try: - chunk_rpc_response = SendMessageStreamResponse.model_validate( - json.loads(agent_rpc_response_str), - from_attributes=True - ) - yield chunk_rpc_response - except json.JSONDecodeError: - # Skip invalid JSON lines - continue + for _line in response.iter_lines(): + if not _line: + continue + line = _line.strip() + # Handle optional SSE-style prefix + if line.startswith("data:"): + line = line[len("data:"):].strip() + if not line: + continue + try: + chunk_rpc_response = SendMessageStreamResponse.model_validate( + json.loads(line), + from_attributes=True + ) + yield chunk_rpc_response + except json.JSONDecodeError: + # Skip invalid JSON lines + continue def send_event( self, @@ -1048,17 +1055,24 @@ async def send_message_stream( raise ValueError("Either agent_id or agent_name must be provided") async with raw_agent_rpc_response as response: - async for agent_rpc_response_str in response.iter_text(): - if agent_rpc_response_str.strip(): # Only process non-empty lines - try: - chunk_rpc_response = SendMessageStreamResponse.model_validate( - json.loads(agent_rpc_response_str), - from_attributes=True - ) - yield chunk_rpc_response - except json.JSONDecodeError: - # Skip invalid JSON lines - continue + async for _line in response.iter_lines(): + if not _line: + continue + line = _line.strip() + # Handle optional SSE-style prefix + if line.startswith("data:"): + line = line[len("data:"):].strip() + if not line: + continue + try: + chunk_rpc_response = SendMessageStreamResponse.model_validate( + json.loads(line), + from_attributes=True + ) + yield chunk_rpc_response + except json.JSONDecodeError: + # Skip invalid JSON lines + continue async def send_event( self,