Skip to content

Commit

Permalink
feat: controller changes for Side Inputs support (#866)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed Jul 18, 2023
1 parent 92db62a commit 6d14998
Show file tree
Hide file tree
Showing 74 changed files with 12,139 additions and 1,503 deletions.
212 changes: 199 additions & 13 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -17334,7 +17334,8 @@
"type": "boolean"
},
"containerTemplate": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ContainerTemplate"
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ContainerTemplate",
"description": "Container template for the main numa container."
},
"dnsConfig": {
"$ref": "#/definitions/io.k8s.api.core.v1.PodDNSConfig",
Expand All @@ -17354,10 +17355,11 @@
"x-kubernetes-patch-strategy": "merge"
},
"initContainerTemplate": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ContainerTemplate"
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ContainerTemplate",
"description": "Container template for all the vertex pod init containers spawned by numaflow, excluding the ones specified by the user."
},
"initContainers": {
"description": "List of init containers belonging to the pod. More info: https://kubernetes.io/docs/concepts/workloads/pods/init-containers/",
"description": "List of customized init containers belonging to the pod. More info: https://kubernetes.io/docs/concepts/workloads/pods/init-containers/",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.Container"
},
Expand Down Expand Up @@ -17411,8 +17413,19 @@
"description": "ServiceAccountName applied to the pod",
"type": "string"
},
"sideInputs": {
"description": "Names of the side inputs used in this vertex.",
"items": {
"type": "string"
},
"type": "array"
},
"sideInputsContainerTemplate": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ContainerTemplate",
"description": "Container template for the side inputs watcher container."
},
"sidecars": {
"description": "List of sidecar containers belonging to the pod.",
"description": "List of customized sidecar containers belonging to the pod.",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.Container"
},
Expand Down Expand Up @@ -18050,7 +18063,7 @@
],
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.GetVertexPodSpecReq": {
"io.numaproj.numaflow.v1alpha1.GetSideInputDeploymentReq": {
"properties": {
"Env": {
"items": {
Expand All @@ -18076,6 +18089,36 @@
],
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.GetVertexPodSpecReq": {
"properties": {
"Env": {
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.EnvVar"
},
"type": "array"
},
"ISBSvcType": {
"type": "string"
},
"Image": {
"type": "string"
},
"PullPolicy": {
"type": "string"
},
"SideInputsStoreName": {
"type": "string"
}
},
"required": [
"ISBSvcType",
"Image",
"PullPolicy",
"Env",
"SideInputsStoreName"
],
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.GroupBy": {
"description": "GroupBy indicates it is a reducer UDF",
"properties": {
Expand Down Expand Up @@ -18735,7 +18778,7 @@
"io.numaproj.numaflow.v1alpha1.PipelineLimits": {
"properties": {
"bufferMaxLength": {
"description": "BufferMaxLength is used to define the max length of a buffer Only applies to UDF and Source vertices as only they do buffer write. It can be overridden by the settings in vertex limits.",
"description": "BufferMaxLength is used to define the max length of a buffer. Only applies to UDF and Source vertices as only they do buffer write. It can be overridden by the settings in vertex limits.",
"format": "int64",
"type": "integer"
},
Expand All @@ -18745,7 +18788,7 @@
"type": "integer"
},
"readBatchSize": {
"description": "Read batch size for all the vertices in the pipeline, can be overridden by the vertex's limit settings",
"description": "Read batch size for all the vertices in the pipeline, can be overridden by the vertex's limit settings.",
"format": "int64",
"type": "integer"
},
Expand Down Expand Up @@ -18801,6 +18844,13 @@
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.PipelineLimits",
"description": "Limits define the limitations such as buffer read batch size for all the vertices of a pipeline, they could be overridden by each vertex's settings"
},
"sideInputs": {
"description": "SideInputs defines the Side Inputs of a pipeline.",
"items": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.SideInput"
},
"type": "array"
},
"templates": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Templates",
"description": "Templates is used to customize additional kubernetes resources required for the Pipeline"
Expand Down Expand Up @@ -19059,6 +19109,125 @@
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.SideInput": {
"description": "SideInput defines information of a Side Input",
"properties": {
"container": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Container"
},
"name": {
"type": "string"
},
"trigger": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.SideInputTrigger"
},
"volumes": {
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.Volume"
},
"type": "array",
"x-kubernetes-patch-merge-key": "name",
"x-kubernetes-patch-strategy": "merge"
}
},
"required": [
"name",
"container",
"trigger"
],
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.SideInputTrigger": {
"properties": {
"interval": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
},
"schedule": {
"type": "string"
},
"timezone": {
"type": "string"
}
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.SideInputsManagerTemplate": {
"properties": {
"affinity": {
"$ref": "#/definitions/io.k8s.api.core.v1.Affinity",
"description": "The pod's scheduling constraints More info: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/"
},
"automountServiceAccountToken": {
"description": "AutomountServiceAccountToken indicates whether a service account token should be automatically mounted.",
"type": "boolean"
},
"containerTemplate": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ContainerTemplate",
"description": "Template for the side inputs manager numa container"
},
"dnsConfig": {
"$ref": "#/definitions/io.k8s.api.core.v1.PodDNSConfig",
"description": "Specifies the DNS parameters of a pod. Parameters specified here will be merged to the generated DNS configuration based on DNSPolicy."
},
"dnsPolicy": {
"description": "Set DNS policy for the pod. Defaults to \"ClusterFirst\". Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default' or 'None'. DNS parameters given in DNSConfig will be merged with the policy selected with DNSPolicy. To have DNS options set along with hostNetwork, you have to specify DNS policy explicitly to 'ClusterFirstWithHostNet'.",
"type": "string"
},
"imagePullSecrets": {
"description": "ImagePullSecrets is an optional list of references to secrets in the same namespace to use for pulling any of the images used by this PodSpec. If specified, these secrets will be passed to individual puller implementations for them to use. For example, in the case of docker, only DockerConfig type secrets are honored. More info: https://kubernetes.io/docs/concepts/containers/images#specifying-imagepullsecrets-on-a-pod",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.LocalObjectReference"
},
"type": "array",
"x-kubernetes-patch-merge-key": "name",
"x-kubernetes-patch-strategy": "merge"
},
"initContainerTemplate": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ContainerTemplate",
"description": "Template for the side inputs manager init container"
},
"metadata": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Metadata",
"description": "Metadata sets the pods's metadata, i.e. annotations and labels"
},
"nodeSelector": {
"additionalProperties": {
"type": "string"
},
"description": "NodeSelector is a selector which must be true for the pod to fit on a node. Selector which must match a node's labels for the pod to be scheduled on that node. More info: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/",
"type": "object"
},
"priority": {
"description": "The priority value. Various system components use this field to find the priority of the Redis pod. When Priority Admission Controller is enabled, it prevents users from setting this field. The admission controller populates this field from PriorityClassName. The higher the value, the higher the priority. More info: https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/",
"format": "int32",
"type": "integer"
},
"priorityClassName": {
"description": "If specified, indicates the Redis pod's priority. \"system-node-critical\" and \"system-cluster-critical\" are two special keywords which indicate the highest priorities with the former being the highest priority. Any other name must be defined by creating a PriorityClass object with that name. If not specified, the pod priority will be default or zero if there is no default. More info: https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/",
"type": "string"
},
"runtimeClassName": {
"description": "RuntimeClassName refers to a RuntimeClass object in the node.k8s.io group, which should be used to run this pod. If no RuntimeClass resource matches the named class, the pod will not be run. If unset or empty, the \"legacy\" RuntimeClass will be used, which is an implicit class with an empty definition that uses the default runtime handler. More info: https://git.k8s.io/enhancements/keps/sig-node/585-runtime-class",
"type": "string"
},
"securityContext": {
"$ref": "#/definitions/io.k8s.api.core.v1.PodSecurityContext",
"description": "SecurityContext holds pod-level security attributes and common container settings. Optional: Defaults to empty. See type description for default values of each field."
},
"serviceAccountName": {
"description": "ServiceAccountName applied to the pod",
"type": "string"
},
"tolerations": {
"description": "If specified, the pod's tolerations.",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.Toleration"
},
"type": "array"
}
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.Sink": {
"properties": {
"blackhole": {
Expand Down Expand Up @@ -19169,11 +19338,15 @@
"properties": {
"daemon": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.DaemonTemplate",
"description": "DaemonTemplate is used to customize the Daemon Deployment"
"description": "DaemonTemplate is used to customize the Daemon Deployment."
},
"job": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.JobTemplate",
"description": "JobTemplate is used to customize Jobs"
"description": "JobTemplate is used to customize Jobs."
},
"sideInputsManager": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.SideInputsManagerTemplate",
"description": "SideInputsManagerTemplate is used to customize the Side Inputs Manager."
}
},
"type": "object"
Expand Down Expand Up @@ -19347,7 +19520,8 @@
"type": "boolean"
},
"containerTemplate": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ContainerTemplate"
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ContainerTemplate",
"description": "Container template for the main numa container."
},
"dnsConfig": {
"$ref": "#/definitions/io.k8s.api.core.v1.PodDNSConfig",
Expand All @@ -19373,10 +19547,11 @@
"x-kubernetes-patch-strategy": "merge"
},
"initContainerTemplate": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ContainerTemplate"
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ContainerTemplate",
"description": "Container template for all the vertex pod init containers spawned by numaflow, excluding the ones specified by the user."
},
"initContainers": {
"description": "List of init containers belonging to the pod. More info: https://kubernetes.io/docs/concepts/workloads/pods/init-containers/",
"description": "List of customized init containers belonging to the pod. More info: https://kubernetes.io/docs/concepts/workloads/pods/init-containers/",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.Container"
},
Expand Down Expand Up @@ -19440,8 +19615,19 @@
"description": "ServiceAccountName applied to the pod",
"type": "string"
},
"sideInputs": {
"description": "Names of the side inputs used in this vertex.",
"items": {
"type": "string"
},
"type": "array"
},
"sideInputsContainerTemplate": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ContainerTemplate",
"description": "Container template for the side inputs watcher container."
},
"sidecars": {
"description": "List of sidecar containers belonging to the pod.",
"description": "List of customized sidecar containers belonging to the pod.",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.Container"
},
Expand Down

0 comments on commit 6d14998

Please sign in to comment.