Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ exclude = [
"docs",
"*.tests",
"*.tests",
"tests",
"*.tests.*",
"tests.*",
"legacy",
Expand Down
49 changes: 49 additions & 0 deletions src/zenml/cli/served_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
print_served_model_configuration,
warning,
)
from zenml.console import console
from zenml.enums import CliCategories, StackComponentType
from zenml.model_deployers import BaseModelDeployer
from zenml.repository import Repository
Expand Down Expand Up @@ -269,3 +270,51 @@ def delete_model_service(

warning(f"No model with uuid: '{served_model_uuid}' could be found.")
return


@served_models.command("logs")
@click.argument("served_model_uuid", type=click.STRING)
@click.option(
"--follow",
"-f",
is_flag=True,
help="Continue to output new log data as it becomes available.",
)
@click.option(
"--tail",
"-t",
type=click.INT,
default=None,
help="Only show the last NUM lines of log output.",
)
@click.option(
"--raw",
"-r",
is_flag=True,
help="Show raw log contents (don't pretty-print logs).",
)
@click.pass_obj
def get_model_service_logs(
model_deployer: "BaseModelDeployer",
served_model_uuid: str,
follow: bool,
tail: Optional[int],
raw: bool,
) -> None:
"""Display the logs for a model server."""

served_models = model_deployer.find_model_server(
service_uuid=uuid.UUID(served_model_uuid)
)
if not served_models:
warning(f"No model with uuid: '{served_model_uuid}' could be found.")
return

for line in model_deployer.get_model_server_logs(
served_models[0].uuid, follow=follow, tail=tail
):
# don't pretty-print log lines that are already pretty-printed
if raw or line.startswith("\x1b["):
print(line)
else:
console.print(line)
88 changes: 87 additions & 1 deletion src/zenml/integrations/seldon/seldon_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import json
import re
import time
from typing import Dict, List, Optional
from typing import Dict, Generator, List, Optional

from kubernetes import client as k8s_client
from kubernetes import config as k8s_config
Expand Down Expand Up @@ -829,6 +829,92 @@ def find_deployments(
f"with labels '{labels or ''}' and field '{fields or ''}'"
) from e

def get_deployment_logs(
self,
name: str,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a Seldon Core deployment resource.

Args:
name: the name of the Seldon Core deployment to get logs for.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.

Returns:
A generator that can be acccessed to get the service logs.

Raises:
SeldonClientError: if an unknown error occurs while fetching
the logs.
"""
logger.debug(f"Retrieving logs for SeldonDeployment resource: {name}")
try:
response = self._core_api.list_namespaced_pod(
namespace=self._namespace,
label_selector=f"seldon-deployment-id={name}",
)
logger.debug("Kubernetes API response: %s", response)
pods = response.items
if not pods:
raise SeldonClientError(
f"The Seldon Core deployment {name} is not currently "
f"running: no Kubernetes pods associated with it were found"
)
pod = pods[0]
pod_name = pod.metadata.name

containers = [c.name for c in pod.spec.containers]
init_containers = [c.name for c in pod.spec.init_containers]
container_statuses = {
c.name: c.started or c.restart_count
for c in pod.status.container_statuses
}

container = "default"
if container not in containers:
container = containers[0]
# some containers might not be running yet and have no logs to show,
# so we need to filter them out
if not container_statuses[container]:
container = init_containers[0]

logger.info(
f"Retrieving logs for pod: `{pod_name}` and container "
f"`{container}` in namespace `{self._namespace}`"
)
response = self._core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self._namespace,
container=container,
follow=follow,
tail_lines=tail,
_preload_content=False,
)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when fetching logs for SeldonDeployment resource "
"%s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Unexpected exception when fetching logs for SeldonDeployment "
f"resource: {name}"
) from e

try:
while True:
line = response.readline().decode("utf-8").rstrip("\n")
if not line:
return
stop = yield line
if stop:
return
finally:
response.release_conn()

def create_or_update_secret(
self,
name: str,
Expand Down
22 changes: 21 additions & 1 deletion src/zenml/integrations/seldon/services/seldon_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# permissions and limitations under the License.

import os
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, Tuple
from uuid import UUID

import numpy as np
Expand Down Expand Up @@ -311,6 +311,26 @@ def deprovision(self, force: bool = False) -> None:
except SeldonDeploymentNotFoundError:
pass

def get_logs(
self,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a Seldon Core model deployment.

Args:
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.

Returns:
A generator that can be acccessed to get the service logs.
"""
return self._get_client().get_deployment_logs(
self.seldon_deployment_name,
follow=follow,
tail=tail,
)

@property
def prediction_url(self) -> Optional[str]:
"""The prediction URI exposed by the prediction service.
Expand Down
20 changes: 19 additions & 1 deletion src/zenml/model_deployers/base_model_deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# or implied. See the License for the specific language governing
# permissions and limitations under the License.
from abc import ABC, abstractmethod
from typing import ClassVar, Dict, List, Optional
from typing import ClassVar, Dict, Generator, List, Optional
from uuid import UUID

from zenml.enums import StackComponentType
Expand Down Expand Up @@ -198,3 +198,21 @@ def delete_model_server(
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""

def get_model_server_logs(
self,
uuid: UUID,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a model server.

Args:
uuid: UUID of the model server to get the logs of.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
"""
services = self.find_model_server(service_uuid=uuid)
if len(services) == 0:
raise RuntimeError(f"No model server found with UUID {uuid}")
return services[0].get_logs(follow=follow, tail=tail)
42 changes: 41 additions & 1 deletion src/zenml/services/local/local_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import subprocess
import sys
import tempfile
import time
from abc import abstractmethod
from typing import Dict, List, Optional, Tuple
from typing import Dict, Generator, List, Optional, Tuple

import psutil
from pydantic import Field
Expand Down Expand Up @@ -366,6 +367,45 @@ def provision(self) -> None:
def deprovision(self, force: bool = False) -> None:
self._stop_daemon(force)

def get_logs(
self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
"""Retrieve the service logs.

Args:
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.

Returns:
A generator that can be acccessed to get the service logs.
"""
if not self.status.log_file or not os.path.exists(self.status.log_file):
return

with open(self.status.log_file, "r") as f:
if tail:
# TODO[LOW]: implement a more efficient tailing mechanism that
# doesn't read the entire file
lines = f.readlines()[-tail:]
for line in lines:
yield line.rstrip("\n")
if not follow:
return
line = ""
while True:
partial_line = f.readline()
if partial_line:
line += partial_line
if line.endswith("\n"):
stop = yield line.rstrip("\n")
if stop:
break
line = ""
elif follow:
time.sleep(1)
else:
break

@abstractmethod
def run(self) -> None:
"""Run the service daemon process associated with this service.
Expand Down
19 changes: 18 additions & 1 deletion src/zenml/services/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import time
from abc import abstractmethod
from typing import Any, ClassVar, Dict, Optional, Tuple, Type, cast
from typing import Any, ClassVar, Dict, Generator, Optional, Tuple, Type, cast
from uuid import UUID, uuid4

from pydantic import Field
Expand Down Expand Up @@ -174,6 +174,23 @@ def check_status(self) -> Tuple[ServiceState, str]:
service status).
"""

@abstractmethod
def get_logs(
self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
"""Retrieve the service logs.

This method should be overridden by subclasses that implement
concrete service tracking functionality.

Args:
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.

Returns:
A generator that can be acccessed to get the service logs.
"""

def update_status(self) -> None:
"""Check the current operational state of the external service
and update the local operational status information to reflect it.
Expand Down