Skip to content

Commit

Permalink
Merge branch 'linkedin:main' into dlo
Browse files Browse the repository at this point in the history
  • Loading branch information
teamurko committed May 15, 2024
2 parents d23f0b2 + ca05f77 commit 633003b
Show file tree
Hide file tree
Showing 112 changed files with 3,008 additions and 731 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build-tag-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
uses: gradle/wrapper-validation-action@v1

- name: Build with Gradle
run: ./gradlew clean build --debug
run: ./gradlew clean build

- name: Start Docker Containers
run: docker compose -f infra/recipes/docker-compose/oh-only/docker-compose.yml up -d --build
Expand All @@ -52,7 +52,7 @@ jobs:

- name: Bump version and push tag
if: ${{ success() && github.ref == 'refs/heads/main' && github.repository == 'linkedin/openhouse' }}
uses: anothrNick/github-tag-action@1.67.0
uses: anothrNick/github-tag-action@1.69.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
WITH_V: true # prefix for tag "v"
Expand Down
2 changes: 1 addition & 1 deletion SETUP.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ OpenHouse locally on laptop. Script has been tested to work fine on MacOS.

Multiple recipes are provided for locally bringing up a docker-compose environment that can be used for testing.

Config| docker-dompose Directory |Notes
Config| docker-compose Directory |Notes
---|--------------------------|---
Run OpenHouse Services Only | oh-only | Stores data on local filesystem within the application container, with in-memory database. Least resource consuming.
Run OpenHouse Services on HDFS | oh-hadoop | Stores data on locally running Hadoop HDFS containers, with iceberg-backed database.
Expand Down
1 change: 1 addition & 0 deletions apps/spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ test.dependsOn ':integrations:spark:openhouse-spark-runtime_2.12:build'

shadowJar {
zip64 = true
archiveClassifier.set('uber')
mergeServiceFiles() // merge META-INF/services configuration files to allow FileSystem to be discovered
dependencies {
// unnecessary dependencies from iceberg-spark3-runtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ protected void run(JobConf.JobTypeEnum jobType, String taskType, boolean isDryRu
taskFutures.add(executorService.submit(taskList.get(taskIndex)));
}

int emptyStateJobCount = 0;
for (int taskIndex = 0; taskIndex < taskList.size(); ++taskIndex) {
Optional<JobState> jobState = Optional.empty();
OperationTask task = taskList.get(taskIndex);
Expand Down Expand Up @@ -171,16 +172,22 @@ protected void run(JobConf.JobTypeEnum jobType, String taskType, boolean isDryRu
jobStateCountMap.put(JobState.CANCELLED, jobStateCountMap.get(JobState.CANCELLED) + 1);
}
} finally {
if (jobState.isPresent() && jobStateCountMap.containsKey(jobState.get())) {
if (jobState.isPresent()) {
jobStateCountMap.put(jobState.get(), jobStateCountMap.get(jobState.get()) + 1);
} else {
emptyStateJobCount++;
}
}
}
log.info(
"Finishing scheduler, {} tasks completed successfully out of {} tasks, {} tasks cancelled due to timeout",
jobStateCountMap.get(JobState.SUCCEEDED),
"Finishing scheduler for job type {}, tasks stats: {} created, {} succeeded,"
+ " {} cancelled (timeout), {} failed, {} skipped (no state)",
jobType,
taskList.size(),
jobStateCountMap.get(JobState.CANCELLED));
jobStateCountMap.get(JobState.SUCCEEDED),
jobStateCountMap.get(JobState.CANCELLED),
jobStateCountMap.get(JobState.FAILED),
emptyStateJobCount);
executorService.shutdown();
METER.counterBuilder("scheduler_end_count").build().add(1);
reportSchedulerMetrics(jobStateCountMap, taskType, startTimeMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,18 @@
import avro.shaded.com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
import com.linkedin.openhouse.jobs.util.AppConstants;
import com.linkedin.openhouse.jobs.util.SparkJobUtil;
import com.linkedin.openhouse.jobs.util.TableStatsCollector;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand All @@ -36,8 +31,6 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.collection.JavaConverters;

Expand Down Expand Up @@ -240,51 +233,10 @@ public ExpireSnapshots.Result expireSnapshots(Table table, long expireBeforeTime
*/
public void runRetention(
String fqtn, String columnName, String columnPattern, String granularity, int count) {
checkRecords(fqtn, columnName, columnPattern);
final String statement =
SparkJobUtil.createDeleteStatement(fqtn, columnName, columnPattern, granularity, count);
Dataset<Row> dsWithExpiredRows =
spark.sql(
SparkJobUtil.createSelectLimitStatement(
fqtn, columnName, columnPattern, granularity, count, 1));
if (!dsWithExpiredRows.isEmpty()) {
spark.sql(statement).show();
}
}

/**
* CheckRecords verifies a sample from fqtn against @columnPattern to validate parsing with
* columnPattern. The metrics emitted helps to verify if records in table are not adhering to
* DateTimeFormat standards
*
* @param fqtn - fully-qualified table name
* @param columnName - retention column name
* @param columnPattern - retention column pattern
*/
public void checkRecords(String fqtn, String columnName, String columnPattern) {
String quotedFqtn = SparkJobUtil.getQuotedFqtn(fqtn);
if (!StringUtils.isBlank(columnPattern)) {
String query =
String.format(
"SELECT COALESCE(to_timestamp(%s, '%s'), 'Invalid') AS parsed_timestamp FROM %s LIMIT 10",
columnName, columnPattern, quotedFqtn);
log.info("Pattern verification query {}", query);
List<Row> resultDF = spark.sql(query).collectAsList();
if (resultDF.stream()
.map(r -> r.getString(r.fieldIndex("parsed_timestamp")))
.collect(Collectors.toList())
.contains("Invalid")) {
meter
.counterBuilder(AppConstants.INCOMPATIBLE_DATE_COLUMN)
.build()
.add(1, Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), fqtn));
log.warn(
"Failed to parse column {} with provided retention column pattern {} for table {}",
columnName,
columnPattern,
fqtn);
}
}
log.info("deleting records from table: {}", fqtn);
spark.sql(statement);
}

private Path getTrashPath(String path, String filePath, String trashDir) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private SparkJobUtil() {}
result: records will be filtered from deletion
*/
private static final String RETENTION_CONDITION_WITH_PATTERN_TEMPLATE =
"to_date(substring(%s, 0, CHAR_LENGTH('%s')), '%s') < date_trunc('%s', current_timestamp() - INTERVAL %s %ss)";
"%s < cast(date_format(current_timestamp() - INTERVAL %s %ss, '%s') as string)";

public static String createDeleteStatement(
String fqtn, String columnName, String columnPattern, String granularity, int count) {
Expand All @@ -59,11 +59,9 @@ public static String createDeleteStatement(
String.format(
RETENTION_CONDITION_WITH_PATTERN_TEMPLATE,
columnName,
columnPattern,
columnPattern,
granularity,
count,
granularity));
granularity,
columnPattern));
log.info(
"Table: {}. Column pattern: {}, columnName {}, granularity {}s, " + "retention query: {}",
fqtn,
Expand All @@ -89,47 +87,6 @@ public static String createDeleteStatement(
}
}

public static String createSelectLimitStatement(
String fqtn,
String columnName,
String columnPattern,
String granularity,
int count,
int limit) {
if (!StringUtils.isBlank(columnPattern)) {
String query =
String.format(
"SELECT * FROM %s WHERE %s limit %d",
getQuotedFqtn(fqtn),
String.format(
RETENTION_CONDITION_WITH_PATTERN_TEMPLATE,
columnName,
columnPattern,
columnPattern,
granularity,
count,
granularity),
limit);
log.info("Table: {} column pattern provided: {} selectQuery: {}", fqtn, columnPattern, query);
return query;
} else {
String query =
String.format(
"SELECT * FROM %s WHERE %s limit %d",
getQuotedFqtn(fqtn),
String.format(
RETENTION_CONDITION_TEMPLATE,
granularity,
columnName,
granularity,
count,
granularity),
limit);
log.info("Table: {} No column pattern provided: selectQuery: {}", fqtn, query);
return query;
}
}

public static String getQuotedFqtn(String fqtn) {
String[] fqtnTokens = fqtn.split("\\.");
// adding single quotes around fqtn for cases when db and/or tableName has special character(s),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
import java.io.IOException;
import java.util.Optional;
import java.util.stream.StreamSupport;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -82,20 +83,29 @@ protected static IcebergTableStats populateStatsOfAllReferencedFiles(
*/
protected static IcebergTableStats populateStatsForSnapshots(
String fqtn, Table table, SparkSession spark, IcebergTableStats stats) {
long snapshotId = table.currentSnapshot().snapshotId();

Dataset<Row> allDataFiles = getAllDataFilesCount(table, spark, MetadataTableType.FILES);
long countOfDataFiles = allDataFiles.count();
Dataset<Row> dataFiles = getAllDataFilesCount(table, spark, MetadataTableType.FILES);
long countOfDataFiles = dataFiles.count();

// Calculate sum of file sizes in bytes in above allDataFiles
long sumOfFileSizeBytes = getSumOfFileSizeBytes(allDataFiles);
long sumOfFileSizeBytes = getSumOfFileSizeBytes(dataFiles);

Long currentSnapshotId =
Optional.ofNullable(table.currentSnapshot())
.map(snapshot -> snapshot.snapshotId())
.orElse(null);

Long currentSnapshotTimestamp =
Optional.ofNullable(table.currentSnapshot())
.map(snapshot -> snapshot.timestampMillis())
.orElse(null);

log.info(
"Table: {}, Count of total Data files: {}, Sum of file sizes in bytes: {} for snaphot: {}",
fqtn,
countOfDataFiles,
sumOfFileSizeBytes,
snapshotId);
currentSnapshotId);

// Find minimum timestamp of all snapshots where snapshots is iterator
Long oldestSnapshotTimestamp =
Expand All @@ -106,8 +116,8 @@ protected static IcebergTableStats populateStatsForSnapshots(

return stats
.toBuilder()
.currentSnapshotId(snapshotId)
.currentSnapshotTimestamp(table.currentSnapshot().timestampMillis())
.currentSnapshotId(currentSnapshotId)
.currentSnapshotTimestamp(currentSnapshotTimestamp)
.oldestSnapshotTimestamp(oldestSnapshotTimestamp)
.numCurrentSnapshotReferencedDataFiles(countOfDataFiles)
.totalCurrentSnapshotReferencedDataFilesSizeInBytes(sumOfFileSizeBytes)
Expand Down Expand Up @@ -168,11 +178,11 @@ protected static IcebergTableStats populateTableMetadata(Table table, IcebergTab
.tableType(table.properties().get(getCanonicalFieldName("tableType")))
.tableCreator((table.properties().get(getCanonicalFieldName("tableCreator"))))
.tableCreationTimestamp(
table.properties().containsKey("creationTime")
table.properties().containsKey(getCanonicalFieldName("creationTime"))
? Long.parseLong(table.properties().get(getCanonicalFieldName("creationTime")))
: 0)
.tableLastUpdatedTimestamp(
table.properties().containsKey("lastModifiedTime")
table.properties().containsKey(getCanonicalFieldName("lastModifiedTime"))
? Long.parseLong(table.properties().get(getCanonicalFieldName("lastModifiedTime")))
: 0)
.tableUUID(table.properties().get(getCanonicalFieldName("tableUUID")))
Expand Down Expand Up @@ -217,6 +227,8 @@ private static Dataset<Row> getAllDataFilesCount(
}

private static long getSumOfFileSizeBytes(Dataset<Row> allDataFiles) {
if (allDataFiles.isEmpty()) return 0;

return allDataFiles
.agg(org.apache.spark.sql.functions.sum("file_size_in_bytes"))
.first()
Expand Down

0 comments on commit 633003b

Please sign in to comment.