Skip to content

Commit

Permalink
Updated readme for http client details
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed Feb 1, 2017
1 parent 0ddd12d commit 7744a12
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 115 deletions.
208 changes: 107 additions & 101 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.sksamuel.elastic4s.embedded
import java.io.File
import java.nio.file.{Path, Paths}

import com.sksamuel.elastic4s.TcpClient$
import com.sksamuel.elastic4s.TcpClient
import com.sksamuel.exts.Logging
import org.elasticsearch.client.Client
import org.elasticsearch.common.settings.Settings
Expand All @@ -29,15 +29,15 @@ class LocalNode(settings: Settings, plugins: List[Class[_ <: Plugin]])
}
})

val nodeId = client().admin().cluster().prepareState().get().getState.getNodes.getLocalNodeId
val nodeId: String = client().admin().cluster().prepareState().get().getState.getNodes.getLocalNodeId

val ipAndPort = client().admin().cluster().prepareNodesInfo(nodeId).get()
val ipAndPort: String = client().admin().cluster().prepareNodesInfo(nodeId).get()
.getNodes.iterator().next().getHttp.address().publishAddress().toString
logger.info(s"LocalNode started @ $ipAndPort")

val ip: Int = ipAndPort.dropWhile(_ != ':').drop(1).toInt

def stop(removeData: Boolean = false) = {
def stop(removeData: Boolean = false): Any = {
super.close()

def deleteDir(dir: File): Unit = {
Expand Down Expand Up @@ -95,7 +95,7 @@ object LocalNode {

// creates a LocalNode with all settings provided by the user
// and using default plugins
def apply(settings: Settings) = {
def apply(settings: Settings): LocalNode = {
require(settings.getAsMap.containsKey("cluster.name"))
require(settings.getAsMap.containsKey("path.home"))
require(settings.getAsMap.containsKey("path.data"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.sksamuel.elastic4s.streams

import akka.actor._
import com.sksamuel.elastic4s.TcpClient$
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.TcpClient
import com.sksamuel.elastic4s.bulk.{BulkCompatibleDefinition, BulkDefinition, RichBulkItemResponse, RichBulkResponse}
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy
import org.reactivestreams.{Subscriber, Subscription}

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import com.sksamuel.elastic4s.ElasticDsl._

/**
* An implementation of the reactive API Subscriber.
Expand Down Expand Up @@ -86,7 +86,7 @@ class BulkActor[T](client: TcpClient,
typedConfig: TypedSubscriberConfig[T]) extends Actor {

import context.{dispatcher, system}
import typedConfig.{ baseConfig => config }
import typedConfig.{baseConfig => config}

private val buffer = new ArrayBuffer[T]()
buffer.sizeHint(config.batchSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.sksamuel.elastic4s.streams

import akka.actor.ActorRefFactory
import com.sksamuel.elastic4s.searches.SearchDefinition
import com.sksamuel.elastic4s.{TcpClient$, IndexesAndTypes}
import com.sksamuel.elastic4s.{IndexesAndTypes, TcpClient}

import scala.concurrent.duration._
import scala.language.implicitConversions
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.sksamuel.elastic4s.streams

import akka.actor.{Actor, ActorRefFactory, PoisonPill, Props, Stash}
import com.sksamuel.elastic4s.TcpClient$
import com.sksamuel.elastic4s.TcpClient
import com.sksamuel.elastic4s.searches.{RichSearchHit, RichSearchResponse, SearchDefinition}
import com.sksamuel.elastic4s.streams.PublishActor.Ready
import org.elasticsearch.ElasticsearchException
Expand Down Expand Up @@ -41,7 +41,7 @@ class ScrollPublisher private[streams](client: TcpClient,
class ScrollSubscription(client: TcpClient, query: SearchDefinition, s: Subscriber[_ >: RichSearchHit], max: Long)
(implicit actorRefFactory: ActorRefFactory) extends Subscription {

val actor = actorRefFactory.actorOf(Props(new PublishActor(client, query, s, max)))
private val actor = actorRefFactory.actorOf(Props(new PublishActor(client, query, s, max)))

private[streams] def ready(): Unit = {
actor ! PublishActor.Ready
Expand Down Expand Up @@ -85,7 +85,7 @@ class PublishActor(client: TcpClient,
// rule 1.03 the subscription should not send any results until the onSubscribe call has returned
// even tho the user might call request in the onSubscribe, we can't start sending the results yet.
// this ready method signals to the actor that its ok to start sending data. In the meantime we just stash requests.
override def receive = {
override def receive: PartialFunction[Any, Unit] = {
case Ready =>
context become ready
unstashAll()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.sksamuel.elastic4s.bulk

import com.sksamuel.elastic4s.TcpClient$
import com.sksamuel.elastic4s.TcpClient
import com.sksamuel.exts.OptionImplicits._
import org.elasticsearch.action.bulk.BulkProcessor.Listener
import org.elasticsearch.action.bulk.{BackoffPolicy, BulkRequest, BulkResponse}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.sksamuel.elastic4s.xpack.security

import java.net.InetSocketAddress

import com.sksamuel.elastic4s.{TcpClient$, ElasticsearchClientUri}
import com.sksamuel.elastic4s.{ElasticsearchClientUri, TcpClient}
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.plugins.Plugin
Expand Down

0 comments on commit 7744a12

Please sign in to comment.