Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription "hints" #37

Merged
merged 24 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ab582e1
Add !mermaid command
ryannedolan Jun 12, 2023
eeec93c
Merge branch 'main' into mermaid
ryannedolan Jun 12, 2023
9d50cc6
Add read/write Resources and Kafka ACL controller
ryannedolan Jun 20, 2023
5c5ec40
Add insert-into syntax to yaml and mermaid commands
ryannedolan Jun 21, 2023
d23e12d
Merge branch 'main' into sink-api
ryannedolan Jun 22, 2023
2d78575
Tweak Acl CRD
ryannedolan Jun 22, 2023
16b2845
Fix ACL template
ryannedolan Jun 22, 2023
929a8a4
Merge branch 'main' into sink-api
ryannedolan Jun 24, 2023
99f06d9
Add hints to Subscription CRD
ryannedolan Jun 24, 2023
7aefab0
Merge branch 'main' into hints
ryannedolan Jun 24, 2023
f602a5f
Add Subscription hints and default values
ryannedolan Jun 25, 2023
9f793c2
Merge branch 'sink-api' into hints
ryannedolan Jun 25, 2023
bae289a
Bug fixes
ryannedolan Jun 25, 2023
450a543
Fix default numPartitions
ryannedolan Jun 26, 2023
e16ab32
Change sample Subscription's numPartitions
ryannedolan Jun 26, 2023
63049d5
Drop accidentally resurrected SqlJob model files
ryannedolan Jun 26, 2023
df6d48c
License select files under Apache 2
ryannedolan Jul 19, 2023
aa5abc8
Merge branch 'main' into sink-api
ryannedolan Jul 19, 2023
a5dd664
Whoops.
ryannedolan Jul 19, 2023
7a392e4
Merge branch 'sink-api' into hints
ryannedolan Jul 19, 2023
5853fc4
Merge branch 'main' into hints
ryannedolan Jul 21, 2023
ab682c1
Rename magic template variable "sql" to "pipeline.sql"
ryannedolan Jul 21, 2023
f98bf00
Limit hints to sink resources
ryannedolan Jul 25, 2023
f296005
Bug fixes
ryannedolan Jul 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/hoptimator
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/sh

kubectl exec -it hoptimator -c hoptimator -- ./hoptimator --isolation=TRANSACTION_NONE -n "" -p "" -u "jdbc:calcite:model=/etc/config/model.yaml" -nn hoptimator "$@"
kubectl exec -it hoptimator -c hoptimator -- ./hoptimator -n "" -p "" -u "jdbc:calcite:model=/etc/config/model.yaml" "$@"
3 changes: 2 additions & 1 deletion deploy/samples/subscriptions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ metadata:
spec:
sql: SELECT "quantity", "product_id" AS KEY FROM INVENTORY."products_on_hand"
database: RAWKAFKA

hints:
kafka.numPartitions: "2"
ryannedolan marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 5 additions & 0 deletions deploy/subscriptions.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ spec:
database:
description: The database in which to create the output/sink table.
type: string
hints:
description: Hints to adapters, which may disregard them.
ryannedolan marked this conversation as resolved.
Show resolved Hide resolved
type: object
additionalProperties:
type: string
required:
- sql
- database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,19 @@ public interface Environment {
Environment EMPTY = new SimpleEnvironment();
Environment PROCESS = new ProcessEnvironment();

String get(String key);
String getOrDefault(String key, String defaultValue);
}

/** Basic Environment implementation */
public static class SimpleEnvironment implements Environment {
private final Map<String, String> vars;

public SimpleEnvironment(Map<String, String> vars) {
this.vars = vars;
if (vars != null) {
this.vars = vars;
} else {
this.vars = new HashMap<>();
}
}

public SimpleEnvironment() {
Expand All @@ -182,31 +186,38 @@ public SimpleEnvironment with(String key, String value) {
}

@Override
public String get(String key) {
if (!vars.containsKey(key)) {
public String getOrDefault(String key, String defaultValue) {
if (defaultValue == null && !vars.containsKey(key)) {
ryannedolan marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException("No variable '" + key + "' found in the environment");
}
return vars.get(key);
return vars.getOrDefault(key, defaultValue);
}
}

/** Returns "{{key}}" for any key */
/** Returns "{{key}}" for any key without a default */
public static class DummyEnvironment implements Environment {
@Override
public String get(String key) {
return "{{" + key + "}}";
public String getOrDefault(String key, String defaultValue) {
if (defaultValue != null) {
return defaultValue;
} else {
return "{{" + key + "}}";
}
}
}

/** Provides access to the process's environment variables */
public static class ProcessEnvironment implements Environment {

@Override
public String get(String key) {
public String getOrDefault(String key, String defaultValue) {
String value = System.getenv(key);
if (value == null) {
value = System.getProperty(key);
}
if (value == null) {
value = defaultValue;
}
ryannedolan marked this conversation as resolved.
Show resolved Hide resolved
if (value == null) {
throw new IllegalArgumentException("Missing system property `" + key + "`");
}
Expand All @@ -222,11 +233,12 @@ public interface Template {
/**
* Replaces `{{var}}` in a template file with the corresponding variable.
*
* Resource-scoped variables take precedence over Environment-scoped variables.
* Resource-scoped variables take precedence over Environment-scoped
* variables. Default values can supplied with `{{var:default}}`.
*
* If `var` contains multiple lines, the behavior depends on context; specifically,
* whether the pattern appears within a list or comment (prefixed with `-` or `#`).
* For example, if the template includes:
* If `var` contains multiple lines, the behavior depends on context;
* specifically, whether the pattern appears within a list or comment
* (prefixed with `-` or `#`). For example, if the template includes:
*
* - {{var}}
*
Expand All @@ -235,8 +247,8 @@ public interface Template {
* - value line 1
* - value line 2
*
* To avoid this behavior (and just get a multiline string), use one of YAML's multiline
* markers, e.g.
* To avoid this behavior (and just get a multiline string), use one of
* YAML's multiline markers, e.g.
*
* - |
* {{var}}
Expand All @@ -255,17 +267,18 @@ public SimpleTemplate(Environment env, String template) {
@Override
public String render(Resource resource) {
StringBuffer sb = new StringBuffer();
Pattern p = Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*\\}\\}");
Pattern p = Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]+))?\\s*\\}\\}");
Matcher m = p.matcher(template);
while (m.find()) {
String prefix = m.group(1);
if (prefix == null) {
prefix = "";
}
String key = m.group(2);
String value = resource.getOrDefault(key, () -> env.get(key));
String defaultValue = m.group(4);
String value = resource.getOrDefault(key, () -> env.getOrDefault(key, defaultValue));
if (value == null) {
throw new IllegalArgumentException("No value for key " + key);
throw new IllegalArgumentException(template + " has no value for key " + key + ".");
}
String quotedPrefix = Matcher.quoteReplacement(prefix);
String quotedValue = Matcher.quoteReplacement(value);
Expand Down
3 changes: 2 additions & 1 deletion hoptimator-cli/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ java \
--add-opens java.base/java.util=ALL-UNNAMED \
--add-opens java.base/java.time=ALL-UNNAMED \
-classpath "/opt/plugins/*/lib/*:./hoptimator-cli-all.jar" \
-Dorg.slf4j.simpleLogger.defaultLogLevel=error \
ryannedolan marked this conversation as resolved.
Show resolved Hide resolved
$JAVA_OPTS \
com.linkedin.hoptimator.HoptimatorCliApp --verbose=true -nn hoptimator "$@"
com.linkedin.hoptimator.HoptimatorCliApp --verbose=true -nn hoptimator --isolation=TRANSACTION_NONE "$@"
18 changes: 9 additions & 9 deletions hoptimator-flink-adapter/src/main/resources/SqlJob.yaml.template
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: {{namespace}}
name: {{name}}-flink-job
name: {{pipeline.name}}-flink-job
namespace: {{pipeline.namespace}}
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
taskmanager.numberOfTaskSlots: "{{flink.taskmanager.numberOfTaskSlots:1}}"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: .1
memory: "{{flink.job.memory:2048m}}"
cpu: {{flink.job.cpu:.1}}
taskManager:
resource:
memory: "2048m"
cpu: .1
memory: "{{flink.task.memory:2048m}}"
cpu: {{flink.task.cpu:.1}}
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- {{sql}}
- {{pipeline.sql}}
jarURI: local:///opt/hoptimator-flink-runner.jar
parallelism: 1
parallelism: {{flink.parallelism:1}}
upgradeMode: stateless
state: running

Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@
import java.util.Map;

class KafkaTopic extends Resource {
public KafkaTopic(String name, Integer numPartitions,
Map<String, String> clientOverrides) {
public KafkaTopic(String name, Map<String, String> clientOverrides) {
super("KafkaTopic");
export("name", name);
export("numPartitions", Optional.ofNullable(numPartitions)
.map(x -> Integer.toString(x)).orElse("null"));
export("clientOverrides", clientOverrides);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> o
};
ConfigProvider topicConfigProvider = ConfigProvider.from(clientConfig);
TableResolver resolver = x -> rowType.rel();
Integer numPartitions = (Integer) operand.get("numPartitions");

ResourceProvider resources = ResourceProvider.empty()
.with(x -> new KafkaTopic(x, numPartitions, topicConfigProvider.config(x)))
.with(x -> new KafkaTopic(x, topicConfigProvider.config(x)))
.readWith(x -> new KafkaTopicAcl(x, principal, "Read"))
.writeWith(x -> new KafkaTopicAcl(x, principal, "Write"));

Database database = new Database(name, tableLister, resolver, connectorConfigProvider,
resources);
return new DatabaseSchema(database);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: KafkaTopic
metadata:
name: {{name}}
namespace: {{namespace}}
name: {{pipeline.name}}-kafka-topic-{{id}}
namespace: {{pipeline.namespace}}
spec:
topicName: {{name}}
numPartitions: {{numPartitions}}
numPartitions: {{kafka.numPartitions:null}}
clientOverrides:
{{clientOverrides}}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Acl
metadata:
name: {{name}}-acl-{{id}}
namespace: {{namespace}}
namespace: {{pipeline.namespace}}
spec:
resource:
kind: KafkaTopic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Access control rule (colloquially, an Acl)
*/
@ApiModel(description = "Access control rule (colloquially, an Acl)")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* AclList is a list of Acl
*/
@ApiModel(description = "AclList is a list of Acl")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* A set of related ACL rules.
*/
@ApiModel(description = "A set of related ACL rules.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1AclSpec {
/**
* The resource access method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* The resource being controlled.
*/
@ApiModel(description = "The resource being controlled.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1AclSpecResource {
public static final String SERIALIZED_NAME_KIND = "kind";
@SerializedName(SERIALIZED_NAME_KIND)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Status, as set by the operator.
*/
@ApiModel(description = "Status, as set by the operator.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1AclStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Kafka Topic
*/
@ApiModel(description = "Kafka Topic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* KafkaTopicList is a list of KafkaTopic
*/
@ApiModel(description = "KafkaTopicList is a list of KafkaTopic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* Desired Kafka topic configuration.
*/
@ApiModel(description = "Desired Kafka topic configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpec {
public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs";
@SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* V1alpha1KafkaTopicSpecClientConfigs
*/
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecClientConfigs {
public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef";
@SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Reference to a ConfigMap to use for AdminClient configuration.
*/
@ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecConfigMapRef {
public static final String SERIALIZED_NAME_NAME = "name";
@SerializedName(SERIALIZED_NAME_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Current state of the topic.
*/
@ApiModel(description = "Current state of the topic.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1KafkaTopicStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Hoptimator Subscription
*/
@ApiModel(description = "Hoptimator Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* SubscriptionList is a list of Subscription
*/
@ApiModel(description = "SubscriptionList is a list of Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down