Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Remove @LegacyFileIO and Make DelegationRefreshToken use new cluster.yaml #100

Merged
merged 3 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.openhouse.cluster.storage.filesystem;
package com.linkedin.openhouse.cluster.storage.hdfs;

import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;

Expand All @@ -7,6 +7,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -19,23 +20,36 @@
* basis and token file as pointed by HADOOP_TOKEN_FILE_LOCATION is always updated.
*/
@Slf4j
public class DelegationTokenRefresher {
public class HdfsDelegationTokenRefresher {

@Autowired private FsStorageProvider fsStorageProvider;
@Autowired HdfsStorage hdfsStorage;

/**
* Schedule credential refresh (hadoop delegation tokens) daily twice. The schedule cron
* expression represented by #{clusterProperties.clusterStorageHadoopTokenRefreshScheduleCron}
* sets the cron to run every 12 hours i.e. daily twice. Hadoop delegation token is valid for 24
* hours and hence the token must be refreshed before that. The hadoop delegation token file is
* pointed by environment variable i.e. HADOOP_TOKEN_FILE_LOCATION. The renewal of the delegation
* token must be done before it expires. This code assumes that hadoop delegation tokens are
* renewed on a regular basis and token file as pointed by HADOOP_TOKEN_FILE_LOCATION is always
* updated. So, this methods reads the token file and updates the current user
* UserGroupInformation (UGI) with the renewed token and this update is done daily twice.
* expression represented by HdfsStorage specific parameter "token.refresh.schedule.cron" sets the
* cron to run every 12 hours i.e. daily twice. Hadoop delegation token is valid for 24 hours and
* hence the token must be refreshed before that. The hadoop delegation token file is pointed by
* environment variable i.e. HADOOP_TOKEN_FILE_LOCATION. The renewal of the delegation token must
* be done before it expires. This code assumes that hadoop delegation tokens are renewed on a
* regular basis and token file as pointed by HADOOP_TOKEN_FILE_LOCATION is always updated. So,
* this methods reads the token file and updates the current user UserGroupInformation (UGI) with
* the renewed token and this update is done daily twice. The relevant configuration in the
* cluster YAML file is as follows:
*
* <pre>
* cluster:
* storages:
* hdfs:
* parameter:
* token.refresh.enabled: true
* token.refresh.schedule.cron: 0 0 0/12 * * ?
* </pre>
*/
@Scheduled(cron = "#{clusterProperties.clusterStorageHadoopTokenRefreshScheduleCron}")
@Scheduled(
cron =
"#{hdfsStorage.getProperties().getOrDefault('token.refresh.schedule.cron', '0 0 0/12 * * ?')}")
public void refresh() {
log.info("Refreshing HDFS delegation token");
String tokenFileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
try {
log.info(
Expand All @@ -55,9 +69,8 @@ public void refresh() {
+ HADOOP_TOKEN_FILE_LOCATION
+ " not found");
}
Credentials cred =
Credentials.readTokenStorageFile(
tokenFile, fsStorageProvider.storageClient().getConf());
FileSystem fs = (FileSystem) hdfsStorage.getClient().getNativeClient();
Credentials cred = Credentials.readTokenStorageFile(tokenFile, fs.getConf());
log.info("Loaded {} tokens", cred.numberOfTokens());
UserGroupInformation.getCurrentUser().addCredentials(cred);
log.info(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.linkedin.openhouse.cluster.storage.hdfs;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
* Configuration class to conditionally enable/disable creation of {@link
* HdfsDelegationTokenRefresher} bean and enable delegation token refresh scheduling.
*/
@Slf4j
@Configuration
@EnableScheduling
public class HdfsDelegationTokenRefresherConfig {
HotSushi marked this conversation as resolved.
Show resolved Hide resolved

@Autowired private HdfsStorage hdfsStorage;

private static final String HDFS_TOKEN_REFRESH_ENABLED = "token.refresh.enabled";

/**
* Conditionally provide the HdfsDelegationTokenRefresher bean if the parameter for token refresh
* is enabled in the HdfsStorage properties. The relevant configuration in the cluster YAML file
* is as follows:
*
* <pre>
* cluster:
* storages:
* hdfs:
* parameter:
* token.refresh.enabled: true
* </pre>
*
* @return HdfsDelegationTokenRefresher
*/
@Bean
public HdfsDelegationTokenRefresher getDelegationTokenRefresher() {
if (!hdfsStorage.isConfigured()) {
log.debug(
"Hdfs storage is not configured, ignoring HdfsDelegationTokenRefresher bean creation");
return null;
HotSushi marked this conversation as resolved.
Show resolved Hide resolved
}
String refreshEnabled =
HotSushi marked this conversation as resolved.
Show resolved Hide resolved
hdfsStorage.getProperties().getOrDefault(HDFS_TOKEN_REFRESH_ENABLED, "false");
HotSushi marked this conversation as resolved.
Show resolved Hide resolved
if (Boolean.parseBoolean(refreshEnabled)) {
log.info("Creating HdfsDelegationTokenRefresher bean");
return new HdfsDelegationTokenRefresher();
} else {
log.debug(
"Hdfs storage token refresh is not enabled, ignoring HdfsDelegationTokenRefresher bean creation");
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static com.linkedin.openhouse.internal.catalog.InternalCatalogMetricsConstant.METRICS_PREFIX;

import com.linkedin.openhouse.cluster.metrics.micrometer.MetricsReporter;
import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper;
import com.linkedin.openhouse.internal.catalog.model.HouseTablePrimaryKey;
import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepository;
Expand All @@ -21,7 +23,6 @@
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

/**
Expand All @@ -35,9 +36,9 @@ public class OpenHouseInternalCatalog extends BaseMetastoreCatalog {

@Autowired HouseTableRepository houseTableRepository;

@Autowired
@Qualifier("LegacyFileIO")
FileIO fileIO;
@Autowired FileIOManager fileIOManager;
HotSushi marked this conversation as resolved.
Show resolved Hide resolved

@Autowired StorageManager storageManager;

@Autowired SnapshotInspector snapshotInspector;

Expand All @@ -49,7 +50,7 @@ public class OpenHouseInternalCatalog extends BaseMetastoreCatalog {
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
return new OpenHouseInternalTableOperations(
houseTableRepository,
fileIO,
fileIOManager.getFileIO(storageManager.getDefaultStorage().getType()),
snapshotInspector,
houseTableMapper,
tableIdentifier,
Expand Down Expand Up @@ -100,6 +101,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
}
if (purge) {
// Delete data and metadata files from storage.
FileIO fileIO = fileIOManager.getFileIO(storageManager.getDefaultStorage().getType());
if (fileIO instanceof SupportsPrefixOperations) {
log.debug("Deleting files for table {}", tableLocation);
((SupportsPrefixOperations) fileIO).deletePrefix(tableLocation);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.linkedin.openhouse.internal.catalog;

import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.cluster.storage.filesystem.FsStorageProvider;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOConfig;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.function.Consumer;
Expand All @@ -11,7 +14,6 @@
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.io.FileIO;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.mock.mockito.MockBean;
Expand All @@ -25,7 +27,11 @@ public static void main(String[] args) {

@MockBean Catalog openHouseInternalCatalog;

@MockBean FileIO fileIO;
@MockBean StorageManager storageManager;

@MockBean FileIOManager fileIOManager;

@MockBean FileIOConfig fileIOConfig;

@MockBean FsStorageProvider fsStorageProvider;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.linkedin.openhouse.internal.catalog;

import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.internal.catalog.exception.InvalidIcebergSnapshotException;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOConfig;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapperTest;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -40,7 +37,6 @@
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Import;

@SpringBootTest
Expand All @@ -51,11 +47,6 @@ class SnapshotInspectorTest {

@TempDir static Path tempDir;

@MockBean StorageManager storageManager;

@MockBean FileIOManager fileIOManager;

@MockBean FileIOConfig fileIOConfig;
private static final TableMetadata noSnapshotsMetadata =
TableMetadata.newTableMetadata(
new Schema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@
import com.linkedin.openhouse.housetables.client.invoker.ApiClient;
import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepository;
import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepositoryImpl;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;

@SpringBootTest
public class HouseTableMapperTest {
Expand All @@ -35,13 +31,6 @@ public UserTableApi provideMockHtsApiInstance() {
public HouseTableRepository provideRealHtsRepository() {
return new HouseTableRepositoryImpl();
}

@Primary
@Bean
@Qualifier("LegacyFileIO")
public FileIO provideFileIO() {
return new HadoopFileIO();
}
}

@Autowired protected HouseTableMapper houseTableMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@
import static org.assertj.core.api.Assertions.*;

import com.google.gson.Gson;
import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.housetables.client.api.UserTableApi;
import com.linkedin.openhouse.housetables.client.invoker.ApiClient;
import com.linkedin.openhouse.housetables.client.model.EntityResponseBodyUserTable;
import com.linkedin.openhouse.housetables.client.model.GetAllEntityResponseBodyUserTable;
import com.linkedin.openhouse.housetables.client.model.UserTable;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOConfig;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper;
import com.linkedin.openhouse.internal.catalog.model.HouseTable;
import com.linkedin.openhouse.internal.catalog.model.HouseTablePrimaryKey;
Expand All @@ -24,8 +21,6 @@
import java.util.List;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -34,9 +29,7 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.util.ReflectionUtils;

Expand All @@ -51,12 +44,6 @@ public class HouseTableRepositoryImplTest {

@Autowired HouseTableMapper houseTableMapper;

@MockBean StorageManager storageManager;

@MockBean FileIOManager fileIOManager;

@MockBean FileIOConfig fileIOConfig;

@TestConfiguration
public static class MockWebServerConfiguration {
/**
Expand Down Expand Up @@ -85,13 +72,6 @@ public UserTableApi provideMockHtsApiInstance() {
public HouseTableRepository provideRealHtsRepository() {
return new HouseTableRepositoryImpl();
}

@Primary
@Bean
@Qualifier("LegacyFileIO")
public FileIO provideFileIO() {
return new HadoopFileIO();
}
}

/**
Expand Down
Loading
Loading