Skip to content

Commit

Permalink
Merge pull request #419 from kmgowda/kmg-pravega-1
Browse files Browse the repository at this point in the history
Update pravega version and enable connection pooling

Signed-off-by: Keshava Munegowda <keshava.gowda@gmail.com>
  • Loading branch information
kmgowda committed Aug 30, 2023
2 parents 5e1df02 + c93d012 commit 9de9f64
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 5 deletions.
2 changes: 1 addition & 1 deletion driver-pravega/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ repositories {

dependencies {
api project(":sbk-api")
api "io.pravega:pravega-client:0.10.0"
api "io.pravega:pravega-client:0.12.0"
}
5 changes: 4 additions & 1 deletion driver-pravega/src/main/java/io/sbk/Pravega/Pravega.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void addArgs(final InputOptions params) throws IllegalArgumentException {
params.addOption("segments", true, "Number of segments, default :" + config.segmentCount);
params.addOption("recreate", true,
"If the stream is already existing, delete and recreate the same, default :" + config.recreate);
params.addOption("connpool", true, "Enable Connection pooling, default :" + config.connPooling);
}

@Override
Expand All @@ -74,6 +75,8 @@ public void parseArgs(final ParameterOptions params) throws IllegalArgumentExcep
} else {
config.recreate = params.getWritersCount() > 0 && params.getReadersCount() > 0;
}
config.connPooling = Boolean.parseBoolean(params.getOptionValue("connpool",
Boolean.toString(config.connPooling)));

if (config.recreate) {
rdGrpName = config.streamName + System.currentTimeMillis();
Expand Down Expand Up @@ -127,7 +130,7 @@ public void closeStorage(final ParameterOptions params) throws IOException {
@Override
public DataWriter<byte[]> createWriter(final int id, final ParameterOptions params) {
try {
return new PravegaWriter(id, params, config.streamName, factory);
return new PravegaWriter(id, params, config.streamName, factory, config.connPooling);
} catch (IOException ex) {
ex.printStackTrace();
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ public class PravegaConfig {
public String streamName;
public int segmentCount;
public boolean recreate;
public boolean connPooling;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
public class PravegaWriter implements Writer<byte[]> {
final EventStreamWriter<byte[]> producer;

public PravegaWriter(int id, ParameterOptions params, String streamName, EventStreamClientFactory factory) throws IOException {
public PravegaWriter(int id, ParameterOptions params, String streamName, EventStreamClientFactory factory,
boolean connectionPooling) throws IOException {
this.producer = factory.createEventWriter(streamName,
new ByteArraySerializer(),
EventWriterConfig.builder().build());
EventWriterConfig.builder()
.enableConnectionPooling(connectionPooling)
.build());
}

/**
Expand Down
5 changes: 4 additions & 1 deletion driver-pravega/src/main/resources/pravega.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ segmentCount=1

# Create the topic
# if the writers and readers are supplied then it is set to true
recreate=false
recreate=false

#Disable Connection Pooling by default
connPooling=false

0 comments on commit 9de9f64

Please sign in to comment.