Skip to content
This repository has been archived by the owner on Jan 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request apache#17187: [BEAM-14181] Make sure to evict conn…
Browse files Browse the repository at this point in the history
…ections from cache after closing them

(cherry picked from commit e7d7525)
  • Loading branch information
reuvenlax authored and youngoli committed Mar 29, 2022
1 parent 24982cf commit 8063330
Showing 1 changed file with 27 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ public DestinationState(
this.useDefaultStream = useDefaultStream;
}

void teardown() {
if (streamAppendClient != null) {
runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::unpin);
}
}

String getDefaultStreamName() {
return BigQueryHelpers.stripPartitionDecorator(tableUrn) + "/streams/_default";
}
Expand All @@ -188,12 +194,15 @@ StreamAppendClient getWriteStream() {
} else {
this.streamName = getDefaultStreamName();
}
this.streamAppendClient =
APPEND_CLIENTS.get(
streamName,
() ->
datasetService.getStreamAppendClient(
streamName, messageConverter.getSchemaDescriptor()));
synchronized (APPEND_CLIENTS) {
this.streamAppendClient =
APPEND_CLIENTS.get(
streamName,
() ->
datasetService.getStreamAppendClient(
streamName, messageConverter.getSchemaDescriptor()));
this.streamAppendClient.pin();
}
this.currentOffset = 0;
}
return streamAppendClient;
Expand All @@ -203,11 +212,13 @@ StreamAppendClient getWriteStream() {
}

void invalidateWriteStream() {
try {
runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::close);
if (streamAppendClient != null) {
synchronized (APPEND_CLIENTS) {
// Unpin in a different thread, as it may execute a blocking close.
runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::unpin);
APPEND_CLIENTS.invalidate(streamName);
}
streamAppendClient = null;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -380,19 +391,22 @@ public void process(
@FinishBundle
public void finishBundle(FinishBundleContext context) throws Exception {
flushAll();
if (!useDefaultStream) {
for (DestinationState state : destinations.values()) {
for (DestinationState state : destinations.values()) {
if (!useDefaultStream) {
context.output(
KV.of(state.tableUrn, state.streamName),
BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)),
GlobalWindow.INSTANCE);
}
state.teardown();
}
destinations.clear();
destinations = null;
}

@Teardown
public void teardown() {
destinations.clear();
destinations = null;
try {
if (datasetService != null) {
datasetService.close();
Expand Down

0 comments on commit 8063330

Please sign in to comment.