Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-949967 Add an optional offset token parameter for openChannel #645

Merged
merged 1 commit into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public enum OnErrorOption {
// Default timezone for TIMESTAMP_LTZ and TIMESTAMP_TZ columns
private final ZoneId defaultTimezone;

private final String offsetToken;
private final boolean isOffsetTokenProvided;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why isOffsetTokenProvided is required?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is to distinguish null as an offset token vs. it not being set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, i want to support setting the offsetToken as null as well.

In order to distinguish between default null string vs user provided null, I am using this extra flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, could you add a IT test for NULL?


public static OpenChannelRequestBuilder builder(String channelName) {
return new OpenChannelRequestBuilder(channelName);
}
Expand All @@ -53,6 +56,9 @@ public static class OpenChannelRequestBuilder {
private OnErrorOption onErrorOption;
private ZoneId defaultTimezone;

private String offsetToken;
private boolean isOffsetTokenProvided = false;

public OpenChannelRequestBuilder(String channelName) {
this.channelName = channelName;
this.defaultTimezone = DEFAULT_DEFAULT_TIMEZONE;
Expand Down Expand Up @@ -83,6 +89,12 @@ public OpenChannelRequestBuilder setDefaultTimezone(ZoneId defaultTimezone) {
return this;
}

public OpenChannelRequestBuilder setOffsetToken(String offsetToken){
this.offsetToken = offsetToken;
this.isOffsetTokenProvided = true;
return this;
}

public OpenChannelRequest build() {
return new OpenChannelRequest(this);
}
Expand All @@ -102,6 +114,8 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) {
this.tableName = builder.tableName;
this.onErrorOption = builder.onErrorOption;
this.defaultTimezone = builder.defaultTimezone;
this.offsetToken = builder.offsetToken;
this.isOffsetTokenProvided = builder.isOffsetTokenProvided;
}

public String getDBName() {
Expand Down Expand Up @@ -131,4 +145,12 @@ public String getFullyQualifiedTableName() {
public OnErrorOption getOnErrorOption() {
return this.onErrorOption;
}

public String getOffsetToken() {
return this.offsetToken;
}

public boolean isOffsetTokenProvided() {
return this.isOffsetTokenProvided;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
payload.put("schema", request.getSchemaName());
payload.put("write_mode", Constants.WriteMode.CLOUD_STORAGE.name());
payload.put("role", this.role);
if (request.isOffsetTokenProvided()){
payload.put("offset_token", request.getOffsetToken());
}

OpenChannelResponse response =
executeWithRetries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,25 @@ public void testOpenChannelRequestCreationSuccess() {

Assert.assertEquals(
"STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName());
Assert.assertFalse(request.isOffsetTokenProvided());
}


@Test
public void testOpenChannelRequesCreationtWithOffsetToken() {
OpenChannelRequest request =
OpenChannelRequest.builder("CHANNEL")
.setDBName("STREAMINGINGEST_TEST")
.setSchemaName("PUBLIC")
.setTableName("T_STREAMINGINGEST")
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
.setOffsetToken("TEST_TOKEN")
.build();

Assert.assertEquals(
"STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName());
Assert.assertEquals("TEST_TOKEN", request.getOffsetToken());
Assert.assertTrue(request.isOffsetTokenProvided());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,41 @@ public void testMultiColumnIngest() throws Exception {
Assert.fail("Row sequencer not updated before timeout");
}

@Test
public void testOpenChannelOffsetToken() throws Exception {
String tableName = "offsetTokenTest";
jdbcConnection
.createStatement()
.execute(
String.format(
"create or replace table %s (s text);",
tableName));
OpenChannelRequest request1 =
OpenChannelRequest.builder("TEST_CHANNEL")
.setDBName(testDb)
.setSchemaName(TEST_SCHEMA)
.setTableName(tableName)
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
.setOffsetToken("TEST_OFFSET")
.build();

// Open a streaming ingest channel from the given client
SnowflakeStreamingIngestChannel channel1 = client.openChannel(request1);

// Close the channel after insertion
channel1.close().get();

for (int i = 1; i < 15; i++) {
if (channel1.getLatestCommittedOffsetToken() != null
&& channel1.getLatestCommittedOffsetToken().equals("TEST_OFFSET")) {
return;
} else {
Thread.sleep(2000);
}
}
Assert.fail("Row sequencer not updated before timeout");
}

@Test
public void testNullableColumns() throws Exception {
String multiTableName = "multi_column";
Expand Down
Loading