diff --git a/src/database-mcp-server/oracle/database_mcp_server/__init__.py b/src/database-mcp-server/oracle/database_mcp_server/__init__.py index 18a81447..7164143d 100644 --- a/src/database-mcp-server/oracle/database_mcp_server/__init__.py +++ b/src/database-mcp-server/oracle/database_mcp_server/__init__.py @@ -5,4 +5,4 @@ """ __project__ = "oracle.database-mcp-server" -__version__ = "1.0.0" +__version__ = "1.0.1" diff --git a/src/oci-identity-mcp-server/oracle/oci_identity_mcp_server/__init__.py b/src/oci-identity-mcp-server/oracle/oci_identity_mcp_server/__init__.py index 8d20cb35..08a61e6a 100644 --- a/src/oci-identity-mcp-server/oracle/oci_identity_mcp_server/__init__.py +++ b/src/oci-identity-mcp-server/oracle/oci_identity_mcp_server/__init__.py @@ -5,4 +5,4 @@ """ __project__ = "oracle.oci-identity-mcp-server" -__version__ = "1.0.2" +__version__ = "2.0.0" diff --git a/src/oci-identity-mcp-server/oracle/oci_identity_mcp_server/models.py b/src/oci-identity-mcp-server/oracle/oci_identity_mcp_server/models.py new file mode 100644 index 00000000..5862de03 --- /dev/null +++ b/src/oci-identity-mcp-server/oracle/oci_identity_mcp_server/models.py @@ -0,0 +1,355 @@ +""" +Copyright (c) 2025, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +from datetime import datetime +from typing import Any, Dict, Literal, Optional + +import oci +from pydantic import BaseModel, Field + + +def _oci_to_dict(obj): + """Best-effort conversion of OCI SDK model objects to plain dicts.""" + if obj is None: + return None + try: + from oci.util import to_dict as oci_to_dict + + return oci_to_dict(obj) + except Exception: + pass + if isinstance(obj, dict): + return obj + if hasattr(obj, "__dict__"): + return {k: v for k, v in obj.__dict__.items() if not k.startswith("_")} + return None + + +# region Compartment + + +class Compartment(BaseModel): + """ + Pydantic model mirroring the fields of oci.identity.models.Compartment. + There are no nested OCI model types in this class. + """ + + id: Optional[str] = Field(None, description="The OCID of the compartment.") + compartment_id: Optional[str] = Field( + None, + description="The OCID of the parent compartment " + "(remember that the tenancy is simply the root compartment).", + ) + name: Optional[str] = Field( + None, + description="The name you assign to the compartment during creation. " + "The name must be unique across all compartments in the parent. " + "Avoid entering confidential information.", + ) + description: Optional[str] = Field( + None, + description="The description you assign to the compartment. " + "Does not have to be unique, and it's changeable.", + ) + time_created: Optional[datetime] = Field( + None, + description="Date and time the compartment was created, in the format " + "defined by RFC3339. Example: `2016-08-25T21:10:29.600Z`", + ) + lifecycle_state: Optional[ + Literal["CREATING", "ACTIVE", "INACTIVE", "DELETING", "DELETED", "FAILED"] + ] = Field(None, description="The compartment's current state.") + inactive_status: Optional[int] = Field( + None, description="The detailed status of INACTIVE lifecycleState." + ) + freeform_tags: Optional[Dict[str, str]] = Field( + None, + description='Free-form tags for this resource. Each tag is a simple key-value pair with "' + '"no predefined name, type, or namespace. For more information, see "' + '"[Resource Tags](https://docs.cloud.oracle.com/Content/General/Concepts/resourcetags.htm). "' + '"Example: `{"Department": "Finance"}`', + ) + defined_tags: Optional[Dict[str, Dict[str, Any]]] = Field( + None, + description='Defined tags for this resource. Each key is predefined and scoped to a namespace. "' + '"For more information, see "' + '"[Resource Tags](https://docs.cloud.oracle.com/Content/General/Concepts/resourcetags.htm). "' + '"Example: `{"Operations": {"CostCenter": "42"}}`', + ) + + +def map_compartment(compartment_data: oci.identity.models.Compartment) -> Compartment: + """ + Convert an oci.identity.models.Compartment to oracle.oci_identity_mcp_server.models.Compartment. + """ + return Compartment( + id=getattr(compartment_data, "id", None), + compartment_id=getattr(compartment_data, "compartment_id", None), + name=getattr(compartment_data, "name", None), + description=getattr(compartment_data, "description", None), + time_created=getattr(compartment_data, "time_created", None), + lifecycle_state=getattr(compartment_data, "lifecycle_state", None), + inactive_status=getattr(compartment_data, "inactive_status", None), + freeform_tags=getattr(compartment_data, "freeform_tags", None), + defined_tags=getattr(compartment_data, "defined_tags", None), + ) + + +# end region + +# region Tenancy + + +class Tenancy(BaseModel): + """ + Pydantic model mirroring the fields of oci.identity.models.Tenancy. + There are no nested OCI model types in this class. + """ + + id: Optional[str] = Field(None, description="The OCID of the tenancy.") + name: Optional[str] = Field(None, description="The name of the tenancy.") + description: Optional[str] = Field( + None, description="The description of the tenancy." + ) + home_region_key: Optional[str] = Field( + None, description="The region key for the tenancy's home region." + ) + freeform_tags: Optional[Dict[str, str]] = Field( + None, + description='Free-form tags for this resource. Each tag is a simple key-value pair with "' + '"no predefined name, type, or namespace. For more information, see "' + '"[Resource Tags](https://docs.cloud.oracle.com/Content/General/Concepts/resourcetags.htm). "' + '"Example: `{"Department": "Finance"}`', + ) + defined_tags: Optional[Dict[str, Dict[str, Any]]] = Field( + None, + description='Defined tags for this resource. Each key is predefined and scoped to a namespace. "' + '"For more information, see "' + '"[Resource Tags](https://docs.cloud.oracle.com/Content/General/Concepts/resourcetags.htm). "' + '"Example: `{"Operations": {"CostCenter": "42"}}`', + ) + + +def map_tenancy(tenancy_data: oci.identity.models.Tenancy) -> Tenancy: + """ + Convert an oci.identity.models.Tenancy to oracle.oci_identity_mcp_server.models.Tenancy. + """ + return Tenancy( + id=getattr(tenancy_data, "id", None), + name=getattr(tenancy_data, "name", None), + description=getattr(tenancy_data, "description", None), + home_region_key=getattr(tenancy_data, "home_region_key", None), + freeform_tags=getattr(tenancy_data, "freeform_tags", None), + defined_tags=getattr(tenancy_data, "defined_tags", None), + ) + + +# endregion + +# region AvailabilityDomain + + +class AvailabilityDomain(BaseModel): + """ + Pydantic model mirroring the fields of oci.identity.models.AvailabilityDomain. + There are no nested OCI model types in this class. + """ + + id: Optional[str] = Field(None, description="The OCID of the Availability Domain.") + name: Optional[str] = Field( + None, description="The name of the Availability Domain." + ) + compartment_id: Optional[str] = Field( + None, description="The OCID of the tenancy containing the Availability Domain." + ) + + +def map_availability_domain( + availability_domain_data: oci.identity.models.AvailabilityDomain, +) -> AvailabilityDomain: + """ + Convert an oci.identity.models.AvailabilityDomain to + oracle.oci_identity_mcp_server.models.AvailabilityDomain. + """ + return AvailabilityDomain( + id=getattr(availability_domain_data, "id", None), + name=getattr(availability_domain_data, "name", None), + compartment_id=getattr(availability_domain_data, "compartment_id", None), + ) + + +# end region + +# region AuthToken + + +class AuthToken(BaseModel): + """ + Pydantic model mirroring the fields of oci.identity.models.AuthToken. + There are no nested OCI model types in this class. + """ + + id: Optional[str] = Field(None, description="The OCID of the auth token.") + token: Optional[str] = Field(None, description="The raw auth token string.") + user_id: Optional[str] = Field( + None, description="The OCID of the user the password belongs to." + ) + description: Optional[str] = Field( + None, + description="The description you assign to the auth token. " + "Does not have to be unique, and it's changeable.", + ) + time_created: Optional[datetime] = Field( + None, + description="Date and time the auth token was created, in the format " + "defined by RFC3339. Example: `2016-08-25T21:10:29.600Z`", + ) + time_expires: Optional[datetime] = Field( + None, + description="Date and time when this auth token will expire, in the format " + "defined by RFC3339. Example: `2016-08-25T21:10:29.600Z`", + ) + lifecycle_state: Optional[ + Literal["CREATING", "ACTIVE", "INACTIVE", "DELETING", "DELETED"] + ] = Field(None, description="The auth token's current state.") + inactive_status: Optional[int] = Field( + None, description="The detailed status of INACTIVE lifecycleState." + ) + freeform_tags: Optional[Dict[str, str]] = Field( + None, + description='Free-form tags for this resource. Each tag is a simple key-value pair with "' + '"no predefined name, type, or namespace. For more information, see "' + '"[Resource Tags](https://docs.cloud.oracle.com/Content/General/Concepts/resourcetags.htm). "' + '"Example: `{"Department": "Finance"}`', + ) + defined_tags: Optional[Dict[str, Dict[str, Any]]] = Field( + None, + description='Defined tags for this resource. Each key is predefined and scoped to a namespace. "' + '"For more information, see "' + '"[Resource Tags](https://docs.cloud.oracle.com/Content/General/Concepts/resourcetags.htm). "' + '"Example: `{"Operations": {"CostCenter": "42"}}`', + ) + + +def map_auth_token(auth_token_data: oci.identity.models.AuthToken) -> AuthToken: + """ + Convert an oci.identity.models.AuthToken to oracle.oci_identity_mcp_server.models.AuthToken. + """ + return AuthToken( + id=getattr(auth_token_data, "id", None), + token=getattr(auth_token_data, "token", None), + user_id=getattr(auth_token_data, "user_id", None), + description=getattr(auth_token_data, "description", None), + time_created=getattr(auth_token_data, "time_created", None), + time_expires=getattr(auth_token_data, "time_expires", None), + lifecycle_state=getattr(auth_token_data, "lifecycle_state", None), + inactive_status=getattr(auth_token_data, "inactive_status", None), + freeform_tags=getattr(auth_token_data, "freeform_tags", None), + defined_tags=getattr(auth_token_data, "defined_tags", None), + ) + + +# endregion + +# region User + + +class User(BaseModel): + """ + Pydantic model mirroring the fields of oci.identity.models.User. + There are no nested OCI model types in this class. + """ + + id: Optional[str] = Field(None, description="The OCID of the user.") + compartment_id: Optional[str] = Field( + None, description="The OCID of the tenancy containing the user." + ) + name: Optional[str] = Field( + None, + description="The name you assign to the user during creation. " + "This is the user's login value. The name must be unique " + "across all users in the tenancy and cannot be changed.", + ) + description: Optional[str] = Field( + None, + description="The description you assign to the user. " + "Does not have to be unique, and it's changeable.", + ) + email: Optional[str] = Field(None, description="The email address of the user.") + email_verified: Optional[bool] = Field( + None, description="Whether the email address has been validated." + ) + db_user_name: Optional[str] = Field( + None, + description="DB username of the DB credential. Has to be unique across the tenancy.", + ) + identity_provider_id: Optional[str] = Field( + None, description="The OCID of the IdentityProvider this user belongs to." + ) + external_identifier: Optional[str] = Field( + None, description="Identifier of the user in the identity provider" + ) + time_created: Optional[datetime] = Field( + None, + description="Date and time the user was created, in the format defined by RFC3339. " + "Example: `2016-08-25T21:10:29.600Z`", + ) + lifecycle_state: Optional[ + Literal["CREATING", "ACTIVE", "INACTIVE", "DELETING", "DELETED"] + ] = Field(None, description="The user's current state.") + inactive_status: Optional[int] = Field( + None, + description="Returned only if the user's `lifecycleState` is INACTIVE. " + "A 16-bit value showing the reason why the user is inactive: " + "- bit 0: SUSPENDED (reserved for future use) " + "- bit 1: DISABLED (reserved for future use) " + "- bit 2: BLOCKED (the user has exceeded the maximum number of " + "failed login attempts for the Console)", + ) + freeform_tags: Optional[Dict[str, str]] = Field( + None, + description='Free-form tags for this resource. Each tag is a simple key-value pair with "' + '"no predefined name, type, or namespace. For more information, see "' + '"[Resource Tags](https://docs.cloud.oracle.com/Content/General/Concepts/resourcetags.htm). "' + '"Example: `{"Department": "Finance"}`', + ) + defined_tags: Optional[Dict[str, Dict[str, Any]]] = Field( + None, + description='Defined tags for this resource. Each key is predefined and scoped to a namespace. "' + '"For more information, see "' + '"[Resource Tags](https://docs.cloud.oracle.com/Content/General/Concepts/resourcetags.htm). "' + '"Example: `{"Operations": {"CostCenter": "42"}}`', + ) + is_mfa_activated: Optional[bool] = Field( + None, + description="Indicates whether the user has activated multi-factor authentication.", + ) + + +def map_user(user_data: oci.identity.models.User) -> User: + """ + Convert an oci.identity.models.User to oracle.oci_identity_mcp_server.models.User. + """ + return User( + id=getattr(user_data, "id", None), + compartment_id=getattr(user_data, "compartment_id", None), + name=getattr(user_data, "name", None), + description=getattr(user_data, "description", None), + email=getattr(user_data, "email", None), + email_verified=getattr(user_data, "email_verified", None), + db_user_name=getattr(user_data, "db_user_name", None), + identity_provider_id=getattr(user_data, "identity_provider_id", None), + external_identifier=getattr(user_data, "external_identifier", None), + time_created=getattr(user_data, "time_created", None), + lifecycle_state=getattr(user_data, "lifecycle_state", None), + inactive_status=getattr(user_data, "inactive_status", None), + freeform_tags=getattr(user_data, "freeform_tags", None), + defined_tags=getattr(user_data, "defined_tags", None), + is_mfa_activated=getattr(user_data, "is_mfa_activated", None), + ) + + +# endregion diff --git a/src/oci-identity-mcp-server/oracle/oci_identity_mcp_server/server.py b/src/oci-identity-mcp-server/oracle/oci_identity_mcp_server/server.py index ab3e2f55..1188a90c 100644 --- a/src/oci-identity-mcp-server/oracle/oci_identity_mcp_server/server.py +++ b/src/oci-identity-mcp-server/oracle/oci_identity_mcp_server/server.py @@ -7,12 +7,29 @@ import base64 import json import os +from logging import Logger +from typing import Optional import oci from fastmcp import FastMCP +from oracle.oci_identity_mcp_server.models import ( + AuthToken, + AvailabilityDomain, + Compartment, + Tenancy, + User, + map_auth_token, + map_availability_domain, + map_compartment, + map_tenancy, + map_user, +) +from pydantic import Field from . import __project__, __version__ +logger = Logger(__name__, level="INFO") + mcp = FastMCP(name=__project__) @@ -30,119 +47,186 @@ def get_identity_client(): return oci.identity.IdentityClient(config, signer=signer) -@mcp.tool -def list_compartments(tenancy_id: str) -> list[dict]: - identity = get_identity_client() - compartments = identity.list_compartments(tenancy_id).data - return [ - { - "id": compartment.id, - "name": compartment.name, - "description": compartment.description, - "lifecycle_state": compartment.lifecycle_state, - } - for compartment in compartments - ] +@mcp.tool(description="List compartments in a given compartment") +def list_compartments( + compartment_id: str = Field( + ..., + description="The OCID of the compartment (remember that the tenancy is simply the root compartment)", + ), + limit: Optional[int] = Field( + None, + description="The maximum amount of compartments to return. If None, there is no limit.", + ge=1, + ), +) -> list[Compartment]: + compartments: list[Compartment] = [] + try: + client = get_identity_client() -@mcp.tool -def get_tenancy_info(tenancy_id: str) -> dict: - identity = get_identity_client() - tenancy = identity.get_tenancy(tenancy_id).data - return { - "id": tenancy.id, - "name": tenancy.name, - "description": tenancy.description, - "home_region_key": tenancy.home_region_key, - } + response: oci.response.Response = None + has_next_page = True + next_page: str = None + + while has_next_page and (limit is None or len(compartments) < limit): + kwargs = { + "compartment_id": compartment_id, + "page": next_page, + "limit": limit, + } + + response = client.list_compartments(**kwargs) + has_next_page = response.has_next_page + next_page = response.next_page if hasattr(response, "next_page") else None + + data: list[oci.identity.models.Compartment] = response.data + for d in data: + compartments.append(map_compartment(d)) + + logger.info(f"Found {len(compartments)} Compartments") + return compartments + + except Exception as e: + logger.error(f"Error in list_compartments tool: {str(e)}") + raise e + + +@mcp.tool(description="Get tenancy with a given OCID") +def get_tenancy( + tenancy_id: str = Field(..., description="The OCID of the tenancy") +) -> Tenancy: + try: + client = get_identity_client() + + response: oci.response.Response = client.get_tenancy(tenancy_id) + data: oci.identity.models.Tenancy = response.data + logger.info("Found Tenancy") + return map_tenancy(data) + + except Exception as e: + logger.error(f"Error in get_tenancy tool: {str(e)}") + raise e @mcp.tool(description="Lists all of the availability domains in a given tenancy") -def list_availability_domains(tenancy_id: str) -> list[dict]: - identity = get_identity_client() - ads: list[oci.identity.models.AvailabilityDomain] = ( - identity.list_availability_domains(tenancy_id).data - ) - return [ - { - "id": ad.id, - "name": ad.name, - "compartment_id": ad.compartment_id, - } - for ad in ads - ] +def list_availability_domains( + compartment_id: str = Field( + ..., + description="The OCID of the compartment (remember that the tenancy is simply the root compartment)", + ), +) -> list[AvailabilityDomain]: + ads: list[AvailabilityDomain] = [] + + try: + client = get_identity_client() + + response = client.list_availability_domains(compartment_id) + + data: list[oci.identity.models.AvailabilityDomain] = response.data + for d in data: + ads.append(map_availability_domain(d)) + + logger.info(f"Found {len(ads)} Availability Domains") + return ads + + except Exception as e: + logger.error(f"Error in list_availability_domains tool: {str(e)}") + raise e @mcp.tool -def get_current_tenancy() -> dict: - config = oci.config.from_file( - profile_name=os.getenv("OCI_CONFIG_PROFILE", oci.config.DEFAULT_PROFILE) - ) - tenancy_id = config["tenancy"] - identity = get_identity_client() - tenancy = identity.get_tenancy(tenancy_id).data - return { - "id": tenancy.id, - "name": tenancy.name, - "description": tenancy.description, - "home_region_key": tenancy.home_region_key, - } +def get_current_tenancy() -> Tenancy: + try: + client = get_identity_client() + + config = oci.config.from_file( + profile_name=os.getenv("OCI_CONFIG_PROFILE", oci.config.DEFAULT_PROFILE) + ) + tenancy_id = config["tenancy"] + + response: oci.response.Response = client.get_tenancy(tenancy_id) + data: oci.identity.models.Tenancy = response.data + logger.info("Found Tenancy") + return map_tenancy(data) + + except Exception as e: + logger.error(f"Error in get_tenancy tool: {str(e)}") + raise e @mcp.tool -def create_auth_token(user_id: str) -> dict: - identity = get_identity_client() - token = identity.create_auth_token(user_id=user_id).data - return { - "token": token.token, - "description": token.description, - "lifecycle_state": token.lifecycle_state, - } +def create_auth_token( + user_id: str = Field(..., description="The OCID of the user"), + description: Optional[str] = Field( + "", description="The description of the auth token" + ), +) -> AuthToken: + try: + client = get_identity_client() + + create_auth_token_details = oci.identity.models.CreateAuthTokenDetails( + description=description + ) + + response: oci.response.Response = client.create_auth_token( + user_id=user_id, + create_auth_token_details=create_auth_token_details, + ) + data: oci.identity.models.AuthToken = response.data + logger.info("Created auth token") + return map_auth_token(data) + + except Exception as e: + logger.error(f"Error in create_auth_token tool: {str(e)}") + raise e @mcp.tool -def get_current_user() -> dict: - identity = get_identity_client() - config = oci.config.from_file( - profile_name=os.getenv("OCI_CONFIG_PROFILE", oci.config.DEFAULT_PROFILE) - ) +def get_current_user() -> User: + try: + client = get_identity_client() + config = oci.config.from_file( + profile_name=os.getenv("OCI_CONFIG_PROFILE", oci.config.DEFAULT_PROFILE) + ) - # Prefer explicit user from config if present - user_id = config.get("user") - - # Fallback: derive user OCID from the security token (session auth) - if not user_id: - token_file = config.get("security_token_file") - if token_file and os.path.exists(token_file): - with open(token_file, "r") as f: - token = f.read().strip() - - # Expect JWT-like token: header.payload.signature (base64url) - if "." in token: - try: - payload_b64 = token.split(".", 2)[1] - padding = "=" * (-len(payload_b64) % 4) - payload_json = base64.urlsafe_b64decode( - payload_b64 + padding - ).decode("utf-8") - payload = json.loads(payload_json) - # 'sub' typically contains the user OCID for session tokens; - # fallback to opc-user-id if present - user_id = payload.get("sub") or payload.get("opc-user-id") - except Exception: - user_id = None + # Prefer explicit user from config if present + user_id = config.get("user") + # Fallback: derive user OCID from the security token (session auth) if not user_id: - raise KeyError( - "Unable to determine current user OCID from config or security token" - ) - - user = identity.get_user(user_id).data - return { - "id": user.id, - "name": user.name, - "description": user.description, - } + token_file = config.get("security_token_file") + if token_file and os.path.exists(token_file): + with open(token_file, "r") as f: + token = f.read().strip() + + # Expect JWT-like token: header.payload.signature (base64url) + if "." in token: + try: + payload_b64 = token.split(".", 2)[1] + padding = "=" * (-len(payload_b64) % 4) + payload_json = base64.urlsafe_b64decode( + payload_b64 + padding + ).decode("utf-8") + payload = json.loads(payload_json) + # 'sub' typically contains the user OCID for session tokens; + # fallback to opc-user-id if present + user_id = payload.get("sub") or payload.get("opc-user-id") + except Exception: + user_id = None + + if not user_id: + raise KeyError( + "Unable to determine current user OCID from config or security token" + ) + + response: oci.response.Response = client.get_user(user_id) + data: oci.identity.models.User = response.data + logger.info("Found current user") + return map_user(data) + + except Exception as e: + logger.error(f"Error in get_current_user tool: {str(e)}") + raise e def main(): diff --git a/src/oci-identity-mcp-server/oracle/oci_identity_mcp_server/tests/test_identity_tools.py b/src/oci-identity-mcp-server/oracle/oci_identity_mcp_server/tests/test_identity_tools.py index b2841e68..0a446364 100644 --- a/src/oci-identity-mcp-server/oracle/oci_identity_mcp_server/tests/test_identity_tools.py +++ b/src/oci-identity-mcp-server/oracle/oci_identity_mcp_server/tests/test_identity_tools.py @@ -30,6 +30,8 @@ async def test_list_compartments(self, mock_get_client): time_created="1970-01-01T00:00:00", ) ] + mock_list_response.has_next_page = False + mock_list_response.next_page = None mock_client.list_compartments.return_value = mock_list_response async with Client(mcp) as client: @@ -37,7 +39,7 @@ async def test_list_compartments(self, mock_get_client): await client.call_tool( "list_compartments", { - "tenancy_id": "test_tenancy", + "compartment_id": "test_tenancy", }, ) ).structured_content["result"] @@ -66,7 +68,7 @@ async def test_list_availability_domains(self, mock_get_client): await client.call_tool( "list_availability_domains", { - "tenancy_id": "test_tenancy", + "compartment_id": "test_tenancy", }, ) ).structured_content["result"] @@ -76,7 +78,7 @@ async def test_list_availability_domains(self, mock_get_client): @pytest.mark.asyncio @patch("oracle.oci_identity_mcp_server.server.get_identity_client") - async def test_get_tenancy_info(self, mock_get_client): + async def test_get_tenancy(self, mock_get_client): mock_client = MagicMock() mock_get_client.return_value = mock_client @@ -90,14 +92,13 @@ async def test_get_tenancy_info(self, mock_get_client): mock_client.get_tenancy.return_value = mock_get_response async with Client(mcp) as client: - result = ( - await client.call_tool( - "get_tenancy_info", - { - "tenancy_id": "test_tenancy", - }, - ) - ).data + call_tool_result = await client.call_tool( + "get_tenancy", + { + "tenancy_id": "test_tenancy", + }, + ) + result = call_tool_result.structured_content assert result["id"] == "tenancy1" @@ -110,7 +111,7 @@ async def test_get_current_tenancy(self, mock_config_from_file, mock_get_client) mock_get_client.return_value = mock_client mock_get_response = create_autospec(oci.response.Response) - mock_get_response.data = MagicMock( + mock_get_response.data = oci.identity.models.Tenancy( id="tenancy1", name="Tenancy 1", description="Test tenancy", @@ -119,7 +120,11 @@ async def test_get_current_tenancy(self, mock_config_from_file, mock_get_client) mock_client.get_tenancy.return_value = mock_get_response async with Client(mcp) as client: - result = (await client.call_tool("get_current_tenancy", {})).data + call_tool_result = await client.call_tool( + "get_current_tenancy", + {}, + ) + result = call_tool_result.structured_content assert result["id"] == "tenancy1" @@ -136,14 +141,13 @@ async def test_create_auth_token(self, mock_get_client): mock_client.create_auth_token.return_value = mock_create_response async with Client(mcp) as client: - result = ( - await client.call_tool( - "create_auth_token", - { - "user_id": "test_user", - }, - ) - ).data + call_tool_result = await client.call_tool( + "create_auth_token", + { + "user_id": "test_user", + }, + ) + result = call_tool_result.structured_content assert result["token"] == "token1" @@ -156,12 +160,16 @@ async def test_get_current_user(self, mock_config_from_file, mock_get_client): mock_get_client.return_value = mock_client mock_get_response = create_autospec(oci.response.Response) - mock_get_response.data = MagicMock( + mock_get_response.data = oci.identity.models.User( id="user1", name="User 1", description="Test user" ) mock_client.get_user.return_value = mock_get_response async with Client(mcp) as client: - result = (await client.call_tool("get_current_user", {})).data + call_tool_result = await client.call_tool( + "get_current_user", + {}, + ) + result = call_tool_result.structured_content assert result["id"] == "user1" diff --git a/src/oci-identity-mcp-server/pyproject.toml b/src/oci-identity-mcp-server/pyproject.toml index 96c3777a..b68f406d 100644 --- a/src/oci-identity-mcp-server/pyproject.toml +++ b/src/oci-identity-mcp-server/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "oracle.oci-identity-mcp-server" -version = "1.0.2" +version = "2.0.0" description = "OCI Identity Service MCP server" readme = "README.md" requires-python = ">=3.13" diff --git a/src/oci-identity-mcp-server/uv.lock b/src/oci-identity-mcp-server/uv.lock index 1011c897..600de851 100644 --- a/src/oci-identity-mcp-server/uv.lock +++ b/src/oci-identity-mcp-server/uv.lock @@ -735,7 +735,7 @@ wheels = [ [[package]] name = "oracle-oci-identity-mcp-server" -version = "1.0.1" +version = "2.0.0" source = { editable = "." } dependencies = [ { name = "fastmcp" }, diff --git a/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/__init__.py b/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/__init__.py index c9442225..85632ee7 100644 --- a/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/__init__.py +++ b/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/__init__.py @@ -5,4 +5,4 @@ """ __project__ = "oracle.oci-migration-mcp-server" -__version__ = "1.0.2" +__version__ = "2.0.0" diff --git a/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/models.py b/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/models.py new file mode 100644 index 00000000..39eba853 --- /dev/null +++ b/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/models.py @@ -0,0 +1,198 @@ +""" +Copyright (c) 2025, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +from datetime import datetime +from typing import Dict, Optional + +import oci +from pydantic import BaseModel, Field + + +def _oci_to_dict(obj): + """Best-effort conversion of OCI SDK model objects to plain dicts.""" + if obj is None: + return None + try: + from oci.util import to_dict as oci_to_dict + + return oci_to_dict(obj) + except Exception: + pass + if isinstance(obj, dict): + return obj + if hasattr(obj, "__dict__"): + return {k: v for k, v in obj.__dict__.items() if not k.startswith("_")} + return None + + +# region Migration + + +class Migration(BaseModel): + """ + Pydantic model mirroring the fields of oci.cloud_migrations.models.Migration. + This model has no nested custom types; all fields are primitives or dicts. + """ + + id: Optional[str] = Field( + None, description="Unique identifier that is immutable on creation" + ) + display_name: Optional[str] = Field( + None, description="Migration Identifier that can be renamed" + ) + compartment_id: Optional[str] = Field(None, description="Compartment Identifier") + lifecycle_state: Optional[str] = Field( + None, + description="The current state of migration.", + json_schema_extra={ + "enum": [ + "CREATING", + "UPDATING", + "NEEDS_ATTENTION", + "ACTIVE", + "DELETING", + "DELETED", + "FAILED", + ] + }, + ) + lifecycle_details: Optional[str] = Field( + None, + description="A message describing the current state in more detail. " + "For example, it can be used to provide actionable information for a resource in Failed state.", + ) + time_created: Optional[datetime] = Field( + None, + description="The time when the migration project was created. An RFC3339 formatted datetime string", + ) + time_updated: Optional[datetime] = Field( + None, + description="The time when the migration project was updated. An RFC3339 formatted datetime string", + ) + replication_schedule_id: Optional[str] = Field( + None, description="Replication schedule identifier" + ) + is_completed: Optional[bool] = Field( + None, description="Indicates whether migration is marked as completed." + ) + freeform_tags: Optional[Dict[str, str]] = Field( + None, + description="Simple key-value pair that is applied without any predefined name, type or scope. " + "It exists only for cross-compatibility.", + ) + defined_tags: Optional[Dict[str, Dict[str, object]]] = Field( + None, + description="Defined tags for this resource. Each key is predefined and scoped to a namespace.", + ) + system_tags: Optional[Dict[str, Dict[str, object]]] = Field( + None, + description="Usage of system tag keys. These predefined keys are scoped to namespaces.", + ) + + +def map_migration(migration_data: oci.cloud_migrations.models.Migration) -> Migration: + """ + Convert an oci.cloud_migrations.models.Migration to oracle.oci_migration_mcp_server.models.Migration. + Since there are no nested types, this is a direct mapping. + """ + return Migration( + id=getattr(migration_data, "id", None), + display_name=getattr(migration_data, "display_name", None), + compartment_id=getattr(migration_data, "compartment_id", None), + lifecycle_state=getattr(migration_data, "lifecycle_state", None), + lifecycle_details=getattr(migration_data, "lifecycle_details", None), + time_created=getattr(migration_data, "time_created", None), + time_updated=getattr(migration_data, "time_updated", None), + replication_schedule_id=getattr( + migration_data, "replication_schedule_id", None + ), + is_completed=getattr(migration_data, "is_completed", None), + freeform_tags=getattr(migration_data, "freeform_tags", None), + defined_tags=getattr(migration_data, "defined_tags", None), + system_tags=getattr(migration_data, "system_tags", None), + ) + + +# endregion + +# region MigrationSummary + + +class MigrationSummary(BaseModel): + """ + Pydantic model mirroring the fields of oci.cloud_migrations.models.MigrationSummary. + This model has no nested custom types; all fields are primitives or dicts. + """ + + id: Optional[str] = Field( + None, description="Unique identifier that is immutable on creation." + ) + display_name: Optional[str] = Field( + None, description="Migration identifier that can be renamed" + ) + compartment_id: Optional[str] = Field(None, description="Compartment identifier") + time_created: Optional[datetime] = Field( + None, + description="The time when the migration project was created. An RFC3339 formatted datetime string.", + ) + time_updated: Optional[datetime] = Field( + None, + description="The time when the migration project was updated. An RFC3339 formatted datetime string.", + ) + lifecycle_state: Optional[str] = Field( + None, description="The current state of migration." + ) + lifecycle_details: Optional[str] = Field( + None, + description="A message describing the current state in more detail. " + "For example, it can be used to provide actionable information for a resource in Failed state.", + ) + is_completed: Optional[bool] = Field( + None, description="Indicates whether migration is marked as complete." + ) + replication_schedule_id: Optional[str] = Field( + None, description="Replication schedule identifier" + ) + freeform_tags: Optional[Dict[str, str]] = Field( + None, + description="Simple key-value pair that is applied without any predefined name, type or scope. " + "It exists only for cross-compatibility.", + ) + defined_tags: Optional[Dict[str, Dict[str, object]]] = Field( + None, + description="Defined tags for this resource. Each key is predefined and scoped to a namespace.", + ) + system_tags: Optional[Dict[str, Dict[str, object]]] = Field( + None, + description="Usage of system tag keys. These predefined keys are scoped to namespaces.", + ) + + +def map_migration_summary( + summary_data: oci.cloud_migrations.models.MigrationSummary, +) -> MigrationSummary: + """ + Convert an oci.cloud_migrations.models.MigrationSummary to + oracle.oci_migration_mcp_server.models.MigrationSummary. + Since there are no nested types, this is a direct mapping. + """ + return MigrationSummary( + id=getattr(summary_data, "id", None), + display_name=getattr(summary_data, "display_name", None), + compartment_id=getattr(summary_data, "compartment_id", None), + time_created=getattr(summary_data, "time_created", None), + time_updated=getattr(summary_data, "time_updated", None), + lifecycle_state=getattr(summary_data, "lifecycle_state", None), + lifecycle_details=getattr(summary_data, "lifecycle_details", None), + is_completed=getattr(summary_data, "is_completed", None), + replication_schedule_id=getattr(summary_data, "replication_schedule_id", None), + freeform_tags=getattr(summary_data, "freeform_tags", None), + defined_tags=getattr(summary_data, "defined_tags", None), + system_tags=getattr(summary_data, "system_tags", None), + ) + + +# endregion diff --git a/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/server.py b/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/server.py index e8437367..aa0984f4 100644 --- a/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/server.py +++ b/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/server.py @@ -6,9 +6,17 @@ import os from logging import Logger +from typing import Literal, Optional import oci from fastmcp import FastMCP +from oracle.oci_migration_mcp_server.models import ( + Migration, + MigrationSummary, + map_migration, + map_migration_summary, +) +from pydantic import Field from . import __project__, __version__ @@ -33,49 +41,80 @@ def get_migration_client(): return oci.cloud_migrations.MigrationClient(config, signer=signer) -@mcp.tool -def get_migration(migration_id: str) -> dict: - """ - Get details for a specific Migration Project by OCID. - Args: - migration_id (str): OCID of the migration project. - Returns: - dict: Migration project details. - """ - client = get_migration_client() - return client.get_migration(migration_id).data - - -@mcp.tool -def list_migrations(compartment_id: str, lifecycle_state: str = None) -> list[dict]: - """ - List Migration Projects for a compartment, optionally filtered by lifecycle state. - Args: - compartment_id (str): OCID of the compartment. - lifecycle_state (str, optional): Filter by lifecycle state. - Returns: - list of dict: Each dict is a migration object. - """ - client = get_migration_client() - list_args = {"compartment_id": compartment_id} - - if lifecycle_state is not None: - list_args["lifecycle_state"] = lifecycle_state - - migrations = client.list_migrations(**list_args).data.items - return [ - { - "id": migration.id, - "display_name": migration.display_name, - "compartment_id": migration.compartment_id, - "lifecycle_state": migration.lifecycle_state, - "lifecycle_details": migration.lifecycle_details, - "time_created": migration.time_created, - "replication_schedule_id": migration.replication_schedule_id, - "is_completed": migration.is_completed, - } - for migration in migrations - ] +@mcp.tool(description="Get details for a specific Migration Project by OCID") +def get_migration( + migration_id: str = Field(..., description="OCID of the migration project") +) -> Migration: + try: + client = get_migration_client() + + response: oci.response.Response = client.get_migration(migration_id) + data: oci.cloud_migrations.models.Migration = response.data + logger.info("Found Migration") + return map_migration(data) + + except Exception as e: + logger.error(f"Error in get_migration tool: {str(e)}") + raise e + + +@mcp.tool( + description="List Migration Projects for a compartment, optionally filtered by lifecycle state" +) +def list_migrations( + compartment_id: str = Field(..., description="The OCID of the compartment"), + limit: Optional[int] = Field( + None, + description="The maximum amount of migrations to return. If None, there is no limit.", + ge=1, + ), + lifecycle_state: Optional[ + Literal[ + "CREATING", + "UPDATING", + "NEEDS_ATTENTION", + "ACTIVE", + "DELETING", + "DELETED", + "FAILED", + ] + ] = Field(None, description="The lifecycle state of the migration to filter on"), +) -> list[MigrationSummary]: + migrations: list[Migration] = [] + + try: + client = get_migration_client() + + response: oci.response.Response = None + has_next_page = True + next_page: str = None + + while has_next_page and (limit is None or len(migrations) < limit): + kwargs = { + "compartment_id": compartment_id, + "page": next_page, + "limit": limit, + } + + if lifecycle_state is not None: + kwargs["lifecycle_state"] = lifecycle_state + + response = client.list_migrations(**kwargs) + has_next_page = response.has_next_page + next_page = response.next_page if hasattr(response, "next_page") else None + + data: list[oci.cloud_migrations.models.MigrationSummary] = ( + response.data.items + ) + for d in data: + migrations.append(map_migration_summary(d)) + + logger.info(f"Found {len(migrations)} Migrations") + return migrations + + except Exception as e: + logger.error(f"Error in list_migrations tool: {str(e)}") + raise e def main(): diff --git a/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/tests/test_migration_tools.py b/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/tests/test_migration_tools.py index 2cea8b18..277e77c5 100644 --- a/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/tests/test_migration_tools.py +++ b/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/tests/test_migration_tools.py @@ -13,30 +13,25 @@ class TestMigrationTools: - # @pytest.mark.asyncio - # @patch("oracle.oci_migration_mcp_server.server.get_migration_client") - # async def test_get_migration(self, mock_get_client): - # mock_client = MagicMock() - # mock_get_client.return_value = mock_client + @pytest.mark.asyncio + @patch("oracle.oci_migration_mcp_server.server.get_migration_client") + async def test_get_migration(self, mock_get_client): + mock_client = MagicMock() + mock_get_client.return_value = mock_client - # mock_get_response = create_autospec(oci.response.Response) - # mock_get_response.data = oci.cloud_migrations.models.Migration( - # id="migration1", - # display_name="Migration 1", - # ) - # mock_client.get_migration.return_value = mock_get_response + mock_get_response = create_autospec(oci.response.Response) + mock_get_response.data = oci.cloud_migrations.models.Migration( + id="migration1", display_name="Migration 1", lifecycle_state="ACTIVE" + ) + mock_client.get_migration.return_value = mock_get_response - # async with Client(mcp) as client: - # result = ( - # await client.call_tool( - # "get_migration", - # { - # "migration_id": "migration1", - # }, - # ) - # ).data + async with Client(mcp) as client: + call_tool_result = await client.call_tool( + "get_migration", {"migration_id": "migration1"} + ) + result = call_tool_result.structured_content - # assert result.id == "migration1" + assert result["id"] == "migration1" @pytest.mark.asyncio @patch("oracle.oci_migration_mcp_server.server.get_migration_client") @@ -55,6 +50,8 @@ async def test_list_migrations(self, mock_get_client): ) ] ) + mock_list_response.has_next_page = False + mock_list_response.next_page = None mock_client.list_migrations.return_value = mock_list_response async with Client(mcp) as client: diff --git a/src/oci-migration-mcp-server/pyproject.toml b/src/oci-migration-mcp-server/pyproject.toml index 9ffd1c54..d46b14cf 100644 --- a/src/oci-migration-mcp-server/pyproject.toml +++ b/src/oci-migration-mcp-server/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "oracle.oci-migration-mcp-server" -version = "1.0.2" +version = "2.0.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.13" diff --git a/src/oci-network-load-balancer-mcp-server/oracle/oci_network_load_balancer_mcp_server/__init__.py b/src/oci-network-load-balancer-mcp-server/oracle/oci_network_load_balancer_mcp_server/__init__.py index dfbad298..a68bad65 100644 --- a/src/oci-network-load-balancer-mcp-server/oracle/oci_network_load_balancer_mcp_server/__init__.py +++ b/src/oci-network-load-balancer-mcp-server/oracle/oci_network_load_balancer_mcp_server/__init__.py @@ -5,4 +5,4 @@ """ __project__ = "oracle.oci-network-load-balancer-mcp-server" -__version__ = "1.0.2" +__version__ = "2.0.0" diff --git a/src/oci-network-load-balancer-mcp-server/oracle/oci_network_load_balancer_mcp_server/models.py b/src/oci-network-load-balancer-mcp-server/oracle/oci_network_load_balancer_mcp_server/models.py new file mode 100644 index 00000000..4d233729 --- /dev/null +++ b/src/oci-network-load-balancer-mcp-server/oracle/oci_network_load_balancer_mcp_server/models.py @@ -0,0 +1,446 @@ +""" +Copyright (c) 2025, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +from datetime import datetime +from typing import Any, Dict, List, Literal, Optional + +import oci +from pydantic import BaseModel, Field + + +# Helper function +def _oci_to_dict(obj): + """Best-effort conversion of OCI SDK model objects to plain dicts.""" + if obj is None: + return None + try: + from oci.util import to_dict as oci_to_dict + + return oci_to_dict(obj) + except Exception: + pass + if isinstance(obj, dict): + return obj + if hasattr(obj, "__dict__"): + return {k: v for k, v in obj.__dict__.items() if not k.startswith("_")} + return None + + +# Nested models + + +class ReservedIP(BaseModel): + """Reserved IP details.""" + + id: Optional[str] = Field( + None, description="The OCID of the reserved public IP address." + ) + + +class IpAddress(BaseModel): + """IP address details.""" + + ip_address: Optional[str] = Field(None, description="The IP address.") + is_public: Optional[bool] = Field( + None, description="Whether the IP address is public." + ) + ip_version: Optional[Literal["IPV4", "IPV6"]] = Field( + None, description="IP version." + ) + reserved_ip: Optional[ReservedIP] = Field( + None, description="Reserved IP information." + ) + + +class Backend(BaseModel): + """Backend server details.""" + + name: Optional[str] = Field(None, description="The name of the backend.") + ip_address: Optional[str] = Field( + None, description="The IP address of the backend server." + ) + target_id: Optional[str] = Field( + None, + description="The IP OCID/Instance OCID associated with the backend server.", + ) + port: Optional[int] = Field( + None, description="The communication port for the backend server." + ) + weight: Optional[int] = Field( + None, + description="The network load balancing policy weight assigned to the server.", + ) + is_drain: Optional[bool] = Field( + None, description="Whether the network load balancer should drain this server." + ) + is_backup: Optional[bool] = Field( + None, + description="Whether the network load balancer should treat this server as a backup unit.", + ) + is_offline: Optional[bool] = Field( + None, + description="Whether the network load balancer should treat this server as offline.", + ) + + +class DnsHealthCheckerDetails(BaseModel): + """DNS health checker details.""" + + transport_protocol: Optional[Literal["UDP", "TCP"]] = Field( + None, description="DNS transport protocol." + ) + domain_name: Optional[str] = Field( + None, + description="The absolute fully-qualified domain name to perform periodic DNS queries.", + ) + query_class: Optional[Literal["IN", "CH"]] = Field( + None, description="The class of the DNS health check query." + ) + query_type: Optional[Literal["A", "TXT", "AAAA"]] = Field( + None, description="The type of the DNS health check query." + ) + rcodes: Optional[List[str]] = Field( + None, description="Acceptable RCODE values for DNS query response." + ) + + +class HealthChecker(BaseModel): + """Health checker configuration.""" + + protocol: Optional[Literal["HTTP", "HTTPS", "TCP", "UDP", "DNS"]] = Field( + None, description="The protocol the health check must use." + ) + port: Optional[int] = Field( + None, + description="The backend server port against which to run the health check.", + ) + retries: Optional[int] = Field( + None, + description="The number of retries to attempt before a backend server is considered unhealthy.", + ) + timeout_in_millis: Optional[int] = Field( + None, + description="The maximum time, in milliseconds, to wait for a reply to a health check.", + ) + interval_in_millis: Optional[int] = Field( + None, description="The interval between health checks, in milliseconds." + ) + url_path: Optional[str] = Field( + None, description="The path against which to run the health check." + ) + response_body_regex: Optional[str] = Field( + None, + description="A regular expression for parsing the response body from the backend server.", + ) + return_code: Optional[int] = Field( + None, description="The status code a healthy backend server should return." + ) + request_data: Optional[str] = Field( + None, + description="Base64 encoded pattern to be sent as UDP or TCP health check probe.", + ) + response_data: Optional[str] = Field( + None, + description="Base64 encoded pattern to be validated as UDP or TCP health check probe response.", + ) + dns: Optional[DnsHealthCheckerDetails] = Field( + None, description="DNS health checker details." + ) + + +class Listener(BaseModel): + """Listener configuration.""" + + name: Optional[str] = Field(None, description="A friendly name for the listener.") + default_backend_set_name: Optional[str] = Field( + None, description="The name of the associated backend set." + ) + port: Optional[int] = Field( + None, description="The communication port for the listener." + ) + protocol: Optional[Literal["ANY", "TCP", "UDP", "TCP_AND_UDP", "L3IP"]] = Field( + None, + description="The protocol on which the listener accepts connection requests.", + ) + ip_version: Optional[Literal["IPV4", "IPV6"]] = Field( + None, description="IP version associated with the listener." + ) + is_ppv2_enabled: Optional[bool] = Field( + None, description="Property to enable/disable PPv2 feature for this listener." + ) + tcp_idle_timeout: Optional[int] = Field( + None, description="The duration for TCP idle timeout in seconds." + ) + udp_idle_timeout: Optional[int] = Field( + None, description="The duration for UDP idle timeout in seconds." + ) + l3_ip_idle_timeout: Optional[int] = Field( + None, description="The duration for L3IP idle timeout in seconds." + ) + + +class BackendSet(BaseModel): + """Backend set configuration.""" + + name: Optional[str] = Field( + None, description="A user-friendly name for the backend set." + ) + policy: Optional[Literal["TWO_TUPLE", "THREE_TUPLE", "FIVE_TUPLE"]] = Field( + None, description="The network load balancer policy for the backend set." + ) + is_preserve_source: Optional[bool] = Field( + None, + description="If enabled, the network load balancer preserves the source IP of the packet.", + ) + is_fail_open: Optional[bool] = Field( + None, + description="If enabled, the network load balancer will continue " + "to distribute traffic if all backends are unhealthy.", + ) + is_instant_failover_enabled: Optional[bool] = Field( + None, + description="If enabled, existing connections will be forwarded to an " + "alternative healthy backend as soon as current backend becomes unhealthy.", + ) + is_instant_failover_tcp_reset_enabled: Optional[bool] = Field( + None, + description="If enabled along with instant failover, the network load balancer " + "will send TCP RST to the clients for the existing connections.", + ) + are_operationally_active_backends_preferred: Optional[bool] = Field( + None, description="If enabled, NLB supports active-standby backends." + ) + ip_version: Optional[Literal["IPV4", "IPV6"]] = Field( + None, description="IP version associated with the backend set." + ) + backends: Optional[List[Backend]] = Field(None, description="An array of backends.") + health_checker: Optional[HealthChecker] = Field( + None, description="The health check policy configuration." + ) + + +class NetworkLoadBalancer(BaseModel): + """Network load balancer.""" + + id: Optional[str] = Field( + None, description="The OCID of the network load balancer." + ) + compartment_id: Optional[str] = Field( + None, + description="The OCID of the compartment containing the network load balancer.", + ) + display_name: Optional[str] = Field(None, description="A user-friendly name.") + lifecycle_state: Optional[ + Literal["CREATING", "UPDATING", "ACTIVE", "DELETING", "DELETED", "FAILED"] + ] = Field(None, description="The current state of the network load balancer.") + lifecycle_details: Optional[str] = Field( + None, description="A message describing the current state in more detail." + ) + nlb_ip_version: Optional[Literal["IPV4", "IPV4_AND_IPV6", "IPV6"]] = Field( + None, description="IP version associated with the NLB." + ) + time_created: Optional[datetime] = Field( + None, description="The date and time the network load balancer was created." + ) + time_updated: Optional[datetime] = Field( + None, description="The time the network load balancer was updated." + ) + ip_addresses: Optional[List[IpAddress]] = Field( + None, description="An array of IP addresses." + ) + is_private: Optional[bool] = Field( + None, + description="Whether the network load balancer has a " + "virtual cloud network-local (private) IP address.", + ) + is_preserve_source_destination: Optional[bool] = Field( + None, + description="When enabled, the skipSourceDestinationCheck parameter is " + "automatically enabled on the load balancer VNIC.", + ) + is_symmetric_hash_enabled: Optional[bool] = Field( + None, + description="This can only be enabled when NLB is working in transparent " + "mode with source destination header preservation enabled.", + ) + subnet_id: Optional[str] = Field( + None, description="The subnet in which the network load balancer is spawned." + ) + network_security_group_ids: Optional[List[str]] = Field( + None, + description="An array of network security groups OCIDs associated with the network load balancer.", + ) + listeners: Optional[Dict[str, Listener]] = Field( + None, description="Listeners associated with the network load balancer." + ) + backend_sets: Optional[Dict[str, BackendSet]] = Field( + None, description="Backend sets associated with the network load balancer." + ) + freeform_tags: Optional[Dict[str, str]] = Field( + None, description="Free-form tags for this resource." + ) + security_attributes: Optional[Dict[str, Dict[str, Any]]] = Field( + None, description="ZPR tags for this resource." + ) + defined_tags: Optional[Dict[str, Dict[str, Any]]] = Field( + None, description="Defined tags for this resource." + ) + system_tags: Optional[Dict[str, Dict[str, Any]]] = Field( + None, + description="Key-value pair representing system tags' keys and values scoped to a namespace.", + ) + + +# Mapping functions + + +def map_reserved_ip(obj) -> ReservedIP | None: + if not obj: + return None + return ReservedIP(id=getattr(obj, "id", None)) + + +def map_ip_address(obj) -> IpAddress | None: + if not obj: + return None + return IpAddress( + ip_address=getattr(obj, "ip_address", None), + is_public=getattr(obj, "is_public", None), + ip_version=getattr(obj, "ip_version", None), + reserved_ip=map_reserved_ip(getattr(obj, "reserved_ip", None)), + ) + + +def map_backend(obj) -> Backend | None: + if not obj: + return None + return Backend( + name=getattr(obj, "name", None), + ip_address=getattr(obj, "ip_address", None), + target_id=getattr(obj, "target_id", None), + port=getattr(obj, "port", None), + weight=getattr(obj, "weight", None), + is_drain=getattr(obj, "is_drain", None), + is_backup=getattr(obj, "is_backup", None), + is_offline=getattr(obj, "is_offline", None), + ) + + +def map_dns_health_checker_details(obj) -> DnsHealthCheckerDetails | None: + if not obj: + return None + return DnsHealthCheckerDetails( + transport_protocol=getattr(obj, "transport_protocol", None), + domain_name=getattr(obj, "domain_name", None), + query_class=getattr(obj, "query_class", None), + query_type=getattr(obj, "query_type", None), + rcodes=getattr(obj, "rcodes", None), + ) + + +def map_health_checker(obj) -> HealthChecker | None: + if not obj: + return None + return HealthChecker( + protocol=getattr(obj, "protocol", None), + port=getattr(obj, "port", None), + retries=getattr(obj, "retries", None), + timeout_in_millis=getattr(obj, "timeout_in_millis", None), + interval_in_millis=getattr(obj, "interval_in_millis", None), + url_path=getattr(obj, "url_path", None), + response_body_regex=getattr(obj, "response_body_regex", None), + return_code=getattr(obj, "return_code", None), + request_data=getattr(obj, "request_data", None), + response_data=getattr(obj, "response_data", None), + dns=map_dns_health_checker_details(getattr(obj, "dns", None)), + ) + + +def map_listener(obj) -> Listener | None: + if not obj: + return None + return Listener( + name=getattr(obj, "name", None), + default_backend_set_name=getattr(obj, "default_backend_set_name", None), + port=getattr(obj, "port", None), + protocol=getattr(obj, "protocol", None), + ip_version=getattr(obj, "ip_version", None), + is_ppv2_enabled=getattr(obj, "is_ppv2_enabled", None), + tcp_idle_timeout=getattr(obj, "tcp_idle_timeout", None), + udp_idle_timeout=getattr(obj, "udp_idle_timeout", None), + l3_ip_idle_timeout=getattr(obj, "l3_ip_idle_timeout", None), + ) + + +def map_backend_set(obj) -> BackendSet | None: + if not obj: + return None + backends = ( + [map_backend(b) for b in getattr(obj, "backends", [])] + if getattr(obj, "backends", None) + else None + ) + return BackendSet( + name=getattr(obj, "name", None), + policy=getattr(obj, "policy", None), + is_preserve_source=getattr(obj, "is_preserve_source", None), + is_fail_open=getattr(obj, "is_fail_open", None), + is_instant_failover_enabled=getattr(obj, "is_instant_failover_enabled", None), + is_instant_failover_tcp_reset_enabled=getattr( + obj, "is_instant_failover_tcp_reset_enabled", None + ), + are_operationally_active_backends_preferred=getattr( + obj, "are_operationally_active_backends_preferred", None + ), + ip_version=getattr(obj, "ip_version", None), + backends=backends, + health_checker=map_health_checker(getattr(obj, "health_checker", None)), + ) + + +def map_network_load_balancer( + obj: oci.network_load_balancer.models.NetworkLoadBalancer, +) -> NetworkLoadBalancer: + """Map OCI NetworkLoadBalancer to custom Pydantic model.""" + ip_addresses = ( + [map_ip_address(ip) for ip in getattr(obj, "ip_addresses", [])] + if getattr(obj, "ip_addresses", None) + else None + ) + listeners = ( + {k: map_listener(v) for k, v in getattr(obj, "listeners", {}).items()} + if getattr(obj, "listeners", None) + else None + ) + backend_sets = ( + {k: map_backend_set(v) for k, v in getattr(obj, "backend_sets", {}).items()} + if getattr(obj, "backend_sets", None) + else None + ) + return NetworkLoadBalancer( + id=getattr(obj, "id", None), + compartment_id=getattr(obj, "compartment_id", None), + display_name=getattr(obj, "display_name", None), + lifecycle_state=getattr(obj, "lifecycle_state", None), + lifecycle_details=getattr(obj, "lifecycle_details", None), + nlb_ip_version=getattr(obj, "nlb_ip_version", None), + time_created=getattr(obj, "time_created", None), + time_updated=getattr(obj, "time_updated", None), + ip_addresses=ip_addresses, + is_private=getattr(obj, "is_private", None), + is_preserve_source_destination=getattr( + obj, "is_preserve_source_destination", None + ), + is_symmetric_hash_enabled=getattr(obj, "is_symmetric_hash_enabled", None), + subnet_id=getattr(obj, "subnet_id", None), + network_security_group_ids=getattr(obj, "network_security_group_ids", None), + listeners=listeners, + backend_sets=backend_sets, + freeform_tags=getattr(obj, "freeform_tags", None), + security_attributes=getattr(obj, "security_attributes", None), + defined_tags=getattr(obj, "defined_tags", None), + system_tags=getattr(obj, "system_tags", None), + ) diff --git a/src/oci-network-load-balancer-mcp-server/oracle/oci_network_load_balancer_mcp_server/server.py b/src/oci-network-load-balancer-mcp-server/oracle/oci_network_load_balancer_mcp_server/server.py index c22f4e55..a6b298fd 100644 --- a/src/oci-network-load-balancer-mcp-server/oracle/oci_network_load_balancer_mcp_server/server.py +++ b/src/oci-network-load-balancer-mcp-server/oracle/oci_network_load_balancer_mcp_server/server.py @@ -6,10 +6,21 @@ import os from logging import Logger -from typing import Annotated +from typing import Literal, Optional import oci from fastmcp import FastMCP +from oracle.oci_network_load_balancer_mcp_server.models import ( + Backend, + BackendSet, + Listener, + NetworkLoadBalancer, + map_backend, + map_backend_set, + map_listener, + map_network_load_balancer, +) +from pydantic import Field from . import __project__ @@ -38,124 +49,313 @@ def get_nlb_client(): description="Lists the network load balancers from the given compartment", ) def list_network_load_balancers( - compartment_id: Annotated[str, "compartment ocid"], -) -> list[dict]: - nlb_client = get_nlb_client() - nlbs = nlb_client.list_network_load_balancers(compartment_id).data.items - return [ - { - "nlb_id": nlb.id, - "display_name": nlb.display_name, - "lifecycle_state": nlb.lifecycle_state, - "public_ips": [ip.ip_address for ip in nlb.ip_addresses if ip.is_public], - "private_ips": [ - ip.ip_address for ip in nlb.ip_addresses if not ip.is_public - ], - } - for nlb in nlbs - ] + compartment_id: str = Field(..., description="The OCID of the compartment"), + limit: Optional[int] = Field( + None, + description="The maximum amount of network load balancers to return. If None, there is no limit.", + ge=1, + ), + lifecycle_state: Optional[ + Literal[ + "CREATING", + "UPDATING", + "ACTIVE", + "DELETING", + "DELETED", + "FAILED", + ] + ] = Field( + None, + description="The lifecycle state of the network load balancer to filter on", + ), +) -> list[NetworkLoadBalancer]: + nlbs: list[NetworkLoadBalancer] = [] + + try: + client = get_nlb_client() + + response: oci.response.Response = None + has_next_page = True + next_page: str = None + + while has_next_page and (limit is None or len(nlbs) < limit): + kwargs = { + "compartment_id": compartment_id, + "page": next_page, + "limit": limit, + } + + if lifecycle_state is not None: + kwargs["lifecycle_state"] = lifecycle_state + + response = client.list_network_load_balancers(**kwargs) + has_next_page = response.has_next_page + next_page = response.next_page if hasattr(response, "next_page") else None + + data: list[oci.network_load_balancer.models.NetworkLoadBalancer] = ( + response.data.items + ) + for d in data: + nlbs.append(map_network_load_balancer(d)) + + logger.info(f"Found {len(nlbs)} Network Load Balancers") + return nlbs + + except Exception as e: + logger.error(f"Error in list_network_load_balancers tool: {str(e)}") + raise e @mcp.tool( name="get_network_load_balancer", description="Get network load balancer details" ) -def get_network_load_balancer(network_load_balancer_id: Annotated[str, "nlb id"]): - nlb_client = get_nlb_client() - return nlb_client.get_network_load_balancer(network_load_balancer_id).data - - -@mcp.tool(name="list_network_load_balancer_listeners") -def list_listeners(network_load_balancer_id: str) -> list[dict]: - """Lists the listeners from the given network load balancer""" - nlb_client = get_nlb_client() - listeners = nlb_client.list_listeners(network_load_balancer_id).data.items - return [ - { - "name": listener.name, - "ip_version": listener.ip_version, - "protocol": listener.protocol, - "port": listener.port, - "is_ppv2_enabled": listener.is_ppv2_enabled, - } - for listener in listeners - ] - - -@mcp.tool(name="get_network_load_balancer_listener") +def get_network_load_balancer( + network_load_balancer_id: str = Field( + ..., description="The OCID of the network load balancer" + ) +): + try: + client = get_nlb_client() + + response: oci.response.Response = client.get_network_load_balancer( + network_load_balancer_id + ) + data: oci.network_load_balancer.models.NetworkLoadBalancer = response.data + logger.info("Found Network Load Balancer") + return map_network_load_balancer(data) + + except Exception as e: + logger.error(f"Error in get_network_load_balancer tool: {str(e)}") + raise e + + +@mcp.tool( + name="list_network_load_balancer_listeners", + description="Lists the listeners from the given network load balancer", +) +def list_listeners( + network_load_balancer_id: str = Field( + ..., description="The OCID of the network load balancer to list listeners from" + ), + limit: Optional[int] = Field( + None, + description="The maximum amount of listeners to return. If None, there is no limit.", + ge=1, + ), +) -> list[Listener]: + listeners: list[Listener] = [] + + try: + client = get_nlb_client() + + response: oci.response.Response = None + has_next_page = True + next_page: str = None + + while has_next_page and (limit is None or len(listeners) < limit): + kwargs = { + "network_load_balancer_id": network_load_balancer_id, + "page": next_page, + "limit": limit, + } + + response = client.list_listeners(**kwargs) + has_next_page = response.has_next_page + next_page = response.next_page if hasattr(response, "next_page") else None + + data: list[oci.network_load_balancer.models.Listener] = response.data.items + for d in data: + listeners.append(map_listener(d)) + + logger.info(f"Found {len(listeners)} Listeners") + return listeners + + except Exception as e: + logger.error(f"Error in list_network_load_balancer_listeners tool: {str(e)}") + raise e + + +@mcp.tool( + name="get_network_load_balancer_listener", + description="Gets the listener with the given listener name" + "from the given network load balancer", +) def get_listener( - network_load_balancer_id: str, - listener_name: str, + network_load_balancer_id: str = Field( + ..., + description="The OCID of the network load balancer to get the listener from", + ), + listener_name: str = Field(..., description="The name of the listener"), ): - """Gets the listener with the given listener name - from the given network load balancer""" - nlb_client = get_nlb_client() - return nlb_client.get_listener(network_load_balancer_id, listener_name).data - - -@mcp.tool(name="list_network_load_balancer_backend_sets") -def list_backend_sets(network_load_balancer_id: str) -> list[dict]: - """Lists the backend sets from the given network load balancer""" - nlb_client = get_nlb_client() - backend_sets = nlb_client.list_backend_sets(network_load_balancer_id).data.items - return [ - { - "name": backend_set.name, - "ip_version": backend_set.ip_version, - "is_preemptive": backend_set.are_operationally_active_backends_preferred, - "load_balancing_policy": backend_set.policy, - "number_of_backends": len(backend_set.backends), - } - for backend_set in backend_sets - ] - - -@mcp.tool(name="get_network_load_balancer_backend_set") + try: + client = get_nlb_client() + + response: oci.response.Response = client.get_listener( + network_load_balancer_id, listener_name + ) + data: oci.network_load_balancer.models.Listener = response.data + logger.info("Found Listener") + return map_listener(data) + + except Exception as e: + logger.error(f"Error in get_network_load_balancer_listener tool: {str(e)}") + raise e + + +@mcp.tool( + name="list_network_load_balancer_backend_sets", + description="Lists the backend sets from the given network load balancer", +) +def list_backend_sets( + network_load_balancer_id: str = Field( + ..., + description="The OCID of the network load balancer to list backend sets from", + ), + limit: Optional[int] = Field( + None, + description="The maximum amount of backend sets to return. If None, there is no limit.", + ge=1, + ), +) -> list[BackendSet]: + backend_sets: list[BackendSet] = [] + + try: + client = get_nlb_client() + + response: oci.response.Response = None + has_next_page = True + next_page: str = None + + while has_next_page and (limit is None or len(backend_sets) < limit): + kwargs = { + "network_load_balancer_id": network_load_balancer_id, + "page": next_page, + "limit": limit, + } + + response = client.list_backend_sets(**kwargs) + has_next_page = response.has_next_page + next_page = response.next_page if hasattr(response, "next_page") else None + + data: list[oci.network_load_balancer.models.BackendSet] = ( + response.data.items + ) + for d in data: + backend_sets.append(map_backend_set(d)) + + logger.info(f"Found {len(backend_sets)} Backend Sets") + return backend_sets + + except Exception as e: + logger.error(f"Error in list_network_load_balancer_backend_sets tool: {str(e)}") + raise e + + +@mcp.tool( + name="get_network_load_balancer_backend_set", + description="Gets the backend set with the given backend set name" + "from the given network load balancer", +) def get_backend_set( - network_load_balancer_id: str, - backend_set_name: str, + network_load_balancer_id: str = Field( + ..., + description="The OCID of the network load balancer to get the backend set from", + ), + backend_set_name: str = Field(..., description="The name of the backend set"), ): - """Gets the backend set with the given backend set name - from the given network load balancer""" - nlb_client = get_nlb_client() - return nlb_client.get_backend_set(network_load_balancer_id, backend_set_name).data + try: + client = get_nlb_client() + + response: oci.response.Response = client.get_backend_set( + network_load_balancer_id, backend_set_name + ) + data: oci.network_load_balancer.models.BackendSet = response.data + logger.info("Found Backend Set") + return map_backend_set(data) + + except Exception as e: + logger.error(f"Error in get_network_load_balancer_backend_set tool: {str(e)}") + raise e -@mcp.tool(name="list_network_load_balancer_backends") +@mcp.tool( + name="list_network_load_balancer_backends", + description="Lists the backends from the given backend set and network load balancer", +) def list_backends( - network_load_balancer_id: str, - backend_set_name: str, -) -> list[dict]: - """Lists the backends from the given backend set and network load balancer""" - nlb_client = get_nlb_client() - backends = nlb_client.list_backends( - network_load_balancer_id, backend_set_name - ).data.items - return [ - { - "name": backend.name, - "ip_address": backend.ip_address, - "port": backend.port, - "weight": backend.weight, - "is_drain": backend.is_drain, - "is_backup": backend.is_backup, - "is_offline": backend.is_offline, - } - for backend in backends - ] - - -@mcp.tool(name="get_network_load_balancer_backend") + network_load_balancer_id: str = Field( + ..., + description="The OCID of the network load balancer to list the backends from", + ), + backend_set_name: str = Field( + ..., description="The name of the backend set to list the backends from" + ), + limit: Optional[int] = Field( + None, + description="The maximum amount of backends to return. If None, there is no limit.", + ge=1, + ), +) -> list[Backend]: + backends: list[Backend] = [] + + try: + client = get_nlb_client() + + response: oci.response.Response = None + has_next_page = True + next_page: str = None + + while has_next_page and (limit is None or len(backends) < limit): + kwargs = { + "network_load_balancer_id": network_load_balancer_id, + "backend_set_name": backend_set_name, + "page": next_page, + "limit": limit, + } + + response = client.list_backends(**kwargs) + has_next_page = response.has_next_page + next_page = response.next_page if hasattr(response, "next_page") else None + + data: list[oci.network_load_balancer.models.Backend] = response.data.items + for d in data: + backends.append(map_backend(d)) + + logger.info(f"Found {len(backends)} Backends") + return backends + + except Exception as e: + logger.error(f"Error in list_network_load_balancer_backends tool: {str(e)}") + raise e + + +@mcp.tool( + name="get_network_load_balancer_backend", + description="Gets the backend with the given backend name" + "from the given backend set and network load balancer", +) def get_backend( - network_load_balancer_id: str, - backend_set_name: str, - backend_name: str, + network_load_balancer_id: str = Field( + ..., description="The OCID of the network load balancer to get the backend from" + ), + backend_set_name: str = Field( + ..., description="The name of the backend set to get the backend from" + ), + backend_name: str = Field(..., description="The name of the backend"), ): - """Gets the backend with the given backend name - from the given backend set and network load balancer""" - nlb_client = get_nlb_client() - return nlb_client.get_backend( - network_load_balancer_id, backend_set_name, backend_name - ).data + try: + client = get_nlb_client() + + response: oci.response.Response = client.get_backend( + network_load_balancer_id, backend_set_name, backend_name + ) + data: oci.network_load_balancer.models.Backend = response.data + logger.info("Found Backend") + return map_backend(data) + + except Exception as e: + logger.error(f"Error in get_network_load_balancer_backend tool: {str(e)}") + raise e def main(): diff --git a/src/oci-network-load-balancer-mcp-server/oracle/oci_network_load_balancer_mcp_server/tests/test_network_load_balancer_tools.py b/src/oci-network-load-balancer-mcp-server/oracle/oci_network_load_balancer_mcp_server/tests/test_network_load_balancer_tools.py index 647fc865..0bcc1b8f 100644 --- a/src/oci-network-load-balancer-mcp-server/oracle/oci_network_load_balancer_mcp_server/tests/test_network_load_balancer_tools.py +++ b/src/oci-network-load-balancer-mcp-server/oracle/oci_network_load_balancer_mcp_server/tests/test_network_load_balancer_tools.py @@ -39,6 +39,8 @@ async def test_list_nlbs(self, mock_get_client): ] ) ) + mock_list_response.has_next_page = False + mock_list_response.next_page = None mock_client.list_network_load_balancers.return_value = mock_list_response async with Client(mcp) as client: @@ -50,7 +52,7 @@ async def test_list_nlbs(self, mock_get_client): ).structured_content["result"] assert len(result) == 1 - assert result[0]["nlb_id"] == "nlb1" + assert result[0]["id"] == "nlb1" @pytest.mark.asyncio @patch("oracle.oci_network_load_balancer_mcp_server.server.get_nlb_client") @@ -64,12 +66,14 @@ async def test_list_listeners(self, mock_get_client): oci.network_load_balancer.models.ListenerSummary( name="Listener 1", ip_version="IPV4", - protocol="HTTP", + protocol="ANY", port=8008, is_ppv2_enabled=False, ) ] ) + mock_list_response.has_next_page = False + mock_list_response.next_page = None mock_client.list_listeners.return_value = mock_list_response async with Client(mcp) as client: @@ -96,11 +100,13 @@ async def test_list_backend_sets(self, mock_get_client): name="Backend Set 1", ip_version="IPV4", are_operationally_active_backends_preferred=False, - policy="3-TUPLE", + policy="THREE_TUPLE", backends=[], ) ] ) + mock_list_response.has_next_page = False + mock_list_response.next_page = None mock_client.list_backend_sets.return_value = mock_list_response async with Client(mcp) as client: @@ -126,6 +132,7 @@ async def test_list_backends(self, mock_get_client): oci.network_load_balancer.models.BackendSummary( name="Backend 1", ip_address="192.168.1.1", + target_id="target1", port=8008, weight=0, is_drain=False, @@ -134,6 +141,8 @@ async def test_list_backends(self, mock_get_client): ) ] ) + mock_list_response.has_next_page = False + mock_list_response.next_page = None mock_client.list_backends.return_value = mock_list_response async with Client(mcp) as client: diff --git a/src/oci-network-load-balancer-mcp-server/pyproject.toml b/src/oci-network-load-balancer-mcp-server/pyproject.toml index cb60126d..314bd4cd 100644 --- a/src/oci-network-load-balancer-mcp-server/pyproject.toml +++ b/src/oci-network-load-balancer-mcp-server/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "oracle.oci-network-load-balancer-mcp-server" -version = "1.0.2" +version = "2.0.0" description = "OCI Network Load Balancer MCP server" readme = "README.md" requires-python = ">=3.13" diff --git a/src/oci-registry-mcp-server/oracle/oci_registry_mcp_server/__init__.py b/src/oci-registry-mcp-server/oracle/oci_registry_mcp_server/__init__.py index 415728ac..ba67d60e 100644 --- a/src/oci-registry-mcp-server/oracle/oci_registry_mcp_server/__init__.py +++ b/src/oci-registry-mcp-server/oracle/oci_registry_mcp_server/__init__.py @@ -5,4 +5,4 @@ """ __project__ = "oracle.oci-registry-mcp-server" -__version__ = "1.0.2" +__version__ = "2.0.0" diff --git a/src/oci-registry-mcp-server/oracle/oci_registry_mcp_server/models.py b/src/oci-registry-mcp-server/oracle/oci_registry_mcp_server/models.py new file mode 100644 index 00000000..434cf246 --- /dev/null +++ b/src/oci-registry-mcp-server/oracle/oci_registry_mcp_server/models.py @@ -0,0 +1,288 @@ +""" +Copyright (c) 2025, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +from datetime import datetime +from typing import Any, Dict, Literal, Optional + +import oci +from pydantic import BaseModel, Field + + +# Utility function +def _oci_to_dict(obj): + """Best-effort conversion of OCI SDK model objects to plain dicts.""" + if obj is None: + return None + try: + from oci.util import to_dict as oci_to_dict + + return oci_to_dict(obj) + except Exception: + pass + if isinstance(obj, dict): + return obj + if hasattr(obj, "__dict__"): + return {k: v for k, v in obj.__dict__.items() if not k.startswith("_")} + return None + + +# region ContainerRepository + + +class ContainerRepositoryReadme(BaseModel): + """Container repository readme.""" + + content: Optional[str] = Field( + None, description="Readme content. Avoid entering confidential information." + ) + format: Optional[Literal["TEXT_MARKDOWN", "TEXT_PLAIN", "UNKNOWN_ENUM_VALUE"]] = ( + Field( + None, + description="Readme format. Supported formats are text/plain and text/markdown.", + ) + ) + + +def map_container_repository_readme(readme) -> ContainerRepositoryReadme | None: + if not readme: + return None + return ContainerRepositoryReadme( + content=getattr(readme, "content", None), + format=getattr(readme, "format", None), + ) + + +# Based on oci.artifacts.models.ContainerRepository +class ContainerRepository(BaseModel): + """ + Pydantic model mirroring the fields of oci.artifacts.models.ContainerRepository. + Nested OCI model types are represented as Pydantic classes (above). + """ + + compartment_id: Optional[str] = Field( + None, + description="The OCID of the compartment in which the container repository exists.", + ) + created_by: Optional[str] = Field( + None, description="The id of the user or principal that created the resource." + ) + display_name: Optional[str] = Field( + None, description="The container repository name." + ) + id: Optional[str] = Field(None, description="The OCID of the container repository.") + image_count: Optional[int] = Field(None, description="Total number of images.") + is_immutable: Optional[bool] = Field( + None, + description="Whether the repository is immutable. " + "Images cannot be overwritten in an immutable repository.", + ) + is_public: Optional[bool] = Field( + None, + description="Whether the repository is public. A public repository allows unauthenticated access.", + ) + layer_count: Optional[int] = Field(None, description="Total number of layers.") + layers_size_in_bytes: Optional[int] = Field( + None, description="Total storage in bytes consumed by layers." + ) + lifecycle_state: Optional[ + Literal["AVAILABLE", "DELETING", "DELETED", "UNKNOWN_ENUM_VALUE"] + ] = Field(None, description="The current state of the container repository.") + readme: Optional[ContainerRepositoryReadme] = Field( + None, description="The repository readme." + ) + time_created: Optional[datetime] = Field( + None, + description="An RFC 3339 timestamp indicating when the repository was created.", + ) + time_last_pushed: Optional[datetime] = Field( + None, + description="An RFC 3339 timestamp indicating when an image was last pushed to the repository.", + ) + billable_size_in_gbs: Optional[int] = Field( + None, description="Total storage size in GBs that will be charged." + ) + namespace: Optional[str] = Field( + None, description="The tenancy namespace used in the container repository path." + ) + freeform_tags: Optional[Dict[str, str]] = Field( + None, description="Free-form tags for this resource." + ) + defined_tags: Optional[Dict[str, Dict[str, Any]]] = Field( + None, description="Defined tags for this resource." + ) + system_tags: Optional[Dict[str, Dict[str, Any]]] = Field( + None, description="The system tags for this resource." + ) + + +def map_container_repository( + repo: oci.artifacts.models.ContainerRepository, +) -> ContainerRepository: + """ + Convert an oci.artifacts.models.ContainerRepository to + oracle.oci_registry_mcp_server.models.ContainerRepository, + including all nested types. + """ + return ContainerRepository( + compartment_id=getattr(repo, "compartment_id", None), + created_by=getattr(repo, "created_by", None), + display_name=getattr(repo, "display_name", None), + id=getattr(repo, "id", None), + image_count=getattr(repo, "image_count", None), + is_immutable=getattr(repo, "is_immutable", None), + is_public=getattr(repo, "is_public", None), + layer_count=getattr(repo, "layer_count", None), + layers_size_in_bytes=getattr(repo, "layers_size_in_bytes", None), + lifecycle_state=getattr(repo, "lifecycle_state", None), + readme=map_container_repository_readme(getattr(repo, "readme", None)), + time_created=getattr(repo, "time_created", None), + time_last_pushed=getattr(repo, "time_last_pushed", None), + billable_size_in_gbs=getattr(repo, "billable_size_in_gbs", None), + namespace=getattr(repo, "namespace", None), + freeform_tags=getattr(repo, "freeform_tags", None), + defined_tags=getattr(repo, "defined_tags", None), + system_tags=getattr(repo, "system_tags", None), + ) + + +# endregion + +# region Response (oci.response.Response) + + +class Request(BaseModel): + """ + Pydantic model mirroring the fields of oci.request.Request. + """ + + method: Optional[str] = Field(None, description="The HTTP method.") + url: Optional[str] = Field(None, description="URL that will serve the request.") + query_params: Optional[Dict[str, Any]] = Field( + None, description="Query parameters in the URL." + ) + header_params: Optional[Dict[str, Any]] = Field( + None, description="Request header parameters." + ) + body: Optional[Any] = Field(None, description="Request body.") + response_type: Optional[str] = Field( + None, description="Expected response data type." + ) + enforce_content_headers: Optional[bool] = Field( + None, + description=( + "Whether content headers should be added for PUT and POST requests when not present." # noqa + ), + ) + + +class Response(BaseModel): + """ + Pydantic model mirroring the fields of oci.response.Response. + Includes derived fields next_page, request_id, and has_next_page. + """ + + status: Optional[int] = Field(None, description="The HTTP status code.") + headers: Optional[Dict[str, Any]] = Field( + None, description="The HTTP headers (case-insensitive keys)." + ) + data: Optional[Any] = Field( + None, description="The response data. Type depends on the request." + ) + request: Optional[Request] = Field( + None, description="The corresponding request for this response." + ) + next_page: Optional[str] = Field( + None, description="The value of the opc-next-page response header." + ) + request_id: Optional[str] = Field( + None, description="The ID of the request that generated this response." + ) + has_next_page: Optional[bool] = Field( + None, description="Whether there is a next page of results." + ) + + +def map_request(req) -> Request | None: + if not req: + return None + return Request( + method=getattr(req, "method", None), + url=getattr(req, "url", None), + query_params=getattr(req, "query_params", None), + header_params=getattr(req, "header_params", None), + body=getattr(req, "body", None), + response_type=getattr(req, "response_type", None), + enforce_content_headers=getattr(req, "enforce_content_headers", None), + ) + + +def _map_headers(headers) -> Dict[str, Any] | None: + if headers is None: + return None + try: + # requests.structures.CaseInsensitiveDict is convertible to dict + return dict(headers) + except Exception: + try: + return {k: v for k, v in headers.items()} + except Exception: + return _oci_to_dict(headers) or None + + +def _map_response_data(data: Any) -> Any: + """ + Best-effort mapping of Response.data to Pydantic-friendly structures. + Recognizes common repository models; otherwise falls back to to_dict. + """ + # Handle sequences + if isinstance(data, (list, tuple)): + return [_map_response_data(x) for x in data] + + # Already a plain type + if data is None or isinstance(data, (str, int, float, bool)): + return data + if isinstance(data, dict): + return data + + # Known OCI repository models + try: + if isinstance(data, oci.artifacts.models.ContainerRepository): + return map_container_repository(data) + except Exception: + # Ignore import/type detection issues and fall through to generic handling + pass + + # Fallback: attempt to convert OCI SDK models or other objects to dict + coerced = _oci_to_dict(data) + return coerced if coerced is not None else data + + +def map_response(resp: oci.response.Response) -> Response | None: + if resp is None: + return None + + headers = _map_headers(getattr(resp, "headers", None)) + next_page = getattr(resp, "next_page", None) + request_id = getattr(resp, "request_id", None) + + # Derive from headers if not already present + if next_page is None and headers is not None: + next_page = headers.get("opc-next-page") + if request_id is None and headers is not None: + request_id = headers.get("opc-request-id") + + return Response( + status=getattr(resp, "status", None), + headers=headers, + data=_map_response_data(getattr(resp, "data", None)), + request=map_request(getattr(resp, "request", None)), + next_page=next_page, + request_id=request_id, + has_next_page=(next_page is not None), + ) + + +# endregion diff --git a/src/oci-registry-mcp-server/oracle/oci_registry_mcp_server/server.py b/src/oci-registry-mcp-server/oracle/oci_registry_mcp_server/server.py index 125b7e32..b9bd352f 100644 --- a/src/oci-registry-mcp-server/oracle/oci_registry_mcp_server/server.py +++ b/src/oci-registry-mcp-server/oracle/oci_registry_mcp_server/server.py @@ -6,9 +6,17 @@ import os from logging import Logger +from typing import Optional import oci from fastmcp import FastMCP +from oracle.oci_registry_mcp_server.models import ( + ContainerRepository, + Response, + map_container_repository, + map_response, +) +from pydantic import Field from . import __project__, __version__ @@ -33,76 +41,122 @@ def get_ocir_client(): return oci.artifacts.ArtifactsClient(config, signer=signer) -@mcp.tool -def create_container_repository( - compartment_id: str, repository_name: str, is_public: bool = False -): - ocir_client = get_ocir_client() - create_repository_details = oci.artifacts.models.CreateContainerRepositoryDetails( - compartment_id=compartment_id, display_name=repository_name, is_public=is_public - ) +@mcp.tool(description="List container repositories in the given compartment") +def list_container_repositories( + compartment_id: str = Field(..., description="The OCID of the compartment"), + limit: Optional[int] = Field( + None, + description="The maximum amount of conatiner repositories to return. If None, there is no limit.", + ge=1, + ), +) -> list[ContainerRepository]: + container_repositories: list[ContainerRepository] = [] + try: - repository = ocir_client.create_container_repository( - create_repository_details - ).data - return { - "repository_name": repository.display_name, - "id": repository.id, - "is_public": repository.is_public, - } - except oci.exceptions.ServiceError as e: - logger.error(f"Failed to create container repository: {e}") - return {"error": str(e)} + client = get_ocir_client() + + response: oci.response.Response = None + has_next_page = True + next_page: str = None + + while has_next_page and (limit is None or len(container_repositories) < limit): + kwargs = { + "compartment_id": compartment_id, + "page": next_page, + "limit": limit, + } + + response = client.list_container_repositories(**kwargs) + has_next_page = response.has_next_page + next_page = response.next_page if hasattr(response, "next_page") else None + + data: list[oci.artifacts.models.ContainerRepository] = response.data.items + for d in data: + container_repositories.append(map_container_repository(d)) + + logger.info(f"Found {len(container_repositories)} Container Repositories") + return container_repositories + + except Exception as e: + logger.error(f"Error in list_container_repositories tool: {str(e)}") + raise e @mcp.tool -def list_container_repositories(compartment_id: str): - ocir_client = get_ocir_client() +def get_container_repository( + repository_id: str = Field(..., description="The OCID of the container repository") +) -> ContainerRepository: try: - repositories = ocir_client.list_container_repositories( - compartment_id=compartment_id - ).data.items - return [ - { - "repository_name": repo.display_name, - "id": repo.id, - "is_public": repo.is_public, - } - for repo in repositories - ] - except oci.exceptions.ServiceError as e: - logger.error(f"Failed to list container repositories: {e}") - return {"error": str(e)} + client = get_ocir_client() + + response: oci.response.Response = client.get_container_repository(repository_id) + data: oci.artifacts.models.ContainerRepository = response.data + logger.info("Found Container Repository") + return map_container_repository(data) + + except Exception as e: + logger.error(f"Error in get_container_repository tool: {str(e)}") + raise e @mcp.tool -def get_container_repo_details(repository_id: str): - ocir_client = get_ocir_client() +def create_container_repository( + compartment_id: str = Field( + ..., + description="This is the ocid of the compartment to create the instance in." + 'Must begin with "ocid". If the user specifies a compartment name, ' + "then you may use the list_compartments tool in order to map the " + "compartment name to its ocid", + ), + repository_name: str = Field( + ..., + description="The name of the repository", + min_length=1, + max_length=255, + ), + is_public: bool = Field( + False, description="Whether or not the repository is public" + ), +) -> ContainerRepository: try: - repository = ocir_client.get_container_repository( - repository_id=repository_id - ).data - return { - "repository_name": repository.display_name, - "id": repository.id, - "is_public": repository.is_public, - "compartment_id": repository.compartment_id, - "time_created": repository.time_created.isoformat(), - } - except oci.exceptions.ServiceError as e: - logger.error(f"Failed to get container repository details: {e}") - return {"error": str(e)} + client = get_ocir_client() + + create_repository_details = ( + oci.artifacts.models.CreateContainerRepositoryDetails( + compartment_id=compartment_id, + display_name=repository_name, + is_public=is_public, + ) + ) + + response: oci.response.Response = client.create_container_repository( + create_repository_details + ) + data: oci.artifacts.models.ContainerRepository = response.data + logger.info("Created Container Repository") + return map_container_repository(data) + + except Exception as e: + logger.error(f"Error in create_container_repository tool: {str(e)}") + raise e @mcp.tool -def delete_container_repository(repository_id: str): - ocir_client = get_ocir_client() +def delete_container_repository( + repository_id: str = Field(..., description="The OCID of the container repository") +) -> Response: try: - ocir_client.delete_container_repository(repository_id=repository_id) - return {"success": True} - except oci.exceptions.ServiceError as e: - logger.error(f"Failed to delete container repository: {e}") - return {"error": str(e), "success": False} + client = get_ocir_client() + + response: oci.response.Response = client.delete_container_repository( + repository_id + ) + logger.info("Deleted Container Repository") + return map_response(response) + + except Exception as e: + logger.error(f"Error in delete_container_repository tool: {str(e)}") + raise e def main(): diff --git a/src/oci-registry-mcp-server/oracle/oci_registry_mcp_server/tests/test_registry_tools.py b/src/oci-registry-mcp-server/oracle/oci_registry_mcp_server/tests/test_registry_tools.py index c754d180..aaaf6308 100644 --- a/src/oci-registry-mcp-server/oracle/oci_registry_mcp_server/tests/test_registry_tools.py +++ b/src/oci-registry-mcp-server/oracle/oci_registry_mcp_server/tests/test_registry_tools.py @@ -15,62 +15,43 @@ class TestRegistryTools: @pytest.mark.asyncio @patch("oracle.oci_registry_mcp_server.server.get_ocir_client") - async def test_create_container_repository(self, mock_get_client): + async def test_list_container_repositories(self, mock_get_client): mock_client = MagicMock() mock_get_client.return_value = mock_client - mock_create_response = create_autospec(oci.response.Response) - mock_create_response.data = oci.artifacts.models.ContainerRepository( - display_name="repo1", id="repo1_id", is_public=False + mock_list_response = create_autospec(oci.response.Response) + mock_list_response.data = oci.artifacts.models.ContainerRepositoryCollection( + items=[ + oci.artifacts.models.ContainerRepositorySummary( + display_name="repo1", + id="repo1_id", + is_public=False, + compartment_id="compartment1", + ) + ] ) - mock_client.create_container_repository.return_value = mock_create_response + mock_list_response.has_next_page = False + mock_list_response.next_page = None + mock_client.list_container_repositories.return_value = mock_list_response async with Client(mcp) as client: - result = ( - await client.call_tool( - "create_container_repository", - { - "compartment_id": "compartment1", - "repository_name": "repo1", - }, - ) - ).structured_content + call_tool_result = await client.call_tool( + "list_container_repositories", + {"compartment_id": "compartment1"}, + ) + result = call_tool_result.structured_content["result"] - assert result["repository_name"] == "repo1" - - # @pytest.mark.asyncio - # @patch("oracle.oci_registry_mcp_server.server.get_ocir_client") - # async def test_list_container_repositories(self, mock_get_client): - # mock_client = MagicMock() - # mock_get_client.return_value = mock_client - - # mock_list_response = create_autospec(oci.response.Response) - # mock_list_response.data = oci.artifacts.models.ContainerRepositoryCollection( - # items=[MagicMock(display_name="repo1", id="repo1_id", is_public=False)] - # ) - # mock_client.list_container_repositories.return_value = mock_list_response - - # async with Client(mcp) as client: - # result = ( - # await client.call_tool( - # "list_container_repositories", - # { - # "compartment_id": "compartment1", - # }, - # ) - # ).structured_content - - # assert len(result) == 1 - # assert result[0]["repository_name"] == "repo1" + assert len(result) == 1 + assert result[0]["display_name"] == "repo1" @pytest.mark.asyncio @patch("oracle.oci_registry_mcp_server.server.get_ocir_client") - async def test_get_container_repo_details(self, mock_get_client): + async def test_get_container_repository(self, mock_get_client): mock_client = MagicMock() mock_get_client.return_value = mock_client mock_get_response = create_autospec(oci.response.Response) - mock_get_response.data = MagicMock( + mock_get_response.data = oci.artifacts.models.ContainerRepository( display_name="repo1", id="repo1_id", is_public=False, @@ -81,14 +62,39 @@ async def test_get_container_repo_details(self, mock_get_client): async with Client(mcp) as client: result = ( await client.call_tool( - "get_container_repo_details", + "get_container_repository", { "repository_id": "repo1_id", }, ) - ).data + ).structured_content - assert result["repository_name"] == "repo1" + assert result["display_name"] == "repo1" + + @pytest.mark.asyncio + @patch("oracle.oci_registry_mcp_server.server.get_ocir_client") + async def test_create_container_repository(self, mock_get_client): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + mock_create_response = create_autospec(oci.response.Response) + mock_create_response.data = oci.artifacts.models.ContainerRepository( + display_name="repo1", id="repo1_id", is_public=False + ) + mock_client.create_container_repository.return_value = mock_create_response + + async with Client(mcp) as client: + result = ( + await client.call_tool( + "create_container_repository", + { + "compartment_id": "compartment1", + "repository_name": "repo1", + }, + ) + ).structured_content + + assert result["display_name"] == "repo1" @pytest.mark.asyncio @patch("oracle.oci_registry_mcp_server.server.get_ocir_client") @@ -97,6 +103,7 @@ async def test_delete_container_repository(self, mock_get_client): mock_get_client.return_value = mock_client mock_delete_response = create_autospec(oci.response.Response) + mock_delete_response.status = 204 mock_client.delete_container_repository.return_value = mock_delete_response async with Client(mcp) as client: @@ -107,6 +114,6 @@ async def test_delete_container_repository(self, mock_get_client): "repository_id": "repo1_id", }, ) - ).data + ).structured_content - assert result["success"] + assert result["status"] == 204 diff --git a/src/oci-registry-mcp-server/pyproject.toml b/src/oci-registry-mcp-server/pyproject.toml index 2c8b5ac4..0d21a02c 100644 --- a/src/oci-registry-mcp-server/pyproject.toml +++ b/src/oci-registry-mcp-server/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "oracle.oci-registry-mcp-server" -version = "1.0.2" +version = "2.0.0" description = "OCI Registry Service MCP server" readme = "README.md" requires-python = ">=3.13" diff --git a/src/oci-resource-search-mcp-server/oracle/oci_resource_search_mcp_server/__init__.py b/src/oci-resource-search-mcp-server/oracle/oci_resource_search_mcp_server/__init__.py index e9f58be3..cb0ff8d7 100644 --- a/src/oci-resource-search-mcp-server/oracle/oci_resource_search_mcp_server/__init__.py +++ b/src/oci-resource-search-mcp-server/oracle/oci_resource_search_mcp_server/__init__.py @@ -5,4 +5,4 @@ """ __project__ = "oracle.oci-resource-search-mcp-server" -__version__ = "1.0.2" +__version__ = "2.0.0" diff --git a/src/oci-resource-search-mcp-server/oracle/oci_resource_search_mcp_server/models.py b/src/oci-resource-search-mcp-server/oracle/oci_resource_search_mcp_server/models.py new file mode 100644 index 00000000..e8548197 --- /dev/null +++ b/src/oci-resource-search-mcp-server/oracle/oci_resource_search_mcp_server/models.py @@ -0,0 +1,147 @@ +""" +Copyright (c) 2025, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +from datetime import datetime +from typing import Any, Dict, List, Optional + +import oci +from pydantic import BaseModel, Field + + +def _oci_to_dict(obj): + """Best-effort conversion of OCI SDK model objects to plain dicts.""" + if obj is None: + return None + try: + from oci.util import to_dict as oci_to_dict + + return oci_to_dict(obj) + except Exception: + pass + if isinstance(obj, dict): + return obj + if hasattr(obj, "__dict__"): + return {k: v for k, v in obj.__dict__.items() if not k.startswith("_")} + return None + + +# region SearchContext + + +class SearchContext(BaseModel): + """ + Contains search context, such as highlighting, for found resources. + """ + + highlights: Optional[Dict[str, List[str]]] = Field( + None, + description="Describes what in each field matched the search criteria by showing highlighted values, " + "but only for free text searches or for structured queries that use a MATCHING clause. " + "The list of strings represents fragments of values that matched the query conditions. " + "Highlighted values are wrapped with