Skip to content

Commit

Permalink
[Java] Add warning to gcpTempLocation when its bucket has soft delete…
Browse files Browse the repository at this point in the history
… enabled (apache#31358)

* Add warning to gcpTempLocation when its bucket has soft delete policy enabled.

* Fix an issue reported by spotbugs.
  • Loading branch information
shunping committed May 21, 2024
1 parent c5b6475 commit a998107
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory;
import org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory;
import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
Expand Down Expand Up @@ -390,9 +392,41 @@ class GcpTempLocationFactory implements DefaultValueFactory<String> {
e);
}
}

if (isSoftDeletePolicyEnabled(options, tempLocation)) {
LOG.warn(
String.format(
"The bucket of gcpTempLocation %s has soft delete policy enabled."
+ " Dataflow jobs use Cloud Storage to store temporary files during pipeline"
+ " execution. To avoid being billed for unnecessary storage costs, turn off the soft"
+ " delete feature on buckets that your Dataflow jobs use for temporary storage."
+ " For more information, see"
+ " https://cloud.google.com/storage/docs/use-soft-delete#remove-soft-delete-policy.",
tempLocation));
}

return tempLocation;
}

@VisibleForTesting
static boolean isSoftDeletePolicyEnabled(PipelineOptions options, String tempLocation) {
GcsOptions gcsOptions = options.as(GcsOptions.class);
GcsUtil gcsUtil = gcsOptions.getGcsUtil();
try {
SoftDeletePolicy policy =
Objects.requireNonNull(gcsUtil.getBucket(GcsPath.fromUri(tempLocation)))
.getSoftDeletePolicy();
if (policy != null && policy.getRetentionDurationSeconds() > 0) {
return true;
}
} catch (Exception e) {
LOG.warn(
String.format(
"Failed to access bucket for gcpTempLocation: %s.%nCaused by %s", tempLocation, e));
}
return false;
}

@VisibleForTesting
static ImmutableList<String> getDefaultBucketNameStubs(
PipelineOptions options, CloudResourceManager crmClient, String bucketNamePrefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.any;
Expand All @@ -35,6 +37,7 @@
import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects.Get;
import com.google.api.services.cloudresourcemanager.model.Project;
import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.Bucket.SoftDeletePolicy;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -244,6 +247,38 @@ public void testCreateBucket() throws Exception {
Long.valueOf(0L));
}

@Test
public void testTempLocationWithSoftDeletePolicy() throws IOException {
Bucket bucket = new Bucket();
bucket.setSoftDeletePolicy(new SoftDeletePolicy().setRetentionDurationSeconds(1L));
when(mockGcsUtil.getBucket(any(GcsPath.class))).thenReturn(bucket);

String tempLocation = "gs://bucket_with_soft_delete";
options.setTempLocation(tempLocation);
options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class);

GcpOptions gcpOptions = options.as(GcpOptions.class);
assertEquals(tempLocation, gcpOptions.getGcpTempLocation());

assertTrue(GcpTempLocationFactory.isSoftDeletePolicyEnabled(options, tempLocation));
}

@Test
public void testTempLocationWithoutSoftDeletePolicy() throws IOException {
Bucket bucket = new Bucket();
bucket.setSoftDeletePolicy(new SoftDeletePolicy().setRetentionDurationSeconds(0L));
when(mockGcsUtil.getBucket(any(GcsPath.class))).thenReturn(bucket);

String tempLocation = "gs://bucket_without_soft_delete";
options.setTempLocation(tempLocation);
options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class);

GcpOptions gcpOptions = options.as(GcpOptions.class);
assertEquals(tempLocation, gcpOptions.getGcpTempLocation());

assertFalse(GcpTempLocationFactory.isSoftDeletePolicyEnabled(options, tempLocation));
}

@Test
public void testCreateBucketProjectLookupFails() throws Exception {
doThrow(new IOException("badness")).when(mockGet).execute();
Expand Down

0 comments on commit a998107

Please sign in to comment.