diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 3ddf7a0e..edc19c5b 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -3,8 +3,8 @@ import sbt._ object Dependencies { val antlrVersion = "4.5.3" - val akkaVersion = "2.5.3" - val akkaHttpVersion = "10.0.9" + val akkaVersion = "2.5.4" + val akkaHttpVersion = "10.0.10" val commonsCodecVersion = "1.10" val httpClientVersion = "4.3.3" val javaDriverVersion = "3.1.1" @@ -40,6 +40,7 @@ object Dependencies { val gson = "com.google.code.gson" % "gson" % gsonVersion val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % akkaVersion + val akkaTypedTestkit = "com.typesafe.akka" %% "akka-typed-testkit" % akkaVersion val akkaHttpTestkit = "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion val akkaTypedTestKit = "com.typesafe.akka" %% "akka-typed-testkit" % akkaVersion @@ -58,8 +59,8 @@ object Dependencies { val jarjar = "com.googlecode.jarjar" % "jarjar" % "1.3" val junitInterface = "com.novocode" % "junit-interface" % "0.11" - val serverDeps = Seq(cats, antlrRuntime, logback, akkaHttpCors, akkaHttp, akkaHttpSpray, akkaActor, akkaRemote, akkaSlf4j, akkaTyped, typesafeLogging, guava) - val serverTestDeps = Seq(akkaTestkit, akkaHttpTestkit, akkaTypedTestKit, scalaTest, mockito, cassandraDriver, pegdown).map(_ % "test") + val serverDeps = Seq(cats, antlrRuntime, logback, akkaHttpCors, akkaHttp, akkaHttpSpray, akkaActor, akkaTyped, akkaRemote, akkaSlf4j, akkaTyped, typesafeLogging, guava) + val serverTestDeps = Seq(akkaTestkit, akkaTypedTestkit, akkaHttpTestkit, akkaTypedTestKit, scalaTest, mockito, cassandraDriver, pegdown).map(_ % "test") val codecDeps = Seq(scodec, guava) val codecTestDeps = Seq(scalaTest, scalaCheck).map(_ % "test") diff --git a/server/src/main/scala/org/scassandra/server/ScassandraNativeServer.scala b/server/src/main/scala/org/scassandra/server/ScassandraNativeServer.scala index 4ccade97..d999fb07 100644 --- a/server/src/main/scala/org/scassandra/server/ScassandraNativeServer.scala +++ b/server/src/main/scala/org/scassandra/server/ScassandraNativeServer.scala @@ -15,14 +15,17 @@ */ package org.scassandra.server -import akka.actor.{ Actor, ActorRef, ActorSystem, PoisonPill, Props } +import akka.actor.{ Actor, ActorRef, ActorSystem, PoisonPill, Props, Scheduler } import akka.http.scaladsl.Http import akka.pattern.{ ask, pipe } import akka.stream.ActorMaterializer import akka.util.Timeout +import akka.typed.scaladsl.adapter._ +import akka.typed.{ ActorRef => TActorRef } import com.typesafe.scalalogging.LazyLogging +import org.scassandra.server.actors.ActivityLogTyped.ActivityLogCommand import org.scassandra.server.actors.priming.{ PreparedPrimesActor, PrimeBatchStoreActor, PrimePreparedStoreActor, PrimeQueryStoreActor } -import org.scassandra.server.actors.{ ActivityLogActor, TcpServer } +import org.scassandra.server.actors.{ ActivityLogActor, ActivityLogTyped, TcpServer } import org.scassandra.server.priming.AllRoutes import org.scassandra.server.priming.prepared._ @@ -70,10 +73,10 @@ class ScassandraServer( private val preparedLookup: ActorRef = context.actorOf(Props(classOf[PreparedPrimesActor], List(primePreparedStore, primePreparedPatternStore, primePreparedMultiStore))) - val primeBatchStore: ActorRef = context.actorOf(Props[PrimeBatchStoreActor]) + val activityLogTyped: TActorRef[ActivityLogCommand] = context.spawn(ActivityLogTyped.activityLog, "TypedActivityLog") val activityLog: ActorRef = context.actorOf(Props[ActivityLogActor]) + val primeBatchStore: ActorRef = context.actorOf(Props[PrimeBatchStoreActor]) val primeQueryStore: ActorRef = context.actorOf(Props[PrimeQueryStoreActor]) - val primingReadyListener: ActorRef = context.actorOf(Props(classOf[ServerReadyListener]), "PrimingReadyListener") val tcpReadyListener: ActorRef = context.actorOf(Props(classOf[ServerReadyListener]), "TcpReadyListener") val tcpServer: ActorRef = @@ -81,6 +84,7 @@ class ScassandraServer( preparedLookup, primeBatchStore, tcpReadyListener, activityLog), "BinaryTcpListener") implicit val ec: ExecutionContext = context.dispatcher + val scheduler: Scheduler = context.system.scheduler val actorTimeout: Timeout = Timeout(2 seconds) implicit val system: ActorSystem = context.system diff --git a/server/src/main/scala/org/scassandra/server/actors/ActivityLog.scala b/server/src/main/scala/org/scassandra/server/actors/ActivityLog.scala index 8da500b5..09190bc6 100644 --- a/server/src/main/scala/org/scassandra/server/actors/ActivityLog.scala +++ b/server/src/main/scala/org/scassandra/server/actors/ActivityLog.scala @@ -83,7 +83,7 @@ object ActivityLogActor { case class RecordExecution(prepare: PreparedStatementExecution) case class RecordBatch(batch: BatchExecution) - private case class ActivityLog( + case class ActivityLog( queries: List[Query] = List(), connections: List[Connection] = List(), prepares: List[PreparedStatementPreparation] = List(), diff --git a/server/src/main/scala/org/scassandra/server/actors/ActivityLogTyped.scala b/server/src/main/scala/org/scassandra/server/actors/ActivityLogTyped.scala new file mode 100644 index 00000000..7108b302 --- /dev/null +++ b/server/src/main/scala/org/scassandra/server/actors/ActivityLogTyped.scala @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2017 Christopher Batey and Dogan Narinc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.scassandra.server.actors + +import akka.typed.{ ActorRef, Behavior } +import akka.typed.scaladsl.Actor +import org.scassandra.server.actors.Activity.Query + +object ActivityLogTyped { + + val activityLog: Behavior[ActivityLogCommand] = activityLogState(DynamicActivityLog()) + + def activityLogState(state: DynamicActivityLog): Behavior[ActivityLogCommand] = { + Actor.immutable[ActivityLogCommand] { (ctx, msg) => + msg match { + case GetQueries(sender) => + sender ! TQueries(state.get(QueryType)) + Actor.same + case RecordQuery(query, sender) => + sender ! Recorded + activityLogState(state.store(QueryType)(query)) + } + } + } + + sealed trait ActivityLogCommand + final case class GetQueries(replyTo: ActorRef[TQueries]) extends ActivityLogCommand + final case class RecordQuery(query: Query, replyTo: ActorRef[ActivityLogResponse]) extends ActivityLogCommand + + sealed trait ActivityLogResponse + case class TQueries(queries: List[Query]) extends ActivityLogResponse + + final case object Recorded extends ActivityLogResponse + + private[actors] sealed trait ActivityType { + type RecordType + } + + private[actors] case object QueryType extends ActivityType { + type RecordType = Query + } + + class DynamicActivityLog private (data: Map[ActivityType, List[Any]]) { + def get(key: ActivityType): List[key.RecordType] = + data.getOrElse(key, List.empty[key.RecordType]).asInstanceOf[List[key.RecordType]].reverse + def store(key: ActivityType)(value: key.RecordType): DynamicActivityLog = { + val x: List[Any] = value :: data.getOrElse(key, List()) + DynamicActivityLog(data + (key -> x)) + } + } + + object DynamicActivityLog { + def apply(data: Map[ActivityType, List[Any]] = Map[ActivityType, List[Any]]()): DynamicActivityLog = new DynamicActivityLog(data) + } +} diff --git a/server/src/main/scala/org/scassandra/server/actors/priming/PreparedPrimesActor.scala b/server/src/main/scala/org/scassandra/server/actors/priming/PreparedPrimesActor.scala index 21b7cf6f..0d940c85 100644 --- a/server/src/main/scala/org/scassandra/server/actors/priming/PreparedPrimesActor.scala +++ b/server/src/main/scala/org/scassandra/server/actors/priming/PreparedPrimesActor.scala @@ -1,3 +1,18 @@ +/* + * Copyright (C) 2017 Christopher Batey and Dogan Narinc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.scassandra.server.actors.priming import akka.actor.{ Actor, ActorRef } @@ -21,7 +36,7 @@ class PreparedPrimesActor(primeStores: List[ActorRef]) extends Actor { private implicit val timeout: Timeout = Timeout(1 second) - def receive = { + def receive: Receive = { case msg => val primeResponse: Future[PrimeMatch] = primeStores .map(store => (store ? msg).mapTo[PrimeMatch]) diff --git a/server/src/main/scala/org/scassandra/server/priming/routes/ActivityVerificationRoute.scala b/server/src/main/scala/org/scassandra/server/priming/routes/ActivityVerificationRoute.scala index 8312dc07..d14a65d6 100644 --- a/server/src/main/scala/org/scassandra/server/priming/routes/ActivityVerificationRoute.scala +++ b/server/src/main/scala/org/scassandra/server/priming/routes/ActivityVerificationRoute.scala @@ -15,19 +15,23 @@ */ package org.scassandra.server.priming.routes -import akka.actor.ActorRef +import akka.actor.{ActorRef, Scheduler} import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server._ import akka.pattern.ask +import akka.typed.{ActorRef => TActorRef} +import akka.typed.scaladsl.AskPattern._ import akka.util.Timeout import ch.megard.akka.http.cors.scaladsl.CorsDirectives._ import com.typesafe.scalalogging.LazyLogging +import org.scassandra.server.actors.Activity import org.scassandra.server.actors.ActivityLogActor._ +import org.scassandra.server.actors.ActivityLogTyped.{ActivityLogCommand, GetQueries, TQueries} import org.scassandra.server.priming.json.PrimingJsonImplicits -import scala.concurrent.ExecutionContext +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ //todo make deletes return once actor has confirmed @@ -35,9 +39,13 @@ trait ActivityVerificationRoute extends LazyLogging with SprayJsonSupport { import PrimingJsonImplicits._ - implicit val activityLog: ActorRef + val activityLog: ActorRef + val activityLogTyped: TActorRef[ActivityLogCommand] + implicit val ec: ExecutionContext - private implicit val timoeut = Timeout(250 milliseconds) + + private implicit val timoeut = Timeout(10 seconds) + implicit val scheduler: Scheduler val activityVerificationRoute: Route = cors() { @@ -59,7 +67,8 @@ trait ActivityVerificationRoute extends LazyLogging with SprayJsonSupport { get { complete { logger.debug("Request for recorded queries") - (activityLog ? GetAllQueries).mapTo[Queries].map(_.list) + // (activityLog ? GetAllQueries).mapTo[Queries].map(_.list) + (activityLogTyped ? (GetQueries(_: TActorRef[TQueries]))).map(_.queries) } } ~ delete { @@ -88,7 +97,7 @@ trait ActivityVerificationRoute extends LazyLogging with SprayJsonSupport { path("prepared-statement-execution") { get { complete { - logger.debug("Request for recorded prepared statement executions") + logger.debug("Request for recorded prepared sment executions") (activityLog ? GetAllExecutions).mapTo[Executions].map(_.list) } } ~ diff --git a/server/src/test/scala/org/scassandra/server/actors/ActivityLogActorTest.scala b/server/src/test/scala/org/scassandra/server/actors/ActivityLogActorTest.scala index 3b05a092..03f46ddf 100644 --- a/server/src/test/scala/org/scassandra/server/actors/ActivityLogActorTest.scala +++ b/server/src/test/scala/org/scassandra/server/actors/ActivityLogActorTest.scala @@ -1,3 +1,18 @@ +/* + * Copyright (C) 2017 Christopher Batey and Dogan Narinc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.scassandra.server.actors import akka.actor.Props diff --git a/server/src/test/scala/org/scassandra/server/actors/ActivityLogTypedTest.scala b/server/src/test/scala/org/scassandra/server/actors/ActivityLogTypedTest.scala new file mode 100644 index 00000000..b1c54b8f --- /dev/null +++ b/server/src/test/scala/org/scassandra/server/actors/ActivityLogTypedTest.scala @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2017 Christopher Batey and Dogan Narinc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.scassandra.server.actors + +import akka.typed.ActorSystem +import akka.typed.testkit.scaladsl.TestProbe +import akka.typed.testkit.{ EffectfulActorContext, TestKitSettings } +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } +import org.scassandra.codec.Consistency +import org.scassandra.server.actors.Activity.Query +import org.scassandra.server.actors.ActivityLogTyped._ + +import scala.language.postfixOps + +class ActivityLogTypedTest extends WordSpec with BeforeAndAfterAll with ScalaFutures with Matchers { + implicit private val al: ActorSystem[ActivityLogCommand] = ActorSystem(ActivityLogTyped.activityLog, "ActivityLogTest") + implicit private val settings: TestKitSettings = TestKitSettings(al) + private val testProbe: TestProbe[ActivityLogResponse] = TestProbe[ActivityLogResponse]() + private val ctx = new EffectfulActorContext[ActivityLogCommand]("something", ActivityLogTyped.activityLog, 1, al) + + "activity log initially" must { + "have no stored queries" in { + ctx.run(GetQueries(testProbe.ref)) + testProbe.expectMsg(TQueries(List())) + } + } + + "activity log recording queries must" must { + "store and return them" in { + val query1 = Query("select 1", Consistency.ONE, None) + val query2 = Query("select 2", Consistency.ONE, None) + + ctx.run(RecordQuery(query1, testProbe.ref)) + testProbe.expectMsg(Recorded) + ctx.run(RecordQuery(query2, testProbe.ref)) + testProbe.expectMsg(Recorded) + + ctx.run(GetQueries(testProbe.ref)) + testProbe.expectMsg(TQueries(List(query1, query2))) + } + } + + override def afterAll(): Unit = { + al.terminate() + } +} + diff --git a/server/src/test/scala/org/scassandra/server/priming/routes/ActivityVerificationRouteTest.scala b/server/src/test/scala/org/scassandra/server/priming/routes/ActivityVerificationRouteTest.scala index 18043c17..a7247d2f 100644 --- a/server/src/test/scala/org/scassandra/server/priming/routes/ActivityVerificationRouteTest.scala +++ b/server/src/test/scala/org/scassandra/server/priming/routes/ActivityVerificationRouteTest.scala @@ -15,28 +15,61 @@ */ package org.scassandra.server.priming.routes -import akka.actor.ActorRef -import akka.http.scaladsl.testkit.ScalatestRouteTest -import akka.testkit.{ TestActor, TestProbe } +import akka.NotUsed +import akka.actor.{ActorRef, ActorSystem, Scheduler} +import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} +import akka.testkit.TestActor._ +import akka.testkit.{TestActor, TestDuration, TestProbe} +import akka.typed.scaladsl.{Actor => TActor} +import akka.typed.testkit.{StubbedActorContext, TestKitSettings} +import akka.typed.{Behavior, ActorRef => TActorRef, ActorSystem => TActorSystem} +import akka.util.Timeout +import com.typesafe.scalalogging.LazyLogging import org.scalatest._ import org.scassandra.codec.Consistency.ONE import org.scassandra.codec.messages.BatchQueryKind.Simple import org.scassandra.codec.messages.BatchType._ +import org.scassandra.server.actors.Activity._ import org.scassandra.server.actors.ActivityLogActor._ -import org.scassandra.server.priming._ -import org.scassandra.server.priming.json.PrimingJsonImplicits +import org.scassandra.server.actors.ActivityLogTyped.{ActivityLogCommand, GetQueries, TQueries} import spray.json.JsonParser -import akka.testkit.TestActor._ -import org.scassandra.server.actors.Activity._ -class ActivityVerificationRouteTest extends FunSpec with BeforeAndAfterEach with Matchers with ScalatestRouteTest with ActivityVerificationRoute { +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.language.postfixOps + + +class ActivityVerificationRouteTest extends FunSpec with Matchers with ScalatestRouteTest with ActivityVerificationRoute with LazyLogging { + + implicit val actorRefFactory: ActorSystem = system + implicit val actorSystemTyped: TActorSystem[_] = TActorSystem(Behavior.empty, "ActivityVerificationRouteTest") + implicit private val settings: TestKitSettings = TestKitSettings(actorSystemTyped) + val scheduler: Scheduler = actorRefFactory.scheduler + implicit val ctx: StubbedActorContext[NotUsed] = new StubbedActorContext[NotUsed]("Test", 10, actorSystemTyped) - implicit def actorRefFactory = system val ec = scala.concurrent.ExecutionContext.global - val activityLogProbe = TestProbe() - implicit val activityLog = activityLogProbe.ref + val activityLogProbe = TestProbe("ActivityLogProbe") + val activityLog = activityLogProbe.ref + + implicit val timeout: RouteTestTimeout = RouteTestTimeout(5.seconds dilated) + + class TypedStub[C](system: TActorSystem[_]) { + var currentBehaviour: C => Unit = (_: C) => () + private val testBehaviour = TActor.immutable[C] { (_, msg) => + currentBehaviour(msg) + TActor.same + } - import PrimingJsonImplicits._ + val testActor: TActorRef[C] = { + implicit val timeout: Timeout = Timeout(1.seconds) + val futRef = system.systemActorOf(testBehaviour, s"TypedStub") + Await.result(futRef, timeout.duration + 1.second) + } + } + + val activityLoggedTypedFake = new TypedStub[ActivityLogCommand](actorSystemTyped) + + val activityLogTyped: TActorRef[ActivityLogCommand] = activityLoggedTypedFake.testActor def respondWith(m: Any): Unit = { activityLogProbe.setAutoPilot(new AutoPilot { @@ -55,6 +88,7 @@ class ActivityVerificationRouteTest extends FunSpec with BeforeAndAfterEach with respondWith(Connections(List(Connection()))) Get("/connection") ~> activityVerificationRoute ~> check { val response: String = responseAs[String] + logger.error("Reply has happened") val connectionList = JsonParser(response).convertTo[List[Connection]] connectionList.size should equal(1) activityLogProbe.expectMsg(GetAllConnections) @@ -79,26 +113,28 @@ class ActivityVerificationRouteTest extends FunSpec with BeforeAndAfterEach with } describe("Retrieving query activity") { - it("Should return queries from ActivityLog - no queries") { - respondWith(Queries(List())) + activityLoggedTypedFake.currentBehaviour = { + case GetQueries(sender) => sender.tell(TQueries(List())) + } + Get("/query") ~> activityVerificationRoute ~> check { val response: String = responseAs[String] val queryList = JsonParser(response).convertTo[List[Query]] queryList.size should equal(0) - activityLogProbe.expectMsg(GetAllQueries) } } it("Should return queries from ActivityLog - single query") { - val queries = List(Query("select 1", ONE, None)) - respondWith(Queries(queries)) + activityLoggedTypedFake.currentBehaviour = { + case GetQueries(sender) => sender.tell(TQueries(queries)) + } + Get("/query") ~> activityVerificationRoute ~> check { val response: String = responseAs[String] val queryList = JsonParser(response).convertTo[List[Query]] queryList should equal(queries) - activityLogProbe.expectMsg(GetAllQueries) } }