Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement VersionedKeyValueStore<K, V> #21

Closed
hartmut-co-uk opened this issue Jul 8, 2023 · 3 comments
Closed

Implement VersionedKeyValueStore<K, V> #21

hartmut-co-uk opened this issue Jul 8, 2023 · 3 comments
Assignees
Labels
documentation Improvements or additions to documentation enhancement New feature or request

Comments

@hartmut-co-uk
Copy link
Collaborator

hartmut-co-uk commented Jul 8, 2023

Support VersionedKeyValueStore<K, V>

Shipped with Kafka 3.5.0.
ref KIP-889: Versioned State Stores

Talk at Kafka Summit London 2023:
https://www.confluent.io/events/kafka-summit-london-2023/versioned-state-stores-in-kafka-streams/

Motivation

(quoted from KIP-889)

The state stores used by Kafka Streams today maintain only the latest value associated with each key. This prevents Kafka Streams from providing proper temporal join semantics for stream-tables joins.

Consider the following example of A join B, where A is a stream and B is a table, and records (for a particular key) arrive in the following order:

                    B: (time=0, v=b0)
A: (time=1, v=a1)                       --> output 1: (a1, b0)
                    B: (time=3, v=b3)
A: (time=4, v=a4)                       --> output 2: (a4, b3)
A: (time=2, v=a2)                       --> output 3: (a2, b3) should be (a2, b0) instead

Note that the last record (a2) is out-of-order by timestamp (relative to the other messages in stream A). The first two join result records are as expected: the a1 record is joined against b0, and the a4 record is joined against b3. However, when the out-of-order a2 record arrives, it should be joined against the b0 record since that's the latest record on the B-side as of its record timestamp time=2 but this is not what happens. The state store for table B has already been updated to contain b3 and the b0 record was replaced in the process. As a result, the only record available in the store for the a2 record to join against is the b3 record, which is why Kafka Streams outputs the join result (a2, b3) today.

For an example stream-table join application where proper temporal join semantics is critical, imagine that B is table of real-time currency conversion rates and A is a stream of transactions. It's important that each transaction in A is joined with the conversion rate at the time that the transaction was issued, not the latest conversion rate seen so far.

A similar temporal semantic gap exists with table-table foreign key joins today as well. The subscription store may not reflect an accurate set of foreign key records at the time of the join record's timestamp, resulting in incorrect join results.

To address this class of gaps in Kafka Streams stateful processing semantics, this KIP proposes to introduce versioned state stores. A versioned state store will track multiple record versions for the same key, rather than the single latest record per key as is the case for existing stores today.

Versioned Store Interface

package org.apache.kafka.streams.state;

import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;

/**
 * A key-value store that stores multiple record versions per key, and supports timestamp-based
 * retrieval operations to return the latest record (per key) as of a specified timestamp.
 * Only one record is stored per key and timestamp, i.e., a second call to
 * {@link #put(Object, Object, long)} with the same key and timestamp will replace the first.
 * <p>
 * Each store instance has an associated, fixed-duration "history retention" which specifies
 * how long old record versions should be kept for. In particular, a versioned store guarantees
 * to return accurate results for calls to {@link #get(Object, long)} where the provided timestamp
 * bound is within history retention of the current observed stream time. (Queries with timestamp
 * bound older than the specified history retention are considered invalid.)
 * <p>
 * The store's "history retention" also doubles as its "grace period," which determines how far
 * back in time writes to the store will be accepted. A versioned store will not accept writes
 * (inserts, updates, or deletions) if the timestamp associated with the write is older than the
 * current observed stream time by more than the grace period.
 *
 * @param <K> The key type
 * @param <V> The value type
 */
public interface VersionedKeyValueStore<K, V> extends StateStore {

    long PUT_RETURN_CODE_VALID_TO_UNDEFINED = -1L;
    long PUT_RETURN_CODE_NOT_PUT = Long.MIN_VALUE;

    /**
     * Add a new record version associated with the specified key and timestamp.
     * <p>
     * If the timestamp associated with the new record version is older than the store's
     * grace period (i.e., history retention) relative to the current observed stream time,
     * then the record will not be added.
     *
     * @param key       The key
     * @param value     The value, it can be {@code null}. {@code null} is interpreted as a delete.
     * @param timestamp The timestamp for this record version
     * @return The validTo timestamp of the newly put record. Two special values, {@code -1} and
     *         {@code Long.MIN_VALUE} carry specific meanings. {@code -1} indicates that the
     *         record that was put is the latest record version for its key, and therefore the
     *         validTo timestamp is undefined. {@code Long.MIN_VALUE} indicates that the record
     *         was not put, due to grace period having been exceeded.
     * @throws NullPointerException If {@code null} is used for key.
     * @throws InvalidStateStoreException if the store is not initialized
     */
    long put(K key, V value, long timestamp);

    /**
     * Delete the value associated with this key from the store, at the specified timestamp
     * (if there is such a value), and return the deleted value.
     * <p>
     * If the timestamp associated with this deletion is older than the store's grace period
     * (i.e., history retention) relative to the current observed stream time, then the deletion
     * will not be performed and {@code null} will be returned.
     * <p>
     * As a consequence of the above, the way to delete a record version is <it>not</it>
     * to first call {@link #get(Object) #get(key)} or {@link #get(Object, long) #get(key, timestamp)}
     * and use the returned {@link VersionedRecord#timestamp()} in a call to this
     * {@code delete(key, timestamp)} method, as the returned timestamp may be older than
     * the store's grace period (i.e., history retention) and will therefore not take place.
     * Instead, you should pass a business logic inferred timestamp that specifies when
     * the delete actually happens. For example, it could be the timestamp of the currently
     * processed input record or the current stream time.
     * <p>
     * This operation is semantically equivalent to {@link #get(Object, long) #get(key, timestamp)}
     * followed by {@link #put(Object, Object, long) #put(key, null, timestamp)}, with
     * a caveat that if the deletion timestamp is older than the store's grace period
     * (i.e., history retention) then the return value is always {@code null}, regardless
     * of what {@link #get(Object, long) #get(key, timestamp)} would return.
     *
     * @param key       The key
     * @param timestamp The timestamp for this delete
     * @return The value and timestamp of the record associated with this key as of
     *         the deletion timestamp (inclusive), or {@code null} if no such record exists
     *         (including if the deletion timestamp is older than this store's history
     *         retention time, i.e., the store no longer contains data for the provided
     *         timestamp). Note that the record timestamp {@code r.timestamp()} of the
     *         returned {@link VersionedRecord} may be smaller than the provided deletion
     *         timestamp.
     * @throws NullPointerException If {@code null} is used for key.
     * @throws InvalidStateStoreException if the store is not initialized
     */
    VersionedRecord<V> delete(K key, long timestamp);

    /**
     * Get the current (i.e., latest by timestamp) record associated with this key.
     *
     * @param key The key to fetch
     * @return The value and timestamp of the current record associated with this key, or
     *         {@code null} if there is no current record for this key.
     * @throws NullPointerException       If null is used for key.
     * @throws InvalidStateStoreException if the store is not initialized
     */
    VersionedRecord<V> get(K key);

    /**
     * Get the record associated with this key as of the specified timestamp (i.e.,
     * the existing record with the largest timestamp not exceeding the provided
     * timestamp bound).
     *
     * @param key           The key to fetch
     * @param asOfTimestamp The timestamp bound. This bound is inclusive; if a record
     *                      (for the specified key) exists with this timestamp, then
     *                      this is the record that will be returned.
     * @return The value and timestamp of the record associated with this key
     *         as of the provided timestamp, or {@code null} if no such record exists
     *         (including if the provided timestamp bound is older than this store's history
     *         retention time, i.e., the store no longer contains data for the provided
     *         timestamp). Note that the record timestamp {@code r.timestamp()} of the
     *         returned {@link VersionedRecord} may be smaller than the provided timestamp
     *         bound. Additionally, if the latest record version for the key is eligible
     *         for the provided timestamp bound, then that record will be returned even if
     *         the timestamp bound is older than the store's history retention.
     * @throws NullPointerException       If null is used for key.
     * @throws InvalidStateStoreException if the store is not initialized
     */
    VersionedRecord<V> get(K key, long asOfTimestamp);
}
@hartmut-co-uk hartmut-co-uk added documentation Improvements or additions to documentation enhancement New feature or request labels Jul 9, 2023
@hartmut-co-uk hartmut-co-uk self-assigned this Jul 9, 2023
@hartmut-co-uk
Copy link
Collaborator Author

Initial design...

Draft table schema for keyValueStore

CREATE TABLE IF NOT EXISTS word_count_kstreams_store (
    partition int,
    key blob,
    validFrom timestamp,
    validTo timestamp,
    time timestamp,
    value blob,
    PRIMARY KEY ((partition), key, validTo)
) WITH CLUSTERING ORDER BY (key ASC, validTo DESC) AND compaction = { 'class' : 'LeveledCompactionStrategy' }

Draft table schema for globalKeyValueStore

Using defaults, for a state store named "clicks-global" the following CQL Schema applies:

CREATE TABLE IF NOT EXISTS clicks_global_kstreams_store (
    key blob,
    validFrom timestamp,
    validTo timestamp,
    time timestamp,
    value blob,
    PRIMARY KEY ((key), validTo)
) WITH CLUSTERING ORDER BY (validTo DESC) AND compaction = { 'class' : 'LeveledCompactionStrategy' }

Implementation details & query patterns

follow overall KIP-889 impl. -> use validTo=-1 for 'latest' value (Cassandra PK cols cannot be null)

get:

  • query latest, check if 'validFrom' satisfies 'asOfTimestamp'
    SELECT * FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('a') AND validTo=-1;
  • if not, query top1 latest
    SELECT * FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('a') AND validTo<=123 ORDER BY validTo DESC LIMIT 1;
  • return as per method def

put:

  • validate <> RETENTION_TIME
  • get (returns latest, or versioned)
  • batch:
    • upsert previous
    • upsert new
    • (?) delete all older than retention time
      (performance impact? Do separately, fire-and-forget?)
      DELETE FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('a') AND validTo >=0 AND validTo<123;
  • return as per method def

delete:

  • validate <> RETENTION_TIME
  • get (returns latest, or versioned)
  • when found, get previous
  • batch:
    • delete current
    • upsert previous
  • return deleted, when found, as per method def

@hartmut-co-uk
Copy link
Collaborator Author

hartmut-co-uk commented Aug 5, 2023

Here's the scratch file from the live coding example (https://youtu.be/zuMvGdmRqfs)

CREATE KEYSPACE IF NOT EXISTS "test" WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};

USE test;

CREATE TABLE IF NOT EXISTS word_count_kstreams_store
(
    partition int,
    key       blob,
    validFrom timestamp,
    validTo   timestamp,
    time      timestamp,
    value     blob,
    PRIMARY KEY ((partition), key, validTo)
) WITH compaction = { 'class' : 'LeveledCompactionStrategy' };
-- ) WITH CLUSTERING ORDER BY (key ASC, validTo ASC) AND compaction = { 'class' : 'LeveledCompactionStrategy' };
-- DONE: TBC should clustering order use `validTo ASC`? -> decision: use ASC, since that's what we need to use for our current known data access patterns


CREATE TABLE IF NOT EXISTS clicks_global_kstreams_store
(
    key   blob,
    validFrom timestamp,
    validTo   timestamp,
    time  timestamp,
    value blob,
    PRIMARY KEY ((key), validTo)
) WITH compaction = { 'class' : 'LeveledCompactionStrategy' };
-- ) WITH CLUSTERING ORDER BY (validTo ASC) AND compaction = { 'class' : 'LeveledCompactionStrategy' };
-- DONE: TBC should clustering order use `validTo ASC`? -> decision: use ASC, since that's what we need to use for our current known data access patterns


SELECT * FROM word_count_kstreams_store;

INSERT INTO word_count_kstreams_store (partition, key, validFrom, validTo, time, value) VALUES (0, textAsBlob('curry'), 0, 4000, 4000, intAsBlob(0));
INSERT INTO word_count_kstreams_store (partition, key, validFrom, validTo, time, value) VALUES (0, textAsBlob('curry'), 4000, 8000, 8000, intAsBlob(4));
INSERT INTO word_count_kstreams_store (partition, key, validFrom, validTo, time, value) VALUES (0, textAsBlob('curry'), 8000, 10000, 10000, intAsBlob(8));
INSERT INTO word_count_kstreams_store (partition, key, validFrom, validTo, time, value) VALUES (0, textAsBlob('curry'), 10000, -1, 60000, intAsBlob(10));

SELECT partition, blobAsText(key), blobAsInt(value), validFrom, validTo, time FROM word_count_kstreams_store;

SELECT partition, blobAsText(key), blobAsInt(value), validFrom, validTo, time FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo=-1;
-- current is validFrom 10000

-- WRONG: SELECT partition, blobAsText(key), blobAsInt(value), validFrom, validTo, time FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo<=5000 AND validTo>=0;

-- note: `validTo<=` is unnecessary, see following line .. SELECT partition, blobAsText(key), blobAsInt(value), validFrom, validTo, time FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo<=10000 AND validTo>=5000 ORDER BY validTo ASC LIMIT 1;
SELECT partition, blobAsText(key), blobAsInt(value), validFrom, validTo, time FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo>=5000 ORDER BY validTo ASC LIMIT 1;




-- new current -> insert at time 15, price 15$, at stream time 15
-- step 1: validate time is within the retention window (30s) to current stream time 15 - true
-- step 2: get current value
SELECT partition, blobAsText(key), blobAsInt(value), validFrom, validTo, time FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo=-1;
-- current is validFrom 10000
-- step 3: new record is after current - true
-- step 4: (logical) update prev. record & insert new record
--         (actual cql) upsert current (update value+validFrom) - insert the prev. record as new row
BEGIN BATCH
INSERT INTO word_count_kstreams_store (partition, key, validFrom, validTo, time, value) VALUES (0, textAsBlob('curry'), 15000, -1, 15000, intAsBlob(15));
INSERT INTO word_count_kstreams_store (partition, key, validFrom, validTo, time, value) VALUES (0, textAsBlob('curry'), 10000, 15000, 10000, intAsBlob(10));
APPLY BATCH;



-- new late record -> insert at time 12, price 12$, at stream time 16
-- step 1: validate time is within the retention window (30s) to current stream time 16 - true
-- step 2: get current value
SELECT partition, blobAsText(key), blobAsInt(value), validFrom, validTo, time FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo=-1;
-- current is validFrom 15000
-- step 3: new record is after current - false
-- step 4: fetch record `asOfTimestamp` 12
SELECT partition, blobAsText(key), blobAsInt(value), validFrom, validTo, time FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo>=12000 ORDER BY validTo ASC LIMIT 1;
-- record 10$ valid 10...15
-- step 5: update prev. record & insert new record
BEGIN BATCH
INSERT INTO word_count_kstreams_store (partition, key, validFrom, validTo, time, value) VALUES (0, textAsBlob('curry'), 10000, 12000, 10000, intAsBlob(10));
INSERT INTO word_count_kstreams_store (partition, key, validFrom, validTo, time, value) VALUES (0, textAsBlob('curry'), 12000, 15000, 16000, intAsBlob(12));
APPLY BATCH;


-- test... asOfTime 13
SELECT partition, blobAsText(key), blobAsInt(value), validFrom, validTo, time FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo>=13000 ORDER BY validTo ASC LIMIT 1;
-- assert 12$


-- delete value at time 13, at stream time 17
-- step 1: validate time is within the retention window (30s) to current stream time 17 - true
-- step 2: fetch current record and check if validFrom<=13
SELECT partition, blobAsText(key), blobAsInt(value), validFrom, validTo, time FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo=-1;
-- current is validFrom 15000
-- step 3: fetch record `asOfTimestamp` 13
SELECT partition, blobAsText(key), blobAsInt(value), validFrom, validTo, time FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo>=13000 ORDER BY validTo ASC LIMIT 1;
-- record 12$ valid 12...15
-- step 4: fetch previous record `asOfTimestamp` 12 (the `validFrom` of the record)
SELECT partition, blobAsText(key), blobAsInt(value), validFrom, validTo, time FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo>=12000 ORDER BY validTo ASC LIMIT 1;
-- record 10$ valid 10...12
-- step 5: delete versioned record, delete prev. versioned record, insert 'new previous'
BEGIN BATCH
-- not required, cause we're going to upsert anyway... DELETE FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo=15000;
DELETE FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo=12000;
INSERT INTO word_count_kstreams_store (partition, key, validFrom, validTo, time, value) VALUES (0, textAsBlob('curry'), 10000, 15000, 10000, intAsBlob(10));
APPLY BATCH;

-- 1691235573402988

-- test... asOfTime 13
SELECT partition, blobAsText(key), blobAsInt(value), validFrom, validTo, time FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo>=13000 ORDER BY validTo ASC LIMIT 1;
-- assert: 10$




-- all versioned records for key
SELECT partition, blobAsText(key), blobAsInt(value), validFrom, validTo, time FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry');
-- all versioned records for key, before validTo 6
SELECT partition, blobAsText(key), blobAsInt(value), validFrom, validTo, time FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo<=6000 AND validTo>=0;


-- cleanup in reg. to retention time
-- step 1: apply range delete
DELETE FROM word_count_kstreams_store WHERE partition=0 AND key=textAsBlob('curry') AND validTo<=9000 AND validTo>=0;






-- test example from Kafka Summit London 2023 Talk (slide 13) with versioned global store
SELECT * FROM clicks_global_kstreams_store;

-- inserts
INSERT INTO clicks_global_kstreams_store (key, validFrom, validTo, time, value) VALUES (textAsBlob('curry'), 0, 4000, 4000, intAsBlob(8));
INSERT INTO clicks_global_kstreams_store (key, validFrom, validTo, time, value) VALUES (textAsBlob('curry'), 4000, 17000, 8000, intAsBlob(10));
INSERT INTO clicks_global_kstreams_store (key, validFrom, validTo, time, value) VALUES (textAsBlob('curry'), 17000, 38000, 10000, intAsBlob(12));
INSERT INTO clicks_global_kstreams_store (key, validFrom, validTo, time, value) VALUES (textAsBlob('curry'), 38000, 53000, 10000, intAsBlob(11));
INSERT INTO clicks_global_kstreams_store (key, validFrom, validTo, time, value) VALUES (textAsBlob('curry'), 53000, -1, 60000, intAsBlob(9));

-- 30s retention time 'cleanup' applied at time '53
DELETE FROM clicks_global_kstreams_store WHERE key=textAsBlob('curry') AND validTo<=23000 AND validTo>=0;

-- select examples
SELECT blobAsText(key), blobAsInt(value), validFrom, validTo FROM clicks_global_kstreams_store WHERE key=textAsBlob('curry');
SELECT blobAsText(key), blobAsInt(value), validFrom, validTo FROM clicks_global_kstreams_store WHERE key=textAsBlob('curry') AND validTo=-1;
SELECT blobAsText(key), blobAsInt(value), validFrom, validTo FROM clicks_global_kstreams_store WHERE key=textAsBlob('curry') AND validTo>=50000 ORDER BY validTo ASC LIMIT 1;
SELECT blobAsText(key), blobAsInt(value), validFrom, validTo FROM clicks_global_kstreams_store WHERE key=textAsBlob('curry') AND validTo>=35000 ORDER BY validTo ASC LIMIT 1;
SELECT blobAsText(key), blobAsInt(value), validFrom, validTo FROM clicks_global_kstreams_store WHERE key=textAsBlob('curry') AND validTo>=30000 ORDER BY validTo ASC LIMIT 1;

hartmut-co-uk added a commit that referenced this issue Aug 13, 2023
- (for versioned store)
- covers edge cases
@hartmut-co-uk
Copy link
Collaborator Author

completed with #27

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant