Skip to content

Commit

Permalink
fix(python): default session needs to be aware of g.set_option(num_wo…
Browse files Browse the repository at this point in the history
…rkers=...) (alibaba#3246)

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
sighingnow committed Sep 21, 2023
1 parent 9f012b7 commit 52cac82
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 51 deletions.
7 changes: 5 additions & 2 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,7 @@ def get_default_session() -> Session:
def get_session_by_id(handle):
"""Return the session by handle."""
if handle not in _session_dict:
raise ValueError("Session {} not exists.".format(handle))
raise ValueError(f"Session {handle} not exists.")
return _session_dict.get(handle)


Expand All @@ -1480,7 +1480,10 @@ def __init__(self):
def get_default(self) -> Session:
if not self.stack:
logger.info("Creating default session ...")
sess = session(cluster_type="hosts", num_workers=1)
sess = session(
cluster_type="hosts",
num_workers=gs_config.session.default_local_num_workers,
)
sess.as_default()
return self.stack[-1]

Expand Down
99 changes: 50 additions & 49 deletions python/graphscope/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ class ResourceSpec:
"""Resource requirements for a container in kubernetes."""

cpu: Union[str, float, None] = None # CPU cores of container.
memory: Union[
str, None
] = None # Memory of container, suffix with ['Mi', 'Gi', 'Ti'].
# Memory of container, suffix with ['Mi', 'Gi', 'Ti'].
memory: Union[str, None] = None

def as_dict(self):
ret = {}
Expand Down Expand Up @@ -87,12 +86,12 @@ def make_burstable(cpu, memory):
class ImageConfig:
"""Image related stuffs."""

registry: Union[str, None] = "registry.cn-hongkong.aliyuncs.com"
# k8s image registry.
registry: Union[str, None] = "registry.cn-hongkong.aliyuncs.com"
repository: str = "graphscope" # k8s image repository.
tag: str = __version__ # k8s image tag.
pull_secrets: List[str] = field(default_factory=list)
# A list of secrets to pull image.
pull_secrets: List[str] = field(default_factory=list)
pull_policy: str = "IfNotPresent" # Kubernetes image pull policy.


Expand All @@ -114,47 +113,45 @@ class DatasetConfig:
"""A Dataset container could be shipped with GraphScope in kubernetes."""

enable: bool = False # Mount the aliyun dataset bucket as a volume by ossfs.
proxy: Union[str, None] = None
# A json string specifies the dataset proxy info. Available options of proxy: http_proxy, https_proxy, no_proxy.
proxy: Union[str, None] = None


@dataclass
class EngineConfig:
"""Engine configuration"""

enabled_engines: str = "gae,gie,gle" # A set of engines to enable.
node_selector: Union[
str, None
] = None # Node selector for engine pods, default is None.
# Node selector for engine pods, default is None.
node_selector: Union[str, None] = None

enable_gae: bool = True # Enable or disable analytical engine.
enable_gae_java: bool = (
False # Enable or disable analytical engine with java support.
)
# Enable or disable analytical engine with java support.
enable_gae_java: bool = False
enable_gie: bool = True # Enable or disable interactive engine.
enable_gle: bool = True # Enable or disable learning engine.

preemptive: bool = True

# Resource for analytical pod
gae_resource: ResourceConfig = field(
default_factory=lambda: ResourceConfig.make_burstable(1, "4Gi")
)
# Resource for analytical pod

# Resource for interactive executor pod
gie_executor_resource: ResourceConfig = field(
default_factory=lambda: ResourceConfig.make_burstable(1, "2Gi")
)
# Resource for interactive executor pod

# Resource for interactive frontend pod
gie_frontend_resource: ResourceConfig = field(
default_factory=lambda: ResourceConfig.make_burstable(0.5, "1Gi")
)
# Resource for interactive frontend pod

# Resource for learning pod
gle_resource: ResourceConfig = field(
default_factory=lambda: ResourceConfig.make_burstable(0.2, "1Gi")
)
# Resource for learning pod

def post_setup(self):
valid_engines = set(
Expand Down Expand Up @@ -190,10 +187,11 @@ class EtcdConfig:
If address is set, all other etcd configurations are ignored.
"""

listening_client_port: int = 2379
# The port that etcd server will bind to for accepting client connections. Defaults to 2379.
listening_peer_port: int = 2380
listening_client_port: int = 2379

# The port that etcd server will bind to for accepting peer connections. Defaults to 2380.
listening_peer_port: int = 2380

# Kubernetes related config
replicas: int = 1
Expand All @@ -203,20 +201,21 @@ class EtcdConfig:
class VineyardConfig:
"""Vineyard configuration"""

socket: Union[str, None] = None
# Vineyard IPC socket path, a socket suffixed by timestamp will be created in '/tmp' if not given.
socket: Union[str, None] = None
rpc_port: int = 9600 # Vineyard RPC port.

# Kubernetes related config
deployment_name: Union[str, None] = None

# The name of vineyard deployment, it should exist as expected.
deployment_name: Union[str, None] = None

image: str = "vineyardcloudnative/vineyardd:latest" # Image for vineyard container.

# Resource for vineyard sidecar container
resource: ResourceConfig = field(
default_factory=lambda: ResourceConfig.make_burstable(0.2, "256Mi")
)
# Resource for vineyard sidecar container


@dataclass
Expand All @@ -231,62 +230,63 @@ class CoordinatorConfig:
monitor_port: int = 9090 # Coordinator prometheus exporter service port.

# Kubernetes related config
deployment_name: Union[str, None] = None

# Name of the coordinator deployment and service.
node_selector: Union[str, None] = None
deployment_name: Union[str, None] = None
# Node selector for coordinator pod in kubernetes
node_selector: Union[str, None] = None
# Resource configuration of coordinator.
resource: ResourceConfig = field(
default_factory=lambda: ResourceConfig.make_burstable(0.5, "512Mi")
)
# Resource configuration of coordinator.

# For GraphScope operator
operator_mode: bool = False
# Launch coordinator only, do not let coordinator launch resources or delete resources.
# It would try to find existing resources and connect to it.
operator_mode: bool = False


@dataclass
class HostsLauncherConfig:
"""Local cluster configuration."""

hosts: List[str] = list_field("localhost")
# list of hostnames of graphscope engine workers.
etcd: EtcdConfig = field(default_factory=EtcdConfig)
hosts: List[str] = list_field("localhost")
# Etcd configuration. Only local session needs to configure etcd.
etcd: EtcdConfig = field(default_factory=EtcdConfig)

dataset_download_retries: int = 3
# The number of retries when downloading dataset from internet.
dataset_download_retries: int = 3


@dataclass
class KubernetesLauncherConfig:
"""Kubernetes cluster configuration."""

namespace: Union[str, None] = None
# The namespace to create all resource, which must exist in advance.
namespace: Union[str, None] = None
delete_namespace: bool = False # Delete the namespace that created by graphscope.

config_file: Union[str, None] = None # kube config file path

deployment_mode = "eager" # The deployment mode of engines on the kubernetes cluster, choose from 'eager' or 'lazy'.
# The deployment mode of engines on the kubernetes cluster, choose from 'eager' or 'lazy'.
deployment_mode: str = "eager"

service_type: str = "NodePort"
# Service type, choose from 'NodePort' or 'LoadBalancer'.
service_type: str = "NodePort"

volumes: Union[str, None] = None
# A base64 encoded json string specifies the kubernetes volumes to mount.
volumes: Union[str, None] = None

waiting_for_delete: bool = False
# Wait until the graphscope instance has been deleted successfully.
waiting_for_delete: bool = False

image: ImageConfig = field(default_factory=ImageConfig) # Image configuration.

engine: EngineConfig = field(default_factory=EngineConfig) # Engine configuration.

dataset: DatasetConfig = field(
default_factory=DatasetConfig
) # Dataset configuration.
# Dataset configuration.
dataset: DatasetConfig = field(default_factory=DatasetConfig)

mars: MarsConfig = field(default_factory=MarsConfig) # Mars configuration.

Expand All @@ -303,50 +303,50 @@ class SessionConfig:
"""Session configuration"""

num_workers: int = 2 # The number of graphscope engine workers.
# The number of graphscope engine workers when launch local workers.
default_local_num_workers: int = 1

reconnect: bool = False # Connect to an existed GraphScope Cluster
instance_id: Union[str, None] = None # Unique id for each GraphScope instance.

show_log: bool = False # Show log or not.
log_level: str = "info" # Log level, choose from 'info' or 'debug'.

# The length of time to wait before giving up launching graphscope.z
timeout_seconds: int = 600
# The length of time to wait before giving up launching graphscope.
dangling_timeout_seconds: int = 600
# The length of time to wait starting from client disconnected before killing the graphscope instance.
dangling_timeout_seconds: int = 600

retry_time_seconds: int = 1
# The length of time to wait before retrying to launch graphscope.
retry_time_seconds: int = 1

execution_mode: str = "eager" # The deploying mode of graphscope, eager or lazy.


@dataclass
class Config(Serializable):
launcher_type: str = "k8s"
# Launcher type, choose from 'hosts', 'k8s' or 'operator'.
launcher_type: str = "k8s"

session: SessionConfig = field(default_factory=SessionConfig)

coordinator: CoordinatorConfig = field(
default_factory=CoordinatorConfig
) # Coordinator configuration.
vineyard: VineyardConfig = field(
default_factory=VineyardConfig
) # Vineyard configuration.
# Coordinator configuration.
coordinator: CoordinatorConfig = field(default_factory=CoordinatorConfig)
# Vineyard configuration.
vineyard: VineyardConfig = field(default_factory=VineyardConfig)

hosts_launcher: HostsLauncherConfig = field(default_factory=HostsLauncherConfig)
# Local cluster configuration.
hosts_launcher: HostsLauncherConfig = field(default_factory=HostsLauncherConfig)

# Kubernetes cluster configuration.
kubernetes_launcher: KubernetesLauncherConfig = field(
default_factory=KubernetesLauncherConfig
)
# Kubernetes cluster configuration.

# Launcher used in operator mode.
operator_launcher: OperatorLauncherConfig = field(
default_factory=OperatorLauncherConfig
)
# Launcher used in operator mode.

def set_option(self, key, value): # noqa: C901
"""Forward set_option target to actual config fields"""
Expand Down Expand Up @@ -422,6 +422,7 @@ def set_option(self, key, value): # noqa: C901
self.kubernetes_launcher.waiting_for_delete = value
elif key == "num_workers":
self.session.num_workers = value
self.session.default_local_num_workers = value
elif key == "show_log":
self.session.show_log = value
elif key == "log_level":
Expand Down

0 comments on commit 52cac82

Please sign in to comment.