Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix transaction visibility problem #3036

Merged
merged 3 commits into from Nov 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/v/kafka/server/handlers/fetch.cc
Expand Up @@ -201,6 +201,10 @@ static ss::future<read_result> do_read_from_ntp(
ntp_config.cfg.isolation_level
== model::isolation_level::read_committed) {
ntp_config.cfg.max_offset = kafka_partition->last_stable_offset();
if (ntp_config.cfg.max_offset > model::offset{0}) {
rystsov marked this conversation as resolved.
Show resolved Hide resolved
ntp_config.cfg.max_offset = ntp_config.cfg.max_offset
- model::offset{1};
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/java/tx-verifier/pom.xml
Expand Up @@ -18,7 +18,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
<version>3.0.0</version>
</dependency>
</dependencies>
<build>
Expand Down
@@ -0,0 +1,289 @@
package io.vectorized.kafka;

import java.io.*;
import java.lang.Thread;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTxnStateException;

public class TxReadsWritesTest {
private Properties tx_producer_properties;
private Properties tx_consumer_properties;
private String topic;

private volatile String error;
private volatile long last_known_offset;
private volatile boolean has_errors = false;
private volatile boolean has_finished = false;
private volatile long finished_at_ms = 0;

public TxReadsWritesTest(String connection, String topic) {
this.topic = topic;

this.tx_producer_properties = new Properties();
this.tx_producer_properties.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, connection);
this.tx_producer_properties.put(ProducerConfig.ACKS_CONFIG, "all");
this.tx_producer_properties.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
this.tx_producer_properties.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
this.tx_producer_properties.put(
ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 10000);
this.tx_producer_properties.put(
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000);
this.tx_producer_properties.put(ProducerConfig.LINGER_MS_CONFIG, 0);
this.tx_producer_properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
this.tx_producer_properties.put(
ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000);
this.tx_producer_properties.put(
ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 50);
this.tx_producer_properties.put(
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
this.tx_producer_properties.put(
ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
this.tx_producer_properties.put(
ProducerConfig.METADATA_MAX_AGE_CONFIG, 10000);
this.tx_producer_properties.put(
ProducerConfig.METADATA_MAX_IDLE_CONFIG, 10000);
this.tx_producer_properties.put(ProducerConfig.RETRIES_CONFIG, 5);
this.tx_producer_properties.put(
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
this.tx_producer_properties.put(
ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-id-1");

this.tx_consumer_properties = new Properties();
this.tx_consumer_properties.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, connection);
this.tx_consumer_properties.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
this.tx_consumer_properties.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
this.tx_consumer_properties.put(
ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
this.tx_consumer_properties.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
this.tx_consumer_properties.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
this.tx_consumer_properties.put(
ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 10000);
this.tx_consumer_properties.put(
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10000);
this.tx_consumer_properties.put(
ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
this.tx_consumer_properties.put(
ConsumerConfig.METADATA_MAX_AGE_CONFIG, 10000);
this.tx_consumer_properties.put(
ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000);
this.tx_consumer_properties.put(
ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 50);
this.tx_consumer_properties.put(
ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
this.tx_consumer_properties.put(
ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
}

public void run() throws Exception {
Producer<String, String> producer = null;
long first_offset = -1;
try {
producer = new KafkaProducer<>(this.tx_producer_properties);
producer.initTransactions();
producer.beginTransaction();
first_offset = producer
.send(new ProducerRecord<String, String>(
topic, "tx:0", "commit:record:0"))
.get()
.offset();
producer.commitTransaction();
} finally {
if (producer != null) {
try {
producer.close();
} catch (Exception e) {
}
}
}

last_known_offset = -1;

final long offset = first_offset;

var reader = new Thread(() -> {
try {
readProcess(offset);
} catch (Exception e) {
e.printStackTrace();
error = e.getMessage();
has_errors = true;
}
});
reader.start();

var writer = new Thread(() -> {
try {
writeProcess();
} catch (Exception e) {
e.printStackTrace();
error = e.getMessage();
has_errors = true;
}
});
writer.start();

writer.join();
reader.join();

if (has_errors) {
throw new Exception(error);
}
}

private void readProcess(long first_offset) throws Exception {
var tp = new TopicPartition(topic, 0);
var tps = Collections.singletonList(tp);

KafkaConsumer<String, String> consumer = null;
try {
consumer = new KafkaConsumer<>(tx_consumer_properties);
consumer.assign(tps);
consumer.seek(tp, first_offset);

var is_active = true;
var last_observed_offset = -1L;

while (is_active) {
if (has_errors) {
break;
}

if (has_finished) {
if (last_observed_offset == last_known_offset) {
break;
}

if (System.currentTimeMillis() - finished_at_ms > 10000) {
error = "can't catchup with offset: " + last_known_offset
+ " last observed offset: " + last_observed_offset;
has_errors = true;
break;
}
}

ConsumerRecords<String, String> records
= consumer.poll(Duration.ofMillis(10000));
var it = records.iterator();
while (it.hasNext()) {
var record = it.next();

last_observed_offset = record.offset();
String key = record.key();
String value = record.value();

if (value.startsWith("commit")) {
continue;
}

error = "observed a record of an aborted transaction: " + key + "="
+ value + "@" + last_observed_offset;
has_errors = true;
break;
}
}
} catch (Exception e) {
e.printStackTrace();
error = e.getMessage();
has_errors = true;
} finally {
if (consumer != null) {
try {
consumer.close();
} catch (Exception e) {
}
}
}
}

private void writeProcess() throws Exception {
Producer<String, String> producer
= new KafkaProducer<>(tx_producer_properties);
producer.initTransactions();

for (int j = 0; j < 4; j++) {
try {
long last_offset = -1;
producer.beginTransaction();
var pending = new ArrayList<Future<RecordMetadata>>();
for (int i = 0; i < 10; i++) {
pending.add(producer.send(new ProducerRecord<String, String>(
topic, "tx:" + j, "abort:record:" + i)));
}
for (int i = 0; i < pending.size(); i++) {
pending.get(i).get();
}
producer.abortTransaction();
} catch (Exception e1) {
try {
producer.close();
} catch (Exception e2) {
}

if (e1 instanceof InvalidTxnStateException
|| (e1.getCause() != null
&& e1.getCause() instanceof InvalidTxnStateException)) {
producer = new KafkaProducer<>(tx_producer_properties);
producer.initTransactions();
continue;
}

e1.printStackTrace();
error = e1.getMessage();
has_errors = true;
break;
}
}

if (!has_errors) {
try {
producer.beginTransaction();
last_known_offset = producer
.send(new ProducerRecord<String, String>(
topic, "tx:" + 5, "commit:record:0"))
.get()
.offset();
producer.commitTransaction();
} catch (Exception e) {
e.printStackTrace();
error = e.getMessage();
has_errors = true;
}
}

try {
producer.close();
} catch (Exception e1) {
}

if (!has_errors) {
finished_at_ms = System.currentTimeMillis();
has_finished = true;
}
}
}
Expand Up @@ -46,7 +46,8 @@ public static interface StringAction {
"read-uncommitted-seek-reads-ongoing-tx",
Verifier::readUncommittedSeekDoesntRespectOngoingTx),
entry("set-group-start-offset", Verifier::setGroupStartOffsetPasses),
entry("read-process-write", Verifier::readProcessWrite));
entry("read-process-write", Verifier::readProcessWrite),
entry("concurrent-reads-writes", Verifier::txReadsWritesPasses));

public static void main(final String[] args) throws Exception {
if (args.length != 2) {
Expand Down Expand Up @@ -98,6 +99,11 @@ static void retry(String name, StringAction action, String connection)
}
}

static void txReadsWritesPasses(String connection) throws Exception {
var test = new TxReadsWritesTest(connection, topic1);
test.run();
}

static void initPasses(String connection) throws Exception {
initPasses(connection, txId1);
}
Expand Down