Skip to content

Commit

Permalink
Refactor: Remove FsStorageProvider from Table Service (#104)
Browse files Browse the repository at this point in the history
Refactor: Remove FsStorageProvider from Table Service   (#104)
  • Loading branch information
HotSushi committed May 15, 2024
1 parent 81c0887 commit ca05f77
Show file tree
Hide file tree
Showing 23 changed files with 480 additions and 378 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,25 @@ public interface StorageClient<T> {
* @return the native client of the storage system
*/
T getNativeClient();

/**
* Get the endpoint of the storage system.
*
* <p>Example: For HDFS, the endpoint could be "hdfs://localhost:9000". For local file system, the
* endpoint could be "file://". For S3, the endpoint could be "s3://".
*
* @return the endpoint of the storage system
*/
String getEndpoint();

/**
* Get the root prefix for OpenHouse on the storage system.
*
* <p>Root prefix should include the bucket-name plus any additional path components. Example: For
* HDFS, the root prefix could be "/data/openhouse". For local file system, the root prefix could
* be "/tmp". For S3, the root prefix could be "/bucket-name/key/prefix/to/openhouse".
*
* @return the root prefix for OpenHouse on the storage system
*/
String getRootPrefix();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.springframework.stereotype.Component;

/**
Expand All @@ -18,6 +19,7 @@ public class StorageType {

@AllArgsConstructor
@EqualsAndHashCode
@ToString(includeFieldNames = false)
@Getter
public static class Type {
private String value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,14 @@ private void validateProperties() {
public FileSystem getNativeClient() {
return fs;
}

@Override
public String getEndpoint() {
return storageProperties.getTypes().get(HDFS_TYPE.getValue()).getEndpoint();
}

@Override
public String getRootPrefix() {
return storageProperties.getTypes().get(HDFS_TYPE.getValue()).getRootPath();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ public class LocalStorageClient implements StorageClient<FileSystem> {

private static final StorageType.Type LOCAL_TYPE = StorageType.LOCAL;

private static final String DEFAULT_ENDPOINT = "file://";
private static final String DEFAULT_ENDPOINT = "file:";

private static final String DEFAULT_ROOTPATH = "/tmp";

private String endpoint;

private String rootPath;

@Autowired private StorageProperties storageProperties;

/** Initialize the LocalStorageClient when the bean is accessed for the first time. */
Expand All @@ -57,19 +61,19 @@ public synchronized void init() throws URISyntaxException, IOException {
.getEndpoint()
.startsWith(DEFAULT_ENDPOINT),
"Storage properties endpoint was misconfigured for: " + LOCAL_TYPE.getValue());
try {
uri =
new URI(
storageProperties.getTypes().get(LOCAL_TYPE.getValue()).getEndpoint()
+ storageProperties.getTypes().get(LOCAL_TYPE.getValue()).getRootPath());
} catch (URISyntaxException e) {
throw new IllegalArgumentException(
"Storage properties 'endpoint', 'rootpath' was incorrectly configured for: "
+ LOCAL_TYPE.getValue(),
e);
}
endpoint = storageProperties.getTypes().get(LOCAL_TYPE.getValue()).getEndpoint();
rootPath = storageProperties.getTypes().get(LOCAL_TYPE.getValue()).getRootPath();
} else {
uri = new URI(DEFAULT_ENDPOINT + DEFAULT_ROOTPATH);
endpoint = DEFAULT_ENDPOINT;
rootPath = DEFAULT_ROOTPATH;
}
try {
uri = new URI(endpoint + rootPath);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(
"Storage properties 'endpoint', 'rootpath' was incorrectly configured for: "
+ LOCAL_TYPE.getValue(),
e);
}
this.fs = FileSystem.get(uri, new org.apache.hadoop.conf.Configuration());
Preconditions.checkArgument(
Expand All @@ -81,4 +85,14 @@ public synchronized void init() throws URISyntaxException, IOException {
public FileSystem getNativeClient() {
return fs;
}

@Override
public String getEndpoint() {
return endpoint;
}

@Override
public String getRootPrefix() {
return rootPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import static com.linkedin.openhouse.cluster.storage.StorageType.HDFS;
import static com.linkedin.openhouse.cluster.storage.StorageType.LOCAL;

import com.linkedin.openhouse.cluster.storage.Storage;
import com.linkedin.openhouse.cluster.storage.StorageType;
import com.linkedin.openhouse.cluster.storage.hdfs.HdfsStorage;
import com.linkedin.openhouse.cluster.storage.local.LocalStorage;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.iceberg.hadoop.HadoopFileIO;
Expand All @@ -30,6 +33,9 @@ public class FileIOManager {
@Qualifier("LocalFileIO")
FileIO localFileIO;

@Autowired HdfsStorage hdfsStorage;

@Autowired LocalStorage localStorage;
/**
* Returns the FileIO implementation for the given storage type.
*
Expand All @@ -48,4 +54,21 @@ public FileIO getFileIO(StorageType.Type storageType) throws IllegalArgumentExce
throw new IllegalArgumentException("FileIO not supported for storage type: " + storageType);
}
}

/**
* Returns the Storage implementation for the given FileIO.
*
* @param fileIO, the FileIO for which the Storage implementation is required
* @return Storage implementation for the given FileIO
* @throws IllegalArgumentException if the FileIO is not configured
*/
public Storage getStorage(FileIO fileIO) {
if (fileIO.equals(hdfsFileIO)) {
return hdfsStorage;
} else if (fileIO.equals(localFileIO)) {
return localStorage;
} else {
throw new IllegalArgumentException("Storage not supported for fileIO: " + fileIO);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.linkedin.openhouse.internal.catalog;

import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.cluster.storage.filesystem.FsStorageProvider;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOConfig;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import java.io.IOException;
Expand Down Expand Up @@ -33,8 +32,6 @@ public static void main(String[] args) {

@MockBean FileIOConfig fileIOConfig;

@MockBean FsStorageProvider fsStorageProvider;

static final FsPermission perm = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.openhouse.housetables.config.db.iceberg;

import com.linkedin.openhouse.cluster.storage.filesystem.FsStorageProvider;
import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.cluster.storage.StorageType;
import com.linkedin.openhouse.housetables.repository.HtsRepository;
import com.linkedin.openhouse.hts.catalog.model.jobtable.JobIcebergRow;
import com.linkedin.openhouse.hts.catalog.model.jobtable.JobIcebergRowPrimaryKey;
Expand All @@ -10,6 +11,7 @@
import java.nio.file.Paths;
import java.util.Collections;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -34,7 +36,7 @@ public class IcebergProviderConfiguration {
private static final String HTS_USER_TBL_NAME = "userTable";
private static final String HTS_JOB_TBL_NAME = "jobTable";

@Autowired FsStorageProvider storageProvider;
@Autowired StorageManager storageManager;

private Catalog provideHadoopCatalogForHouseTables() {
HadoopCatalog catalog = new HadoopCatalog();
Expand All @@ -43,7 +45,7 @@ private Catalog provideHadoopCatalogForHouseTables() {
HTS_CATALOG_NAME,
Collections.singletonMap(
CatalogProperties.WAREHOUSE_LOCATION,
Paths.get(storageProvider.rootPath()).toString()));
Paths.get(storageManager.getDefaultStorage().getClient().getRootPrefix()).toString()));
return catalog;
}

Expand All @@ -70,7 +72,15 @@ private Catalog provideHadoopCatalogForHouseTables() {
}

private org.apache.hadoop.conf.Configuration getHadoopConfigurations() {
log.debug("Loading hadoop configuration from:" + storageProvider.name());
return storageProvider.storageClient().getConf();
log.debug("Loading hadoop configuration for:" + storageManager.getDefaultStorage().getType());
if (storageManager.getDefaultStorage().getType().equals(StorageType.HDFS)
|| storageManager.getDefaultStorage().getType().equals(StorageType.LOCAL)) {
return ((FileSystem) storageManager.getDefaultStorage().getClient().getNativeClient())
.getConf();
} else {
throw new UnsupportedOperationException(
"Unsupported storage type for Iceberg catalog: "
+ storageManager.getDefaultStorage().getType());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import com.linkedin.openhouse.cluster.metrics.TagUtils;
import com.linkedin.openhouse.cluster.storage.FsStorageUtils;
import com.linkedin.openhouse.cluster.storage.filesystem.FsStorageProvider;
import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.cluster.storage.StorageType;
import com.linkedin.openhouse.common.config.BaseApplicationConfig;
import com.linkedin.openhouse.common.provider.HttpConnectionPoolProviderConfig;
import com.linkedin.openhouse.housetables.client.api.UserTableApi;
Expand All @@ -19,6 +20,8 @@
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.springdoc.core.customizers.OpenApiCustomiser;
Expand All @@ -32,14 +35,15 @@

/** Main Application Configuration to load cluster properties. */
@Configuration
@Slf4j
public class MainApplicationConfig extends BaseApplicationConfig {
public static final String APP_NAME = "tables";
private static final Pattern VERSION_PART_PATTERN = Pattern.compile("v[0-9]+");
private static final int IN_MEMORY_BUFFER_SIZE = 10 * 1000 * 1024;

private static final int DNS_QUERY_TIMEOUT_SECONDS = 10;

@Autowired protected FsStorageProvider fsStorageProvider;
@Autowired StorageManager storageManager;

/**
* When cluster properties are available, obtain hts base URI and inject API client
Expand Down Expand Up @@ -80,7 +84,17 @@ MeterRegistryCustomizer<MeterRegistry> provideMeterRegistry() {
Consumer<Supplier<Path>> provideFileSecurer() {
return pathSeqSupplier -> {
try {
FsStorageUtils.securePath(fsStorageProvider.storageClient(), pathSeqSupplier.get());
// TODO: This should use high-level storage api such as Storage::secureTableObject.
if (storageManager.getDefaultStorage().getType().equals(StorageType.HDFS)
|| storageManager.getDefaultStorage().getType().equals(StorageType.LOCAL)) {
FsStorageUtils.securePath(
(FileSystem) storageManager.getDefaultStorage().getClient().getNativeClient(),
pathSeqSupplier.get());
} else {
log.warn(
"No secure path implementation for storage type: {}",
storageManager.getDefaultStorage().getType());
}
} catch (IOException ioe) {
// Throwing unchecked exception and leave the handling explicitly to the caller.
throw new UncheckedIOException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import static com.linkedin.openhouse.internal.catalog.mapper.HouseTableSerdeUtils.getCanonicalFieldName;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.openhouse.cluster.storage.filesystem.FsStorageProvider;
import com.linkedin.openhouse.cluster.storage.Storage;
import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.common.schema.IcebergSchemaHelper;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.tables.dto.mapper.iceberg.PartitionSpecMapper;
import com.linkedin.openhouse.tables.dto.mapper.iceberg.PoliciesSpecMapper;
import com.linkedin.openhouse.tables.dto.mapper.iceberg.TableTypeMapper;
Expand All @@ -17,7 +19,6 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdateProperties;

Expand All @@ -34,8 +35,13 @@ private InternalRepositoryUtils() {
}

public static java.nio.file.Path constructTablePath(
FsStorageProvider fsStorageProvider, String databaseID, String tableId, String tableUUID) {
return Paths.get(fsStorageProvider.rootPath(), databaseID, tableId + "-" + tableUUID);
StorageManager storageManager, String databaseID, String tableId, String tableUUID) {
// TODO: Default storage is used here. Support for non-default storage type per table needs to
// be added.
return Paths.get(
storageManager.getDefaultStorage().getClient().getRootPrefix(),
databaseID,
tableId + "-" + tableUUID);
}

/**
Expand Down Expand Up @@ -120,13 +126,13 @@ public static boolean alterPoliciesIfNeeded(
@VisibleForTesting
static TableDto convertToTableDto(
Table table,
FsStorageProvider fsStorageProvider,
FileIOManager fileIOManager,
PartitionSpecMapper partitionSpecMapper,
PoliciesSpecMapper policiesMapper,
TableTypeMapper tableTypeMapper) {
/* Contains everything needed to populate dto */
final Map<String, String> megaProps = table.properties();

Storage storage = fileIOManager.getStorage(table.io());
TableDto tableDto =
TableDto.builder()
.tableId(megaProps.get(getCanonicalFieldName("tableId")))
Expand All @@ -135,9 +141,10 @@ static TableDto convertToTableDto(
.tableUri(megaProps.get(getCanonicalFieldName("tableUri")))
.tableUUID(megaProps.get(getCanonicalFieldName("tableUUID")))
.tableLocation(
fsStorageProvider
.storageClient()
.makeQualified(new Path(megaProps.get(getCanonicalFieldName("tableLocation"))))
URI.create(
storage.getClient().getEndpoint()
+ megaProps.get(getCanonicalFieldName("tableLocation")))
.normalize()
.toString())
.tableVersion(megaProps.get(getCanonicalFieldName("tableVersion")))
.tableCreator(megaProps.get(getCanonicalFieldName("tableCreator")))
Expand Down
Loading

0 comments on commit ca05f77

Please sign in to comment.