Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PDP 44 - Lightweight Transactions #5072

Closed
shiveshr opened this issue Aug 14, 2020 · 9 comments · Fixed by #5084
Closed

PDP 44 - Lightweight Transactions #5072

shiveshr opened this issue Aug 14, 2020 · 9 comments · Fixed by #5084

Comments

@shiveshr
Copy link
Contributor

shiveshr commented Aug 14, 2020

Problem description
Lightweight transactions proposal as described in https://github.com/pravega/pravega/wiki/PDP-44:-Single-Routing-Key-Transactions.

Problem location
Transactions

Suggestions for an improvement
Implement light weight transaction

@tkaitchuck
Copy link
Member

  • I don't think "lightweight" is the right name. That is not their defining charistic. I think they should be called "Segment transactions" as they are per-segment.
  • The API has a begin and end method calls. We considered this approach when writing TransactionalEventStreamWriter and rejected it because it allowed invalid states to to be represented in the code. IE it was possible to call methods in nonsensical orders. Instead we adopted the convention that creating a transaction returned a new object which had the write method on it. We should adopt the same approach both for the same reason and for consistency with the existing API.
  • The proposal dictates how the events are to be sent over the wire. This is a cross cutting concern and is handled by a lower layer in the client. So to implement this as specified would require re-writing that code or duplicating it into a new stack so as to provide the functionality specified. I don't think this makes sense because it is both complicated and limits future developments in the wire protocol. Instead we should specify that the client writes to the created transaction segment without specifying HOW this is done. That way we can simply re-use the existing classes to do this.
  • The document is not clear on what happens if the stream is scaled when the client has written some but not all of the data.
  • I don't think this functionality should rely on or assume any particular features of how data is batched when sent over the wire.
  • How this interacts with retention needs to be spelled out. Specifically when are aborted transaction deleted and how abandoned transactions are dealt with.

@shiveshr
Copy link
Contributor Author

thanks @tom Kaitchuck

  1. We dont expose segments to the end users, so perhaps "SingleRoutingKeyTransaction" is more appropriate as that matches the user view better.

  2. I have updated the design doc with interfaces that follow the same paradigm as regular transactions.

  3. That is a valid point. To give a bit more detail - what i am doing is trying to abstract out the "automatic" batching in lower layer so that batching can be explicitly governed by top layer if desired. I will try to create common abstractions wherever possible so that it is more managable.

  4. one clarification: this approach does not rely on a separate transaction segment. (that is the approach used in PDP-43 large events).

Here we propose to write data into the original segment itself, but we rely on the append block protocol semantics to achieve it. SetupAppend initiates a new transaction and append block end corresponds to commit.

And it does not rely on automatic batching, instead performs manual batching of its writes.

  1. I will update the document to articulate this better. The flow is similar to what happens to regular writes when they encounter segment sealed. They contact the controller to get the successor segment and send all the inflight events to the successor. The difference here is, since we disable automatic batching, unless we have issued append block end, we dont need to check with the previous segment about the event sequence number.

  2. It disables the batching entirely and manages the batching manually. That is the crux of the proposal.

  3. aborting a transaction simply discards the inflight events and discards its connection.

@andreipaduroiu
Copy link
Member

Here we propose to write data into the original segment itself, but we rely on the append block protocol semantics to achieve it. SetupAppend initiates a new transaction and append block end corresponds to commit.

It is this part that concerns me the most. How much are we going to buffer in the memory of the Segment Store? And for how long? Anything more than 1s will be highly problematic for a number of reasons.

  1. Segment Store memory is a finite resource. A few of these transactions done concurrently can eat away at what little heap and direct memory we have available.
  2. There is a maximum of 512MB outstanding data per process that we will accept across all incoming connections at any given time. That is, the sum of all received append bytes from the client that have not yet been ack-ed back to the client cannot exceed 512MB (and 128MB per individual connection).
    • Just like in the previous item, if we want to buffer data in the segment store memory, that buffer will count against these thresholds which will negatively affect the performance of the entire cluster.

How does your proposed solution handle both of these problems?

@shiveshr
Copy link
Contributor Author

shiveshr commented Aug 19, 2020

@andreipaduroiu
The proposal was as follows:

  1. size: buffer at most 16 mb per transaction.
  2. time: buffer for a duration equivalent to transaction timeout, typically small, O(seconds).

1 is equivalent to the max append batch size so these transactions are equivalent in that sense.
however, 2 can potentially be larger than 1 seconds in this case, esp as the timeout applies for user inactivity (time between start of transaction and commit).

The client only opens a new channel and performs setupAppend but does not send any data to the server until the user issues commit. This will avoid any buffering on server until we have the entire data accumulated on the client and buffering is performed only for the time it takes for all tcp frames for the append block + append block end to arrive. and this buffering would be upto 16 mb as that is the limit we have on the transaction data.

rationale:

  • client anyway needs to buffer the entire transactional data for retransmissions.
  • Events in txn have to become visible only after user issues a commit, so we can send the entire data as part of the commit processing.
  • the limit on data sizes ensures that we do not have huge amounts of data accumulated which could lead to large buffers on the server.

@shiveshr
Copy link
Contributor Author

@tkaitchuck

The API has a begin and end method calls. We considered this approach when writing TransactionalEventStreamWriter and rejected it because it allowed invalid states to to be represented in the code. IE it was possible to call methods in nonsensical orders. Instead we adopted the convention that creating a transaction returned a new object which had the write method on it. We should adopt the same approach both for the same reason and for consistency with the existing API.

Actually the reason we have a begin and end call, and it is necessary too, is because we want to have one open txn per writer. Otherwise, there is a suboptimal case where we may end up creating a new SegmentOutputStream object for each new transaction on the same segment. Which can lead to proliferation of writerIds in the segment attributes.

That is why it makes more sense to have a single active transaction per writer which would have begin and end states. And will need the state handling as was originally proposed. I will revert the changes in the document to reflect it.

@andreipaduroiu
Copy link
Member

The client only opens a new channel and performs setupAppend but does not send any data to the server until the user issues commit

This is a big departure from our current API and what we guarantee through it. With the current API, once the writeEvent completes, we guarantee that the Event has been durably persisted. We cannot use the same API; we'd need something different where we clearly state this behavior.

@shiveshr
Copy link
Contributor Author

This is a big departure from our current API and what we guarantee through it. With the current API, once the writeEvent completes, we guarantee that the Event has been durably persisted. We cannot use the same API; we'd need something different where we clearly state this behavior.

that is why this is exposed via the singleRoutingKey transaction object, which provides a transaction like semantis of atomically including all events. Until user explicitly states that all events should be included, we do not write the message.

@shiveshr
Copy link
Contributor Author

shiveshr commented Aug 20, 2020

following is the draft PR (#5084) which demonstrates the approach and APIs.
Basically the trick is all in this file with this new method

    public static PendingEvent withHeader(@NonNull String routingKey, @NonNull List<ByteBuffer> batch, @NonNull CompletableFuture<Void> ackFuture) {
        Preconditions.checkArgument(!batch.isEmpty(), "Batch cannot be empty");
        ByteBuf batchBuff = Unpooled.EMPTY_BUFFER;
        for (int i = 0; i < batch.size(); i++) {
            ByteBuf eventBuf = getByteBuf(batch.get(i));
            batchBuff = Unpooled.wrappedUnmodifiableBuffer(batchBuff, eventBuf);
        }
        Preconditions.checkArgument(batchBuff.readableBytes() <= MAX_WRITE_SIZE, "Batch size too large: %s", batchBuff.readableBytes());

        return new PendingEvent(routingKey, batchBuff, batch.size(), ackFuture);
    }

This allows us to write a batch of events. And we impose a limit on the batch size. So all these events will be guaranteed to be sent in a single append block (note: with automatic batching there potentially be additional events batched with our "transaction" if its on the same writer. but all "transaction" events will be written atomically).

So the above is used in SegmentOutputStream writer to not just write a single event but possibly a batch of events.

and we can expose it in two separate ways:

  1. EventStreamWriterImpl api and implementation

  2. Single Routing key transaction (with timeout/abort like semantics)
    writer and transaction and approximate implementation

@shiveshr
Copy link
Contributor Author

And this is the usage example:

        EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, clientConfig);
        JavaSerializer<String> serializer = new JavaSerializer<>();

        // region event writer
        EventStreamWriter<String> test = clientFactory.createEventWriter(
                "test", serializer, EventWriterConfig.builder().build());

        List<String> batch = Lists.newArrayList("a", "b", "c");
        test.writeEvents("routingkey", batch).get();
       // endregion

        // region txn writer
        SingleRoutingKeyTransactionWriter<String> txnWriter = clientFactory.createSingleRoutingKeyWriter(stream, "routingkey", serializer);
        SingleRoutingKeyTransaction<String> txn = txnWriter.beginTxn();
        txn.writeEvent("a");
        txn.writeEvent("b");
        txn.writeEvent("c");
        txn.commit();
        // endregion

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants