Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Add test framework to be able to run unit test. (#17)
Browse files Browse the repository at this point in the history
This PR is based on 
[Produce request](a09a805), which is in PR #14, please review and merge that first.

changes:
- change to sn pulsar version, so we could call getter/setter for parent class in `KafkaBrokerService` and `KafkaService`. Or it is not able to do the mock in test framework.
- add test framework, 
- add a simple produce request test, and pass write test.
  • Loading branch information
jiazhai authored and sijie committed Jul 24, 2019
1 parent ee0ffb2 commit daa19e1
Show file tree
Hide file tree
Showing 8 changed files with 600 additions and 97 deletions.
31 changes: 30 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<lombok.version>1.18.4</lombok.version>
<mockito.version>2.22.0</mockito.version>
<netty.version>4.1.32.Final</netty.version>
<pulsar.version>2.4.0</pulsar.version>
<pulsar.version>2.5.0-2cc34afc0</pulsar.version>
<slf4j.version>1.7.25</slf4j.version>
<spotbugs-annotations.version>3.1.8</spotbugs-annotations.version>
<testcontainers.version>1.11.2</testcontainers.version>
Expand Down Expand Up @@ -196,6 +196,30 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-broker</artifactId>
<version>${pulsar.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>managed-ledger-original</artifactId>
<version>${pulsar.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>


</dependencies>

<build>
Expand Down Expand Up @@ -264,6 +288,11 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<configuration>
<argLine> -Xmx2G
-Dpulsar.allocator.pooled=false
-Dpulsar.allocator.leak_detection=Advanced
-Dlog4j.configurationFile="log4j2.xml"
</argLine>
<reuseForks>false</reuseForks>
<forkCount>1</forkCount>
<redirectTestOutputToFile>${redirectTestOutputToFile}</redirectTestOutputToFile>
Expand Down
67 changes: 50 additions & 17 deletions src/main/java/io/streamnative/kop/KafkaBrokerService.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@
package io.streamnative.kop;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.streamnative.kop.utils.ReflectionUtils;
import io.netty.handler.ssl.SslContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceUtil;
import org.apache.pulsar.broker.service.DistributedIdGenerator;
import org.apache.pulsar.broker.service.PulsarChannelInitializer;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.netty.EventLoopUtil;

Expand All @@ -48,17 +50,18 @@ public void start() throws Exception {
kafkaService.getZkClient(),
"/counters/producer-name",
kafkaService.getConfiguration().getClusterName());
ReflectionUtils.setField(this, "producerNameGenerator", producerNameGenerator);

setProducerNameGenerator(producerNameGenerator);

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
bootstrap.group(
ReflectionUtils.getField(this, "acceptorGroup"),
ReflectionUtils.getField(this, "workerGroup"));
getAcceptorGroup(),
getWorkerGroup());
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);

bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(
ReflectionUtils.getField(this, "workerGroup")
getWorkerGroup()
));
EventLoopUtil.enableTriggeredMode(bootstrap);

Expand All @@ -76,22 +79,52 @@ public void start() throws Exception {
log.info("Started Kop Broker service on port {}", port.get());
}


// start original Pulsar Broker service
ServerBootstrap pulsarBootstrap = new ServerBootstrap();
pulsarBootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
pulsarBootstrap.group(
getAcceptorGroup(),
getWorkerGroup());
pulsarBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
pulsarBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));

pulsarBootstrap.channel(EventLoopUtil.getServerSocketChannelClass(getWorkerGroup()));
EventLoopUtil.enableTriggeredMode(pulsarBootstrap);

pulsarBootstrap.childHandler(new PulsarChannelInitializer(kafkaService, false));

Optional<Integer> pulsarPort = serviceConfig.getBrokerServicePort();
if (port.isPresent()) {
// Bind and start to accept incoming connections.
InetSocketAddress addr = new InetSocketAddress(kafkaService.getBindAddress(), pulsarPort.get());
try {
pulsarBootstrap.bind(addr).sync();
} catch (Exception e) {
throw new IOException("Failed to bind Pulsar broker on " + addr, e);
}
log.info("Started Pulsar Broker service on port {}", pulsarPort.get());
}

Optional<Integer> tlsPort = serviceConfig.getBrokerServicePortTls();
if (tlsPort.isPresent()) {
ServerBootstrap tlsBootstrap = pulsarBootstrap.clone();
tlsBootstrap.childHandler(new PulsarChannelInitializer(kafkaService, true));
tlsBootstrap.bind(new InetSocketAddress(kafkaService.getBindAddress(), tlsPort.get())).sync();
log.info("Started Pulsar Broker TLS service on port {} - TLS provider: {}", tlsPort.get(),
SslContext.defaultServerProvider());
}

// start other housekeeping functions
BrokerServiceUtil.startStatsUpdater(
this,
serviceConfig.getStatsUpdateInitialDelayInSecs(),
serviceConfig.getStatsUpdateFrequencyInSecs());
ReflectionUtils.callNoArgVoidMethod(
this, "startInactivityMonitor"
);
ReflectionUtils.callNoArgVoidMethod(
this, "startMessageExpiryMonitor"
);
ReflectionUtils.callNoArgVoidMethod(
this, "startCompactionMonitor"
);
ReflectionUtils.callNoArgVoidMethod(
this, "startBacklogQuotaChecker"
);

startInactivityMonitor();
startMessageExpiryMonitor();
startCompactionMonitor();
startBacklogQuotaChecker();
}
}
5 changes: 3 additions & 2 deletions src/main/java/io/streamnative/kop/KafkaRequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,6 @@ private ByteBuf messageToByteBuf(Message<byte[]> message) {
return buf;
}


protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator) {
throw new NotImplementedException("handleFindCoordinatorRequest");
}
Expand Down Expand Up @@ -601,7 +600,9 @@ private CompletableFuture<PartitionMetadata> findBroker(KafkaService kafkaServic
log.debug("Find broker: {} for topicName: {}", uri, topic);
}

Node node = newNode(new InetSocketAddress(uri.getHost(), uri.getPort()));
Node node = newNode(new InetSocketAddress(
uri.getHost(),
kafkaService.getKafkaConfig().getKafkaServicePort().get()));
resultFuture.complete(newPartitionMetadata(topic, node));
return;
} else {
Expand Down
92 changes: 17 additions & 75 deletions src/main/java/io/streamnative/kop/KafkaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.streamnative.kop;

import com.google.common.collect.Maps;
import io.streamnative.kop.utils.ReflectionUtils;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
Expand All @@ -26,7 +25,6 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
Expand All @@ -51,7 +49,7 @@ public KafkaService(KafkaServiceConfiguration config) {

@Override
public void start() throws PulsarServerException {
ReentrantLock lock = ReflectionUtils.getField(this, "mutex");
ReentrantLock lock = getMutex();

lock.lock();

Expand All @@ -77,62 +75,33 @@ public void start() throws PulsarServerException {
new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
kafkaConfig.getZookeeperServers(), kafkaConfig.getZooKeeperSessionTimeoutMillis());

ReflectionUtils.setField(
this,
"localZooKeeperConnectionProvider",
localZooKeeperConnectionService
);
setLocalZooKeeperConnectionProvider(localZooKeeperConnectionService);
localZooKeeperConnectionService.start(getShutdownService());


// Initialize and start service to access configuration repository.
ReflectionUtils.callNoArgVoidMethod(
this,
"startZkCacheService"
);
startZkCacheService();

BookKeeperClientFactory bkClientFactory = newBookKeeperClientFactory();
ReflectionUtils.setField(
this,
"bkClientFactory",
bkClientFactory
);
ReflectionUtils.setField(
this,
"managedLedgerClientFactory",
new ManagedLedgerClientFactory(kafkaConfig, getZkClient(), bkClientFactory)
);
ReflectionUtils.setField(
this,
"brokerService",
new KafkaBrokerService(this)
);
setBkClientFactory(bkClientFactory);
setManagedLedgerClientFactory(
new ManagedLedgerClientFactory(kafkaConfig, getZkClient(), bkClientFactory));
setBrokerService(new KafkaBrokerService(this));

// Start load management service (even if load balancing is disabled)
getLoadManager().set(LoadManager.create(this));

// Start the leader election service
ReflectionUtils.callNoArgVoidMethod(
this,
"startLeaderElectionService"
);
startLeaderElectionService();

// needs load management service
ReflectionUtils.callNoArgVoidMethod(
this,
"startNamespaceService"
);
startNamespaceService();

ReflectionUtils.setField(
this,
"offloader",
createManagedLedgerOffloader(kafkaConfig)
);
setOffloader(createManagedLedgerOffloader(kafkaConfig));

getBrokerService().start();

WebService webService = new WebService(this);
ReflectionUtils.setField(this, "webService", webService);
setWebService(webService);
Map<String, Object> attributeMap = Maps.newHashMap();
attributeMap.put(WebService.ATTRIBUTE_PULSAR_NAME, this);
Map<String, Object> vipAttributeMap = Maps.newHashMap();
Expand Down Expand Up @@ -171,36 +140,22 @@ public Boolean get() {
webService.addStaticResources("/static", "/static");

// Register heartbeat and bootstrap namespaces.
ReflectionUtils.<NamespaceService>getField(
this, "nsService"
).registerBootstrapNamespaces();
getNsService().registerBootstrapNamespaces();

ReflectionUtils.setField(
this,
"schemaRegistryService",
SchemaRegistryService.create(this)
);
setSchemaRegistryService(SchemaRegistryService.create(this));

webService.start();

ReflectionUtils.setField(
this,
"metricsGenerator",
new MetricsGenerator(this)
);
setMetricsGenerator(new MetricsGenerator(this));

// By starting the Load manager service, the broker will also become visible
// to the rest of the broker by creating the registration z-node. This needs
// to be done only when the broker is fully operative.
ReflectionUtils.callNoArgVoidMethod(
this,
"startLoadManagementService");
startLoadManagementService();

reflectSetState(State.Started);
setState(State.Started);

ReflectionUtils.callNoArgVoidMethod(
this,
"acquireSLANamespace");
acquireSLANamespace();

final String bootstrapMessage = "bootstrap service "
+ (kafkaConfig.getWebServicePort().isPresent()
Expand All @@ -221,17 +176,4 @@ public Boolean get() {
lock.unlock();
}
}

protected void reflectSetState(State state) {
try {
ReflectionUtils.setField(
this,
"state",
state
);
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new RuntimeException("Unable to set broker set to " + state, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
import static org.mockito.Matchers.anyObject;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -155,7 +155,7 @@ public void testChannelRead() throws Exception {
handler.channelActive(ctx);
handler.channelRead(mock(ChannelHandlerContext.class), inputBuf);

verify(handler, times(1)).handleApiVersionsRequest(anyObject());
verify(handler, times(1)).handleApiVersionsRequest(any());
}

@Test
Expand Down
Loading

0 comments on commit daa19e1

Please sign in to comment.