Skip to content

Commit

Permalink
Adds support for KSqlDB session variables (#544)
Browse files Browse the repository at this point in the history
  • Loading branch information
nachomdo committed Oct 10, 2022
1 parent b1e7741 commit da33be1
Show file tree
Hide file tree
Showing 14 changed files with 113 additions and 27 deletions.
7 changes: 4 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@
<!-- dependencies -->
<jackson.version>2.13.4</jackson.version>
<log4j.version>2.17.1</log4j.version>
<zookeeper.version>3.5.7</zookeeper.version>
<zookeeper.version>3.8.0</zookeeper.version>
<commons.version>1.4</commons.version>
<mockito.version>3.6.0</mockito.version>
<junit.version>4.13.1</junit.version>
Expand All @@ -544,7 +544,8 @@
<jinjava.version>2.5.4</jinjava.version>
<aws.java.sdk.version>2.16.31</aws.java.sdk.version>
<gcp.java.sdk.version>19.2.1</gcp.java.sdk.version>
<ksqldb.version>0.17.0</ksqldb.version>
<ksqldb.version>0.27.1</ksqldb.version>
<ksqldb.client.version>7.0.0</ksqldb.client.version>
<typesafe.version>1.4.0</typesafe.version>
<lombok.version>1.18.22</lombok.version>
</properties>
Expand Down Expand Up @@ -626,7 +627,7 @@
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>${confluent.version}</version>
<version>${ksqldb.client.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/purbon/kafka/topology/ArtefactManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.purbon.kafka.topology.exceptions.RemoteValidationException;
import com.purbon.kafka.topology.model.Artefact;
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.model.artefact.KsqlVarsArtefact;
import com.purbon.kafka.topology.model.artefact.TypeArtefact;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -36,6 +38,12 @@ public ArtefactManager(
this.topologyFileOrDir = topologyFileOrDir;
}

private boolean findKsqlVarsArtefact(Artefact artefact){
return Optional.ofNullable(artefact.getClass().getAnnotation(TypeArtefact.class))
.map(x -> x.name().equals("VARS"))
.orElse(false);
}

@Override
public void updatePlan(ExecutionPlan plan, Map<String, Topology> topologies) throws IOException {
Collection<? extends Artefact> currentArtefacts = loadActualClusterStateIfAvailable(plan);
Expand All @@ -44,17 +52,28 @@ public void updatePlan(ExecutionPlan plan, Map<String, Topology> topologies) thr

for (Topology topology : topologies.values()) {
Set<? extends Artefact> entryArtefacts = parseNewArtefacts(topology);

final var kSqlVarsArtefact =
((Optional<KsqlVarsArtefact>)
entryArtefacts.stream()
.filter(this::findKsqlVarsArtefact)
.findFirst())
.orElseGet(()->new KsqlVarsArtefact(Collections.emptyMap()));
entryArtefacts.removeIf(this::findKsqlVarsArtefact);

for (Artefact artefact : entryArtefacts) {
Optional<? extends Artefact> existingArtefactOpt =
currentArtefacts.stream().filter(ea -> ea.equals(artefact)).findAny();
if (existingArtefactOpt.isEmpty()) {
ArtefactClient client = selectClient(artefact);

if (client == null) {
throw new IOException(
"The Artefact "
+ artefact.getName()
+ " require a non configured client, please check our configuration");
}
client.addSessionVars(kSqlVarsArtefact.getSessionVars());
plan.add(new CreateArtefactAction(client, rootPath(), currentArtefacts, artefact));
} else {
Artefact existingArtefact = existingArtefactOpt.get();
Expand Down
14 changes: 4 additions & 10 deletions src/main/java/com/purbon/kafka/topology/KSqlArtefactManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,7 @@
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -140,7 +132,9 @@ Set<Artefact> parseNewArtefacts(Topology topology) {
(Function<Project, Stream<Artefact>>)
project -> {
KsqlArtefacts kSql = project.getKsqlArtefacts();
return Stream.concat(kSql.getStreams().stream(), kSql.getTables().stream());
return Stream.concat(
Stream.concat(kSql.getStreams().stream(), kSql.getTables().stream()),
Collections.singletonList(kSql.getVars()).stream());
})
.sorted()
.collect(Collectors.toCollection(LinkedHashSet::new));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public String getServer() {
return server.toString();
}

@Override
public void addSessionVars(Map<String, String> vars) {
vars.forEach(this.client::define);
}

@Override
public Map<String, Object> add(String sql) throws IOException {
try {
Expand All @@ -72,6 +77,7 @@ public Map<String, Object> add(String sql) throws IOException {
+ "- "
+ sql.substring(0, 40));
}

var result = client.executeStatement(sql).get();
return new QueryResponse(result).asMap();
} catch (InterruptedException | ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ public interface ArtefactClient {

Map<String, Object> add(String content) throws IOException;

default void addSessionVars(Map<String, String> sessionVars) {
// empty body
}

default Map<String, Object> add(String name, String config) throws IOException {
throw new IOException("Not implemented");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
package com.purbon.kafka.topology.model.artefact;

import com.purbon.kafka.topology.model.Artefacts;
import java.util.ArrayList;
import java.util.List;

import java.util.*;

public class KsqlArtefacts implements Artefacts {

private List<KsqlStreamArtefact> streams;
private List<KsqlTableArtefact> tables;
final private KsqlVarsArtefact vars;

public KsqlArtefacts() {
this(new ArrayList<>(), new ArrayList<>());
this(new ArrayList<>(), new ArrayList<>(), new KsqlVarsArtefact(Collections.emptyMap()));
}

public KsqlArtefacts(List<KsqlStreamArtefact> streams, List<KsqlTableArtefact> tables) {
public KsqlArtefacts(
List<KsqlStreamArtefact> streams, List<KsqlTableArtefact> tables, KsqlVarsArtefact vars) {
this.streams = streams;
this.tables = tables;
this.vars = vars;
}

public KsqlVarsArtefact getVars() {
return this.vars;
}

public List<KsqlStreamArtefact> getStreams() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.purbon.kafka.topology.model.artefact;


import java.util.Map;

@TypeArtefact(name = "VARS")
public class KsqlVarsArtefact extends KsqlArtefact {
private Map<String, String> sessionVars;
private static final String KSQLDB_VARS_NAME = "SESSION_VARS";

public KsqlVarsArtefact(Map<String, String> sessionVars) {
super("", "", KSQLDB_VARS_NAME);
this.sessionVars = sessionVars;
}

public Map<String, String> getSessionVars() {
return this.sessionVars;
}

public void setSessionVars(Map<String, String> sessionVars) {
this.sessionVars = sessionVars;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@
import com.purbon.kafka.topology.model.*;
import com.purbon.kafka.topology.model.Impl.ProjectImpl;
import com.purbon.kafka.topology.model.Impl.TopologyImpl;
import com.purbon.kafka.topology.model.artefact.KConnectArtefacts;
import com.purbon.kafka.topology.model.artefact.KafkaConnectArtefact;
import com.purbon.kafka.topology.model.artefact.KsqlArtefacts;
import com.purbon.kafka.topology.model.artefact.KsqlStreamArtefact;
import com.purbon.kafka.topology.model.artefact.KsqlTableArtefact;
import com.purbon.kafka.topology.model.artefact.*;
import com.purbon.kafka.topology.model.users.Connector;
import com.purbon.kafka.topology.model.users.Consumer;
import com.purbon.kafka.topology.model.users.KSqlApp;
Expand Down Expand Up @@ -73,6 +69,7 @@ public class TopologyCustomDeserializer extends StdDeserializer<Topology> {
private static final String ARTIFACTS = "artifacts";
private static final String STREAMS_NODE = "streams";
private static final String TABLES_NODE = "tables";
private static final String VARS_NODE = "vars";

private static final String SPECIAL_TOPICS_NODE = "special_topics";

Expand Down Expand Up @@ -380,6 +377,8 @@ private Optional<PlatformSystem> doKSqlElements(JsonParser parser, JsonNode node
new JsonSerdesUtils<KSqlApp>().parseApplicationUser(parser, acNode, KSqlApp.class);
List<KsqlStreamArtefact> streamArtefacts = new ArrayList<>();
List<KsqlTableArtefact> tableArtefacts = new ArrayList<>();
KsqlVarsArtefact varsArtefacts = new KsqlVarsArtefact(Collections.emptyMap());

if (node.has(ARTEFACTS) || node.has(ARTIFACTS)) {
String key = node.has(ARTEFACTS) ? ARTEFACTS : ARTIFACTS;
JsonNode artefactsNode = node.get(key);
Expand All @@ -396,10 +395,17 @@ private Optional<PlatformSystem> doKSqlElements(JsonParser parser, JsonNode node
.parseApplicationUser(
parser, artefactsNode.get(TABLES_NODE), KsqlTableArtefact.class);
}

if (artefactsNode.has(VARS_NODE)) {
artefactsNode.get(VARS_NODE);
varsArtefacts.setSessionVars(
parser.getCodec().treeToValue(artefactsNode.get(VARS_NODE), Map.class));
}
}

return Optional.of(
new PlatformSystem(ksqls, new KsqlArtefacts(streamArtefacts, tableArtefacts)));
new PlatformSystem(
ksqls, new KsqlArtefacts(streamArtefacts, tableArtefacts, varsArtefacts)));
}

private Optional<PlatformSystem> doStreamsElements(JsonParser parser, JsonNode node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.purbon.kafka.topology.model.Artefact;
import com.purbon.kafka.topology.model.PlanMap;
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.model.artefact.TypeArtefact;
import com.purbon.kafka.topology.serdes.TopologySerdes;
import com.purbon.kafka.topology.utils.TestUtils;
import java.util.Collections;
Expand Down Expand Up @@ -59,6 +60,10 @@ public void testArtefactGenerationOrder() {
KSqlArtefactManager m = new KSqlArtefactManager(client, config, topologyFilePath);

var artefacts = m.parseNewArtefacts(topology);
assertThat(
artefacts.removeIf(
x -> x.getClass().getAnnotation(TypeArtefact.class).name().equals("VARS")))
.isTrue();

assertThat(artefacts).hasSize(4);
var ksqlArtefacts = topology.getProjects().get(0).getKsqlArtefacts();
Expand All @@ -83,6 +88,10 @@ public void testArtefactsForDeletionOrder() {
KSqlArtefactManager m = new KSqlArtefactManager(client, config, topologyFilePath);

var artefacts = m.parseNewArtefacts(topology);
assertThat(
artefacts.removeIf(
x -> x.getClass().getAnnotation(TypeArtefact.class).name().equals("VARS")))
.isTrue();

System.out.println(artefacts);

Expand Down
11 changes: 11 additions & 0 deletions src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,17 @@ public void testKsqlSerdes() {
assertThat(tableArtefact.getName()).isEqualTo("users");
}

@Test
public void testKsqlWithVarsSerdes() {
Topology topology = parser.deserialise(TestUtils.getResourceFile("/descriptor.yaml"));
Project project = topology.getProjects().get(0);

KsqlArtefacts artefacts = project.getKsqlArtefacts();
assertThat(artefacts.getVars().getSessionVars()).hasSize(2);
String firstVar = artefacts.getVars().getSessionVars().get("foo");
assertThat(firstVar).isEqualTo("bar");
}

@Test
public void testPlatformProcessing() {
Topology topology = parser.deserialise(TestUtils.getResourceFile("/descriptor.yaml"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.purbon.kafka.topology.integration.containerutils.KsqlContainer;
import com.purbon.kafka.topology.integration.containerutils.SaslPlaintextKafkaContainer;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -41,12 +42,12 @@ public void testStreamTableCreateAndDelete() throws IOException {
new KsqlApiClient(KsqlClientConfig.builder().setServer(ksqlContainer.getUrl()).build());

String streamName = "riderLocations";

client.addSessionVars(Collections.singletonMap("partitions", "1"));
String sql =
"CREATE STREAM "
+ streamName
+ " (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)\n"
+ " WITH (kafka_topic='locations', value_format='json', partitions=1);";
+ " WITH (kafka_topic='locations', value_format='json', partitions=${partitions});";

client.add(sql);

Expand All @@ -69,7 +70,7 @@ public void testStreamTableCreateAndDelete() throws IOException {
+ " region_id VARCHAR\n"
+ " ) WITH (\n"
+ " KAFKA_TOPIC = 'my-users-topic', \n"
+ " KEY_FORMAT='KAFKA', PARTITIONS=2, REPLICAS=1,"
+ " KEY_FORMAT='KAFKA', PARTITIONS=${partitions}, REPLICAS=1,"
+ " VALUE_FORMAT = 'JSON'\n"
+ " );";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
public class KsqlContainer extends GenericContainer<KsqlContainer> {

private static final DockerImageName DEFAULT_IMAGE =
DockerImageName.parse("confluentinc/ksqldb-server").withTag("0.17.0");
DockerImageName.parse("confluentinc/ksqldb-server").withTag("0.21.0");

public static final int KSQL_PORT = 8088;

Expand Down
2 changes: 2 additions & 0 deletions src/test/resources/descriptor-ksql-multiple.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ projects:
- name: "foo"
ksql:
artefacts:
vars:
foo: "bar"
streams:
- path: "ksql-streams/riderlocations.sql"
name: "riderLocations"
Expand Down
3 changes: 3 additions & 0 deletions src/test/resources/descriptor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ projects:
idempotence: "true"
ksql:
artefacts:
vars:
foo: "bar"
lorem: "ipsum"
streams:
- path: "ksql-streams/riderlocations.sql"
name: "riderLocations"
Expand Down

0 comments on commit da33be1

Please sign in to comment.