Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental pull should set the _hoodie_commit_time automatically from configs in HoodieInputFormat #16

Closed
prazanna opened this issue Jan 4, 2017 · 2 comments
Assignees

Comments

@prazanna
Copy link
Contributor

prazanna commented Jan 4, 2017

HoodieInputFormat should pick up the start_ts from the config and merge the _hoodie_commit_time predicate with the predicates set already. This allows the processing SQL to be exactly the same for backfill and incremental processing.

SET hoodie.<table_name>.consume.mode=LATEST
Select * from table1; // this would run against all the data in table1
SET hoodie.<table_name>.consume.mode=INCREMENTAL
SET hoodie.<table_name>.start.timestamp=20170101132319
Select * from table1; // this would run against only data inserted after 2017/01/01 12:23:19

Today we have to do

SET hoodie.<table_name>.consume.mode=INCREMENTAL
Select * from table1 where _hoodie_commit_ts > '20170101132319'; 
@vinothchandar
Copy link
Member

This is the dead code sitting in HoodieInputFormat

/**
   * Clears out the filter expression (if this is not done, then ParquetReader will override the
   * FilterPredicate set)
   */
  private void clearOutExistingPredicate(JobConf job) {
    job.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
  }

  /**
   * Constructs the predicate to push down to parquet storage. This creates the predicate for
   * `hoodie_commit_time` > 'start_commit_time' and ANDs with the existing predicate if one is
   * present already.
   */
  private FilterPredicate constructHoodiePredicate(JobConf job, String tableName, InputSplit split)
      throws IOException {
    FilterPredicate commitTimePushdown = constructCommitTimePushdownPredicate(job, tableName);
    LOG.info("Commit time predicate - " + commitTimePushdown.toString());
    FilterPredicate existingPushdown = constructHQLPushdownPredicate(job, split);
    LOG.info("Existing predicate - " + existingPushdown);

    FilterPredicate hoodiePredicate;
    if (existingPushdown != null) {
      hoodiePredicate = and(existingPushdown, commitTimePushdown);
    } else {
      hoodiePredicate = commitTimePushdown;
    }
    LOG.info("Hoodie Predicate - " + hoodiePredicate);
    return hoodiePredicate;
  }

  private FilterPredicate constructHQLPushdownPredicate(JobConf job, InputSplit split)
      throws IOException {
    String serializedPushdown = job.get(TableScanDesc.FILTER_EXPR_CONF_STR);
    String columnNamesString = job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
    if (serializedPushdown == null || columnNamesString == null || serializedPushdown.isEmpty()
        || columnNamesString.isEmpty()) {
      return null;
    } else {
      SearchArgument sarg = SearchArgumentFactory
          .create(Utilities.deserializeExpression(serializedPushdown));
      final Path finalPath = ((FileSplit) split).getPath();
      final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(job, finalPath);
      final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
      return ParquetFilterPredicateConverter.toFilterPredicate(sarg, fileMetaData.getSchema());
    }
  }

  private FilterPredicate constructCommitTimePushdownPredicate(JobConf job, String tableName)
      throws IOException {
    String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
    Operators.BinaryColumn sequenceColumn = binaryColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
    FilterPredicate p = gt(sequenceColumn, Binary.fromString(lastIncrementalTs));
    LOG.info("Setting predicate in InputFormat " + p.toString());
    return p;
  }

This also causes most of issues for Hive2 support, by bringing in some parquet-deps. Removing and simplifying life until we can fix this again.

@vinothchandar
Copy link
Member

zhangyue19921010 pushed a commit to zhangyue19921010/hudi that referenced this issue Aug 31, 2023
* change argLine from 2g to 10g
vinishjail97 pushed a commit to vinishjail97/hudi that referenced this issue Dec 15, 2023
Fixing NULL schema for empty batches in deltastreamer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants