Skip to content

Commit

Permalink
Refactor: Remove @LegacyFileIO and Make DelegationRefreshToken use ne…
Browse files Browse the repository at this point in the history
…w cluster.yaml (#100)

Refactor: Remove @LegacyFileIO and Make DelegationRefreshToken use new cluster.yaml (#100)
  • Loading branch information
HotSushi committed May 13, 2024
1 parent f9b2417 commit 81c0887
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 134 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.openhouse.cluster.storage.filesystem;
package com.linkedin.openhouse.cluster.storage.hdfs;

import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;

Expand All @@ -7,6 +7,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -19,23 +20,36 @@
* basis and token file as pointed by HADOOP_TOKEN_FILE_LOCATION is always updated.
*/
@Slf4j
public class DelegationTokenRefresher {
public class HdfsDelegationTokenRefresher {

@Autowired private FsStorageProvider fsStorageProvider;
@Autowired HdfsStorage hdfsStorage;

/**
* Schedule credential refresh (hadoop delegation tokens) daily twice. The schedule cron
* expression represented by #{clusterProperties.clusterStorageHadoopTokenRefreshScheduleCron}
* sets the cron to run every 12 hours i.e. daily twice. Hadoop delegation token is valid for 24
* hours and hence the token must be refreshed before that. The hadoop delegation token file is
* pointed by environment variable i.e. HADOOP_TOKEN_FILE_LOCATION. The renewal of the delegation
* token must be done before it expires. This code assumes that hadoop delegation tokens are
* renewed on a regular basis and token file as pointed by HADOOP_TOKEN_FILE_LOCATION is always
* updated. So, this methods reads the token file and updates the current user
* UserGroupInformation (UGI) with the renewed token and this update is done daily twice.
* expression represented by HdfsStorage specific parameter "token.refresh.schedule.cron" sets the
* cron to run every 12 hours i.e. daily twice. Hadoop delegation token is valid for 24 hours and
* hence the token must be refreshed before that. The hadoop delegation token file is pointed by
* environment variable i.e. HADOOP_TOKEN_FILE_LOCATION. The renewal of the delegation token must
* be done before it expires. This code assumes that hadoop delegation tokens are renewed on a
* regular basis and token file as pointed by HADOOP_TOKEN_FILE_LOCATION is always updated. So,
* this methods reads the token file and updates the current user UserGroupInformation (UGI) with
* the renewed token and this update is done daily twice. The relevant configuration in the
* cluster YAML file is as follows:
*
* <pre>
* cluster:
* storages:
* hdfs:
* parameter:
* token.refresh.enabled: true
* token.refresh.schedule.cron: 0 0 0/12 * * ?
* </pre>
*/
@Scheduled(cron = "#{clusterProperties.clusterStorageHadoopTokenRefreshScheduleCron}")
@Scheduled(
cron =
"#{hdfsStorage.getProperties().getOrDefault('token.refresh.schedule.cron', '0 0 0/12 * * ?')}")
public void refresh() {
log.info("Refreshing HDFS delegation token");
String tokenFileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
try {
log.info(
Expand All @@ -55,9 +69,8 @@ public void refresh() {
+ HADOOP_TOKEN_FILE_LOCATION
+ " not found");
}
Credentials cred =
Credentials.readTokenStorageFile(
tokenFile, fsStorageProvider.storageClient().getConf());
FileSystem fs = (FileSystem) hdfsStorage.getClient().getNativeClient();
Credentials cred = Credentials.readTokenStorageFile(tokenFile, fs.getConf());
log.info("Loaded {} tokens", cred.numberOfTokens());
UserGroupInformation.getCurrentUser().addCredentials(cred);
log.info(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.linkedin.openhouse.cluster.storage.hdfs;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
* Configuration class to conditionally enable/disable creation of {@link
* HdfsDelegationTokenRefresher} bean and enable delegation token refresh scheduling.
*/
@Slf4j
@Configuration
@EnableScheduling
public class HdfsDelegationTokenRefresherConfig {

@Autowired private HdfsStorage hdfsStorage;

private static final String HDFS_TOKEN_REFRESH_ENABLED = "token.refresh.enabled";

/**
* Conditionally provide the HdfsDelegationTokenRefresher bean if the parameter for token refresh
* is enabled in the HdfsStorage properties. The relevant configuration in the cluster YAML file
* is as follows:
*
* <pre>
* cluster:
* storages:
* hdfs:
* parameter:
* token.refresh.enabled: true
* </pre>
*
* @return HdfsDelegationTokenRefresher
*/
@Bean
public HdfsDelegationTokenRefresher getDelegationTokenRefresher() {
if (!hdfsStorage.isConfigured()) {
log.debug(
"Hdfs storage is not configured, ignoring HdfsDelegationTokenRefresher bean creation");
return null;
}
String refreshEnabled =
hdfsStorage.getProperties().getOrDefault(HDFS_TOKEN_REFRESH_ENABLED, "false");
if (Boolean.parseBoolean(refreshEnabled)) {
log.info("Creating HdfsDelegationTokenRefresher bean");
return new HdfsDelegationTokenRefresher();
} else {
log.debug(
"Hdfs storage token refresh is not enabled, ignoring HdfsDelegationTokenRefresher bean creation");
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static com.linkedin.openhouse.internal.catalog.InternalCatalogMetricsConstant.METRICS_PREFIX;

import com.linkedin.openhouse.cluster.metrics.micrometer.MetricsReporter;
import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper;
import com.linkedin.openhouse.internal.catalog.model.HouseTablePrimaryKey;
import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepository;
Expand All @@ -21,7 +23,6 @@
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

/**
Expand All @@ -35,9 +36,9 @@ public class OpenHouseInternalCatalog extends BaseMetastoreCatalog {

@Autowired HouseTableRepository houseTableRepository;

@Autowired
@Qualifier("LegacyFileIO")
FileIO fileIO;
@Autowired FileIOManager fileIOManager;

@Autowired StorageManager storageManager;

@Autowired SnapshotInspector snapshotInspector;

Expand All @@ -49,7 +50,7 @@ public class OpenHouseInternalCatalog extends BaseMetastoreCatalog {
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
return new OpenHouseInternalTableOperations(
houseTableRepository,
fileIO,
fileIOManager.getFileIO(storageManager.getDefaultStorage().getType()),
snapshotInspector,
houseTableMapper,
tableIdentifier,
Expand Down Expand Up @@ -100,6 +101,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
}
if (purge) {
// Delete data and metadata files from storage.
FileIO fileIO = fileIOManager.getFileIO(storageManager.getDefaultStorage().getType());
if (fileIO instanceof SupportsPrefixOperations) {
log.debug("Deleting files for table {}", tableLocation);
((SupportsPrefixOperations) fileIO).deletePrefix(tableLocation);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.linkedin.openhouse.internal.catalog;

import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.cluster.storage.filesystem.FsStorageProvider;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOConfig;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.function.Consumer;
Expand All @@ -11,7 +14,6 @@
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.io.FileIO;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.mock.mockito.MockBean;
Expand All @@ -25,7 +27,11 @@ public static void main(String[] args) {

@MockBean Catalog openHouseInternalCatalog;

@MockBean FileIO fileIO;
@MockBean StorageManager storageManager;

@MockBean FileIOManager fileIOManager;

@MockBean FileIOConfig fileIOConfig;

@MockBean FsStorageProvider fsStorageProvider;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.linkedin.openhouse.internal.catalog;

import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.internal.catalog.exception.InvalidIcebergSnapshotException;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOConfig;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapperTest;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -40,7 +37,6 @@
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Import;

@SpringBootTest
Expand All @@ -51,11 +47,6 @@ class SnapshotInspectorTest {

@TempDir static Path tempDir;

@MockBean StorageManager storageManager;

@MockBean FileIOManager fileIOManager;

@MockBean FileIOConfig fileIOConfig;
private static final TableMetadata noSnapshotsMetadata =
TableMetadata.newTableMetadata(
new Schema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@
import com.linkedin.openhouse.housetables.client.invoker.ApiClient;
import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepository;
import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepositoryImpl;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;

@SpringBootTest
public class HouseTableMapperTest {
Expand All @@ -35,13 +31,6 @@ public UserTableApi provideMockHtsApiInstance() {
public HouseTableRepository provideRealHtsRepository() {
return new HouseTableRepositoryImpl();
}

@Primary
@Bean
@Qualifier("LegacyFileIO")
public FileIO provideFileIO() {
return new HadoopFileIO();
}
}

@Autowired protected HouseTableMapper houseTableMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@
import static org.assertj.core.api.Assertions.*;

import com.google.gson.Gson;
import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.housetables.client.api.UserTableApi;
import com.linkedin.openhouse.housetables.client.invoker.ApiClient;
import com.linkedin.openhouse.housetables.client.model.EntityResponseBodyUserTable;
import com.linkedin.openhouse.housetables.client.model.GetAllEntityResponseBodyUserTable;
import com.linkedin.openhouse.housetables.client.model.UserTable;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOConfig;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper;
import com.linkedin.openhouse.internal.catalog.model.HouseTable;
import com.linkedin.openhouse.internal.catalog.model.HouseTablePrimaryKey;
Expand All @@ -24,8 +21,6 @@
import java.util.List;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -34,9 +29,7 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.util.ReflectionUtils;

Expand All @@ -51,12 +44,6 @@ public class HouseTableRepositoryImplTest {

@Autowired HouseTableMapper houseTableMapper;

@MockBean StorageManager storageManager;

@MockBean FileIOManager fileIOManager;

@MockBean FileIOConfig fileIOConfig;

@TestConfiguration
public static class MockWebServerConfiguration {
/**
Expand Down Expand Up @@ -85,13 +72,6 @@ public UserTableApi provideMockHtsApiInstance() {
public HouseTableRepository provideRealHtsRepository() {
return new HouseTableRepositoryImpl();
}

@Primary
@Bean
@Qualifier("LegacyFileIO")
public FileIO provideFileIO() {
return new HadoopFileIO();
}
}

/**
Expand Down

0 comments on commit 81c0887

Please sign in to comment.