Skip to content

Commit

Permalink
Added tests for subquerying #586
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed Feb 5, 2017
1 parent 0d52cbf commit 2526643
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 9 deletions.
Expand Up @@ -5,6 +5,6 @@ package com.sksamuel.elastic4s
*
* @tparam U the type of the class supported
*/
trait JsonFormat[+U] {
trait JsonFormat[U] {
def fromJson(string: String): U
}
Expand Up @@ -5,7 +5,7 @@ import com.sksamuel.elastic4s.mappings.MappingDefinition

trait CreateIndexApi {

def createIndex(name: String) = CreateIndexDefinition(name)
def createIndex(name: String): CreateIndexDefinition = CreateIndexDefinition(name)

def analyzers(analyzers: AnalyzerDefinition*) = new AnalyzersWrapper(analyzers)
def tokenizers(tokenizers: Tokenizer*) = new TokenizersWrapper(tokenizers)
Expand Down
Expand Up @@ -15,8 +15,8 @@ case class CreateIndexDefinition(name: String,

require(!name.contains("/"), "Index should not contain / when creating mappings. Specify the type as the mapping")

def singleShard() = shards(1)
def singleReplica() = replicas(1)
def singleShard(): CreateIndexDefinition = shards(1)
def singleReplica(): CreateIndexDefinition = replicas(1)

def waitForActiveShards(shards: Int): CreateIndexDefinition = copy(waitForActiveShards = shards.some)

Expand Down
Expand Up @@ -229,6 +229,13 @@ case class SearchDefinition(indexesTypes: IndexesAndTypes,

def version(version: Boolean): SearchDefinition = copy(version = version.some)

/**
* The maximum number of documents to collect for each shard,
* upon reaching which the query execution will terminate early.
* If set, the response will have a boolean field terminated_early
* to indicate whether the query execution has actually terminated
* early. Defaults to no.
*/
def terminateAfter(terminateAfter: Int): SearchDefinition = copy(terminateAfter = terminateAfter.some)

def indexBoost(map: Map[String, Double]): SearchDefinition = indexBoost(map.toList: _*)
Expand Down
Expand Up @@ -62,11 +62,13 @@ object HttpClient extends Logging {
* @tparam U the type of the response object returned by this handler
*/
trait HttpExecutable[T, U] extends Logging {

def execute(client: RestClient, request: T, format: JsonFormat[U]): Future[U]

// convience method that registers a listener with the function and the response json
// is then marshalled into the type U
protected def executeAsyncAndMapResponse(listener: ResponseListener => Any, format: JsonFormat[U]): Future[U] = {
protected def executeAsyncAndMapResponse(listener: ResponseListener => Any,
format: JsonFormat[U]): Future[U] = {
val p = Promise[U]()
listener(new ResponseListener {
override def onSuccess(r: Response): Unit = {
Expand Down
Expand Up @@ -4,19 +4,39 @@ import com.sksamuel.elastic4s.http.search.queries.{QueryBuilderFn, SortContentBu
import com.sksamuel.elastic4s.searches.SearchDefinition
import org.elasticsearch.common.bytes.BytesArray
import org.elasticsearch.common.xcontent.{XContentBuilder, XContentFactory}
import scala.collection.JavaConverters._

object SearchContentBuilder {

def apply(request: SearchDefinition): XContentBuilder = {

val builder = XContentFactory.jsonBuilder()
builder.startObject()

request.query.map(QueryBuilderFn.apply).foreach(x => builder.rawField("query", new BytesArray(x.string)))

if (request.sorts.nonEmpty) {
builder.startArray("sort")
request.sorts.foreach { sort =>
builder.rawValue(new BytesArray(SortContentBuilder(sort).string))
}
builder.endArray()
}

// source filtering
request.fetchContext foreach { context =>
if (context.fetchSource) {
if (context.includes.nonEmpty || context.excludes.nonEmpty) {
builder.startObject("_source")
builder.field("includes", context.includes.toList.asJava)
builder.field("excludes", context.excludes.toList.asJava)
builder.endObject()
}
} else {
builder.field("_source", false)
}
}

builder.endObject()
builder
}
Expand Down
Expand Up @@ -58,7 +58,7 @@ trait SearchImplicits {
request.pref.foreach(params.put("preference", _))
request.size.map(_.toString).foreach(params.put("size", _))
request.searchType.map(_.toString).foreach(params.put("search_type", _))
request.requestCache.map(_.toString).foreach(params.put("search_type", _))
request.requestCache.map(_.toString).foreach(params.put("request_cache", _))
request.terminateAfter.map(_.toString).foreach(params.put("terminate_after", _))
request.version.map(_.toString).foreach(params.put("version", _))

Expand Down
Expand Up @@ -11,9 +11,10 @@ object ElasticJackson {

object Implicits extends Logging {

implicit val mapper = JacksonSupport.mapper
implicit val mapper: ObjectMapper = JacksonSupport.mapper

implicit def format[T: Manifest](implicit mapper: ObjectMapper): JsonFormat[T] = new JsonFormat[T] {
implicit def format[T](implicit mapper: ObjectMapper,
manifest: Manifest[T]): JsonFormat[T] = new JsonFormat[T] {
override def fromJson(json: String): T = {
val t = manifest.runtimeClass.asInstanceOf[Class[T]]
logger.debug(s"Deserializing $json to $t")
Expand All @@ -25,7 +26,8 @@ object ElasticJackson {
override def json(t: T): String = mapper.writeValueAsString(t)
}

implicit def JacksonJsonHitReader[T: Manifest](implicit mapper: ObjectMapper): HitReader[T] = new HitReader[T] {
implicit def JacksonJsonHitReader[T](implicit mapper: ObjectMapper,
manifest: Manifest[T]): HitReader[T] = new HitReader[T] {
override def read(hit: Hit): Either[Throwable, T] = {
try {
val node = mapper.readTree(hit.sourceAsString).asInstanceOf[ObjectNode]
Expand Down
@@ -0,0 +1,37 @@
package com.sksamuel.elastic4s.search

import com.sksamuel.elastic4s.ElasticsearchClientUri
import com.sksamuel.elastic4s.http.{ElasticDsl, HttpClient}
import com.sksamuel.elastic4s.testkit.SharedElasticSugar
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy
import org.scalatest.{FlatSpec, Matchers}

class MatchQueryTest
extends FlatSpec
with SharedElasticSugar
with Matchers
with ElasticDsl {

import com.sksamuel.elastic4s.jackson.ElasticJackson.Implicits._

val http = HttpClient(ElasticsearchClientUri("elasticsearch://" + node.ipAndPort))

http.execute {
createIndex("units")
}.await

http.execute {
bulk(
indexInto("units/base") fields("name" -> "candela", "scientist.name" -> "Jules Violle", "scientist.country" -> "France")
).refresh(RefreshPolicy.IMMEDIATE)
}.await

"a match query" should "support selecting nested properties" in {

val resp = http.execute {
search("units") query matchQuery("name", "candela") sourceInclude "scientist.name"
}.await

resp.hits.hits.head.sourceAsMap shouldBe Map("scientist.name" -> "Jules Violle")
}
}
Expand Up @@ -112,6 +112,16 @@ class SearchHttpTest
search("chess" / "pieces") query matchAllQuery() sortBy fieldSort("name") size 3
}.await.to[Piece] shouldBe Vector(Piece("bishop", 3, 2), Piece("king", 0, 1), Piece("knight", 3, 2))
}
"support source includes" in {
http.execute {
search("chess" / "pieces") query matchAllQuery() sourceInclude "count"
}.await.hits.hits.map(_.sourceAsString).toSet shouldBe Set("{\"count\":1}", "{\"count\":2}", "{\"count\":8}")
}
"support source excludes" in {
http.execute {
search("chess" / "pieces") query matchAllQuery() sourceExclude "count"
}.await.hits.hits.map(_.sourceAsString).toSet shouldBe Set("{\"name\":\"pawn\",\"value\":1}", "{\"name\":\"knight\",\"value\":3}", "{\"name\":\"king\",\"value\":0}", "{\"name\":\"rook\",\"value\":5}", "{\"name\":\"queen\",\"value\":10}", "{\"name\":\"bishop\",\"value\":3}")
}
}
}

Expand Down

0 comments on commit 2526643

Please sign in to comment.