Skip to content

Commit

Permalink
HOTFIX: Revert "KAFKA-12960: Enforcing strict retention time for Wind…
Browse files Browse the repository at this point in the history
…owStore and Sess… (apache#11211)" (apache#12745) (#57)

This reverts commit 07c1002 which broke trunk.

Reviewers: David Jacot <djacot@confluent.io>, Bill Bejeck <bbejeck@apache.org>

Co-authored-by: Bruno Cadonna <cadonna@apache.org>
  • Loading branch information
rutvijmehta-harness and cadonna committed Feb 9, 2024
1 parent ffe3a70 commit a7974e3
Show file tree
Hide file tree
Showing 19 changed files with 350 additions and 835 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
protected final AbstractSegments<S> segments;
protected final KeySchema baseKeySchema;
protected final Optional<KeySchema> indexKeySchema;
private final long retentionPeriod;


protected ProcessorContext context;
Expand All @@ -67,44 +66,36 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
AbstractDualSchemaRocksDBSegmentedBytesStore(final String name,
final KeySchema baseKeySchema,
final Optional<KeySchema> indexKeySchema,
final AbstractSegments<S> segments,
final long retentionPeriod) {
final AbstractSegments<S> segments) {
this.name = name;
this.baseKeySchema = baseKeySchema;
this.indexKeySchema = indexKeySchema;
this.segments = segments;
this.retentionPeriod = retentionPeriod;
}

@Override
public KeyValueIterator<Bytes, byte[]> all() {

final long actualFrom = getActualFrom(0, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);

final List<S> searchSpace = segments.allSegments(true);
final Bytes from = baseKeySchema.lowerRange(null, actualFrom);
final Bytes from = baseKeySchema.lowerRange(null, 0);
final Bytes to = baseKeySchema.upperRange(null, Long.MAX_VALUE);

return new SegmentIterator<>(
searchSpace.iterator(),
baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, true),
baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, true),
from,
to,
true);
}

@Override
public KeyValueIterator<Bytes, byte[]> backwardAll() {

final long actualFrom = getActualFrom(0, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);

final List<S> searchSpace = segments.allSegments(false);
final Bytes from = baseKeySchema.lowerRange(null, actualFrom);
final Bytes from = baseKeySchema.lowerRange(null, 0);
final Bytes to = baseKeySchema.upperRange(null, Long.MAX_VALUE);

return new SegmentIterator<>(
searchSpace.iterator(),
baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, false),
baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, false),
from,
to,
false);
Expand All @@ -128,15 +119,6 @@ public void remove(final Bytes rawBaseKey) {

abstract protected KeyValue<Bytes, byte[]> getIndexKeyValue(final Bytes baseKey, final byte[] baseValue);

// isTimeFirstWindowSchema true implies ON_WINDOW_CLOSE semantics. There's an edge case
// when retentionPeriod = grace Period. If we add 1, then actualFrom > to which would
// lead to no records being returned.
protected long getActualFrom(final long from, final boolean isTimeFirstWindowSchema) {
return isTimeFirstWindowSchema ? Math.max(from, observedStreamTime - retentionPeriod) :
Math.max(from, observedStreamTime - retentionPeriod + 1);

}

// For testing
void putIndex(final Bytes indexKey, final byte[] value) {
if (!hasIndex()) {
Expand Down Expand Up @@ -209,24 +191,7 @@ public void put(final Bytes rawBaseKey,

@Override
public byte[] get(final Bytes rawKey) {
final long timestampFromRawKey = baseKeySchema.segmentTimestamp(rawKey);
// check if timestamp is expired

if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
if (timestampFromRawKey < observedStreamTime - retentionPeriod) {
LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})",
rawKey.toString(), timestampFromRawKey, observedStreamTime - retentionPeriod);
return null;
}
} else {
if (timestampFromRawKey < observedStreamTime - retentionPeriod + 1) {
LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})",
rawKey.toString(), timestampFromRawKey, observedStreamTime - retentionPeriod + 1);
return null;
}
}

final S segment = segments.getSegmentForTimestamp(timestampFromRawKey);
final S segment = segments.getSegmentForTimestamp(baseKeySchema.segmentTimestamp(rawKey));
if (segment == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
private final String name;
private final AbstractSegments<S> segments;
private final String metricScope;
private final long retentionPeriod;
private final KeySchema keySchema;

private ProcessorContext context;
Expand All @@ -66,12 +65,10 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se

AbstractRocksDBSegmentedBytesStore(final String name,
final String metricScope,
final long retentionPeriod,
final KeySchema keySchema,
final AbstractSegments<S> segments) {
this.name = name;
this.metricScope = metricScope;
this.retentionPeriod = retentionPeriod;
this.keySchema = keySchema;
this.segments = segments;
}
Expand All @@ -94,30 +91,19 @@ KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
final long from,
final long to,
final boolean forward) {
final long actualFrom = getActualFrom(from);
final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to, forward);

if (keySchema instanceof WindowKeySchema && to < actualFrom) {
LOG.debug("Returning no records for key {} as to ({}) < actualFrom ({}) ", key.toString(), to, actualFrom);
return KeyValueIterators.emptyIterator();
}

final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, to, forward);

final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, actualFrom);
final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from);
final Bytes binaryTo = keySchema.upperRangeFixedSize(key, to);

return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(key, key, actualFrom, to, forward),
keySchema.hasNextCondition(key, key, from, to, forward),
binaryFrom,
binaryTo,
forward);
}

private long getActualFrom(final long from) {
return Math.max(from, observedStreamTime - retentionPeriod + 1);
}

@Override
public KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,
final Bytes keyTo,
Expand Down Expand Up @@ -147,48 +133,38 @@ KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,
return KeyValueIterators.emptyIterator();
}

final long actualFrom = getActualFrom(from);
final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to, forward);

if (keySchema instanceof WindowKeySchema && to < actualFrom) {
LOG.debug("Returning no records for keys {}/{} as to ({}) < actualFrom ({}) ", keyFrom, keyTo, to, actualFrom);
return KeyValueIterators.emptyIterator();
}

final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, to, forward);

final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, actualFrom);
final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from);
final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to);

return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(keyFrom, keyTo, actualFrom, to, forward),
keySchema.hasNextCondition(keyFrom, keyTo, from, to, forward),
binaryFrom,
binaryTo,
forward);
}

@Override
public KeyValueIterator<Bytes, byte[]> all() {
final long actualFrom = getActualFrom(0);
final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, Long.MAX_VALUE, true);
final List<S> searchSpace = segments.allSegments(true);

return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, true),
keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, true),
null,
null,
true);
}

@Override
public KeyValueIterator<Bytes, byte[]> backwardAll() {
final long actualFrom = getActualFrom(0);

final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, Long.MAX_VALUE, false);
final List<S> searchSpace = segments.allSegments(false);

return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, false),
keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, false),
null,
null,
false);
Expand All @@ -197,18 +173,11 @@ public KeyValueIterator<Bytes, byte[]> backwardAll() {
@Override
public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
final long actualFrom = getActualFrom(timeFrom);

if (keySchema instanceof WindowKeySchema && timeTo < actualFrom) {
LOG.debug("Returning no records for as timeTo ({}) < actualFrom ({}) ", timeTo, actualFrom);
return KeyValueIterators.emptyIterator();
}

final List<S> searchSpace = segments.segments(actualFrom, timeTo, true);
final List<S> searchSpace = segments.segments(timeFrom, timeTo, true);

return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(null, null, actualFrom, timeTo, true),
keySchema.hasNextCondition(null, null, timeFrom, timeTo, true),
null,
null,
true);
Expand All @@ -217,18 +186,11 @@ public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
@Override
public KeyValueIterator<Bytes, byte[]> backwardFetchAll(final long timeFrom,
final long timeTo) {
final long actualFrom = getActualFrom(timeFrom);

if (keySchema instanceof WindowKeySchema && timeTo < actualFrom) {
LOG.debug("Returning no records for as timeTo ({}) < actualFrom ({}) ", timeTo, actualFrom);
return KeyValueIterators.emptyIterator();
}

final List<S> searchSpace = segments.segments(actualFrom, timeTo, false);
final List<S> searchSpace = segments.segments(timeFrom, timeTo, false);

return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(null, null, actualFrom, timeTo, false),
keySchema.hasNextCondition(null, null, timeFrom, timeTo, false),
null,
null,
false);
Expand Down Expand Up @@ -272,14 +234,7 @@ public void put(final Bytes key,

@Override
public byte[] get(final Bytes key) {
final long timestampFromKey = keySchema.segmentTimestamp(key);
// check if timestamp is expired
if (timestampFromKey < observedStreamTime - retentionPeriod + 1) {
LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})",
key.toString(), timestampFromKey, observedStreamTime - retentionPeriod + 1);
return null;
}
final S segment = segments.getSegmentForTimestamp(timestampFromKey);
final S segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
if (segment == null) {
return null;
}
Expand Down

0 comments on commit a7974e3

Please sign in to comment.