New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed the OOM issue, still have to write tests, release notes, and de… #1678
Conversation
…cide how to deal with rows that are too wide
Codecov Report
@@ Coverage Diff @@
## develop #1678 +/- ##
=============================================
+ Coverage 59.9% 59.92% +0.01%
+ Complexity 4740 4352 -388
=============================================
Files 732 735 +3
Lines 36141 36276 +135
Branches 3953 3967 +14
=============================================
+ Hits 21651 21738 +87
- Misses 12984 13029 +45
- Partials 1506 1509 +3
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still need to look at DbReadTable and below.
@@ -114,6 +114,8 @@ | |||
public class DbKvs extends AbstractKeyValueService { | |||
private static final Logger log = LoggerFactory.getLogger(DbKvs.class); | |||
|
|||
private static final long GET_RANGE_OF_TS_MAX_BATCH = 1000000L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: underscores to ease readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it appears that this is hard-coded and can't be changed. Would be nice to add support for custom batch sizes.
try (ClosableIterator<AgnosticLightResultRow> rangeResults = table.getRange(range, timestamp, maxRows)) { | ||
while (rows.size() < maxRows && rangeResults.hasNext()) { | ||
byte[] rowName = rangeResults.next().getBytes("row_name"); | ||
byte[] rowName = rangeResults.next().getBlob("row_name"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getBytes
was deprecated, replaced with getBlob
which looks like it returns empty arrays rather than null if the column doesn't exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comments below, this may have some perf side effects, would be interested in before/after numbers of this change alone.
nextRow = RangeRequests.getNextStartRow(range.isReverse(), lastRow); | ||
mayHaveMoreResults = rows.size() == maxRows; | ||
byte[] nextRow = results.currentRow; | ||
boolean mayHaveMoreResults = !results.finishedRange; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to see this logic broken out into a place where it's unit testable, as it's quite complex.
if (!Arrays.equals(currentRow, cellResult.getBlob("row_name"))) { | ||
currentRow = cellResult.getBlob("row_name"); | ||
storeRow(rowBuffer); | ||
rowBuffer.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's very weird to have such a side-effect here. I can see why you'd want it - but maybe we should have this method return a boolean stating whether we stored the row, or alternatively split into methods rowIsComplete
and storeRow(cellResult, rowBuffer)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe the abstraction isn't quite right and we need a stateful object to cover the algorithm in getTimestampsByCell
, to enforce the "store a row and then clear the buffer" requirement.
while (iterator.hasNext() && result.size() + rowBuffer.size() < batchSize) { | ||
AgnosticLightResultRow cellResult = iterator.next(); | ||
result.storePreviousRowIfComplete(cellResult, rowBuffer); | ||
Cell cell = Cell.create(cellResult.getBlob("row_name"), cellResult.getBlob("col_name")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we using getBlob for columns that are not BLOB
? We should run some perf tests comparing these for current state on Postgres & Oracle, but from a while back using cellResult.getBytes("row_name")
was faster & lower overhead than getBlob
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point, I'll look into getting the numbers before this gets a green light. I only changed it in the first place because getBytes specifically says it's deprecated and getBlob should be used instead.
} | ||
|
||
void storePreviousRowIfComplete(AgnosticLightResultRow cellResult, SetMultimap<Cell, Long> rowBuffer) { | ||
if (!Arrays.equals(currentRow, cellResult.getBlob("row_name"))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above comment about getBlob
vs. getBytes
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should also pull the row name out to a local to avoid reading twice:
byte[] rowName = cellResult.getBytes("row_name");
if (!Arrays.equals(currentRow, rowName)) {
currentRow = rowName;
@@ -1313,4 +1329,34 @@ private static Integer getMaxRowsFromBatchHint(@Nullable Integer batchHint) { | |||
private interface ReadWriteTask<T> { | |||
T run(DbReadTable readTable, DbWriteTable writeTable); | |||
} | |||
|
|||
static class ResultWithToken { | |||
SetMultimap<Cell, Long> results; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: final
} | ||
|
||
public ClosableIterator<AgnosticLightResultRow> getAllRows( | ||
Iterable<byte[]> rows, ColumnSelection columns, long ts, boolean includeValues, Boolean reverseRows) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taking a Boolean
arg here feels weird, why not make this a primitive or enum (ASCENDING, DESCENDING, UNDEFINED) for clarity as caller should expressly know desired order?
try (ClosableIterator<AgnosticLightResultRow> rangeResults = table.getRange(range, timestamp, maxRows)) { | ||
while (rows.size() < maxRows && rangeResults.hasNext()) { | ||
byte[] rowName = rangeResults.next().getBytes("row_name"); | ||
byte[] rowName = rangeResults.next().getBlob("row_name"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comments below, this may have some perf side effects, would be interested in before/after numbers of this change alone.
Cell cell = Cell.create(row.getBytes("row_name"), row.getBytes("col_name")); | ||
long ts = row.getLong("ts"); | ||
results.put(cell, ts); | ||
private ResultWithToken getTimestampsByCell( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realise this is probably not in the scope you were going for, but I think Javadoc comments / multiline comments on these methods would be really nice for understanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is actually very helpful for the future!
try (ClosableIterator<AgnosticLightResultRow> rangeResults = table.getRange(range, timestamp, maxRows)) { | ||
while (rows.size() < maxRows && rangeResults.hasNext()) { | ||
byte[] rowName = rangeResults.next().getBytes("row_name"); | ||
byte[] rowName = rangeResults.next().getBlob("row_name"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getBytes
was deprecated, replaced with getBlob
which looks like it returns empty arrays rather than null if the column doesn't exist.
try (ClosableIterator<AgnosticLightResultRow> rangeResults = table.getRange(range, timestamp, maxRows)) { | ||
while (rows.size() < maxRows && rangeResults.hasNext()) { | ||
byte[] rowName = rangeResults.next().getBytes("row_name"); | ||
byte[] rowName = rangeResults.next().getBlob("row_name"); | ||
if (rowName != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't need this check (I think) since getBlob returns nonnull
} | ||
} | ||
|
||
private FullQuery addOrdering(Optional<Boolean> reverseRows, FullQuery query) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use Optional above in getAllRows
or just a Boolean object here? I'd prefer for both to be consistent, if that's feasible.
Perhaps even better: an enum for ordering - undefined, descending, or ascending.
.getAllRows(rows, columns, timestamp, false, isReverse)) { | ||
ResultWithToken result = new ResultWithToken(); | ||
SetMultimap<Cell, Long> rowBuffer = HashMultimap.create(); | ||
while (iterator.hasNext() && result.size() + rowBuffer.size() < batchSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth a note here that size()
is actually constant time; I was a bit concerned about perf here, but looked it up and seems fine (AbstractMapBasedMultimap
keeps track of its total size)
@@ -62,16 +64,30 @@ public DbReadTable(ConnectionSupplier conns, DbQueryFactory queryFactory) { | |||
|
|||
public ClosableIterator<AgnosticLightResultRow> getAllRows( | |||
Iterable<byte[]> rows, ColumnSelection columns, long ts, boolean includeValues) { | |||
return getAllRows(rows, columns, ts, includeValues, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passing null here is really ugly - as DavidS said, I'd prefer some kind of enum here.
} | ||
} | ||
|
||
private FullQuery addOrdering(Optional<Boolean> reverseRows, FullQuery query) { | ||
String ordering = reverseRows | ||
.map(reverse -> " ORDER BY m.row_name" + (reverse ? " DESC" : " ASC") + ", m.col_name, m.ts") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the space should probably go in the " ORDER BY m.row_name"
substring.
… some tests. Still have to add more tests, do some refactoring, add javadocs, and release notes.
Thanks for the reviews, I have addressed most of it, and added a fix for the wide rows case. There is no hurry to review, as I still have several things to do, so some refactors are expected as well! |
assertThat(results.getNextStartRow().isPresent()).isFalse(); | ||
assertThat(results.getCellsExamined()).isEqualTo(9); | ||
assertThat(results.getCellsDeleted()).isEqualTo(240); | ||
resetGetRangeOfTsMaxBatch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resetGetRangeOfTsMaxBatch
could be done in an @After
method.
resetGetRangeOfTsMaxBatch(); | ||
} | ||
|
||
private void setGetRangeOfTsMaxBatch(long value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate code (here until the end of the file). Would be nice to have this code in one place only, e.g. something like DbKvsTestUtils
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, refactored
@@ -113,8 +114,12 @@ | |||
|
|||
public class DbKvs extends AbstractKeyValueService { | |||
private static final Logger log = LoggerFactory.getLogger(DbKvs.class); | |||
public static final String ROW = "row_name"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Private and public constants should be separate.
Long overflowId = hasOverflow ? row.getLongObject("overflow") : null; | ||
if (overflowId == null) { | ||
Value value = Value.create(row.getBytes("val"), row.getLong("ts")); | ||
Value value = Value.create(row.getBytes("val"), row.getLong(TIMESTAMP)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you consider extracting "val"
, too?
|
||
private static final long GET_RANGE_OF_TS_MAX_BATCH = 1000000L; | ||
private long getRangeOfTsMaxBatch = INITIAL_GET_RANGE_OF_TS_BATCH; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps call this maxRangeOfTimestampsBatchSize
? And "INITIAL" -> "DEFAULT".
* 3. In the same column, with higher or equal timestamp (to repeat the last timestamp for sweep) | ||
*/ | ||
void moveForward(Token oldToken) { | ||
boolean forwarding = oldToken.shouldForward(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps rename forwarding
to movingForward
and shouldForward
to shouldMoveForward
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or skipping / shouldSkip / shouldFinishSkipping.
} | ||
.getAllRows(rows, columns, timestamp, false, DbReadTable.Order.fromBoolean(isReverse))) { | ||
ResultWithToken result = ResultWithToken.create(iterator); | ||
result.moveForward(token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like ResultWithToken must be used in this way (calling create
, moveForward
, getBatchOfTimestamps
, and checkNextEntryAndCreateToken
, in that order), and the object doesn't really make sense until all that has been done.
We could enforce this temporal dependency by moving the other three methods inside create
, which would then take the iterator, the previous token, and the batch size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, makes much more sense
@Override | ||
public boolean hasNext() { | ||
if (delegate.hasNext()) { | ||
if (limit >= totalItems + returnOneIfNextRowIsNew()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about an int-returning method getTotalItems()
that looks like:
if (nextRowIsNew()) {
return totalItems + 1;
} else {
return totalItems;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was so obvious 🦆
|
||
@Override | ||
public void remove() { | ||
// not implemented |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throw NotImplementedException
?
@@ -40,6 +40,10 @@ develop | |||
* - Type | |||
- Change | |||
|
|||
* - |fixed| | |||
- Fixed DbKvs sweep OOM issue `#982 <https://github.com/palantir/atlasdb/issues/982>`__ caused by very wide rows. Now uses cell batch size that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete release note!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few nits left, mostly around TimestampsByCellResultWithToken
.
boolean skipping = oldToken.shouldSkip(); | ||
while (skipping && iterator.hasNext()) { | ||
AgnosticLightResultRow nextResult = iterator.peek(); | ||
if (finishedForwarding(oldToken, nextResult)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, you missed a rename: finishedForwarding
-> finishedSkipping
@@ -44,6 +44,10 @@ develop | |||
- Fixed DbKvs sweep OOM issue `#982 <https://github.com/palantir/atlasdb/issues/982>`__ caused by very wide rows. Now uses cell batch size that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Release note still incomplete.
List<RowResult<Set<Long>>> finalResults = new ArrayList<>(); | ||
cellsByRow.entrySet().forEach(entry -> finalResults.add(RowResult.create(entry.getKey(), entry.getValue()))); | ||
List<RowResult<Set<Long>>> finalResults = cellsByRow.entrySet().stream() | ||
.map(entry -> RowResult.create(entry.getKey(), entry.getValue())).collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super-nit: should have .map
and .collect
on separate lines.
import com.palantir.nexus.db.sql.AgnosticLightResultRow; | ||
|
||
final class TimestampsByCellResultWithToken { | ||
private static final String ROW = "row_name"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are duplicates of DbKvs.ROW
etc.
* 2. In a greater column | ||
* 3. In the same column, with higher or equal timestamp (to repeat the last timestamp for sweep) | ||
*/ | ||
TimestampsByCellResultWithToken moveForward(Token oldToken) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can these methods (all except create
) now be private?
private PeekingIterator<AgnosticLightResultRow> iterator; | ||
|
||
final SetMultimap<Cell, Long> entries; | ||
boolean mayHaveMoreResults = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably expose getters for these and make them private, to avoid modification.
} | ||
|
||
@Override | ||
public void remove() { | ||
// not implemented | ||
throw new RuntimeException("Method remove() is not implemented."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: UnsupportedOperationException
(java.lang) is more descriptive.
@@ -38,10 +38,10 @@ | |||
|
|||
@RunWith(Suite.class) | |||
@SuiteClasses({ | |||
DbkvsPostgresKeyValueServiceTest.class, | |||
DbkvsPostgresSerializableTransactionTest.class, | |||
// DbkvsPostgresKeyValueServiceTest.class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure this is reverted before we merge!
@@ -576,6 +576,27 @@ public void testOom() { | |||
Assert.assertEquals(10000L, results.getCellsDeleted()); | |||
} | |||
|
|||
@Test | |||
public void wideRowTest() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How long does this test take to run?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there. Last remaining things before we merge:
- one explanatory comment
- add ADR
62)) { | ||
EquivalenceCountingIterator<RowResult<Set<Long>>> iterator = | ||
new EquivalenceCountingIterator<>(rowResults, 10, SweepTaskRunnerImpl.sameRowEquivalence()); | ||
assertRowColumnsTimestamps(iterator.next(), 1, ImmutableSet.of(1, 2, 3, 4), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm concerned that this call is hard to read. Might be good to add a comment that the arguments are: result, expected row key, expected columns, expected timestamps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Looks like you have checkstyle errors, and need to update the dependency lock. |
…cide how to deal with rows that are too wide
Goals (and why):
Fix the DbKvs sweep OOM issue that arises from the combination of wide rows and large row batch size.
Implementation Description (bullets):
The OOM was happening because the previous implementation created a large multimap before doing the paging. Now we limit the size of the multimap to a predefined maximum (chosen 1M arbitrarily).
Concerns (what feedback would you like?):
Have to decide what to do with rows that are wider than the current GET_RANGE_OF_TS_MAX_BATCH. Current solution is to skip them; alternatively can just process the first GET_RANGE_OF_TS_MAX_BATCH columns. Cannot figure out how to do two pages on the same row as the second set of timestamps seems to just be ignored.
Where should we start reviewing?:
getTimestampsByCell in DbKvs
Priority (whenever / two weeks / yesterday):
Couple of days, as I still have a bunch of things to do before it is ready.