Skip to content

Commit 52adca9

Browse files
committed
feat(filesystems): add support for Google Cloud Storage (#121)
This commit adds classes to read object files from a GCS bucket: - GcsStorage - GcsFileSystemListing - GcsAvroFileInputReader - GcsBytesArrayInputReader - GcsRowFileInputReader - GcsFileInputMetadataReader - GcsXMLFileInputReadr Resolves: #121
1 parent e7242f9 commit 52adca9

File tree

17 files changed

+1206
-6
lines changed

17 files changed

+1206
-6
lines changed

connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/NonBlockingBufferReader.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import java.util.List;
3434

3535
/**
36-
* A BufferedReader wrapper to read lines in non-blocking way.
36+
* A {@link BufferedReader} wrapper to read lines in non-blocking way.
3737
*/
3838
public class NonBlockingBufferReader implements AutoCloseable {
3939

@@ -134,12 +134,12 @@ public List<TextBlock> readLines(final int minRecords,
134134
// Instead we have to manage splitting lines ourselves, using simple backoff when no new value
135135
// is available.
136136
final List<TextBlock> records = new LinkedList<>();
137-
// Number of bytes read during last iteration.
138-
int nread = 0;
139137

140138
boolean maxNumRecordsNotReached = true;
141139

142-
while ( !(isEOF = !reader.ready() || nread == -1) &&
140+
// Number of bytes read during last iteration.
141+
int nread = 0;
142+
while ( !(isEOF = nread == -1) &&
143143
(records.isEmpty() || records.size() < minRecords)
144144
) {
145145
nread = reader.read(buffer, bufferOffset, buffer.length - bufferOffset);
@@ -211,8 +211,7 @@ public boolean remaining() {
211211

212212
public boolean hasNext() {
213213
try {
214-
boolean ready = reader.ready();
215-
if (ready && (!isEOF || stream.available() > 1)) return true;
214+
if (!isEOF || stream.available() > 1) return true;
216215
return remaining() && containsLine();
217216
} catch (IOException e) {
218217
LOG.error("Error while checking for remaining bytes to read: {}", e.getLocalizedMessage());
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<parent>
8+
<groupId>io.streamthoughts</groupId>
9+
<artifactId>kafka-connect-filepulse-filesystems</artifactId>
10+
<version>2.0.0-SNAPSHOT</version>
11+
</parent>
12+
13+
<artifactId>kafka-connect-filepulse-google-cloud-storage-fs</artifactId>
14+
15+
<description>Kafka Connect FilePulse - FileSystem - Support for Google Cloud Storage</description>
16+
17+
<properties>
18+
<checkstyle.config.location>${project.parent.basedir}/..</checkstyle.config.location>
19+
</properties>
20+
21+
<dependencyManagement>
22+
<dependencies>
23+
<dependency>
24+
<groupId>com.google.cloud</groupId>
25+
<artifactId>libraries-bom</artifactId>
26+
<version>19.2.1</version>
27+
<type>pom</type>
28+
<scope>import</scope>
29+
</dependency>
30+
</dependencies>
31+
</dependencyManagement>
32+
33+
<dependencies>
34+
<dependency>
35+
<groupId>com.google.cloud</groupId>
36+
<artifactId>google-cloud-storage</artifactId>
37+
</dependency>
38+
<dependency>
39+
<groupId>io.streamthoughts</groupId>
40+
<artifactId>kafka-connect-filepulse-commons-fs</artifactId>
41+
<version>${project.version}</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>org.apache.kafka</groupId>
45+
<artifactId>kafka-clients</artifactId>
46+
</dependency>
47+
<dependency>
48+
<groupId>org.apache.kafka</groupId>
49+
<artifactId>connect-api</artifactId>
50+
</dependency>
51+
<dependency>
52+
<groupId>org.apache.avro</groupId>
53+
<artifactId>avro</artifactId>
54+
</dependency>
55+
<dependency>
56+
<groupId>com.google.cloud</groupId>
57+
<artifactId>google-cloud-nio</artifactId>
58+
<version>0.122.11</version>
59+
<scope>test</scope>
60+
</dependency>
61+
</dependencies>
62+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.storage.gcp;
20+
21+
import org.apache.kafka.common.config.AbstractConfig;
22+
import org.apache.kafka.common.config.ConfigDef;
23+
import org.apache.kafka.common.config.ConfigException;
24+
import org.apache.kafka.common.config.types.Password;
25+
26+
import java.util.Map;
27+
28+
/**
29+
* The Google Cloud Storage client's configuration.
30+
*/
31+
public class GcsClientConfig extends AbstractConfig {
32+
33+
private static final String GROUP_GCS = "GCS";
34+
35+
public static final String GCS_CREDENTIALS_PATH_CONFIG = "gcs.credentials.path";
36+
37+
public static final String GCS_CREDENTIALS_JSON_CONFIG = "gcs.credentials.json";
38+
39+
private static final String GCS_CREDENTIALS_JSON_DOC = "The GCP credentials as JSON string. "
40+
+ "Cannot be set when \"" + GCS_CREDENTIALS_PATH_CONFIG + "\" is provided. "
41+
+ "If no credentials is specified the client library will look for credentials via "
42+
+ "the environment variable GOOGLE_APPLICATION_CREDENTIALS.";
43+
44+
private static final String GCS_CREDENTIALS_PATH_DOC = "The path to GCP credentials file. "
45+
+ "Cannot be set when \"" + GCS_CREDENTIALS_JSON_CONFIG + "\" is provided. "
46+
+ "If no credentials is specified the client library will look for credentials via "
47+
+ "the environment variable GOOGLE_APPLICATION_CREDENTIALS.";
48+
49+
public static final String GCS_BUCKET_NAME_CONFIG = "gcs.bucket.name";
50+
private static final String GCS_BUCKET_NAME_DOC = "The GCS bucket name to download the object files from.";
51+
52+
public static final String GCS_BLOBS_FILTER_PREFIX_CONFIG = "gcs.blobs.filter.prefix";
53+
public static final String GCS_BLOBS_FILTER_PREFIX_DOC = "The prefix to be used for filtering blobs "
54+
+ "whose names begin with it.";
55+
56+
/**
57+
* Creates a new {@link GcsClientConfig} instance.
58+
*
59+
* @param originals the original configuration map.
60+
*/
61+
public GcsClientConfig(final Map<?, ?> originals) {
62+
super(configDef(), originals, false);
63+
validate();
64+
}
65+
66+
private void validate() {
67+
final Password credentialsJson = getCredentialsJson();
68+
final String credentialsPath = getCredentialsPath();
69+
70+
if (credentialsPath != null && credentialsJson != null) {
71+
throw new ConfigException(String.format(
72+
"\"%s\" and \"%s\" are mutually exclusive options, but both are set.",
73+
GCS_CREDENTIALS_PATH_CONFIG,
74+
GCS_CREDENTIALS_JSON_CONFIG)
75+
);
76+
}
77+
}
78+
79+
public static ConfigDef configDef() {
80+
int gscGroupCounter = 0;
81+
82+
return new ConfigDef()
83+
.define(
84+
GCS_CREDENTIALS_PATH_CONFIG,
85+
ConfigDef.Type.STRING,
86+
null,
87+
ConfigDef.Importance.HIGH,
88+
GCS_CREDENTIALS_PATH_DOC,
89+
GROUP_GCS,
90+
gscGroupCounter++,
91+
ConfigDef.Width.NONE,
92+
GCS_CREDENTIALS_PATH_CONFIG
93+
)
94+
95+
.define(
96+
GCS_CREDENTIALS_JSON_CONFIG,
97+
ConfigDef.Type.PASSWORD,
98+
null,
99+
ConfigDef.Importance.HIGH,
100+
GCS_CREDENTIALS_JSON_DOC,
101+
GROUP_GCS,
102+
gscGroupCounter++,
103+
ConfigDef.Width.NONE,
104+
GCS_CREDENTIALS_JSON_CONFIG
105+
)
106+
107+
.define(
108+
GCS_BUCKET_NAME_CONFIG,
109+
ConfigDef.Type.STRING,
110+
ConfigDef.NO_DEFAULT_VALUE,
111+
new ConfigDef.NonEmptyString(),
112+
ConfigDef.Importance.HIGH,
113+
GCS_BUCKET_NAME_DOC,
114+
GROUP_GCS,
115+
gscGroupCounter++,
116+
ConfigDef.Width.NONE,
117+
GCS_BUCKET_NAME_CONFIG
118+
)
119+
120+
.define(
121+
GCS_BLOBS_FILTER_PREFIX_CONFIG,
122+
ConfigDef.Type.STRING,
123+
null,
124+
ConfigDef.Importance.MEDIUM,
125+
GCS_BLOBS_FILTER_PREFIX_DOC,
126+
GROUP_GCS,
127+
gscGroupCounter++,
128+
ConfigDef.Width.NONE,
129+
GCS_BLOBS_FILTER_PREFIX_CONFIG
130+
);
131+
}
132+
133+
public final String getBlobsPrefix() {
134+
return getString(GCS_BLOBS_FILTER_PREFIX_CONFIG);
135+
}
136+
137+
public final String getCredentialsPath() {
138+
return getString(GCS_CREDENTIALS_PATH_CONFIG);
139+
}
140+
141+
public final Password getCredentialsJson() {
142+
return getPassword(GCS_CREDENTIALS_JSON_CONFIG);
143+
}
144+
145+
public final String getBucketName() {
146+
return getString(GCS_BUCKET_NAME_CONFIG);
147+
}
148+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.storage.gcp;
20+
21+
import com.google.auth.Credentials;
22+
import com.google.auth.oauth2.GoogleCredentials;
23+
import com.google.cloud.storage.Storage;
24+
import com.google.cloud.storage.StorageOptions;
25+
import org.apache.kafka.common.config.ConfigException;
26+
import org.apache.kafka.common.config.types.Password;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.io.ByteArrayInputStream;
31+
import java.io.FileInputStream;
32+
import java.io.IOException;
33+
import java.io.InputStream;
34+
35+
/**
36+
* The {@code GcsAuthenticationUtils} can be used to build a new {@link Storage} instance from a
37+
* {@link GcsClientConfig} object.
38+
*/
39+
public class GcsClientUtils {
40+
41+
private static final Logger LOG = LoggerFactory.getLogger(GcsClientUtils.class);
42+
43+
/**
44+
* Helper method to creates a new {@link Storage} object.
45+
*
46+
* @param config The Google Cloud Storage configurations
47+
* @return a new {@link Storage}.
48+
*/
49+
public static Storage createStorageService(final GcsClientConfig config) {
50+
final String credentialsPath = config.getCredentialsPath();
51+
final Password credentialsJsonPwd = config.getCredentialsJson();
52+
try {
53+
String credentialsJson = null;
54+
if (credentialsJsonPwd != null) {
55+
credentialsJson = credentialsJsonPwd.value();
56+
}
57+
58+
if (credentialsPath != null && credentialsJson != null) {
59+
throw new IllegalArgumentException("Both credentialsPath and credentialsJson cannot be non-null.");
60+
}
61+
62+
if (credentialsPath != null) {
63+
LOG.info("Creating new Google Cloud Storage service using the "
64+
+ "the credentials file that was passed "
65+
+ "through the connector's configuration");
66+
return getStorageServiceForCredentials(
67+
getCredentialsFromPath(credentialsPath)
68+
);
69+
}
70+
71+
if (credentialsJson != null) {
72+
LOG.info("Creating new Google Cloud Storage service using the "
73+
+ "the credentials JSON that was passed "
74+
+ "through the connector's configuration");
75+
return getStorageServiceForCredentials(
76+
getCredentialsFromJson(credentialsJson)
77+
);
78+
}
79+
80+
LOG.info("No credentials were passed through the connector's configuration." +
81+
" Use default Google Cloud Storage service");
82+
} catch (final Exception e) {
83+
throw new ConfigException(
84+
"Failed to Google Cloud Storage service using the connector's configuration",
85+
e
86+
);
87+
}
88+
89+
return StorageOptions.getDefaultInstance().getService();
90+
}
91+
92+
private static Storage getStorageServiceForCredentials(final Credentials credentials) {
93+
return StorageOptions.newBuilder().setCredentials(credentials).build().getService();
94+
}
95+
96+
private static GoogleCredentials getCredentialsFromPath(final String credentialsPath) throws IOException {
97+
try (final InputStream stream = new FileInputStream(credentialsPath)) {
98+
return GoogleCredentials.fromStream(stream);
99+
} catch (final IOException e) {
100+
LOG.error("Failed to read credentials from JSON string", e);
101+
throw e;
102+
}
103+
}
104+
105+
private static GoogleCredentials getCredentialsFromJson(final String credentialsJson) throws IOException {
106+
try (final InputStream stream = new ByteArrayInputStream(credentialsJson.getBytes())) {
107+
return GoogleCredentials.fromStream(stream);
108+
} catch (final IOException e) {
109+
LOG.error("Failed to read credentials from JSON string", e);
110+
throw e;
111+
}
112+
}
113+
}

0 commit comments

Comments
 (0)