Skip to content

Commit

Permalink
SNOW-747848: Cleanup BG threads with exceptions (#479)
Browse files Browse the repository at this point in the history
- Improve some comments
- Cleanup BG threads with exceptions in CTOR, reported by customer
  • Loading branch information
sfc-gh-tzhang committed Jun 2, 2023
1 parent 9d1ab3d commit 8f78df6
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,8 @@ String getFilePath(Calendar calendar, String clientPrefix) {
int minute = calendar.get(Calendar.MINUTE);
long time = TimeUnit.MILLISECONDS.toSeconds(calendar.getTimeInMillis());
long threadId = Thread.currentThread().getId();
String fileName =
// Create the file short name, the clientPrefix contains the deployment id
String fileShortName =
Long.toString(time, 36)
+ "_"
+ clientPrefix
Expand All @@ -611,7 +612,7 @@ String getFilePath(Calendar calendar, String clientPrefix) {
+ this.counter.getAndIncrement()
+ "."
+ BLOB_EXTENSION_TYPE;
return year + "/" + month + "/" + day + "/" + hour + "/" + minute + "/" + fileName;
return year + "/" + month + "/" + day + "/" + hour + "/" + minute + "/" + fileShortName;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,9 @@ void collectRowSize(float rowSize) {
}

/**
* Get the latest committed offset token from Snowflake
* Get the latest committed offset token from Snowflake, an exception will be thrown if the
* channel becomes invalid due to errors and the channel needs to be reopened in order to return a
* valid offset token
*
* @return the latest committed offset token
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
// Name of the client
private final String name;

private String accountName;

// Snowflake role for the client to use
private String role;

Expand Down Expand Up @@ -162,7 +160,7 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
this.parameterProvider = new ParameterProvider(parameterOverrides, prop);

this.name = name;
this.accountName = accountURL == null ? null : accountURL.getAccount();
String accountName = accountURL == null ? null : accountURL.getAccount();
this.isTestMode = isTestMode;
this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient;
this.channelCache = new ChannelCache<>();
Expand Down Expand Up @@ -191,7 +189,13 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
this.setupMetricsForClient();
}

this.flushService = new FlushService<>(this, this.channelCache, this.isTestMode);
try {
this.flushService = new FlushService<>(this, this.channelCache, this.isTestMode);
} catch (Exception e) {
// Need to clean up the resources before throwing any exceptions
cleanUpResources();
throw e;
}

logger.logInfo(
"Client created, name={}, account={}. isTestMode={}, parameters={}",
Expand Down Expand Up @@ -617,14 +621,8 @@ public void close() throws Exception {
} catch (InterruptedException | ExecutionException e) {
throw new SFException(e, ErrorCode.RESOURCE_CLEANUP_FAILURE, "client close");
} finally {
if (this.telemetryWorker != null) {
this.telemetryWorker.shutdown();
}
this.flushService.shutdown();
if (this.requestBuilder != null) {
this.requestBuilder.closeResources();
}
HttpUtil.shutdownHttpConnectionManagerDaemonThread();
cleanUpResources();
}
}

Expand Down Expand Up @@ -890,4 +888,17 @@ private void reportStreamingIngestTelemetryToSF() {
telemetryService.reportCpuMemoryUsage(this.cpuHistogram);
}
}

/** Cleanup any resource during client closing or failures */
private void cleanUpResources() {
if (this.telemetryWorker != null) {
this.telemetryWorker.shutdown();
}
if (this.requestBuilder != null) {
this.requestBuilder.closeResources();
}
if (!this.isTestMode) {
HttpUtil.shutdownHttpConnectionManagerDaemonThread();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1213,4 +1213,26 @@ public void testVerifyChannelsAreFullyCommittedSuccess() throws Exception {

client.close();
}

@Test(expected = IllegalArgumentException.class)
public void testFlushServiceException() throws Exception {
CloseableHttpClient httpClient = Mockito.mock(CloseableHttpClient.class);
RequestBuilder requestBuilder =
Mockito.spy(
new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair()));

// Set IO_TIME_CPU_RATIO to MAX_VALUE in order to generate an exception
Map<String, Object> parameterMap = new HashMap<>();
parameterMap.put(ParameterProvider.IO_TIME_CPU_RATIO, Integer.MAX_VALUE);

SnowflakeStreamingIngestClientInternal<?> client =
new SnowflakeStreamingIngestClientInternal<>(
"client",
new SnowflakeURL("snowflake.dev.local:8082"),
null,
httpClient,
true,
requestBuilder,
parameterMap);
}
}

0 comments on commit 8f78df6

Please sign in to comment.