Skip to content

Commit 92e3341

Browse files
qgeffardfhussonnois
authored andcommitted
feat(filesystems): add support for Azure Blob Storage (#112)
This commit adds the new module filepulse-azure-blob-storage-fs that provides: - AzureBlobStorageAvroFileInputReader - AzureBlobStorageBytesArrayInputReader - AzureBlobStorageRowFileInputReader - AzureBlobStorageXMLFileInputReader - AzureBlobStorageFileSystemListing Delete ScELLexer.interp refacto
1 parent 9011c7f commit 92e3341

File tree

14 files changed

+677
-2
lines changed

14 files changed

+677
-2
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectMeta.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import java.util.Objects;
3030

3131
/**
32-
* An object regrouping metadata about an input ile manipulate by the connector.
32+
* An object regrouping metadata about an input file manipulate by the connector.
3333
*/
3434
public interface FileObjectMeta extends Serializable, Comparable<FileObjectMeta> {
3535

connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/aws/AmazonS3ClientConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ static ConfigDef getConf() {
134134
GROUP_AWS,
135135
awsGroupCounter++,
136136
ConfigDef.Width.NONE,
137-
AWS_S3_BUCKET_NAME_CONFIG
137+
AWS_S3_BUCKET_PREFIX_CONFIG
138138
)
139139

140140
.define(
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<parent>
8+
<groupId>io.streamthoughts</groupId>
9+
<artifactId>kafka-connect-filepulse-filesystems</artifactId>
10+
<version>2.0.0-SNAPSHOT</version>
11+
</parent>
12+
13+
<groupId>io.streamthoughts</groupId>
14+
<artifactId>kafka-filepulse-azure-blob-storage-fs</artifactId>
15+
<version>2.0.0-SNAPSHOT</version>
16+
<description>Kafka Connect FilePulse - FileSystem - Support for Azure Blob Storage</description>
17+
18+
<properties>
19+
<checkstyle.config.location>${project.parent.basedir}/..</checkstyle.config.location>
20+
<azure-storage-blob.version>12.6.0</azure-storage-blob.version>
21+
</properties>
22+
23+
<dependencies>
24+
<dependency>
25+
<groupId>com.azure</groupId>
26+
<artifactId>azure-storage-blob</artifactId>
27+
<version>${azure-storage-blob.version}</version>
28+
</dependency>
29+
<dependency>
30+
<groupId>io.streamthoughts</groupId>
31+
<artifactId>kafka-connect-filepulse-commons-fs</artifactId>
32+
<version>${project.version}</version>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.apache.kafka</groupId>
36+
<artifactId>kafka-clients</artifactId>
37+
</dependency>
38+
<dependency>
39+
<groupId>org.apache.kafka</groupId>
40+
<artifactId>connect-api</artifactId>
41+
</dependency>
42+
<dependency>
43+
<groupId>org.apache.avro</groupId>
44+
<artifactId>avro</artifactId>
45+
</dependency>
46+
</dependencies>
47+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2019-2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.storage.azure;
20+
21+
import org.apache.kafka.common.config.AbstractConfig;
22+
import org.apache.kafka.common.config.ConfigDef;
23+
import java.util.Map;
24+
25+
26+
public class AzureBlobStorageClientConfig extends AbstractConfig {
27+
28+
private static final String GROUP_AZURE = "AZURE";
29+
30+
public static final String AZURE_BLOB_STORAGE_CONNECTION_STRING_CONFIG = "azure.blob.storage.connection.string";
31+
private static final String AZURE_BLOB_STORAGE_CONNECTION_STRING_DOC = "Azure storage account connection string";
32+
33+
public static final String AZURE_BLOB_STORAGE_CONTAINER_NAME_CONFIG = "azure.blob.storage.container.name";
34+
private static final String AZURE_BLOB_STORAGE_CONTAINER_NAME_DOC = "The container name";
35+
36+
public final static String AZURE_BLOB_STORAGE_PREFIX_CONFIG = "azure.blob.storage.prefix";
37+
private final static String AZURE_BLOB_STORAGE_PREFIX_DOC = "The prefix to be used for restricting the listing of the blobs in the container";
38+
39+
public AzureBlobStorageClientConfig(final Map<String, ?> originals) {
40+
super(getConf(), originals, false);
41+
}
42+
43+
/**
44+
* @return the {@link ConfigDef}.
45+
*/
46+
static ConfigDef getConf() {
47+
int azureGroupCounter = 0;
48+
49+
return new ConfigDef()
50+
.define(
51+
AZURE_BLOB_STORAGE_CONNECTION_STRING_CONFIG,
52+
ConfigDef.Type.STRING,
53+
ConfigDef.Importance.HIGH,
54+
AZURE_BLOB_STORAGE_CONNECTION_STRING_DOC,
55+
GROUP_AZURE,
56+
azureGroupCounter++,
57+
ConfigDef.Width.NONE,
58+
AZURE_BLOB_STORAGE_CONNECTION_STRING_CONFIG
59+
)
60+
.define(
61+
AZURE_BLOB_STORAGE_CONTAINER_NAME_CONFIG,
62+
ConfigDef.Type.STRING,
63+
ConfigDef.Importance.HIGH,
64+
AZURE_BLOB_STORAGE_CONTAINER_NAME_DOC,
65+
GROUP_AZURE,
66+
azureGroupCounter++,
67+
ConfigDef.Width.NONE,
68+
AZURE_BLOB_STORAGE_CONTAINER_NAME_CONFIG
69+
)
70+
.define(
71+
AZURE_BLOB_STORAGE_PREFIX_CONFIG,
72+
ConfigDef.Type.STRING,
73+
ConfigDef.Importance.HIGH,
74+
AZURE_BLOB_STORAGE_PREFIX_DOC,
75+
GROUP_AZURE,
76+
azureGroupCounter++,
77+
ConfigDef.Width.NONE,
78+
AZURE_BLOB_STORAGE_PREFIX_CONFIG
79+
);
80+
}
81+
82+
public String getConnectionString() {
83+
return getString(AZURE_BLOB_STORAGE_CONNECTION_STRING_CONFIG);
84+
}
85+
86+
public String getContainerName() {
87+
return getString(AZURE_BLOB_STORAGE_CONTAINER_NAME_CONFIG);
88+
}
89+
90+
public String getAzureBlobStoragePrefix() {return getString(AZURE_BLOB_STORAGE_PREFIX_CONFIG);}
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2019-2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.storage.azure;
20+
21+
import com.azure.storage.blob.BlobContainerClient;
22+
import com.azure.storage.blob.BlobServiceClientBuilder;
23+
24+
/**
25+
* Utility class for creating new {@link BlobContainerClient} client.
26+
*/
27+
public class AzureBlobStorageClientUtils {
28+
29+
public static BlobContainerClient createBlobContainerClient(AzureBlobStorageClientConfig config){
30+
// Create a BlobServiceClient object which is used to create a container client
31+
return new BlobServiceClientBuilder()
32+
.connectionString(config.getConnectionString()).buildClient()
33+
.getBlobContainerClient(config.getContainerName());
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2019-2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.storage.azure.blob;
20+
21+
import com.azure.storage.blob.BlobClient;
22+
import com.azure.storage.blob.BlobClientBuilder;
23+
import com.azure.storage.blob.BlobContainerClient;
24+
import com.azure.storage.blob.models.BlobItem;
25+
import io.streamthoughts.kafka.connect.filepulse.fs.reader.Storage;
26+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
27+
import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
28+
29+
import java.io.InputStream;
30+
import java.net.URI;
31+
import java.util.Arrays;
32+
import java.util.HashMap;
33+
34+
public class AzureBlobStorage implements Storage {
35+
36+
private final BlobContainerClient containerClient;
37+
38+
public AzureBlobStorage(BlobContainerClient blobContainerClient) {
39+
this.containerClient = blobContainerClient;
40+
}
41+
42+
@Override
43+
public FileObjectMeta getObjectMetadata(URI uri) {
44+
BlobClient blobClient = getBlobClient(uri);
45+
46+
return new GenericFileObjectMeta.Builder()
47+
.withUri(uri)
48+
.withName(blobClient.getBlobName())
49+
.withContentLength(blobClient.getProperties().getBlobSize())
50+
.withLastModified(blobClient.getProperties().getLastModified().toInstant())
51+
.withContentDigest(
52+
new FileObjectMeta.ContentDigest(
53+
Arrays.toString(blobClient.getProperties().getContentMd5()), "MD5"))
54+
.withUserDefinedMetadata(new HashMap<>(blobClient.getProperties().getMetadata()))
55+
.build();
56+
}
57+
58+
public FileObjectMeta getObjectMetadata(BlobItem blobItem, URI uri) {
59+
return new GenericFileObjectMeta.Builder()
60+
.withUri(uri)
61+
.withName(blobItem.getName())
62+
.withContentLength(blobItem.getProperties().getContentLength())
63+
.withLastModified(blobItem.getProperties().getLastModified().toInstant())
64+
.withContentDigest(
65+
new FileObjectMeta.ContentDigest(
66+
Arrays.toString(blobItem.getProperties().getContentMd5()), "MD5"))
67+
.withUserDefinedMetadata(new HashMap<>(blobItem.getMetadata()))
68+
.build();
69+
}
70+
71+
@Override
72+
public boolean exists(URI uri) {
73+
return getBlobClient(uri).exists();
74+
}
75+
76+
@Override
77+
public InputStream getInputStream(URI uri) {
78+
return getBlobClient(uri).openInputStream();
79+
}
80+
81+
private BlobClient getBlobClient(URI uri) {
82+
return containerClient
83+
.getBlobClient(new BlobClientBuilder().endpoint(uri.toString()).buildClient().getBlobName());
84+
}
85+
86+
public BlobContainerClient getBlobContainerClient() {
87+
return containerClient;
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2019-2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.storage.azure.blob;
20+
21+
import com.azure.storage.blob.BlobContainerClient;
22+
import com.azure.storage.blob.models.BlobListDetails;
23+
import com.azure.storage.blob.models.ListBlobsOptions;
24+
import io.streamthoughts.kafka.connect.filepulse.fs.FileListFilter;
25+
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemListing;
26+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
27+
import io.streamthoughts.kafka.connect.filepulse.storage.azure.AzureBlobStorageClientConfig;
28+
import io.streamthoughts.kafka.connect.filepulse.storage.azure.AzureBlobStorageClientUtils;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.net.URI;
33+
import java.net.URISyntaxException;
34+
import java.util.Collection;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.stream.Collectors;
38+
39+
import static io.streamthoughts.kafka.connect.filepulse.internal.StringUtils.isNotBlank;
40+
41+
public class AzureBlobStorageFileSystemListing implements FileSystemListing {
42+
43+
private static final Logger LOG = LoggerFactory.getLogger(AzureBlobStorageFileSystemListing.class);
44+
45+
private FileListFilter filter;
46+
private AzureBlobStorage storage;
47+
private AzureBlobStorageClientConfig config;
48+
49+
@Override
50+
public void configure(final Map<String, ?> configs) {
51+
this.config = new AzureBlobStorageClientConfig(configs);
52+
storage = new AzureBlobStorage(
53+
AzureBlobStorageClientUtils.createBlobContainerClient(config));
54+
}
55+
56+
@Override
57+
public Collection<FileObjectMeta> listObjects() {
58+
BlobContainerClient blobContainerClient = storage.getBlobContainerClient();
59+
ListBlobsOptions options = new ListBlobsOptions();
60+
if (isNotBlank(config.getAzureBlobStoragePrefix()))
61+
options.setPrefix(config.getAzureBlobStoragePrefix());
62+
List<FileObjectMeta> fileObjectMetaList = blobContainerClient.listBlobs(options, null)
63+
.stream()
64+
.map(blobItem -> {
65+
// URI construction, as the azure sdk sucks...
66+
String blobUrl = blobContainerClient.getBlobClient(blobItem.getName()).getBlobUrl();
67+
URI uri = null;
68+
try {
69+
uri = new URI(blobUrl);
70+
} catch (URISyntaxException e) {
71+
LOG.error(
72+
"Failed to construct the blob URI from the given URL : '{}'. ",
73+
blobUrl,
74+
e
75+
);
76+
}
77+
return storage.getObjectMetadata(blobItem, uri);
78+
}).collect(Collectors.toList());
79+
return filter == null ? fileObjectMetaList : filter.filterFiles(fileObjectMetaList);
80+
}
81+
82+
@Override
83+
public void setFilter(FileListFilter filter) {
84+
this.filter = filter;
85+
}
86+
}

0 commit comments

Comments
 (0)