Skip to content

Commit

Permalink
Add implement entities and api for multi search
Browse files Browse the repository at this point in the history
  • Loading branch information
neitomic committed Jun 21, 2017
1 parent cf70479 commit a982642
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 10 deletions.
2 changes: 1 addition & 1 deletion README.md
@@ -1,4 +1,4 @@
#Elasticsearch Java Http client wrapper
# Elasticsearch Java Http client wrapper
> An Elasticsearch RestClient wrapped with entities and async method for Scala
[![Build Status](https://travis-ci.org/thanhtien522/es-http-client.svg?branch=master)](https://travis-ci.org/thanhtien522/es-http-client)
31 changes: 23 additions & 8 deletions src/main/scala/org/elasticsearch/client/http/ESHttpClient.scala
Expand Up @@ -3,13 +3,13 @@ package org.elasticsearch.client.http
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.commons.logging.LogFactory
import org.apache.http.HttpHost
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.client.methods._
import org.apache.http.entity.ContentType
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.apache.http.nio.entity.NStringEntity
import org.apache.http.nio.entity.{NByteArrayEntity, NStringEntity}
import org.apache.http.{Consts, HttpHost}
import org.elasticsearch.client.http.entities._
import org.elasticsearch.client.{RestClient, RestClientBuilder}

Expand All @@ -22,6 +22,8 @@ class ESHttpClient(servers: Seq[String], authInfo: AuthInfo) {

private val logger = LogFactory.getLog(classOf[ESHttpClient])

private final val APPLICATION_X_NDJSON = ContentType.create("application/x-ndjson", Consts.UTF_8)

private val objectMapper: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule)

private val client = {
Expand Down Expand Up @@ -55,8 +57,6 @@ class ESHttpClient(servers: Seq[String], authInfo: AuthInfo) {
logger.info(s"Lucene version: ${versionInfo.version.luceneVersion}")
logger.info("===============================================")



def getClient: RestClient = client

def index(index: String, `type`: String, id: Option[String], source: String): IndexResponse = {
Expand Down Expand Up @@ -93,21 +93,35 @@ class ESHttpClient(servers: Seq[String], authInfo: AuthInfo) {
objectMapper.readValue(resp.getEntity.getContent, classOf[DeleteResponse])
}

def bulk(index: Option[String], `type`: Option[String], requests: Seq[DocRequest]): BulkResponse = {

}

/**
* Search
*
* @param indies Set of index name
* @param types Set of type name
* @param query ElasticSearch json query
* @param types Set of type name
* @param query ElasticSearch json query
* @return
*/
def search(indies: Set[String], types: Set[String], query: String): SearchResponse = {
val resp = client.performRequest(HttpPost.METHOD_NAME,
s"""${indies.mkString(",")}/${types.mkString(",")}/_search""",
(if (indies.isEmpty) "" else indies.mkString(",") + "/") + (if (indies.isEmpty) "" else indies.mkString(",") + "/") + "_search",
Map.empty[String, String],
new NStringEntity(query, ContentType.APPLICATION_JSON))
objectMapper.readValue(resp.getEntity.getContent, classOf[SearchResponse])
}

def msearch(indies: Set[String], types: Set[String], requests: Seq[SearchRequest]): MultiSearchResponse = {
val resp = client.performRequest(HttpPost.METHOD_NAME,
(if (indies.isEmpty) "" else indies.mkString(",") + "/") + (if (indies.isEmpty) "" else indies.mkString(",") + "/") + "_msearch",
Map.empty[String, String],
new NByteArrayEntity(requests.map(_.toMultiSearchNDJson).mkString("\n").getBytes(Consts.UTF_8), APPLICATION_X_NDJSON)
)
objectMapper.readValue(resp.getEntity.getContent, classOf[MultiSearchResponse])
}

def search(index: String, `type`: String, query: String): SearchResponse =
search(Set(index), Set(`type`), query)

Expand All @@ -124,6 +138,7 @@ class ESHttpClient(servers: Seq[String], authInfo: AuthInfo) {

/**
* Delete one or multiple index
*
* @param indies single index, multiple index with comma separated,
* all indies with _all, wildcard expression
* @return
Expand All @@ -133,7 +148,7 @@ class ESHttpClient(servers: Seq[String], authInfo: AuthInfo) {
objectMapper.readValue(resp.getEntity.getContent, classOf[AckResponse])
}

def indexExist(index: String) : Boolean = {
def indexExist(index: String): Boolean = {
val resp = client.performRequest(HttpHead.METHOD_NAME, s"$index")
resp.getStatusLine.getStatusCode match {
case 200 => true
Expand Down
Expand Up @@ -4,7 +4,84 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import com.fasterxml.jackson.databind.PropertyNamingStrategy.SnakeCaseStrategy
import com.fasterxml.jackson.databind.annotation.JsonNaming

//
trait DocRequest {

/**
* This function used for build bulk request body
*
* @return The ndjson format with the first line is meta data, the second line is document data
*/
def toBulkJson(): String
}

case class DocIndexRequest(__index: Option[String], __type: Option[String], __id: Option[String], source: String) extends DocRequest {
override def toBulkJson(): String = {
val meta = {
val tmp = Seq(
__index.map(v => s""""_index" : "$v""""),
__type.map(v => s""""_type" : "$v""""),
__id.map(v => s""""_id" : "$v"""")
).mkString(", ")
s"""{ "index" : { $tmp } }"""
}
meta + "\n" + source.replaceAll("[\r\n]", "")
}
}

case class DocDeleteRequest(__index: Option[String], __type: Option[String], __id: String) extends DocRequest {
override def toBulkJson(): String = {
val meta = {
val tmp = Seq(
__index.map(v => s""""_index" : "$v""""),
__type.map(v => s""""_type" : "$v""""),
s""""_index" : "$__id""""
).mkString(", ")
s"""{ "delete" : { $tmp } }"""
}
meta
}
}

/**
*
* @param __index optional index of request, the index can be specified by API path
* @param __type optional index of request, the index can be specified by API path
* @param __id id of document
* @param source the document source
*/
case class DocUpdateRequest(__index: Option[String], __type: Option[String], __id: String, source: String) extends DocRequest {
override def toBulkJson(): String = {
val meta = {
val tmp = Seq(
__index.map(v => s""""_index" : "$v""""),
__type.map(v => s""""_type" : "$v""""),
s""""_index" : "$__id""""
).mkString(", ")
s"""{ "update" : { $tmp } }"""
}
meta + "\n" + s"""{ "doc": ${source.replaceAll("[\r\n]", "")}}"""
}
}

case class SearchRequest(searchQuery: String, __index: Option[String] = None, __type: Option[String] = None, searchType: Option[String] = None) {

def toMultiSearchNDJson: String = {
val header = {
val tmp = Seq(
__index.map(v => s""""_index" : "$v""""),
__type.map(v => s""""_type" : "$v""""),
searchType.map(v => s""""search_type" : "$v"""")
).mkString(", ")
s"""{ $tmp }"""
}
header + "\n" + searchQuery.replaceAll("[\r\n]", "")
}
}

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(classOf[SnakeCaseStrategy])
case class GetRequest(__index: Option[String], __type: Option[String], __id: String)

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(classOf[SnakeCaseStrategy])
case class IndexResponse(__index: String, __type: String, __id: String, __version: Long, created: Boolean) {
Expand All @@ -31,6 +108,9 @@ case class UpdateResponse(__index: String, __type: String, __id: String, __versi
def getVersion: Long = __version
}

case class BulkResponse(took: Long, items: Seq[BulkItemResponse])
case class BulkItemResponse()

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(classOf[SnakeCaseStrategy])
case class GetResponse(__index: String, __type: String, __id: String, __version: Long, found: Boolean, __source: Map[String, Any]) {
Expand Down Expand Up @@ -59,6 +139,8 @@ case class DeleteResponse(__index: String, __type: String, __id: String, __versi
def isFound: Boolean = found
}

case class MultiSearchResponse(responses: Seq[SearchResponse])

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(classOf[SnakeCaseStrategy])
case class SearchResponse(timeOut: Boolean, took: Long, hits: SearchHits)
Expand Down
@@ -0,0 +1,13 @@
package org.elasticsearch.client.http.entities

import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer}

/**
* Created by user on 6/21/17.
*/
class BulkItemResponseDeserializer extends JsonDeserializer[BulkItemResponse]{

override def deserialize(p: JsonParser, ctxt: DeserializationContext): BulkItemResponse = {
}
}

0 comments on commit a982642

Please sign in to comment.