diff --git a/driver-pravega/build.gradle b/driver-pravega/build.gradle index 3924d2a7b..ad2ed5447 100644 --- a/driver-pravega/build.gradle +++ b/driver-pravega/build.gradle @@ -9,5 +9,5 @@ repositories { dependencies { api project(":sbk-api") - api "io.pravega:pravega-client:0.10.0" + api "io.pravega:pravega-client:0.12.0" } diff --git a/driver-pravega/src/main/java/io/sbk/Pravega/Pravega.java b/driver-pravega/src/main/java/io/sbk/Pravega/Pravega.java index e00a713ce..be9b364e2 100644 --- a/driver-pravega/src/main/java/io/sbk/Pravega/Pravega.java +++ b/driver-pravega/src/main/java/io/sbk/Pravega/Pravega.java @@ -61,6 +61,7 @@ public void addArgs(final InputOptions params) throws IllegalArgumentException { params.addOption("segments", true, "Number of segments, default :" + config.segmentCount); params.addOption("recreate", true, "If the stream is already existing, delete and recreate the same, default :" + config.recreate); + params.addOption("connpool", true, "Enable Connection pooling, default :" + config.connPooling); } @Override @@ -74,6 +75,8 @@ public void parseArgs(final ParameterOptions params) throws IllegalArgumentExcep } else { config.recreate = params.getWritersCount() > 0 && params.getReadersCount() > 0; } + config.connPooling = Boolean.parseBoolean(params.getOptionValue("connpool", + Boolean.toString(config.connPooling))); if (config.recreate) { rdGrpName = config.streamName + System.currentTimeMillis(); @@ -127,7 +130,7 @@ public void closeStorage(final ParameterOptions params) throws IOException { @Override public DataWriter createWriter(final int id, final ParameterOptions params) { try { - return new PravegaWriter(id, params, config.streamName, factory); + return new PravegaWriter(id, params, config.streamName, factory, config.connPooling); } catch (IOException ex) { ex.printStackTrace(); return null; diff --git a/driver-pravega/src/main/java/io/sbk/Pravega/PravegaConfig.java b/driver-pravega/src/main/java/io/sbk/Pravega/PravegaConfig.java index 68ad003c4..ae742f0a5 100644 --- a/driver-pravega/src/main/java/io/sbk/Pravega/PravegaConfig.java +++ b/driver-pravega/src/main/java/io/sbk/Pravega/PravegaConfig.java @@ -16,4 +16,5 @@ public class PravegaConfig { public String streamName; public int segmentCount; public boolean recreate; + public boolean connPooling; } diff --git a/driver-pravega/src/main/java/io/sbk/Pravega/PravegaWriter.java b/driver-pravega/src/main/java/io/sbk/Pravega/PravegaWriter.java index 0a6d4d740..014dab51c 100644 --- a/driver-pravega/src/main/java/io/sbk/Pravega/PravegaWriter.java +++ b/driver-pravega/src/main/java/io/sbk/Pravega/PravegaWriter.java @@ -29,10 +29,13 @@ public class PravegaWriter implements Writer { final EventStreamWriter producer; - public PravegaWriter(int id, ParameterOptions params, String streamName, EventStreamClientFactory factory) throws IOException { + public PravegaWriter(int id, ParameterOptions params, String streamName, EventStreamClientFactory factory, + boolean connectionPooling) throws IOException { this.producer = factory.createEventWriter(streamName, new ByteArraySerializer(), - EventWriterConfig.builder().build()); + EventWriterConfig.builder() + .enableConnectionPooling(connectionPooling) + .build()); } /** diff --git a/driver-pravega/src/main/resources/pravega.properties b/driver-pravega/src/main/resources/pravega.properties index 158fc023c..674a3e9c7 100644 --- a/driver-pravega/src/main/resources/pravega.properties +++ b/driver-pravega/src/main/resources/pravega.properties @@ -20,4 +20,7 @@ segmentCount=1 # Create the topic # if the writers and readers are supplied then it is set to true -recreate=false \ No newline at end of file +recreate=false + +#Disable Connection Pooling by default +connPooling=false \ No newline at end of file