forked from akka/akka-grpc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
NettyClientUtils.scala
99 lines (83 loc) · 3.63 KB
/
NettyClientUtils.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.grpc.internal
import java.lang.reflect.Field
import java.util.concurrent.{ ThreadLocalRandom, TimeUnit }
import akka.annotation.InternalApi
import akka.discovery.SimpleServiceDiscovery
import akka.grpc.GrpcClientSettings
import io.grpc.netty.shaded.io.grpc.netty.{ GrpcSslContexts, NegotiationType, NettyChannelBuilder }
import io.grpc.netty.shaded.io.netty.handler.ssl._
import io.grpc.{ CallOptions, ManagedChannel }
import javax.net.ssl.SSLContext
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future }
/**
* INTERNAL API
*/
@InternalApi
object NettyClientUtils {
/**
* INTERNAL API
*/
@InternalApi
def createChannel(settings: GrpcClientSettings)(implicit ec: ExecutionContext): Future[ManagedChannel] = {
settings.serviceDiscovery.lookup(settings.name, settings.resolveTimeout).flatMap { targets: SimpleServiceDiscovery.Resolved =>
if (targets.addresses.nonEmpty) {
val target = targets.addresses(ThreadLocalRandom.current().nextInt(targets.addresses.size))
var builder =
NettyChannelBuilder
.forAddress(target.host, target.port.getOrElse(settings.defaultPort))
.flowControlWindow(NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW)
if (!settings.useTls)
builder = builder.usePlaintext()
else {
builder = settings.sslContext
.map(javaCtx => builder.negotiationType(NegotiationType.TLS).sslContext(nettyHttp2SslContext(javaCtx)))
.getOrElse(builder.negotiationType(NegotiationType.PLAINTEXT))
builder = settings.overrideAuthority.map(builder.overrideAuthority(_)).getOrElse(builder)
}
builder = settings.userAgent.map(builder.userAgent(_)).getOrElse(builder)
Future.successful(builder.build)
} else {
Future.failed(new IllegalStateException("No targets returned for name: " + settings.name))
}
}
}
/**
* INTERNAL API
*
* Given a Java [[SSLContext]], create a Netty [[SslContext]] that can be used to build
* a Netty HTTP/2 channel.
*/
@InternalApi
private def nettyHttp2SslContext(javaSslContext: SSLContext): SslContext = {
// FIXME: Create a JdkSslContext using a normal constructor. Need to work out sensible values for all args first.
// In the meantime, use a Netty SslContextBuild to create a JdkSslContext, then use reflection to patch the
// object's internal SSLContext. It's not pretty, but it gets something working for now.
// Create a Netty JdkSslContext object with all the correct ciphers, protocol settings, etc initialized.
val nettySslContext: JdkSslContext = GrpcSslContexts
.configure(GrpcSslContexts.forClient, SslProvider.JDK)
.build.asInstanceOf[JdkSslContext]
// Patch the SSLContext value inside the JdkSslContext object
val nettySslContextField: Field = classOf[JdkSslContext].getDeclaredField("sslContext")
nettySslContextField.setAccessible(true)
nettySslContextField.set(nettySslContext, javaSslContext)
nettySslContext
}
/**
* INTERNAL API
*/
@InternalApi def callOptions(settings: GrpcClientSettings): CallOptions = {
settings.callCredentials.map(CallOptions.DEFAULT.withCallCredentials).getOrElse(CallOptions.DEFAULT)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] def callOptionsWithDeadline(defaultOptions: CallOptions, settings: GrpcClientSettings): CallOptions =
settings.deadline match {
case d: FiniteDuration => defaultOptions.withDeadlineAfter(d.toMillis, TimeUnit.MILLISECONDS)
case _ => defaultOptions
}
}