Skip to content

Commit

Permalink
[#156] Share ActorSystem between Scassandra instances. (#157)
Browse files Browse the repository at this point in the history
* [#156] Share ActorSystem between Scassandra instances.

* [#158] Use akka-slf4j to allow logging configuration via slf4j.

Also reduce logging on server tests, remove application.conf

* Speed up tests by closing Cluster instances quickly.

Each test method was delayed in completion for 2 seconds due to the
default graceful shutdown wait nature of netty.  Since there should be
no ongoing activity on the event loop when our tests finish, it is safe
to close.

* Cache dependencies downloaded via gradle.

* Make tests more resilient by bumping timeouts.

* Process review feedback.

- Add endline to ScassandraServer.scala.
- Use Future.sequence instead of for comprehension.
- Lower timeouts in tests.
- Rename ConnectionsClientBuilder to CurrentClientBuilder to match
  CurrentClient.
  • Loading branch information
tolbertam authored and chbatey committed Jun 2, 2016
1 parent af0f863 commit 077dda8
Show file tree
Hide file tree
Showing 21 changed files with 248 additions and 1,439 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ script:
- ./gradlew check -x signArchives -x java-it-tests:common:check -x java-it-tests:driver20:check -x java-it-tests:driver21:check -x java-it-tests:driver30:check
- jdk_switcher use oraclejdk8
- ./gradlew java-it-tests:common:check java-it-tests:driver20:check java-it-tests:driver21:check java-it-tests:driver30:check -x signArchives

cache:
directories:
- "$HOME/.gradle/caches"
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static class ActivityClientBuilder {
private String host = "localhost";

private int port = 8043;
private int socketTimeout = 1000;
private int socketTimeout = 5000;
private ActivityClientBuilder() {
}
public ActivityClientBuilder withHost(String host) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,24 @@ public class CurrentClient {

private final Type map = new TypeToken<Map<String, Boolean>>() { }.getType();

public static class ConnectionsClientBuilder {
public static class CurrentClientBuilder {

private String host = "localhost";

private int port = 8043;
private int socketTimeout = 1000;
private int socketTimeout = 5000;

public ConnectionsClientBuilder withHost(String host) {
public CurrentClientBuilder withHost(String host) {
this.host = host;
return this;
}

public ConnectionsClientBuilder withPort(int port) {
public CurrentClientBuilder withPort(int port) {
this.port = port;
return this;
}

public ConnectionsClientBuilder withSocketTimeout(int timeout) {
public CurrentClientBuilder withSocketTimeout(int timeout) {
this.socketTimeout = timeout;
return this;
}
Expand All @@ -72,8 +72,8 @@ public CurrentClient build() {
}
}

public static ConnectionsClientBuilder builder() {
return new ConnectionsClientBuilder();
public static CurrentClientBuilder builder() {
return new CurrentClientBuilder();
}

private static final Logger LOGGER = LoggerFactory.getLogger(CurrentClient.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public void testServerHanging() {
stubFor(get(urlEqualTo(connectionUrl))
.willReturn(aResponse().withFixedDelay(5000)));
//when
ActivityClient underTest = ActivityClient.builder()
.withHost("localhost").withPort(PORT).withSocketTimeout(1000).build();
underTest.retrieveConnections();
//then
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.scassandra.http.client.CurrentClient.ConnectionsClientBuilder;
import org.scassandra.http.client.CurrentClient.CurrentClientBuilder;

import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static com.google.common.collect.Lists.newArrayList;
Expand All @@ -41,10 +41,12 @@ public class CurrentClientTest {

private CurrentClient underTest;

private CurrentClient lowTimeoutClient;

@Before
public void setup() {
ConnectionsClientBuilder builder = CurrentClient.builder();
underTest = builder.withHost("localhost").withPort(PORT).build();
underTest = CurrentClient.builder().withHost("localhost").withPort(PORT).build();
lowTimeoutClient = CurrentClient.builder().withHost("localhost").withPort(PORT).withSocketTimeout(1000).build();
}

@Test
Expand Down Expand Up @@ -200,7 +202,7 @@ public void testServerHanging() {
//given
stubFor(get(urlEqualTo(connectionsUrl)).willReturn(aResponse().withFixedDelay(5000)));
//when
underTest.getConnections();
lowTimeoutClient.getConnections();
}

@Test
Expand Down Expand Up @@ -369,7 +371,7 @@ public void testCloseOfConnectionsBySocketAddress() {
public void testServerHangingWhileCloseOfConnections() {
stubFor(delete(urlEqualTo(connectionsUrl + "?type=close")).willReturn(aResponse().withFixedDelay(5000)));
//when
underTest.closeConnections(CLOSE);
lowTimeoutClient.closeConnections(CLOSE);
//then
}

Expand Down Expand Up @@ -434,7 +436,7 @@ public void testListenerServerHanging() {
//given
stubFor(put(urlEqualTo(listenerUrl)).willReturn(aResponse().withFixedDelay(5000)));
//when
underTest.enableListener();
lowTimeoutClient.enableListener();
}

@Test(expected = ListenerRequestFailed.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import com.google.common.base.Optional;
import common.*;
import io.netty.channel.EventLoopGroup;
import org.scassandra.http.client.BatchType;
import org.scassandra.http.client.Result;
import org.scassandra.http.client.WriteTypePrime;
Expand All @@ -26,8 +28,16 @@ public class CassandraExecutor20 implements CassandraExecutor {
private Session session;

public CassandraExecutor20() {
NettyOptions closeQuickly = new NettyOptions() {
public void onClusterClose(EventLoopGroup eventLoopGroup) {
//Shutdown immediately, since we close cluster when finished, we know nothing new coming through.
eventLoopGroup.shutdownGracefully(0, 1000, TimeUnit.MILLISECONDS).syncUninterruptibly();
}
};
cluster = Cluster.builder().addContactPoint(Config.NATIVE_HOST)
.withPort(Config.NATIVE_PORT).build();
.withPort(Config.NATIVE_PORT)
.withNettyOptions(closeQuickly)
.build();
session = cluster.connect(Config.KEYSPACE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

import com.google.common.base.Optional;
import common.*;
import io.netty.channel.EventLoopGroup;
import org.scassandra.http.client.BatchType;
import org.scassandra.http.client.Result;
import org.scassandra.http.client.WriteTypePrime;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
Expand All @@ -27,7 +29,15 @@ public class CassandraExecutor21 implements CassandraExecutor {
private Session session;

public CassandraExecutor21() {
cluster = Cluster.builder().addContactPoint(Config.NATIVE_HOST).withPort(Config.NATIVE_PORT)
NettyOptions closeQuickly = new NettyOptions() {
public void onClusterClose(EventLoopGroup eventLoopGroup) {
//Shutdown immediately, since we close cluster when finished, we know nothing new coming through.
eventLoopGroup.shutdownGracefully(0, 1000, TimeUnit.MILLISECONDS).syncUninterruptibly();
}
};
cluster = Cluster.builder().addContactPoint(Config.NATIVE_HOST)
.withPort(Config.NATIVE_PORT)
.withNettyOptions(closeQuickly)
.build();
session = cluster.connect(KEYSPACE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

import com.google.common.base.Optional;
import common.*;
import io.netty.channel.EventLoopGroup;
import org.scassandra.http.client.BatchType;
import org.scassandra.http.client.Result;
import org.scassandra.http.client.WriteTypePrime;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.scassandra.http.client.Result.*;
Expand All @@ -27,8 +29,16 @@ public class CassandraExecutor30 implements CassandraExecutor {
private Session session;

public CassandraExecutor30() {
cluster = Cluster.builder().addContactPoint(Config.NATIVE_HOST).withPort(Config.NATIVE_PORT)
NettyOptions closeQuickly = new NettyOptions() {
public void onClusterClose(EventLoopGroup eventLoopGroup) {
//Shutdown immediately, since we close cluster when finished, we know nothing new coming through.
eventLoopGroup.shutdownGracefully(0, 1000, TimeUnit.MILLISECONDS).syncUninterruptibly();
}
};
cluster = Cluster.builder().addContactPoint(Config.NATIVE_HOST)
.withPort(Config.NATIVE_PORT)
.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE))
.withNettyOptions(closeQuickly)
.build();
session = cluster.connect(Config.KEYSPACE);
}
Expand Down
1 change: 1 addition & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ dependencies {
compile "ch.qos.logback:logback-classic:1.0.13"
compile "com.typesafe.akka:akka-actor_2.11:2.3.9"
compile "com.typesafe.akka:akka-remote_2.11:2.3.9"
compile "com.typesafe.akka:akka-slf4j_2.11:2.3.9"
compile "io.spray:spray-json_2.11:1.3.2"
compile "io.spray:spray-can_2.11:1.3.3"
compile "io.spray:spray-routing-shapeless2_2.11:1.3.3"
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ akka {

# Loggers to register at boot time (akka.event.Logging$DefaultLogger logs
# to STDOUT)
loggers = ["akka.event.Logging$DefaultLogger"]
loggers = ["akka.event.slf4j.Slf4jLogger"]

# Deprecated, use akka.loggers.
event-handlers = []
Expand Down Expand Up @@ -62,7 +62,7 @@ akka {
extensions = []

# Toggles whether threads created by this ActorSystem should be daemons or not
daemonic = off
daemonic = on

# JVM shutdown, System.exit(-1), in case of a fatal error,
# such as OutOfMemoryError
Expand Down
92 changes: 92 additions & 0 deletions server/src/main/scala/org/scassandra/server/ScassandraServer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (C) 2014 Christopher Batey and Dogan Narinc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.scassandra.server

import akka.actor.{PoisonPill, Props, Actor, ActorRef}
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import com.typesafe.scalalogging.LazyLogging
import org.scassandra.server.actors.TcpServer
import org.scassandra.server.priming.{PrimingServer, ActivityLog}
import org.scassandra.server.priming.batch.PrimeBatchStore
import org.scassandra.server.priming.prepared.{CompositePreparedPrimeStore, PrimePreparedMultiStore, PrimePreparedPatternStore, PrimePreparedStore}
import org.scassandra.server.priming.query.PrimeQueryStore

import scala.concurrent.Future
import scala.util.{Failure, Success, Try}

/**
* Used to wait on startup of listening http and tcp interfaces.
* @param timeout Up to how long to wait for startup before timing out.
*/
case class AwaitStartup(timeout: Timeout)

/**
* Used to shutdown the server by first unbinding the priming and tcp server listeners
* and then sending a {@link PoisonPill} to itself.
* @param timeout Up to how long to wait for shutdown before timing out.
*/
case class ShutdownServer(timeout: Timeout)

/**
* Sent to {@link PrimingServer} and {@link TcpServer} instances to indicate that they should
* unbind their listeners and then subsequently shutdown.
*/
case object Shutdown

class ScassandraServer(val primedResults: PrimeQueryStore, val binaryListenAddress: String, val binaryPortNumber: Int, val adminListenAddress: String, val adminPortNumber: Int) extends Actor with LazyLogging {

val primePreparedStore = new PrimePreparedStore
val primePreparedPatternStore = new PrimePreparedPatternStore
val primePreparedMultiStore = new PrimePreparedMultiStore
val primeBatchStore = new PrimeBatchStore
val activityLog = new ActivityLog
val preparedLookup = new CompositePreparedPrimeStore(primePreparedStore, primePreparedPatternStore, primePreparedMultiStore)
val primingReadyListener = context.actorOf(Props(classOf[ServerReadyListener]), "PrimingReadyListener")
val tcpReadyListener = context.actorOf(Props(classOf[ServerReadyListener]), "TcpReadyListener")
val tcpServer = context.actorOf(Props(classOf[TcpServer], binaryListenAddress, binaryPortNumber, primedResults, preparedLookup, primeBatchStore, tcpReadyListener, activityLog), "BinaryTcpListener")
val primingServer = context.actorOf(Props(classOf[PrimingServer], adminListenAddress, adminPortNumber, primedResults, primePreparedStore,
primePreparedPatternStore, primePreparedMultiStore, primeBatchStore, primingReadyListener, activityLog, tcpServer), "PrimingServer")

override def receive: Receive = {
case AwaitStartup(timeout) => {
implicit val t: Timeout = timeout
import context.dispatcher

// Create a future that completes when both listeners ready.
val f = Future.sequence(
primingReadyListener ? OnServerReady ::
tcpReadyListener ? OnServerReady ::
Nil
)

f pipeTo sender
}
case ShutdownServer(timeout) => {
implicit val t: Timeout = timeout
import context.dispatcher

// Send shutdown message to each sender and on complete send a PoisonPill to self.
val f = Future.sequence(
primingServer ? Shutdown ::
tcpServer ? Shutdown ::
Nil
).map { _ => self ? PoisonPill }

f pipeTo sender
}
}
}

This file was deleted.

Loading

0 comments on commit 077dda8

Please sign in to comment.