Skip to content
Permalink
Browse files

Add Addressable Directory etcd server, configure to use etcd for dire…

…ctory, correctly store leases
  • Loading branch information...
Brett Morien
Brett Morien committed Sep 27, 2019
1 parent 194448f commit 865bdd76145b321d277394eba6028eaf1b22cf20
@@ -11,3 +11,14 @@ services:
expose:
- 2379
- 2380
addressable-directory:
image: bitnami/etcd
environment:
ALLOW_NONE_AUTHENTICATION: "yes"
ETCD_ADVERTISE_CLIENT_URLS: http://etcd-server:2379
expose:
- 2379
- 2380
ports:
- "2381:2379"
- "2382:2380"
@@ -10,7 +10,8 @@ services:
environment:
ORBIT_HOST: "orbit-server-1"
ORBIT_PORT: 50056
ETCD_SERVER: http://node-directory:2379
NODE_DIRECTORY: http://node-directory:2379
ADDRESSABLE_DIRECTORY: http://addressable-directory:2379
entrypoint: sh ./opt/orbit/entrypoint.sh
volumes:
- ../src/orbit-application/build/libs:/opt/orbit/libs
@@ -24,7 +25,8 @@ services:
environment:
ORBIT_HOST: "orbit-server-2"
ORBIT_PORT: 50057
ETCD_SERVER: http://node-directory:2379
NODE_DIRECTORY: http://node-directory:2379
ADDRESSABLE_DIRECTORY: http://addressable-directory:2379
entrypoint: sh -c ./opt/orbit/entrypoint.sh
volumes:
- ../src/orbit-application/build/libs:/opt/orbit/libs
@@ -33,11 +35,14 @@ services:
environment:
ALLOW_NONE_AUTHENTICATION: "yes"
ETCD_ADVERTISE_CLIENT_URLS: http://etcd-server:2379
ports:
- "2379:2379"
- "2380:2380"
expose:
- 2379
- 2380

# ETCD_ROOT_PASSWORD: pass
addressable-directory:
image: bitnami/etcd
environment:
ALLOW_NONE_AUTHENTICATION: "yes"
ETCD_ADVERTISE_CLIENT_URLS: http://etcd-server:2379
expose:
- 2379
- 2380
@@ -86,8 +86,8 @@ class OrbitServer(private val config: OrbitServerConfig) {
definition<Router>()

injectedWithConfig(config.nodeDirectoryConfig)
injectedWithConfig(config.addressableDirectoryConfig)

definition<AddressableDirectory>(InMemoryAddressableDirectory::class.java)
definition<AddressablePlacementStrategy>(LocalFirstPlacementStrategy::class.java)

definition<Connections>()
@@ -9,8 +9,10 @@ package orbit.server
import kotlinx.coroutines.CoroutineDispatcher
import orbit.common.concurrent.Pools
import orbit.server.config.InjectedWithConfig
import orbit.server.etcd.EtcdAddressableDirectory
import orbit.server.etcd.EtcdNodeDirectory
import orbit.server.net.LeaseExpiration
import orbit.server.routing.AddressableDirectory
import orbit.server.routing.NodeDirectory
import java.time.Duration

@@ -75,8 +77,16 @@ data class OrbitServerConfig(
* Node directory configuration
*/
val nodeDirectoryConfig: InjectedWithConfig<NodeDirectory> = EtcdNodeDirectory.EtcdNodeDirectoryConfig(
url = System.getenv("ETCD_SERVER") ?: "http://localhost:2379",
url = System.getenv("NODE_DIRECTORY") ?: "http://localhost:2379",
clientExpiration = leaseExpiration,
serverExpiration = serverLeaseExpiration
),

val addressableDirectoryConfig: InjectedWithConfig<AddressableDirectory> = EtcdAddressableDirectory.EtcdAddressableDirectoryConfig(
url = System.getenv("ADDRESSABLE_DIRECTORY") ?: "http://localhost:2381",
expiration = LeaseExpiration(
duration = Duration.ofSeconds(60),
renew = Duration.ofSeconds(30)
)
)
)
@@ -38,14 +38,21 @@ class EtcdAddressableDirectory(private val config: EtcdAddressableDirectoryConfi

override suspend fun getLease(address: AddressableReference): AddressableLease? {
val response = client.get(getKey(address)).await()
val value = response.kvs.first().value
println("get node ${address} = $value")
val value = response.kvs.firstOrNull()?.value
println("get node $address = $value")

return AddressableLease.fromProto(Addressable.AddressableLease.parseFrom(value.bytes))
return if (value != null) AddressableLease.fromProto(Addressable.AddressableLease.parseFrom(value.bytes)) else null
}

override suspend fun setLocation(address: AddressableReference, node: NodeId) {
client.put(getKey(address), ByteSequence.from(node.value.toByteArray())).await()
override suspend fun setLocation(address: AddressableReference, nodeId: NodeId) {
val lease = AddressableLease(
address = address,
nodeId = nodeId,
expiresAt = Instant.now().plus(config.expiration.duration),
renewAt = Instant.now().plus(config.expiration.renew)
)

client.put(getKey(address), ByteSequence.from(lease.toProto().toByteArray())).await()
}

override suspend fun updateLease(address: AddressableReference): AddressableLease {
@@ -23,10 +23,10 @@ class InMemoryAddressableDirectory(val expiration: LeaseExpiration) : Addressabl
return directory[address]
}

override suspend fun setLocation(address: AddressableReference, node: NodeId) {
override suspend fun setLocation(address: AddressableReference, nodeId: NodeId) {
directory[address] = AddressableLease(
address = address,
nodeId = node,
nodeId = nodeId,
expiresAt = Instant.now().plus(expiration.duration),
renewAt = Instant.now().plus(expiration.renew)
)
@@ -12,6 +12,6 @@ import orbit.server.net.NodeId

interface AddressableDirectory {
suspend fun getLease(address: AddressableReference): AddressableLease?
suspend fun setLocation(address: AddressableReference, node: NodeId)
suspend fun setLocation(address: AddressableReference, nodeId: NodeId)
suspend fun updateLease(address: AddressableReference) : AddressableLease
}

0 comments on commit 865bdd7

Please sign in to comment.
You can’t perform that action at this time.