Skip to content

Commit

Permalink
Extract TestingKafka::sendMessages
Browse files Browse the repository at this point in the history
  • Loading branch information
kokosing committed Jan 5, 2021
1 parent 0e114d5 commit 1b671df
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 181 deletions.
6 changes: 6 additions & 0 deletions plugin/trino-kafka/pom.xml
Expand Up @@ -63,6 +63,12 @@
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Expand Up @@ -14,7 +14,6 @@
package io.trino.plugin.kafka;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import io.trino.Session;
import io.trino.execution.QueryInfo;
import io.trino.spi.connector.SchemaTableName;
Expand All @@ -24,7 +23,6 @@
import io.trino.testing.QueryRunner;
import io.trino.testing.ResultWithQueryId;
import io.trino.testing.kafka.BasicTestingKafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.testng.annotations.Test;
Expand All @@ -34,15 +32,15 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static io.trino.plugin.kafka.util.TestUtils.createEmptyTopicDescription;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
public class TestKafkaIntegrationPushDown
Expand Down Expand Up @@ -89,7 +87,6 @@ protected QueryRunner createQueryRunner()

@Test
public void testPartitionPushDown()
throws ExecutionException, InterruptedException
{
createMessages(topicNamePartition);
String sql = format("SELECT count(*) FROM default.%s WHERE _partition_id=1", topicNamePartition);
Expand All @@ -100,7 +97,6 @@ public void testPartitionPushDown()

@Test
public void testOffsetPushDown()
throws ExecutionException, InterruptedException
{
createMessages(topicNameOffset);
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
Expand Down Expand Up @@ -176,51 +172,37 @@ private RecordMessage createTimestampTestMessages(String topicName)
{
String startTime = null;
String endTime = null;
Future<RecordMetadata> lastSendFuture = Futures.immediateFuture(null);
long lastTimeStamp = -1;
// Avoid last test case has impact on this test case when invocationCount of @Test enabled
Thread.sleep(100);
try (KafkaProducer<Long, Object> producer = testingKafka.createProducer()) {
for (long messageNum = 0; messageNum < MESSAGE_NUM; messageNum++) {
long key = messageNum;
long value = messageNum;
lastSendFuture = producer.send(new ProducerRecord<>(topicName, key, value));
// Record timestamp to build expected timestamp
if (messageNum < TIMESTAMP_TEST_COUNT) {
RecordMetadata r = lastSendFuture.get();
assertTrue(lastTimeStamp != r.timestamp());
lastTimeStamp = r.timestamp();
if (messageNum == TIMESTAMP_TEST_START_INDEX) {
startTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(r.timestamp()), ZoneId.of("UTC")));
}
else if (messageNum == TIMESTAMP_TEST_END_INDEX) {
endTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(r.timestamp()), ZoneId.of("UTC")));
}
// Sleep for a while to ensure different timestamps for different messages..
Thread.sleep(100);
}
for (int messageNum = 0; messageNum < TIMESTAMP_TEST_COUNT; messageNum++) {
long key = messageNum;
long value = messageNum;
RecordMetadata recordMetadata = testingKafka.sendMessages(Stream.of(new ProducerRecord<>(topicName, key, value)));
if (messageNum == TIMESTAMP_TEST_START_INDEX) {
startTime = getTimestamp(recordMetadata);
}
else if (messageNum == TIMESTAMP_TEST_END_INDEX) {
endTime = getTimestamp(recordMetadata);
}

// Sleep for a while to ensure different timestamps for different messages..
Thread.sleep(100);
}
lastSendFuture.get();
requireNonNull(startTime, "startTime result is none");
requireNonNull(endTime, "endTime result is none");
testingKafka.sendMessages(
LongStream.range(TIMESTAMP_TEST_COUNT, MESSAGE_NUM)
.mapToObj(id -> new ProducerRecord<>(topicName, id, id)));
return new RecordMessage(startTime, endTime);
}

private static String getTimestamp(RecordMetadata recordMetadata)
{
return DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(recordMetadata.timestamp()), ZoneId.of("UTC")));
}

private void createMessages(String topicName)
throws ExecutionException, InterruptedException
{
Future<RecordMetadata> lastSendFuture = Futures.immediateFuture(null);
try (KafkaProducer<Long, Object> producer = testingKafka.createProducer()) {
for (long messageNum = 0; messageNum < MESSAGE_NUM; messageNum++) {
long key = messageNum;
long value = messageNum;
lastSendFuture = producer.send(new ProducerRecord<>(topicName, key, value));
}
}
lastSendFuture.get();
testingKafka.sendMessages(
IntStream.range(0, MESSAGE_NUM)
.mapToObj(id -> new ProducerRecord<>(topicName, (long) id, (long) id)));
}

private static class RecordMessage
Expand All @@ -230,8 +212,8 @@ private static class RecordMessage

public RecordMessage(String startTime, String endTime)
{
this.startTime = startTime;
this.endTime = endTime;
this.startTime = requireNonNull(startTime, "startTime result is none");
this.endTime = requireNonNull(endTime, "endTime result is none");
}

public String getStartTime()
Expand Down
Expand Up @@ -21,18 +21,19 @@
import io.trino.testing.QueryRunner;
import io.trino.testing.kafka.BasicTestingKafka;
import io.trino.tpch.TpchTable;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import javax.annotation.Nullable;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -152,44 +153,44 @@ public void testColumnReferencedTwice()

private void insertData(String topic, byte[] data)
{
try (KafkaProducer<byte[], byte[]> producer = createProducer()) {
producer.send(new ProducerRecord<>(topic, data));
producer.flush();
}
testingKafka.sendMessages(Stream.of(new ProducerRecord<>(topic, data)), getProducerProperties());
}

private void createMessagesWithHeader(String topicName)
{
try (KafkaProducer<byte[], byte[]> producer = createProducer()) {
// Messages without headers
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicName, null, "1".getBytes(UTF_8));
producer.send(record);
record = new ProducerRecord<>(topicName, null, "2".getBytes(UTF_8));
producer.send(record);
// Message with simple header
record = new ProducerRecord<>(topicName, null, "3".getBytes(UTF_8));
record.headers()
.add("notfoo", "some value".getBytes(UTF_8));
producer.send(record);
// Message with multiple same key headers
record = new ProducerRecord<>(topicName, null, "4".getBytes(UTF_8));
record.headers()
.add("foo", "bar".getBytes(UTF_8))
.add("foo", null)
.add("foo", "baz".getBytes(UTF_8));
producer.send(record);
}
testingKafka.sendMessages(
Stream.of(
// Messages without headers
new ProducerRecord<>(topicName, null, "1".getBytes(UTF_8)),
new ProducerRecord<>(topicName, null, "2".getBytes(UTF_8)),
// Message with simple header
setHeader(new ProducerRecord<>(topicName, null, "3".getBytes(UTF_8)), "notfoo", "some value"),
// Message with multiple same key headers
setHeader(
setHeader(
setHeader(new ProducerRecord<>(topicName, null, "4".getBytes(UTF_8)), "foo", "bar"),
"foo",
null),
"foo",
"baz")),
getProducerProperties());
}

private KafkaProducer<byte[], byte[]> createProducer()
private static <K, V> ProducerRecord<K, V> setHeader(ProducerRecord<K, V> record, String key, @Nullable String value)
{
Properties properties = new Properties();
properties.put(BOOTSTRAP_SERVERS_CONFIG, testingKafka.getConnectString());
properties.put(ACKS_CONFIG, "all");
properties.put(KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
properties.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
record.headers()
.add(key, value != null ? value.getBytes(UTF_8) : null);
return record;
}

return new KafkaProducer<>(properties);
private Map<String, String> getProducerProperties()
{
return ImmutableMap.<String, String>builder()
.put(BOOTSTRAP_SERVERS_CONFIG, testingKafka.getConnectString())
.put(ACKS_CONFIG, "all")
.put(KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName())
.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName())
.build();
}

@Test
Expand Down
Expand Up @@ -19,13 +19,13 @@
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import io.trino.testing.kafka.BasicTestingKafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.testng.annotations.Test;

import java.util.UUID;
import java.util.stream.LongStream;

import static io.trino.plugin.kafka.util.TestUtils.createEmptyTopicDescription;
import static java.util.UUID.randomUUID;
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
Expand All @@ -40,7 +40,7 @@ protected QueryRunner createQueryRunner()
throws Exception
{
testingKafka = closeAfterClass(new BasicTestingKafka());
topicName = "test_" + UUID.randomUUID().toString().replaceAll("-", "_");
topicName = "test_" + randomUUID().toString().replaceAll("-", "_");
QueryRunner queryRunner = KafkaQueryRunner.builder(testingKafka)
.setExtraTopicDescription(ImmutableMap.<SchemaTableName, KafkaTopicDescription>builder()
.put(createEmptyTopicDescription(topicName, new SchemaTableName("default", topicName)))
Expand All @@ -63,21 +63,9 @@ public void testTopicHasData()
{
assertQuery("SELECT count(*) FROM default." + topicName, "VALUES 0");

createMessages(topicName);
testingKafka.sendMessages(LongStream.range(0, 100000)
.mapToObj(id -> new ProducerRecord<>(topicName, id, ImmutableMap.of("id", Long.toString(id), "value", randomUUID().toString()))));

assertQuery("SELECT count(*) FROM default." + topicName, "VALUES 100000L");
}

private void createMessages(String topicName)
{
try (KafkaProducer<Long, Object> producer = testingKafka.createProducer()) {
int jMax = 10_000;
int iMax = 100_000 / jMax;
for (long i = 0; i < iMax; i++) {
for (long j = 0; j < jMax; j++) {
producer.send(new ProducerRecord<>(topicName, i, ImmutableMap.of("id", Long.toString(i * iMax + j), "value", UUID.randomUUID().toString())));
}
}
}
}
}

0 comments on commit 1b671df

Please sign in to comment.