Skip to content

Commit

Permalink
Merge pull request #48 from rovio/hdfs_storage_support
Browse files Browse the repository at this point in the history
Added support for Hdfs as deep storage
  • Loading branch information
vivek-balakrishnan-rovio committed Nov 17, 2023
2 parents fa823a4 + 2824aac commit 48dc6e4
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 3 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,9 @@ These are the options for `DruidSource`, to be passed with `write.options()`.
| `druid.exclude_dimensions` | Comma separated list of Spark input columns that have to be excluded in Druid ingestion | |
| `druid.segment.max_rows` | Max number of rows per segment | `5000000` |
| `druid.memory.max_rows` | Max number of rows to keep in memory in spark data writer | `75000` |
| `druid.segment_storage.type` | Type of Deep Storage to use. Allowed values: `s3`, `local`. | `s3` |
| `druid.segment_storage.type` | Type of Deep Storage to use. Allowed values: `s3`, `local`, `hdfs`. | `s3` |
| `druid.segment_storage.s3.disableacl` | Whether to disable ACL in S3 config. | `false` |
| `druid.segment_storage.hdfs.dir` | Hdfs segment storage location | `""` |
| `druid.datasource.init` | Boolean flag for (re-)initializing Druid datasource. If `true`, any pre-existing segments for the datasource is marked as unused. | `false` |
| `druid.bitmap_factory` | Compression format for bitmap indexes. Possible values: `concise`, `roaring`. For type `roaring`, the boolean property compressRunOnSerialization is always set to `true`. `rovio-ingest` uses `concise` by default regardless of Druid library version. | `concise` |
| `druid.segment.rollup` | Whether to rollup data during ingestion | `true` |
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@
<artifactId>druid-s3-extensions</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-hdfs-storage</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-datasketches</artifactId>
Expand Down
16 changes: 14 additions & 2 deletions src/main/java/com/rovio/ingest/WriterContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class WriterContext implements Serializable {
private final String dimensionsSpec;
private final String metricsSpec;
private final String transformSpec;
private String getHdfsStorageDir;

private WriterContext(CaseInsensitiveStringMap options, String version) {
this.dataSource = getOrThrow(options, ConfKeys.DATA_SOURCE);
Expand Down Expand Up @@ -95,9 +96,10 @@ private WriterContext(CaseInsensitiveStringMap options, String version) {
this.s3BaseKey = options.getOrDefault(ConfKeys.DEEP_STORAGE_S3_BASE_KEY, null);
this.s3DisableAcl = options.getBoolean(ConfKeys.DEEP_STORAGE_S3_DISABLE_ACL, false);
this.localDir = options.getOrDefault(ConfKeys.DEEP_STORAGE_LOCAL_DIRECTORY, null);
this.getHdfsStorageDir = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_DIRECTORY, null);

this.deepStorageType = options.getOrDefault(ConfKeys.DEEP_STORAGE_TYPE, DEFAULT_DRUID_DEEP_STORAGE_TYPE);
Preconditions.checkArgument(Arrays.asList("s3", "local").contains(this.deepStorageType),
Preconditions.checkArgument(Arrays.asList("s3", "local", "hdfs").contains(this.deepStorageType),
String.format("Invalid %s: %s", ConfKeys.DEEP_STORAGE_TYPE, this.deepStorageType));

this.initDataSource = options.getBoolean(ConfKeys.DATASOURCE_INIT, false);
Expand Down Expand Up @@ -206,6 +208,10 @@ public boolean isLocalDeepStorage() {
return "local".equals(deepStorageType);
}

public boolean isHdfsDeepStorage() {
return "hdfs".equals(deepStorageType);
}

public boolean isRollup() {
return rollup;
}
Expand All @@ -226,7 +232,11 @@ public String getTransformSpec() {
return transformSpec;
}

public static class ConfKeys {
public String getHdfsStorageDir() {
return getHdfsStorageDir;
}

public static class ConfKeys {
public static final String DATASOURCE_INIT = "druid.datasource.init";
// Segment config
public static final String DATA_SOURCE = "druid.datasource";
Expand Down Expand Up @@ -256,5 +266,7 @@ public static class ConfKeys {
public static final String DEEP_STORAGE_S3_DISABLE_ACL = "druid.segment_storage.s3.disableacl";
// Local config (only for testing)
public static final String DEEP_STORAGE_LOCAL_DIRECTORY = "druid.segment_storage.local.dir";
// HDFS config
public static final String DEEP_STORAGE_HDFS_DIRECTORY = "druid.segment_storage.hdfs.dir";
}
}
18 changes: 18 additions & 0 deletions src/main/java/com/rovio/ingest/util/SegmentStorageUpdater.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,29 @@
import org.apache.druid.segment.loading.LocalDataSegmentKiller;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.storage.hdfs.HdfsDataSegmentKiller;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPusher;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.S3DataSegmentKiller;
import org.apache.druid.storage.s3.S3DataSegmentPusher;
import org.apache.druid.storage.s3.S3DataSegmentPusherConfig;
import org.apache.druid.storage.s3.S3InputDataConfig;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.hadoop.conf.Configuration;

import java.io.File;

import static com.rovio.ingest.DataSegmentCommitMessage.MAPPER;

public class SegmentStorageUpdater {

public static DataSegmentPusher createPusher(WriterContext param) {
Preconditions.checkNotNull(param);
if (param.isLocalDeepStorage()) {
return new LocalDataSegmentPusher(getLocalConfig(param.getLocalDir()));
} else if (param.isHdfsDeepStorage()) {
return new HdfsDataSegmentPusher(getHdfsConfig(param.getHdfsStorageDir()), new Configuration(), MAPPER);
} else {
ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3 = getAmazonS3().get();
S3DataSegmentPusherConfig s3Config = new S3DataSegmentPusherConfig();
Expand All @@ -55,6 +63,8 @@ public static DataSegmentKiller createKiller(WriterContext param) {
Preconditions.checkNotNull(param);
if (param.isLocalDeepStorage()) {
return new LocalDataSegmentKiller(getLocalConfig(param.getLocalDir()));
} else if (param.isHdfsDeepStorage()) {
return new HdfsDataSegmentKiller(new Configuration(), getHdfsConfig(param.getHdfsStorageDir()));
} else {
Supplier<ServerSideEncryptingAmazonS3> serverSideEncryptingAmazonS3 = getAmazonS3();
S3DataSegmentPusherConfig s3Config = new S3DataSegmentPusherConfig();
Expand All @@ -81,4 +91,12 @@ private static LocalDataSegmentPusherConfig getLocalConfig(String localDir) {
return config;
}).get();
}

private static HdfsDataSegmentPusherConfig getHdfsConfig(String hdfsStorageDir) {
return Suppliers.memoize(() -> {
HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig();
hdfsSegmentPusherConfig.setStorageDirectory(hdfsStorageDir);
return hdfsSegmentPusherConfig;
}).get();
}
}

0 comments on commit 48dc6e4

Please sign in to comment.