Skip to content

Commit

Permalink
refactor: buffer, edge, bucket (#733)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed May 30, 2023
1 parent d57bfed commit af8e334
Show file tree
Hide file tree
Showing 68 changed files with 3,080 additions and 1,857 deletions.
134 changes: 99 additions & 35 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -17381,6 +17381,11 @@
"description": "NodeSelector is a selector which must be true for the pod to fit on a node. Selector which must match a node's labels for the pod to be scheduled on that node. More info: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/",
"type": "object"
},
"partitions": {
"description": "Number of partitions of the vertex owned buffers. It applies to udf and sink vertices only.",
"format": "int32",
"type": "integer"
},
"priority": {
"description": "The priority value. Various system components use this field to find the priority of the Redis pod. When Priority Admission Controller is enabled, it prevents users from setting this field. The admission controller populates this field from PriorityClassName. The higher the value, the higher the priority. More info: https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/",
"format": "int32",
Expand Down Expand Up @@ -17470,30 +17475,74 @@
"description": "Blackhole is a sink to emulate /dev/null",
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.Buffer": {
"io.numaproj.numaflow.v1alpha1.BufferServiceConfig": {
"properties": {
"Name": {
"type": "string"
"jetstream": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.JetStreamConfig"
},
"Type": {
"type": "string"
"redis": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RedisConfig"
}
},
"required": [
"Name",
"Type"
],
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.BufferServiceConfig": {
"io.numaproj.numaflow.v1alpha1.CombinedEdge": {
"description": "CombinedEdge is a combination of Edge and some other properties such as vertex type, partitions, limits. It's used to decorate the fromEdges and toEdges of the generated Vertex objects, so that in the vertex pod, it knows the properties of the connected vertices, for example, how many partitioned buffers I should write to, what is the write buffer length, etc.",
"properties": {
"jetstream": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.JetStreamConfig"
"conditions": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ForwardConditions",
"description": "Conditional forwarding, only allowed when \"From\" is a Sink or UDF."
},
"redis": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RedisConfig"
"from": {
"type": "string"
},
"fromVertexLimits": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.VertexLimits"
},
"fromVertexPartitions": {
"description": "The number of partitions of the from vertex, if not provided, the default value is set to \"1\".",
"format": "int32",
"type": "integer"
},
"fromVertexType": {
"description": "From vertex type.",
"type": "string"
},
"limits": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.DeprecatedEdgeLimits",
"description": "Deprecated, use vertex.spec.limits instead.\n\nLimits define the limitations such as buffer read batch size for the edge, will override pipeline level settings."
},
"onFull": {
"description": "OnFull specifies the behaviour for the write actions when the inter step buffer is full. There are currently two options, retryUntilSuccess and discardLatest. if not provided, the default value is set to \"retryUntilSuccess\"",
"type": "string"
},
"parallelism": {
"description": "Deprecated, use vertex.spec.partitions instead.\n\nParallelism is only effective when the \"to\" vertex is a reduce vertex, if it's not provided, the default value is set to \"1\". Parallelism is ignored when the \"to\" vertex is not a reduce vertex.",
"format": "int32",
"type": "integer"
},
"to": {
"type": "string"
},
"toVertexLimits": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.VertexLimits"
},
"toVertexPartitions": {
"description": "The number of partitions of the to vertex, if not provided, the default value is set to \"1\".",
"format": "int32",
"type": "integer"
},
"toVertexType": {
"description": "To vertex type.",
"type": "string"
}
},
"required": [
"from",
"to",
"fromVertexType",
"toVertexType"
],
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.Container": {
Expand Down Expand Up @@ -17651,6 +17700,21 @@
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.DeprecatedEdgeLimits": {
"properties": {
"bufferMaxLength": {
"description": "BufferMaxLength is used to define the max length of a buffer. It overrides the settings from pipeline limits.",
"format": "int64",
"type": "integer"
},
"bufferUsageLimit": {
"description": "BufferUsageLimit is used to define the percentage of the buffer usage limit, a valid value should be less than 100, for example, 85. It overrides the settings from pipeline limits.",
"format": "int64",
"type": "integer"
}
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.Edge": {
"properties": {
"conditions": {
Expand All @@ -17661,15 +17725,15 @@
"type": "string"
},
"limits": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.EdgeLimits",
"description": "Limits define the limitations such as buffer read batch size for the edge, will override pipeline level settings."
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.DeprecatedEdgeLimits",
"description": "Deprecated, use vertex.spec.limits instead.\n\nLimits define the limitations such as buffer read batch size for the edge, will override pipeline level settings."
},
"onFull": {
"description": "OnFull specifies the behaviour for the write actions when the inter step buffer is full. There are currently two options, retryUntilSuccess and discardLatest. if not provided, the default value is set to \"retryUntilSuccess\"",
"type": "string"
},
"parallelism": {
"description": "Parallelism is only effective when the \"to\" vertex is a reduce vertex, if it's not provided, the default value is set to \"1\". Parallelism is ignored when the \"to\" vertex is not a reduce vertex.",
"description": "Deprecated, use vertex.spec.partitions instead.\n\nParallelism is only effective when the \"to\" vertex is a reduce vertex, if it's not provided, the default value is set to \"1\". Parallelism is ignored when the \"to\" vertex is not a reduce vertex.",
"format": "int32",
"type": "integer"
},
Expand All @@ -17683,21 +17747,6 @@
],
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.EdgeLimits": {
"properties": {
"bufferMaxLength": {
"description": "BufferMaxLength is used to define the max length of a buffer. It overrides the settings from pipeline limits.",
"format": "int64",
"type": "integer"
},
"bufferUsageLimit": {
"description": "BufferUsageLimit is used to define the percentage of the buffer usage limit, a valid value should be less than 100, for example, 85. It overrides the settings from pipeline limits.",
"format": "int64",
"type": "integer"
}
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.FixedWindow": {
"description": "FixedWindow describes a fixed window",
"properties": {
Expand Down Expand Up @@ -18315,7 +18364,7 @@
"auth": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.NatsAuth"
},
"bufferConfig": {
"streamConfig": {
"type": "string"
},
"tlsEnabled": {
Expand Down Expand Up @@ -19278,6 +19327,16 @@
},
"io.numaproj.numaflow.v1alpha1.VertexLimits": {
"properties": {
"bufferMaxLength": {
"description": "BufferMaxLength is used to define the max length of a buffer. It overrides the settings from pipeline limits.",
"format": "int64",
"type": "integer"
},
"bufferUsageLimit": {
"description": "BufferUsageLimit is used to define the percentage of the buffer usage limit, a valid value should be less than 100, for example, 85. It overrides the settings from pipeline limits.",
"format": "int64",
"type": "integer"
},
"readBatchSize": {
"description": "Read batch size from the source or buffer. It overrides the settings from pipeline limits.",
"format": "int64",
Expand Down Expand Up @@ -19338,7 +19397,7 @@
},
"fromEdges": {
"items": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Edge"
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.CombinedEdge"
},
"type": "array"
},
Expand Down Expand Up @@ -19382,6 +19441,11 @@
"description": "NodeSelector is a selector which must be true for the pod to fit on a node. Selector which must match a node's labels for the pod to be scheduled on that node. More info: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/",
"type": "object"
},
"partitions": {
"description": "Number of partitions of the vertex owned buffers. It applies to udf and sink vertices only.",
"format": "int32",
"type": "integer"
},
"pipelineName": {
"type": "string"
},
Expand Down Expand Up @@ -19429,7 +19493,7 @@
},
"toEdges": {
"items": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Edge"
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.CombinedEdge"
},
"type": "array"
},
Expand Down
Loading

0 comments on commit af8e334

Please sign in to comment.