diff --git a/openapi/openapiv2.json b/openapi/openapiv2.json index bd0bc4351..13909d087 100644 --- a/openapi/openapiv2.json +++ b/openapi/openapiv2.json @@ -2184,6 +2184,98 @@ ] } }, + "/api/v1/namespaces/{namespace}/workers": { + "get": { + "summary": "ListWorkers is a visibility API to list worker status information in a specific namespace.", + "operationId": "ListWorkers2", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1ListWorkersResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "pageSize", + "in": "query", + "required": false, + "type": "integer", + "format": "int32" + }, + { + "name": "nextPageToken", + "in": "query", + "required": false, + "type": "string", + "format": "byte" + }, + { + "name": "query", + "description": "`query` in ListWorkers is used to filter workers based on worker status info.\nThe following worker status attributes are expected are supported as part of the query:\n* WorkerId\n* WorkerIdentity\n* HostId\n* TaskQueue\n* DeploymentName\n* BuildId\n* SdkName\n* SdkVersion\n* StartTime\n* LastHeartbeatTime\n* Status\nCurrently metrics are not supported as a part of ListWorkers query.", + "in": "query", + "required": false, + "type": "string" + } + ], + "tags": [ + "WorkflowService" + ] + } + }, + "/api/v1/namespaces/{namespace}/workers/heartbeat": { + "post": { + "summary": "WorkerHeartbeat receive heartbeat request from the worker.", + "operationId": "RecordWorkerHeartbeat2", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1RecordWorkerHeartbeatResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "description": "Namespace of the workflow which scheduled this activity.", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/WorkflowServiceRecordWorkerHeartbeatBody" + } + } + ], + "tags": [ + "WorkflowService" + ] + } + }, "/api/v1/namespaces/{namespace}/workflow-count": { "get": { "summary": "CountWorkflowExecutions is a visibility API to count of workflow executions in a specific namespace.", @@ -5624,6 +5716,46 @@ ] } }, + "/namespaces/{namespace}/worker-heartbeat": { + "post": { + "summary": "WorkerHeartbeat receive heartbeat request from the worker.", + "operationId": "RecordWorkerHeartbeat", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1RecordWorkerHeartbeatResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "description": "Namespace of the workflow which scheduled this activity.", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/WorkflowServiceRecordWorkerHeartbeatBody" + } + } + ], + "tags": [ + "WorkflowService" + ] + } + }, "/namespaces/{namespace}/worker-task-reachability": { "get": { "summary": "Deprecated. Use `DescribeTaskQueue`.", @@ -5693,6 +5825,58 @@ ] } }, + "/namespaces/{namespace}/workers": { + "get": { + "summary": "ListWorkers is a visibility API to list worker status information in a specific namespace.", + "operationId": "ListWorkers", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1ListWorkersResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "pageSize", + "in": "query", + "required": false, + "type": "integer", + "format": "int32" + }, + { + "name": "nextPageToken", + "in": "query", + "required": false, + "type": "string", + "format": "byte" + }, + { + "name": "query", + "description": "`query` in ListWorkers is used to filter workers based on worker status info.\nThe following worker status attributes are expected are supported as part of the query:\n* WorkerId\n* WorkerIdentity\n* HostId\n* TaskQueue\n* DeploymentName\n* BuildId\n* SdkName\n* SdkVersion\n* StartTime\n* LastHeartbeatTime\n* Status\nCurrently metrics are not supported as a part of ListWorkers query.", + "in": "query", + "required": false, + "type": "string" + } + ], + "tags": [ + "WorkflowService" + ] + } + }, "/namespaces/{namespace}/workflow-count": { "get": { "summary": "CountWorkflowExecutions is a visibility API to count of workflow executions in a specific namespace.", @@ -7385,6 +7569,18 @@ } } }, + "WorkflowServiceRecordWorkerHeartbeatBody": { + "type": "object", + "properties": { + "identity": { + "type": "string", + "description": "The identity of the client who initiated this request." + }, + "workerInfo": { + "$ref": "#/definitions/v1WorkerInfo" + } + } + }, "WorkflowServiceRequestCancelWorkflowExecutionBody": { "type": "object", "properties": { @@ -11174,6 +11370,23 @@ } } }, + "v1ListWorkersResponse": { + "type": "object", + "properties": { + "workerInfo": { + "type": "array", + "items": { + "type": "object", + "$ref": "#/definitions/v1WorkerInfo" + } + }, + "nextPageToken": { + "type": "string", + "format": "byte", + "title": "Next page token" + } + } + }, "v1ListWorkflowExecutionsResponse": { "type": "object", "properties": { @@ -12507,6 +12720,9 @@ } } }, + "v1RecordWorkerHeartbeatResponse": { + "type": "object" + }, "v1RegisterNamespaceRequest": { "type": "object", "properties": { @@ -15053,6 +15269,174 @@ "default": "WORKER_DEPLOYMENT_VERSION_STATUS_UNSPECIFIED", "description": "Specify the status of a Worker Deployment Version.\nExperimental. Worker Deployments are experimental and might significantly change in the future.\n\n - WORKER_DEPLOYMENT_VERSION_STATUS_INACTIVE: The Worker Deployment Version has been created inside the Worker Deployment but is not used by any\nworkflow executions. These Versions can still have workflows if they have an explicit Versioning Override targeting\nthis Version. Such Versioning Override could be set at workflow start time, or at a later time via `UpdateWorkflowExecutionOptions`.\n - WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT: The Worker Deployment Version is the current version of the Worker Deployment. All new workflow executions \nand tasks of existing unversioned or AutoUpgrade workflows are routed to this version.\n - WORKER_DEPLOYMENT_VERSION_STATUS_RAMPING: The Worker Deployment Version is the ramping version of the Worker Deployment. A subset of new Pinned workflow executions are \nrouted to this version. Moreover, a portion of existing unversioned or AutoUpgrade workflow executions are also routed to this version.\n - WORKER_DEPLOYMENT_VERSION_STATUS_DRAINING: The Worker Deployment Version is not used by new workflows but is still used by\nopen pinned workflows. The version cannot be decommissioned safely.\n - WORKER_DEPLOYMENT_VERSION_STATUS_DRAINED: The Worker Deployment Version is not used by new or open workflows, but might be still needed by\nQueries sent to closed workflows. The version can be decommissioned safely if user does\nnot query closed workflows. If the user does query closed workflows for some time x after\nworkflows are closed, they should decommission the version after it has been drained for that duration." }, + "v1WorkerHostInfo": { + "type": "object", + "properties": { + "hostName": { + "type": "string", + "description": "Worker host identifier." + }, + "processId": { + "type": "string", + "description": "Worker process identifier, should be unique for the host." + }, + "workerIdentity": { + "type": "string", + "description": "Worker identity, set by the client, may not be unique.\nUsually host_name+(user group name)+process_id, but can be overwritten by the user." + } + }, + "title": "Holds everything needed to identify the worker host/process context" + }, + "v1WorkerInfo": { + "type": "object", + "properties": { + "workerInstanceKey": { + "type": "string", + "description": "Worker identifier, should be unique for the namespace.\nIt is distinct from worker identity, which is not necessarily namespace-unique." + }, + "hostInfo": { + "$ref": "#/definitions/v1WorkerHostInfo", + "description": "Worker host information." + }, + "taskQueue": { + "type": "string", + "description": "Task queue this worker is polling for tasks." + }, + "deploymentVersion": { + "$ref": "#/definitions/v1WorkerDeploymentVersion" + }, + "sdkName": { + "type": "string" + }, + "sdkVersion": { + "type": "string" + }, + "status": { + "$ref": "#/definitions/v1WorkerStatus", + "description": "Worker status. Defined by SDK." + }, + "startTime": { + "type": "string", + "format": "date-time", + "title": "Worker start time.\nIt can be used to determine worker uptime. (current time - start time)" + }, + "lastHeartbeatTime": { + "type": "string", + "format": "date-time", + "description": "Last heartbeat time, coming from the worker. Worker should set it to \"now\"." + }, + "elapsedSinceLastHeartbeat": { + "type": "string", + "description": "Elapsed time since the last heartbeat from the worker." + }, + "workflowTaskSlotsInfo": { + "$ref": "#/definitions/v1WorkerSlotsInfo" + }, + "activityTaskSlotsInfo": { + "$ref": "#/definitions/v1WorkerSlotsInfo" + }, + "nexusTaskSlotsInfo": { + "$ref": "#/definitions/v1WorkerSlotsInfo" + }, + "localActivitySlotsInfo": { + "$ref": "#/definitions/v1WorkerSlotsInfo" + }, + "workflowPollerInfo": { + "$ref": "#/definitions/v1WorkerPollerInfo" + }, + "workflowStickyPollerInfo": { + "$ref": "#/definitions/v1WorkerPollerInfo" + }, + "activityPollerInfo": { + "$ref": "#/definitions/v1WorkerPollerInfo" + }, + "nexusPollerInfo": { + "$ref": "#/definitions/v1WorkerPollerInfo" + }, + "currentHostCpuUsage": { + "type": "number", + "format": "float" + }, + "currentHostMemUsage": { + "type": "string", + "format": "int64" + }, + "totalStickyCacheHit": { + "type": "integer", + "format": "int32", + "description": "A Workflow Task found a cached Workflow Execution to run against." + }, + "totalStickyCacheMiss": { + "type": "integer", + "format": "int32", + "description": "A Workflow Task did not find a cached Workflow execution to run against." + }, + "currentStickyCacheSize": { + "type": "integer", + "format": "int32", + "description": "Current cache size, expressed in number of Workflow Executions." + } + }, + "description": "Worker info message, contains information about the worker and its current state.\nAll information is provided by the worker itself." + }, + "v1WorkerPollerInfo": { + "type": "object", + "properties": { + "activePollers": { + "type": "integer", + "format": "int32" + }, + "lastSuccessfulPollTime": { + "type": "string", + "format": "date-time" + } + } + }, + "v1WorkerSlotsInfo": { + "type": "object", + "properties": { + "currentSlotsAvailable": { + "type": "integer", + "format": "int32", + "description": "Number of slots available for the worker to specific tasks.\nMay be -1 if the upper bound is not known." + }, + "currentSlotsUsed": { + "type": "integer", + "format": "int32", + "description": "Number of slots used by the worker for specific tasks." + }, + "totalProcessedTasks": { + "type": "integer", + "format": "int32", + "description": "Total number of tasks processed (completed both successfully and unsuccesfully, or any other way)\nby the worker since the worker started. This is a cumulative counter." + }, + "totalFailedTasks": { + "type": "integer", + "format": "int32", + "description": "Total number of failed tasks processed by the worker so far." + }, + "processedTasksLastInterval": { + "type": "integer", + "format": "int32", + "description": "Number of tasks processed in since the last heartbeat from the worker.\nThis is a cumulative counter, and it is reset to 0 each time the worker sends a heartbeat.\nContains both successful and failed tasks." + }, + "failureTasksLastInterval": { + "type": "integer", + "format": "int32", + "description": "Number of failed tasks processed since the last heartbeat from the worker." + } + } + }, + "v1WorkerStatus": { + "type": "string", + "enum": [ + "WORKER_STATUS_UNSPECIFIED", + "WORKER_STATUS_RUNNING", + "WORKER_STATUS_SHUTTING_DOWN", + "WORKER_STATUS_SHUTDOWN" + ], + "default": "WORKER_STATUS_UNSPECIFIED" + }, "v1WorkerVersionCapabilities": { "type": "object", "properties": { diff --git a/openapi/openapiv3.yaml b/openapi/openapiv3.yaml index a0dfa04d3..e801910c3 100644 --- a/openapi/openapiv3.yaml +++ b/openapi/openapiv3.yaml @@ -1982,6 +1982,92 @@ paths: application/json: schema: $ref: '#/components/schemas/Status' + /api/v1/namespaces/{namespace}/workers: + get: + tags: + - WorkflowService + description: ListWorkers is a visibility API to list worker status information in a specific namespace. + operationId: ListWorkers + parameters: + - name: namespace + in: path + required: true + schema: + type: string + - name: pageSize + in: query + schema: + type: integer + format: int32 + - name: nextPageToken + in: query + schema: + type: string + format: bytes + - name: query + in: query + description: |- + `query` in ListWorkers is used to filter workers based on worker status info. + The following worker status attributes are expected are supported as part of the query: + * WorkerId + * WorkerIdentity + * HostId + * TaskQueue + * DeploymentName + * BuildId + * SdkName + * SdkVersion + * StartTime + * LastHeartbeatTime + * Status + Currently metrics are not supported as a part of ListWorkers query. + schema: + type: string + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ListWorkersResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' + /api/v1/namespaces/{namespace}/workers/heartbeat: + post: + tags: + - WorkflowService + description: WorkerHeartbeat receive heartbeat request from the worker. + operationId: RecordWorkerHeartbeat + parameters: + - name: namespace + in: path + description: Namespace of the workflow which scheduled this activity. + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/RecordWorkerHeartbeatRequest' + required: true + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/RecordWorkerHeartbeatResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' /api/v1/namespaces/{namespace}/workflow-count: get: tags: @@ -5049,6 +5135,38 @@ paths: application/json: schema: $ref: '#/components/schemas/Status' + /namespaces/{namespace}/worker-heartbeat: + post: + tags: + - WorkflowService + description: WorkerHeartbeat receive heartbeat request from the worker. + operationId: RecordWorkerHeartbeat + parameters: + - name: namespace + in: path + description: Namespace of the workflow which scheduled this activity. + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/RecordWorkerHeartbeatRequest' + required: true + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/RecordWorkerHeartbeatResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' /namespaces/{namespace}/worker-task-reachability: get: tags: @@ -5129,6 +5247,60 @@ paths: application/json: schema: $ref: '#/components/schemas/Status' + /namespaces/{namespace}/workers: + get: + tags: + - WorkflowService + description: ListWorkers is a visibility API to list worker status information in a specific namespace. + operationId: ListWorkers + parameters: + - name: namespace + in: path + required: true + schema: + type: string + - name: pageSize + in: query + schema: + type: integer + format: int32 + - name: nextPageToken + in: query + schema: + type: string + format: bytes + - name: query + in: query + description: |- + `query` in ListWorkers is used to filter workers based on worker status info. + The following worker status attributes are expected are supported as part of the query: + * WorkerId + * WorkerIdentity + * HostId + * TaskQueue + * DeploymentName + * BuildId + * SdkName + * SdkVersion + * StartTime + * LastHeartbeatTime + * Status + Currently metrics are not supported as a part of ListWorkers query. + schema: + type: string + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ListWorkersResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' /namespaces/{namespace}/workflow-count: get: tags: @@ -8288,6 +8460,17 @@ components: - $ref: '#/components/schemas/WorkerDeploymentInfo_WorkerDeploymentVersionSummary' description: Summary of the ramping version of the Worker Deployment. description: A subset of WorkerDeploymentInfo + ListWorkersResponse: + type: object + properties: + workerInfo: + type: array + items: + $ref: '#/components/schemas/WorkerInfo' + nextPageToken: + type: string + description: Next page token + format: bytes ListWorkflowExecutionsResponse: type: object properties: @@ -9498,6 +9681,20 @@ components: description: |- Will be set to true if the activity was reset. Applies only to the current run. + RecordWorkerHeartbeatRequest: + type: object + properties: + namespace: + type: string + description: Namespace of the workflow which scheduled this activity. + identity: + type: string + description: The identity of the client who initiated this request. + workerInfo: + $ref: '#/components/schemas/WorkerInfo' + RecordWorkerHeartbeatResponse: + type: object + properties: {} RegisterNamespaceRequest: type: object properties: @@ -12467,6 +12664,146 @@ components: - TASK_QUEUE_TYPE_NEXUS type: string format: enum + WorkerHostInfo: + type: object + properties: + hostName: + type: string + description: Worker host identifier. + processId: + type: string + description: Worker process identifier, should be unique for the host. + workerIdentity: + type: string + description: |- + Worker identity, set by the client, may not be unique. + Usually host_name+(user group name)+process_id, but can be overwritten by the user. + description: Holds everything needed to identify the worker host/process context + WorkerInfo: + type: object + properties: + workerInstanceKey: + type: string + description: |- + Worker identifier, should be unique for the namespace. + It is distinct from worker identity, which is not necessarily namespace-unique. + hostInfo: + allOf: + - $ref: '#/components/schemas/WorkerHostInfo' + description: Worker host information. + taskQueue: + type: string + description: Task queue this worker is polling for tasks. + deploymentVersion: + $ref: '#/components/schemas/WorkerDeploymentVersion' + sdkName: + type: string + sdkVersion: + type: string + status: + enum: + - WORKER_STATUS_UNSPECIFIED + - WORKER_STATUS_RUNNING + - WORKER_STATUS_SHUTTING_DOWN + - WORKER_STATUS_SHUTDOWN + type: string + description: Worker status. Defined by SDK. + format: enum + startTime: + type: string + description: |- + Worker start time. + It can be used to determine worker uptime. (current time - start time) + format: date-time + lastHeartbeatTime: + type: string + description: Last heartbeat time, coming from the worker. Worker should set it to "now". + format: date-time + elapsedSinceLastHeartbeat: + pattern: ^-?(?:0|[1-9][0-9]{0,11})(?:\.[0-9]{1,9})?s$ + type: string + description: Elapsed time since the last heartbeat from the worker. + workflowTaskSlotsInfo: + $ref: '#/components/schemas/WorkerSlotsInfo' + activityTaskSlotsInfo: + $ref: '#/components/schemas/WorkerSlotsInfo' + nexusTaskSlotsInfo: + $ref: '#/components/schemas/WorkerSlotsInfo' + localActivitySlotsInfo: + $ref: '#/components/schemas/WorkerSlotsInfo' + workflowPollerInfo: + $ref: '#/components/schemas/WorkerPollerInfo' + workflowStickyPollerInfo: + $ref: '#/components/schemas/WorkerPollerInfo' + activityPollerInfo: + $ref: '#/components/schemas/WorkerPollerInfo' + nexusPollerInfo: + $ref: '#/components/schemas/WorkerPollerInfo' + currentHostCpuUsage: + type: number + format: float + currentHostMemUsage: + type: string + totalStickyCacheHit: + type: integer + description: A Workflow Task found a cached Workflow Execution to run against. + format: int32 + totalStickyCacheMiss: + type: integer + description: A Workflow Task did not find a cached Workflow execution to run against. + format: int32 + currentStickyCacheSize: + type: integer + description: Current cache size, expressed in number of Workflow Executions. + format: int32 + description: |- + Worker info message, contains information about the worker and its current state. + All information is provided by the worker itself. + (-- api-linter: core::0140::prepositions=disabled + aip.dev/not-precedent: Removing those words make names less clear. --) + WorkerPollerInfo: + type: object + properties: + activePollers: + type: integer + format: int32 + lastSuccessfulPollTime: + type: string + format: date-time + WorkerSlotsInfo: + type: object + properties: + currentSlotsAvailable: + type: integer + description: |- + Number of slots available for the worker to specific tasks. + May be -1 if the upper bound is not known. + format: int32 + currentSlotsUsed: + type: integer + description: Number of slots used by the worker for specific tasks. + format: int32 + totalProcessedTasks: + type: integer + description: |- + Total number of tasks processed (completed both successfully and unsuccesfully, or any other way) + by the worker since the worker started. This is a cumulative counter. + format: int32 + totalFailedTasks: + type: integer + description: Total number of failed tasks processed by the worker so far. + format: int32 + processedTasksLastInterval: + type: integer + description: |- + Number of tasks processed in since the last heartbeat from the worker. + This is a cumulative counter, and it is reset to 0 each time the worker sends a heartbeat. + Contains both successful and failed tasks. + format: int32 + failureTasksLastInterval: + type: integer + description: Number of failed tasks processed since the last heartbeat from the worker. + format: int32 WorkerVersionCapabilities: type: object properties: diff --git a/temporal/api/enums/v1/common.proto b/temporal/api/enums/v1/common.proto index c45174b77..192c1d75b 100644 --- a/temporal/api/enums/v1/common.proto +++ b/temporal/api/enums/v1/common.proto @@ -96,4 +96,13 @@ enum ApplicationErrorCategory { APPLICATION_ERROR_CATEGORY_UNSPECIFIED = 0; // Expected application error with little/no severity. APPLICATION_ERROR_CATEGORY_BENIGN = 1; -} \ No newline at end of file +} + +// (-- api-linter: core::0216::synonyms=disabled +// aip.dev/not-precedent: It seems we have both state and status, and status is a better fit for workers. --) +enum WorkerStatus { + WORKER_STATUS_UNSPECIFIED = 0; + WORKER_STATUS_RUNNING = 1; + WORKER_STATUS_SHUTTING_DOWN = 2; + WORKER_STATUS_SHUTDOWN = 3; +} diff --git a/temporal/api/worker/v1/message.proto b/temporal/api/worker/v1/message.proto new file mode 100644 index 000000000..d19a9a302 --- /dev/null +++ b/temporal/api/worker/v1/message.proto @@ -0,0 +1,110 @@ +syntax = "proto3"; + +package temporal.api.worker.v1; + +option go_package = "go.temporal.io/api/worker/v1;worker"; +option java_package = "io.temporal.api.worker.v1"; +option java_multiple_files = true; +option java_outer_classname = "MessageProto"; +option ruby_package = "Temporalio::Api::Worker::V1"; +option csharp_namespace = "Temporalio.Api.Worker.V1"; + +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; +import "temporal/api/deployment/v1/message.proto"; +import "temporal/api/enums/v1/common.proto"; + +message WorkerPollerInfo { + + int32 active_pollers = 1; + + google.protobuf.Timestamp last_successful_poll_time = 2; +} + +message WorkerSlotsInfo { + + // Number of slots available for the worker to specific tasks. + // May be -1 if the upper bound is not known. + int32 current_slots_available = 1; + // Number of slots used by the worker for specific tasks. + int32 current_slots_used = 2; + + // Total number of tasks processed (completed both successfully and unsuccesfully, or any other way) + // by the worker since the worker started. This is a cumulative counter. + int32 total_processed_tasks = 3; + // Total number of failed tasks processed by the worker so far. + int32 total_failed_tasks = 4; + + // Number of tasks processed in since the last heartbeat from the worker. + // This is a cumulative counter, and it is reset to 0 each time the worker sends a heartbeat. + // Contains both successful and failed tasks. + int32 processed_tasks_last_interval = 5; + // Number of failed tasks processed since the last heartbeat from the worker. + int32 failure_tasks_last_interval = 6; +} + +// Holds everything needed to identify the worker host/process context +message WorkerHostInfo { + // Worker host identifier. + string host_name = 1; + + // Worker process identifier, should be unique for the host. + string process_id = 2; + + // Worker identity, set by the client, may not be unique. + // Usually host_name+(user group name)+process_id, but can be overwritten by the user. + string worker_identity = 3; +} + +// Worker info message, contains information about the worker and its current state. +// All information is provided by the worker itself. +// (-- api-linter: core::0140::prepositions=disabled +// aip.dev/not-precedent: Removing those words make names less clear. --) +message WorkerInfo { + // Worker identifier, should be unique for the namespace. + // It is distinct from worker identity, which is not necessarily namespace-unique. + string worker_instance_key = 1; + + // Worker host information. + WorkerHostInfo host_info = 2; + + // Task queue this worker is polling for tasks. + string task_queue = 3; + + temporal.api.deployment.v1.WorkerDeploymentVersion deployment_version = 4; + + string sdk_name = 5; + string sdk_version = 6; + + // Worker status. Defined by SDK. + temporal.api.enums.v1.WorkerStatus status = 7; + + // Worker start time. + // It can be used to determine worker uptime. (current time - start time) + google.protobuf.Timestamp start_time = 8; + + // Last heartbeat time, coming from the worker. Worker should set it to "now". + google.protobuf.Timestamp last_heartbeat_time = 9; + // Elapsed time since the last heartbeat from the worker. + google.protobuf.Duration elapsed_since_last_heartbeat = 10; + + WorkerSlotsInfo workflow_task_slots_info = 11; + WorkerSlotsInfo activity_task_slots_info = 12; + WorkerSlotsInfo nexus_task_slots_info = 13; + WorkerSlotsInfo local_activity_slots_info = 14; + + WorkerPollerInfo workflow_poller_info = 15; + WorkerPollerInfo workflow_sticky_poller_info = 16; + WorkerPollerInfo activity_poller_info = 17; + WorkerPollerInfo nexus_poller_info = 18; + + float current_host_cpu_usage = 19; + int64 current_host_mem_usage = 20; + + // A Workflow Task found a cached Workflow Execution to run against. + int32 total_sticky_cache_hit = 21; + // A Workflow Task did not find a cached Workflow execution to run against. + int32 total_sticky_cache_miss = 22; + // Current cache size, expressed in number of Workflow Executions. + int32 current_sticky_cache_size = 23; +} diff --git a/temporal/api/workflowservice/v1/request_response.proto b/temporal/api/workflowservice/v1/request_response.proto index 544499b77..caaf6b1d0 100644 --- a/temporal/api/workflowservice/v1/request_response.proto +++ b/temporal/api/workflowservice/v1/request_response.proto @@ -40,6 +40,7 @@ import "temporal/api/batch/v1/message.proto"; import "temporal/api/sdk/v1/task_complete_metadata.proto"; import "temporal/api/sdk/v1/user_metadata.proto"; import "temporal/api/nexus/v1/message.proto"; +import "temporal/api/worker/v1/message.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/field_mask.proto"; @@ -993,6 +994,8 @@ message ShutdownWorkerRequest { string sticky_task_queue = 2; string identity = 3; string reason = 4; + + temporal.api.worker.v1.WorkerInfo worker_info = 5; } message ShutdownWorkerResponse { @@ -2346,3 +2349,45 @@ message TriggerWorkflowRuleResponse { // True is the rule was applied, based on the rule conditions (predicate/visibility_query). bool applied = 1; } +message RecordWorkerHeartbeatRequest { + // Namespace of the workflow which scheduled this activity. + string namespace = 1; + + // The identity of the client who initiated this request. + string identity = 2; + + temporal.api.worker.v1.WorkerInfo worker_info = 3; +} + +message RecordWorkerHeartbeatResponse { + +} + +message ListWorkersRequest { + string namespace = 1; + int32 page_size = 2; + bytes next_page_token = 3; + + // `query` in ListWorkers is used to filter workers based on worker status info. + // The following worker status attributes are expected are supported as part of the query: + //* WorkerId + //* WorkerIdentity + //* HostId + //* TaskQueue + //* DeploymentName + //* BuildId + //* SdkName + //* SdkVersion + //* StartTime + //* LastHeartbeatTime + //* Status + // Currently metrics are not supported as a part of ListWorkers query. + string query = 4; +} + +message ListWorkersResponse { + repeated temporal.api.worker.v1.WorkerInfo worker_info = 1; + + // Next page token + bytes next_page_token = 2; +} diff --git a/temporal/api/workflowservice/v1/service.proto b/temporal/api/workflowservice/v1/service.proto index 865386506..45bcfa5c8 100644 --- a/temporal/api/workflowservice/v1/service.proto +++ b/temporal/api/workflowservice/v1/service.proto @@ -1173,4 +1173,25 @@ service WorkflowService { }; } + // WorkerHeartbeat receive heartbeat request from the worker. + rpc RecordWorkerHeartbeat(RecordWorkerHeartbeatRequest) returns (RecordWorkerHeartbeatResponse) { + option (google.api.http) = { + post: "/namespaces/{namespace}/worker-heartbeat" + body: "*" + additional_bindings { + post: "/api/v1/namespaces/{namespace}/workers/heartbeat" + body: "*" + } + }; + }; + + // ListWorkers is a visibility API to list worker status information in a specific namespace. + rpc ListWorkers (ListWorkersRequest) returns (ListWorkersResponse) { + option (google.api.http) = { + get: "/namespaces/{namespace}/workers" + additional_bindings { + get: "/api/v1/namespaces/{namespace}/workers" + } + }; + } }