Skip to content

Commit

Permalink
Wrote a first test, doesn't quite work yet.
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexGilleran committed Dec 22, 2016
1 parent 63fe9f5 commit ddb74d0
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ import org.elasticsearch.common.settings.Settings
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

object ClientProvider {
trait ClientProvider {
def getClient(implicit scheduler: Scheduler, logger: LoggingAdapter, ec: ExecutionContext): Future[ElasticClientTrait]
}

class DefaultClientProvider extends ClientProvider {
private var clientFuture: Option[Future[ElasticClient]] = None

def getClient(implicit scheduler: Scheduler, logger: LoggingAdapter, ec: ExecutionContext): Future[ElasticClient] =
clientFuture match {
override def getClient(implicit scheduler: Scheduler, logger: LoggingAdapter, ec: ExecutionContext): Future[ElasticClientTrait] = {
val outerFuture = clientFuture match {
case Some(future) => future
case None =>
val future = retry(() => Future {
Expand All @@ -32,5 +36,8 @@ object ClientProvider {
future
}

def onRetry(logger: LoggingAdapter)(retriesLeft: Int) = logger.warning("Failed to make initial contact with ES server, {} retries left", retriesLeft)
outerFuture.map(new ElasticClientAdapter(_))
}

private def onRetry(logger: LoggingAdapter)(retriesLeft: Int) = logger.warning("Failed to make initial contact with ES server, {} retries left", retriesLeft)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package au.csiro.data61.magda.search.elasticsearch

import com.sksamuel.elastic4s.{ElasticClient, Executable, RichSearchResponse, SearchDefinition}
import org.elasticsearch.client.{AdminClient, Client}

import scala.concurrent.Future
import scala.concurrent.duration.Duration

trait ElasticClientTrait {
def execute[T, R, Q](t: T)(implicit executable: Executable[T, R, Q]): Future[Q]
def close(): Unit
def java: Client
def admin: AdminClient
def iterateSearch(query: SearchDefinition)(implicit timeout: Duration): Iterator[RichSearchResponse]
}

class ElasticClientAdapter(client: ElasticClient) extends ElasticClientTrait{
override def execute[T, R, Q](t: T)(implicit executable: Executable[T, R, Q]): Future[Q] = client.execute(t)(executable)
override def close(): Unit = client.close
override def java: Client = client.java
override def admin: AdminClient = client.admin
override def iterateSearch(query: SearchDefinition)(implicit timeout: Duration): Iterator[RichSearchResponse] = client.iterateSearch(query)(timeout)
}
3 changes: 2 additions & 1 deletion search-api/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ libraryDependencies ++= {
"org.scalaz" %% "scalaz-core" % "7.2.8",

"com.typesafe.akka" %% "akka-http-testkit" % akkaV,
"org.scalatest" %% "scalatest" % scalaTestV % "test"
"org.scalatest" %% "scalatest" % scalaTestV % "test",
"org.scalamock" %% "scalamock-scalatest-support" % "3.2.2" % "test"
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package au.csiro.data61.magda

import akka.actor.{Actor, ActorLogging, ActorSystem, DeadLetter, Props}
import akka.event.Logging
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import au.csiro.data61.magda.api.Api
import au.csiro.data61.magda.search.elasticsearch.{DefaultClientProvider, ElasticSearchQueryer}

object MagdaApp extends App {
implicit val system = ActorSystem()
Expand All @@ -20,7 +22,11 @@ object MagdaApp extends App {
system.eventStream.subscribe(listener, classOf[DeadLetter])

logger.debug("Starting API")
new Api()
implicit val clientProvider = new DefaultClientProvider
val searchQueryer = new ElasticSearchQueryer()
val api = new Api(logger, config, searchQueryer)

Http().bindAndHandle(api.routes, config.getString("http.interface"), config.getInt("http.port"))
}

class Listener extends Actor with ActorLogging {
Expand Down
17 changes: 5 additions & 12 deletions search-api/src/main/scala/au/csiro/data61/magda/api/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package au.csiro.data61.magda.api
import java.util.concurrent.TimeUnit

import akka.actor.ActorSystem
import akka.event.Logging
import akka.event.{Logging, LoggingAdapter}
import akka.http.scaladsl.Http
import akka.http.scaladsl.coding.Gzip
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
Expand All @@ -12,27 +12,23 @@ import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{ExceptionHandler, MethodRejection, RejectionHandler}
import akka.stream.Materializer
import akka.util.Timeout
import au.csiro.data61.magda.model.misc
import au.csiro.data61.magda.model.misc._
import au.csiro.data61.magda.api.{model => apimodel}
import au.csiro.data61.magda.search.elasticsearch.ElasticSearchQueryer
import au.csiro.data61.magda.search.SearchQueryer
import au.csiro.data61.magda.search.elasticsearch.{ClientProvider, ElasticSearchQueryer}
import ch.megard.akka.http.cors.{CorsDirectives, CorsSettings}
import com.typesafe.config.Config

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

class Api(implicit val config: Config, implicit val system: ActorSystem,
implicit val ec: ExecutionContext, implicit val materializer: Materializer) extends misc.Protocols with CorsDirectives with apimodel.Protocols {
val logger = Logging(system, getClass)
val searchQueryer = new ElasticSearchQueryer()

class Api(val logger: LoggingAdapter, val config: Config, val searchQueryer: SearchQueryer) extends misc.Protocols with CorsDirectives with apimodel.Protocols {
// Disallow credentials so that we return "Access-Control-Allow-Origin: *" instead of
// "Access-Control-Allow-Origin: foo.com". The latter is fine until Chrome decides to
// cache the response and re-use it for other origins, causing a CORS failure.
val corsSettings = CorsSettings.defaultSettings.copy(allowCredentials = false)
//

implicit def rejectionHandler = RejectionHandler.newBuilder()
.handleAll[MethodRejection] { rejections
val methods = rejections map (_.supported)
Expand Down Expand Up @@ -62,7 +58,6 @@ class Api(implicit val config: Config, implicit val system: ActorSystem,
}
}

implicit val timeout = Timeout(FiniteDuration(1, TimeUnit.SECONDS))
val routes =
encodeResponseWith(Gzip) {
cors(corsSettings) {
Expand Down Expand Up @@ -103,6 +98,4 @@ class Api(implicit val config: Config, implicit val system: ActorSystem,
}
}
}

Http().bindAndHandle(routes, config.getString("http.interface"), config.getInt("http.port"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import au.csiro.data61.magda.model.misc._

import scala.concurrent.Future

trait SearchProvider {
trait SearchQueryer {
def search(query: Query, start: Long, limit: Int): Future[SearchResult]
def searchFacets(facetType: FacetType, facetQuery: String, generalQuery: Query, start: Long, limit: Int): Future[FacetSearchResult]
def searchRegions(query: String, start: Long, limit: Int): Future[RegionSearchResult]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ import akka.stream.Materializer
import au.csiro.data61.magda.api.Query
import au.csiro.data61.magda.api.model.{RegionSearchResult, SearchResult}
import au.csiro.data61.magda.model.misc._
import au.csiro.data61.magda.search.elasticsearch.ClientProvider.getClient
import au.csiro.data61.magda.search.elasticsearch.FacetDefinition.facetDefForType
import au.csiro.data61.magda.search.elasticsearch.Indexes._
import au.csiro.data61.magda.search.elasticsearch.Queries._
import au.csiro.data61.magda.search.elasticsearch.ElasticSearchImplicits._
import au.csiro.data61.magda.search.{MatchAll, MatchPart, SearchProvider, SearchStrategy}
import au.csiro.data61.magda.search.{MatchAll, MatchPart, SearchQueryer, SearchStrategy}
import au.csiro.data61.magda.util.ErrorHandling.CausedBy
import au.csiro.data61.magda.util.SetExtractor
import com.sksamuel.elastic4s.ElasticDsl._
Expand All @@ -22,11 +21,11 @@ import org.elasticsearch.search.sort.SortOrder
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

class ElasticSearchQueryer(implicit val system: ActorSystem, implicit val ec: ExecutionContext, implicit val materializer: Materializer) extends SearchProvider {
class ElasticSearchQueryer(implicit val system: ActorSystem, implicit val ec: ExecutionContext, implicit val materializer: Materializer, implicit val clientProvider: ClientProvider) extends SearchQueryer {
private val logger = system.log
private val AGGREGATION_SIZE_LIMIT = 10

lazy val clientFuture: Future[ElasticClient] = getClient(system.scheduler, logger, ec)
lazy val clientFuture: Future[ElasticClientTrait] = clientProvider.getClient(system.scheduler, logger, ec)

override def search(query: Query, start: Long, limit: Int) = {
Future.sequence(query.regions.map(region => findRegion(region.regionType, region.regionId))).flatMap { regions =>
Expand Down
56 changes: 56 additions & 0 deletions search-api/src/test/scala/ai/csiro/data61/magda/api/ApiSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package ai.csiro.data61.magda.api

import akka.actor.Scheduler
import akka.event.{Logging, LoggingAdapter, NoLogging}
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.ContentTypes._
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.testkit.ScalatestRouteTest
import akka.stream.scaladsl.Flow
import au.csiro.data61.magda.AppConfig
import au.csiro.data61.magda.MagdaApp.getClass
import au.csiro.data61.magda.api.Api
import au.csiro.data61.magda.search.elasticsearch.{ClientProvider, ElasticClientTrait, ElasticSearchQueryer}
import com.sksamuel.elastic4s.ElasticClient
import org.elasticsearch.client.ElasticsearchClient
import org.scalamock.proxy.ProxyMockFactory
import org.scalamock.scalatest.MockFactory
import org.scalatest._

import scala.concurrent.{ExecutionContext, Future}

class ApiSpec extends FlatSpec with Matchers with ScalatestRouteTest with MockFactory with ProxyMockFactory {
// override def testConfigSource = "akka.loglevel = DEBUG"
val logger = Logging(system, getClass)

implicit val config = AppConfig.conf

val mockEsClient = mock[ElasticClientTrait]
implicit object MockClientProvider extends ClientProvider {
override def getClient(implicit scheduler: Scheduler, logger: LoggingAdapter, ec: ExecutionContext): Future[ElasticClientTrait] = Future(mockEsClient)
}

val searchQueryer = new ElasticSearchQueryer()
val api = new Api(logger, config, searchQueryer)
val routes = api.routes

// override lazy val ipApiConnectionFlow = Flow[HttpRequest].map { request =>
// if (request.uri.toString().endsWith(ip1Info.query))
// HttpResponse(status = OK, entity = marshal(ip1Info))
// else if (request.uri.toString().endsWith(ip2Info.query))
// HttpResponse(status = OK, entity = marshal(ip2Info))
// else
// HttpResponse(status = BadRequest, entity = marshal("Bad ip format"))
// }

it should "respond to query" in {
Get(s"/datasets/search?query=hello") ~> routes ~> check {
status shouldBe OK
contentType shouldBe `application/json`
// responseAs[IpInfo] shouldBe ip1Info
}

}

}

0 comments on commit ddb74d0

Please sign in to comment.