Skip to content

Commit

Permalink
NIFI-12731: Ensure state is updated in GetHBase whenever the session …
Browse files Browse the repository at this point in the history
…is committed
  • Loading branch information
mattyb149 committed Feb 2, 2024
1 parent c6f1d77 commit d77df13
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
.build();

private final AtomicReference<ScanResult> lastResult = new AtomicReference<>();
private volatile List<Column> columns = new ArrayList<>();
private final List<Column> columns = new ArrayList<>();
private volatile String previousTable = null;

@Override
Expand Down Expand Up @@ -201,11 +201,11 @@ public void parseColumns(final ProcessContext context) throws IOException {
for (final String column : columns) {
if (column.contains(":")) {
final String[] parts = column.split(":");
final byte[] cf = parts[0].getBytes(Charset.forName("UTF-8"));
final byte[] cq = parts[1].getBytes(Charset.forName("UTF-8"));
final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
this.columns.add(new Column(cf, cq));
} else {
final byte[] cf = column.getBytes(Charset.forName("UTF-8"));
final byte[] cf = column.getBytes(StandardCharsets.UTF_8);
this.columns.add(new Column(cf, null));
}
}
Expand Down Expand Up @@ -307,11 +307,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final byte[] cellValue = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset());

final String rowHash = new String(rowValue, StandardCharsets.UTF_8);
Set<String> cellHashes = cellsMatchingTimestamp.get(rowHash);
if (cellHashes == null) {
cellHashes = new HashSet<>();
cellsMatchingTimestamp.put(rowHash, cellHashes);
}
Set<String> cellHashes = cellsMatchingTimestamp.computeIfAbsent(rowHash, k -> new HashSet<>());
cellHashes.add(new String(cellValue, StandardCharsets.UTF_8));
}
}
Expand All @@ -336,40 +332,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
rowsPulledHolder.set(++rowsPulled);

if (++rowsPulled % getBatchSize() == 0) {
session.commitAsync();
updateStateAndCommit(session, latestTimestampHolder.get(), cellsMatchingTimestamp);
}
});

final ScanResult scanResults = new ScanResult(latestTimestampHolder.get(), cellsMatchingTimestamp);

final ScanResult latestResult = lastResult.get();
if (latestResult == null || scanResults.getTimestamp() > latestResult.getTimestamp()) {
session.setState(scanResults.toFlatMap(), Scope.CLUSTER);
session.commitAsync(() -> updateScanResultsIfNewer(scanResults));
} else if (scanResults.getTimestamp() == latestResult.getTimestamp()) {
final Map<String, Set<String>> combinedResults = new HashMap<>(scanResults.getMatchingCells());

// copy the results of result.getMatchingCells() to combinedResults.
// do a deep copy because the Set may be modified below.
for (final Map.Entry<String, Set<String>> entry : scanResults.getMatchingCells().entrySet()) {
combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue()));
}

// combined the results from 'lastResult'
for (final Map.Entry<String, Set<String>> entry : latestResult.getMatchingCells().entrySet()) {
final Set<String> existing = combinedResults.get(entry.getKey());
if (existing == null) {
combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue()));
} else {
existing.addAll(entry.getValue());
}
}

final ScanResult scanResult = new ScanResult(scanResults.getTimestamp(), combinedResults);
session.setState(scanResult.toFlatMap(), Scope.CLUSTER);

session.commitAsync(() -> updateScanResultsIfNewer(scanResult));
}
updateStateAndCommit(session, latestTimestampHolder.get(), cellsMatchingTimestamp);
} catch (final IOException e) {
getLogger().error("Failed to receive data from HBase due to {}", e);
session.rollback();
Expand All @@ -380,6 +347,39 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}
}

private void updateStateAndCommit(final ProcessSession session, final long latestTimestamp, final Map<String, Set<String>> cellsMatchingTimestamp) throws IOException {
final ScanResult scanResults = new ScanResult(latestTimestamp, cellsMatchingTimestamp);

final ScanResult latestResult = lastResult.get();
if (latestResult == null || scanResults.getTimestamp() > latestResult.getTimestamp()) {
session.setState(scanResults.toFlatMap(), Scope.CLUSTER);
session.commitAsync(() -> updateScanResultsIfNewer(scanResults));
} else if (scanResults.getTimestamp() == latestResult.getTimestamp()) {
final Map<String, Set<String>> combinedResults = new HashMap<>(scanResults.getMatchingCells());

// copy the results of result.getMatchingCells() to combinedResults.
// do a deep copy because the Set may be modified below.
for (final Map.Entry<String, Set<String>> entry : scanResults.getMatchingCells().entrySet()) {
combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue()));
}

// combined the results from 'lastResult'
for (final Map.Entry<String, Set<String>> entry : latestResult.getMatchingCells().entrySet()) {
final Set<String> existing = combinedResults.get(entry.getKey());
if (existing == null) {
combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue()));
} else {
existing.addAll(entry.getValue());
}
}

final ScanResult scanResult = new ScanResult(scanResults.getTimestamp(), combinedResults);
session.setState(scanResult.toFlatMap(), Scope.CLUSTER);

session.commitAsync(() -> updateScanResultsIfNewer(scanResult));
}
}

private void updateScanResultsIfNewer(final ScanResult scanResult) {
lastResult.getAndUpdate(current -> (current == null || scanResult.getTimestamp() > current.getTimestamp()) ? scanResult : current);
}
Expand Down Expand Up @@ -495,11 +495,7 @@ public static ScanResult fromFlatMap(final Map<String, String> map) {
final String rowIndex = matcher.group(1);
final String cellIndex = matcher.group(3);

Set<String> cellHashes = rowIndexToMatchingCellHashes.get(rowIndex);
if (cellHashes == null) {
cellHashes = new HashSet<>();
rowIndexToMatchingCellHashes.put(rowIndex, cellHashes);
}
Set<String> cellHashes = rowIndexToMatchingCellHashes.computeIfAbsent(rowIndex, k -> new HashSet<>());

if (cellIndex == null) {
// this provides a Row ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
*/
package org.apache.nifi.hbase.scan;

import java.io.IOException;

/**
* Handles a single row from an HBase scan.
*/
public interface ResultHandler {

void handle(byte[] row, ResultCell[] resultCells);
void handle(byte[] row, ResultCell[] resultCells) throws IOException;

}

0 comments on commit d77df13

Please sign in to comment.