diff --git a/heroic-all/pom.xml b/heroic-all/pom.xml
index 400049718..902dfec13 100644
--- a/heroic-all/pom.xml
+++ b/heroic-all/pom.xml
@@ -91,10 +91,6 @@
-
- com.spotify.heroic.rpc
- heroic-rpc-nativerpc
-
com.spotify.heroic.rpc
heroic-rpc-grpc
diff --git a/heroic-all/src/main/java/com/spotify/heroic/HeroicModules.java b/heroic-all/src/main/java/com/spotify/heroic/HeroicModules.java
index a9982cc8b..096ba161e 100644
--- a/heroic-all/src/main/java/com/spotify/heroic/HeroicModules.java
+++ b/heroic-all/src/main/java/com/spotify/heroic/HeroicModules.java
@@ -66,7 +66,6 @@ public class HeroicModules {
new com.spotify.heroic.consumer.collectd.Module(),
- new com.spotify.heroic.rpc.nativerpc.Module(),
new com.spotify.heroic.rpc.grpc.Module(),
new com.spotify.heroic.rpc.jvm.Module(),
diff --git a/heroic-all/src/main/java/com/spotify/heroic/profile/ClusterProfile.java b/heroic-all/src/main/java/com/spotify/heroic/profile/ClusterProfile.java
index bbd013fd6..1e79e734f 100644
--- a/heroic-all/src/main/java/com/spotify/heroic/profile/ClusterProfile.java
+++ b/heroic-all/src/main/java/com/spotify/heroic/profile/ClusterProfile.java
@@ -29,7 +29,6 @@
import com.spotify.heroic.cluster.discovery.simple.SrvRecordDiscoveryModule;
import com.spotify.heroic.cluster.discovery.simple.StaticListDiscoveryModule;
import com.spotify.heroic.rpc.grpc.GrpcRpcProtocolModule;
-import com.spotify.heroic.rpc.nativerpc.NativeRpcProtocolModule;
import java.net.URI;
import java.net.URISyntaxException;
@@ -51,9 +50,6 @@ public HeroicConfig.Builder build(final ExtraParameters params) throws Exception
case "grpc":
module.protocols(ImmutableList.of(GrpcRpcProtocolModule.builder().build()));
break;
- case "nativerpc":
- module.protocols(ImmutableList.of(NativeRpcProtocolModule.builder().build()));
- break;
default:
throw new IllegalArgumentException("illegal value for protocol (" + protocol + ")");
}
diff --git a/heroic-dist/src/test/java/com/spotify/heroic/HeroicConfigurationTest.java b/heroic-dist/src/test/java/com/spotify/heroic/HeroicConfigurationTest.java
index 6b9145e6f..0770b5dc5 100644
--- a/heroic-dist/src/test/java/com/spotify/heroic/HeroicConfigurationTest.java
+++ b/heroic-dist/src/test/java/com/spotify/heroic/HeroicConfigurationTest.java
@@ -23,7 +23,6 @@ public void testAll() throws Exception {
"com.spotify.heroic.metric.bigtable.BigtableBackend",
"com.spotify.heroic.metric.datastax.DatastaxBackend",
"com.spotify.heroic.rpc.grpc.GrpcRpcProtocolServer",
- "com.spotify.heroic.rpc.nativerpc.NativeRpcProtocolServer",
"com.spotify.heroic.shell.ShellServer",
"com.spotify.heroic.suggest.elasticsearch.SuggestBackendKV"
);
@@ -38,7 +37,6 @@ public void testAll() throws Exception {
"com.spotify.heroic.metric.bigtable.BigtableBackend",
"com.spotify.heroic.metric.datastax.DatastaxBackend",
"com.spotify.heroic.rpc.grpc.GrpcRpcProtocolServer",
- "com.spotify.heroic.rpc.nativerpc.NativeRpcProtocolServer",
"com.spotify.heroic.shell.ShellServer",
"com.spotify.heroic.suggest.elasticsearch.SuggestBackendKV"
);
diff --git a/heroic-dist/src/test/resources/heroic-all.yml b/heroic-dist/src/test/resources/heroic-all.yml
index e566fffb5..de7abc76c 100644
--- a/heroic-dist/src/test/resources/heroic-all.yml
+++ b/heroic-dist/src/test/resources/heroic-all.yml
@@ -2,7 +2,6 @@ cluster:
tags:
role: test
protocols:
- - type: nativerpc
- type: grpc
metadata:
diff --git a/pom.xml b/pom.xml
index b47abb683..1425393dd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,7 +32,6 @@
aggregation/cardinality
consumer/kafka
consumer/collectd
- rpc/nativerpc
rpc/grpc
rpc/jvm
heroic-dist
@@ -381,11 +380,6 @@
heroic-statistics-semantic
${project.version}
-
- com.spotify.heroic.rpc
- heroic-rpc-nativerpc
- ${project.version}
-
com.spotify.heroic.rpc
heroic-rpc-grpc
diff --git a/reporting/pom.xml b/reporting/pom.xml
index dcd706699..ab820ef77 100644
--- a/reporting/pom.xml
+++ b/reporting/pom.xml
@@ -103,11 +103,6 @@
-
- com.spotify.heroic.rpc
- heroic-rpc-nativerpc
-
-
com.spotify.heroic.rpc
heroic-rpc-grpc
diff --git a/rpc/nativerpc/pom.xml b/rpc/nativerpc/pom.xml
deleted file mode 100644
index fceed2f28..000000000
--- a/rpc/nativerpc/pom.xml
+++ /dev/null
@@ -1,58 +0,0 @@
-
- 4.0.0
-
-
- com.spotify.heroic
- heroic-parent
- 0.0.1-SNAPSHOT
- ../..
-
-
- com.spotify.heroic.rpc
- heroic-rpc-nativerpc
- jar
-
- Heroic: Native RPC Protocol
-
-
-
- org.projectlombok
- lombok
- provided
-
-
-
- com.spotify.heroic
- heroic-component
-
-
-
- io.netty
- netty-transport
-
-
- io.netty
- netty-codec
-
-
-
- org.msgpack
- msgpack
- 0.6.11
-
-
-
-
- junit
- junit
- test
-
-
-
- org.mockito
- mockito-core
- test
-
-
-
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/Module.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/Module.java
deleted file mode 100644
index 7b27913d0..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/Module.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-import com.spotify.heroic.HeroicConfigurationContext;
-import com.spotify.heroic.HeroicModule;
-import com.spotify.heroic.dagger.LoadingComponent;
-
-public class Module implements HeroicModule {
- @Override
- public Runnable setup(LoadingComponent loading) {
- final HeroicConfigurationContext config = loading.heroicConfigurationContext();
-
- return () -> {
- config.registerType("nativerpc", NativeRpcProtocolModule.class);
- };
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeEncoding.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeEncoding.java
deleted file mode 100644
index 15e849407..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeEncoding.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-public enum NativeEncoding {
- NONE, GZIP
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpc.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpc.java
deleted file mode 100644
index d22598a61..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpc.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-public class NativeRpc {
- public static final byte REQUEST = 1;
- public static final byte RESPONSE = 2;
- public static final byte ERR_RESPONSE = 3;
- public static final byte HEARTBEAT = 4;
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcClient.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcClient.java
deleted file mode 100644
index 6f04f13e7..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcClient.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.spotify.heroic.rpc.nativerpc.message.NativeOptions;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcEmptyBody;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcRequest;
-import eu.toolchain.async.AsyncFramework;
-import eu.toolchain.async.AsyncFuture;
-import eu.toolchain.async.ResolvableFuture;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.Timeout;
-import io.netty.util.Timer;
-import io.netty.util.TimerTask;
-import lombok.RequiredArgsConstructor;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-@RequiredArgsConstructor
-public class NativeRpcClient {
- private final AsyncFramework async;
- private final EventLoopGroup group;
- private final int maxFrameSize;
- private final InetSocketAddress address;
- private final ObjectMapper mapper;
- private final Timer timer;
- private final long sendTimeout;
- private final long heartbeatInterval;
- private final NativeEncoding encoding;
-
- private static final NativeRpcEmptyBody EMPTY = new NativeRpcEmptyBody();
-
- public AsyncFuture request(
- final String endpoint, final Q entity, final Class expected
- ) {
- byte[] body;
-
- try {
- body = mapper.writeValueAsBytes(entity);
- } catch (JsonProcessingException e) {
- return async.failed(e);
- }
-
- final int size = body.length;
-
- final NativeOptions options = new NativeOptions(encoding);
-
- try {
- body = NativeUtils.encodeBody(options, body);
- } catch (IOException e) {
- return async.failed(e);
- }
-
- final NativeRpcRequest request =
- new NativeRpcRequest(endpoint, heartbeatInterval, options, size, body);
-
- return sendRequest(expected, request);
- }
-
- public AsyncFuture request(String endpoint, Class expected) {
- return request(endpoint, EMPTY, expected);
- }
-
- @Override
- public String toString() {
- return "nativerpc://" + address.getHostString() +
- (address.getPort() != -1 ? ":" + address.getPort() : "");
- }
-
- private AsyncFuture sendRequest(
- final Class expected, final NativeRpcRequest request
- ) {
- final ResolvableFuture future = async.future();
- final AtomicReference heartbeatTimeout = new AtomicReference<>();
-
- final Bootstrap b = new Bootstrap();
- b.channel(NioSocketChannel.class);
- b.group(group);
- b.handler(
- new NativeRpcClientSession(mapper, timer, heartbeatInterval, maxFrameSize, address,
- heartbeatTimeout, future, expected));
-
- // timeout for how long we are allowed to spend attempting to send a request.
- final Timeout sendTimeout = timer.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- future.fail(new Exception("sending of request timed out"));
- }
- }, this.sendTimeout, TimeUnit.MILLISECONDS);
-
- b
- .connect(address)
- .addListener(handleConnect(request, future, heartbeatTimeout, sendTimeout));
-
- return future;
- }
-
- private ChannelFutureListener handleConnect(
- final NativeRpcRequest request, final ResolvableFuture future,
- final AtomicReference heartbeatTimeout, final Timeout requestTimeout
- ) {
- return new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture f) throws Exception {
- if (!f.isSuccess()) {
- future.fail(f.cause());
- return;
- }
-
- f
- .channel()
- .writeAndFlush(request)
- .addListener(handleRequestSent(future, heartbeatTimeout, requestTimeout));
- }
- };
- }
-
- private ChannelFutureListener handleRequestSent(
- final ResolvableFuture future, final AtomicReference heartbeatTimeout,
- final Timeout requestTimeout
- ) {
- return new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture f) throws Exception {
- requestTimeout.cancel();
-
- if (!f.isSuccess()) {
- future.fail(f.cause());
- return;
- }
-
- final Timeout timeout =
- timer.newTimeout(heartbeatTimeout(f.channel(), future), heartbeatInterval,
- TimeUnit.MILLISECONDS);
-
- heartbeatTimeout.set(timeout);
- }
- };
- }
-
- private TimerTask heartbeatTimeout(final Channel ch, final ResolvableFuture> future) {
- return new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- future.fail(new Exception("missing heartbeat, request timed out"));
- ch.close();
- }
- };
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcClientSession.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcClientSession.java
deleted file mode 100644
index 9c6ae14e2..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcClientSession.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcError;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcHeartBeat;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcResponse;
-import eu.toolchain.async.ResolvableFuture;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.LengthFieldPrepender;
-import io.netty.util.Timeout;
-import io.netty.util.Timer;
-import io.netty.util.TimerTask;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-@Slf4j
-@RequiredArgsConstructor
-public class NativeRpcClientSession extends ChannelInitializer {
- private final ObjectMapper mapper;
- private final Timer timer;
- private final long heartbeatInterval;
- private final int maxFrameSize;
- private final InetSocketAddress address;
- private final AtomicReference heartbeatTimeout;
-
- private final ResolvableFuture future;
- private final Class expected;
-
- @Override
- protected void initChannel(final Channel ch) throws Exception {
- final ChannelPipeline pipeline = ch.pipeline();
-
- pipeline.addLast(new ChannelInboundHandlerAdapter() {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- bumpTimeout(ctx);
- ctx.fireChannelRead(msg);
- }
- });
-
- // first four bytes are length prefix of message, strip first four bytes.
- pipeline.addLast(new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
- pipeline.addLast(new NativeRpcDecoder());
- pipeline.addLast(new SimpleChannelInboundHandler() {
- @Override
- protected void channelRead0(final ChannelHandlerContext ctx, final Object msg)
- throws Exception {
- if (msg instanceof NativeRpcError) {
- final NativeRpcError error = (NativeRpcError) msg;
-
- if (log.isTraceEnabled()) {
- log.trace("[{}] remote error: {}", ctx.channel(), error.getMessage());
- }
-
- future.fail(new NativeRpcRemoteException(address, error.getMessage()));
- ctx.channel().close();
- return;
- }
-
- if (msg instanceof NativeRpcResponse) {
- if (log.isTraceEnabled()) {
- log.trace("[{}] response: cancelling heartbeat", ctx.channel());
- }
-
- try {
- handleResponse((NativeRpcResponse) msg);
- } catch (Exception e) {
- future.fail(new Exception("Failed to handle response", e));
- }
-
- return;
- }
-
- if (msg instanceof NativeRpcHeartBeat) {
- if (log.isTraceEnabled()) {
- log.trace("[{}] heartbeat: delaying timeout by {}ms", ctx.channel(),
- heartbeatInterval);
- }
-
- bumpTimeout(ctx);
- return;
- }
-
- throw new IllegalArgumentException("unable to handle type: " + msg);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- future.fail(cause);
- ctx.channel().close();
- }
- });
-
- pipeline.addLast(new LengthFieldPrepender(4));
- pipeline.addLast(new NativeRpcEncoder());
- }
-
- private void unsetTimeout() {
- final Timeout old = heartbeatTimeout.getAndSet(null);
-
- if (old != null) {
- old.cancel();
- }
- }
-
- private void bumpTimeout(final ChannelHandlerContext ctx) {
- final Timeout timeout =
- timer.newTimeout(heartbeatTimeout(ctx.channel(), future), heartbeatInterval,
- TimeUnit.MILLISECONDS);
-
- final Timeout old = heartbeatTimeout.getAndSet(timeout);
-
- if (old != null) {
- old.cancel();
- }
- }
-
- private void handleResponse(final NativeRpcResponse response)
- throws IOException, JsonParseException, JsonMappingException {
- unsetTimeout();
-
- final byte[] bytes =
- NativeUtils.decodeBody(response.getOptions(), response.getSize(), response.getBody());
-
- final R responseBody = mapper.readValue(bytes, expected);
-
- future.resolve(responseBody);
- }
-
- private TimerTask heartbeatTimeout(final Channel ch, final ResolvableFuture> future) {
- return new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- future.fail(new Exception("request timed out (missing heartbeat)"));
- ch.close();
- }
- };
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcContainer.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcContainer.java
deleted file mode 100644
index dc5cdb8d5..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcContainer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import eu.toolchain.async.AsyncFuture;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class NativeRpcContainer {
- private final Map> endpoints = new HashMap<>();
-
- @SuppressWarnings("unchecked")
- public void register(final String endpoint, final NativeRpcEndpoint, ?> handle) {
- if (endpoints.containsKey(endpoint)) {
- throw new IllegalStateException("Endpoint already registered: " + endpoint);
- }
-
- endpoints.put(endpoint, (EndpointSpec) handle);
- }
-
- public EndpointSpec get(String endpoint) {
- return endpoints.get(endpoint);
- }
-
- public static interface EndpointSpec {
- public AsyncFuture handle(final Q request) throws Exception;
-
- public TypeReference requestType();
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcDecoder.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcDecoder.java
deleted file mode 100644
index 1a3adca2e..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcDecoder.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcError;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcHeartBeat;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcRequest;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcResponse;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-import org.msgpack.MessagePack;
-import org.msgpack.unpacker.Unpacker;
-
-import java.util.List;
-
-public class NativeRpcDecoder extends ByteToMessageDecoder {
- private final MessagePack messagePack = new MessagePack();
-
- @Override
- protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List out)
- throws Exception {
- final int length = in.readableBytes();
-
- if (length == 0) {
- return;
- }
-
- try (final ByteBufInputStream stream = new ByteBufInputStream(in)) {
- final Unpacker unpacker = messagePack.createUnpacker(stream);
-
- final byte type = unpacker.readByte();
-
- switch (type) {
- case NativeRpc.HEARTBEAT:
- out.add(NativeRpcHeartBeat.unpack(unpacker));
- return;
- case NativeRpc.REQUEST:
- out.add(NativeRpcRequest.unpack(unpacker));
- return;
- case NativeRpc.RESPONSE:
- out.add(NativeRpcResponse.unpack(unpacker));
- return;
- case NativeRpc.ERR_RESPONSE:
- out.add(NativeRpcError.unpack(unpacker));
- return;
- default:
- throw new IllegalArgumentException("Invalid RPC message type: " + type);
- }
- }
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcEncoder.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcEncoder.java
deleted file mode 100644
index d45c6da99..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcEncoder.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcError;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcHeartBeat;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcRequest;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcResponse;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToByteEncoder;
-import org.msgpack.MessagePack;
-import org.msgpack.packer.Packer;
-
-public class NativeRpcEncoder extends MessageToByteEncoder {
- private final MessagePack messagePack = new MessagePack();
-
- @Override
- protected void encode(final ChannelHandlerContext ctx, final Object in, final ByteBuf out)
- throws Exception {
- try (final ByteBufOutputStream stream = new ByteBufOutputStream(out)) {
- try (final Packer packer = messagePack.createPacker(stream)) {
- if (in instanceof NativeRpcHeartBeat) {
- packer.write(NativeRpc.HEARTBEAT);
- NativeRpcHeartBeat.pack((NativeRpcHeartBeat) in, packer);
- return;
- }
-
- if (in instanceof NativeRpcRequest) {
- packer.write(NativeRpc.REQUEST);
- NativeRpcRequest.pack((NativeRpcRequest) in, packer);
- return;
- }
-
- if (in instanceof NativeRpcResponse) {
- packer.write(NativeRpc.RESPONSE);
- NativeRpcResponse.pack((NativeRpcResponse) in, packer);
- return;
- }
-
- if (in instanceof NativeRpcError) {
- packer.write(NativeRpc.ERR_RESPONSE);
- NativeRpcError.pack((NativeRpcError) in, packer);
- return;
- }
- }
- }
-
- throw new IllegalArgumentException("Unable to encode object: " + in);
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcEndpoint.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcEndpoint.java
deleted file mode 100644
index 371472335..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcEndpoint.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.spotify.heroic.rpc.nativerpc.NativeRpcContainer.EndpointSpec;
-import lombok.RequiredArgsConstructor;
-
-@RequiredArgsConstructor
-public abstract class NativeRpcEndpoint extends TypeReference
- implements EndpointSpec {
- public TypeReference requestType() {
- return this;
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcProtocol.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcProtocol.java
deleted file mode 100644
index 4ae98aa93..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcProtocol.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.spotify.heroic.cluster.ClusterNode;
-import com.spotify.heroic.cluster.NodeMetadata;
-import com.spotify.heroic.cluster.RpcProtocol;
-import com.spotify.heroic.cluster.TracingClusterNodeGroup;
-import com.spotify.heroic.common.Grouped;
-import com.spotify.heroic.common.UsableGroupManager;
-import com.spotify.heroic.metadata.CountSeries;
-import com.spotify.heroic.metadata.DeleteSeries;
-import com.spotify.heroic.metadata.FindKeys;
-import com.spotify.heroic.metadata.FindSeries;
-import com.spotify.heroic.metadata.FindTags;
-import com.spotify.heroic.metadata.WriteMetadata;
-import com.spotify.heroic.metric.FullQuery;
-import com.spotify.heroic.metric.WriteMetric;
-import com.spotify.heroic.suggest.KeySuggest;
-import com.spotify.heroic.suggest.TagKeyCount;
-import com.spotify.heroic.suggest.TagSuggest;
-import com.spotify.heroic.suggest.TagValueSuggest;
-import com.spotify.heroic.suggest.TagValuesSuggest;
-import eu.toolchain.async.AsyncFramework;
-import eu.toolchain.async.AsyncFuture;
-import eu.toolchain.async.ResolvableFuture;
-import io.netty.channel.EventLoopGroup;
-import io.netty.util.Timer;
-import lombok.Data;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-
-import javax.inject.Inject;
-import javax.inject.Named;
-import java.net.Inet6Address;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.Optional;
-import java.util.function.BiFunction;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-@ToString(of = {})
-public class NativeRpcProtocol implements RpcProtocol {
- public static final String METADATA = "metadata";
- public static final String METRICS_QUERY = "metrics:query";
- public static final String METRICS_WRITE = "metrics:write";
- public static final String METADATA_FIND_TAGS = "metadata:findTags";
- public static final String METADATA_FIND_KEYS = "metadata:findKeys";
- public static final String METADATA_FIND_SERIES = "metadata:findSeries";
- public static final String METADATA_COUNT_SERIES = "metadata:countSeries";
- public static final String METADATA_DELETE_SERIES = "metadata:deleteSeries";
- public static final String METADATA_WRITE = "metadata:writeSeries";
- public static final String SUGGEST_TAG_KEY_COUNT = "suggest:tagKeyCount";
- public static final String SUGGEST_KEY = "suggest:key";
- public static final String SUGGEST_TAG = "suggest:tag";
- public static final String SUGGEST_TAG_VALUES = "suggest:tagValues";
- public static final String SUGGEST_TAG_VALUE = "suggest:tagValue";
-
- private final AsyncFramework async;
- private final EventLoopGroup workerGroup;
- private final ObjectMapper mapper;
- private final Timer timer;
- private final NativeEncoding encoding;
- private final ResolvableFuture bindFuture;
-
- private final int defaultPort;
- private final int maxFrameSize;
- private final long sendTimeout;
- private final long heartbeatReadInterval;
-
- @Inject
- public NativeRpcProtocol(
- AsyncFramework async, @Named("worker") EventLoopGroup workerGroup,
- @Named("application/json+internal") ObjectMapper mapper, Timer timer,
- NativeEncoding encoding,
- @Named("bindFuture") ResolvableFuture bindFuture,
- @Named("defaultPort") int defaultPort, @Named("maxFrameSize") int maxFrameSize,
- @Named("sendTimeout") long sendTimeout,
- @Named("heartbeatReadInterval") long heartbeatReadInterval
- ) {
- this.async = async;
- this.workerGroup = workerGroup;
- this.mapper = mapper;
- this.timer = timer;
- this.encoding = encoding;
- this.bindFuture = bindFuture;
- this.defaultPort = defaultPort;
- this.maxFrameSize = maxFrameSize;
- this.sendTimeout = sendTimeout;
- this.heartbeatReadInterval = heartbeatReadInterval;
- }
-
- @Override
- public AsyncFuture connect(final URI uri) {
- final InetSocketAddress address =
- new InetSocketAddress(uri.getHost(), uri.getPort() == -1 ? defaultPort : uri.getPort());
- final NativeRpcClient client =
- new NativeRpcClient(async, workerGroup, maxFrameSize, address, mapper, timer,
- sendTimeout, heartbeatReadInterval, encoding);
-
- return client
- .request(METADATA, NodeMetadata.class)
- .directTransform(m -> new NativeRpcClusterNode(uri, client, m));
- }
-
- @Override
- public AsyncFuture getListenURI() {
- return bindFuture.directTransform(s -> {
- if (s.getAddress() instanceof Inet6Address) {
- return String.format("nativerpc://[%s]:%d", s.getAddress().getHostAddress(),
- s.getPort());
- }
-
- return String.format("nativerpc://%s:%d", s.getHostString(), s.getPort());
- });
- }
-
- @RequiredArgsConstructor
- public class NativeRpcClusterNode implements ClusterNode {
- private final URI uri;
- private final NativeRpcClient client;
- private final NodeMetadata metadata;
-
- @Override
- public NodeMetadata metadata() {
- return metadata;
- }
-
- @Override
- public AsyncFuture fetchMetadata() {
- return client.request(METADATA, NodeMetadata.class);
- }
-
- @Override
- public AsyncFuture close() {
- return async.resolved(null);
- }
-
- @Override
- public ClusterNode.Group useOptionalGroup(Optional group) {
- return new TracingClusterNodeGroup(uri.toString(), new Group(group));
- }
-
- @Override
- public String toString() {
- return client.toString();
- }
-
- @RequiredArgsConstructor
- private class Group implements ClusterNode.Group {
- private final Optional group;
-
- @Override
- public ClusterNode node() {
- return NativeRpcClusterNode.this;
- }
-
- @Override
- public AsyncFuture query(final FullQuery.Request request) {
- return request(METRICS_QUERY, request, FullQuery.class);
- }
-
- @Override
- public AsyncFuture writeMetric(WriteMetric.Request request) {
- return request(METRICS_WRITE, request, WriteMetric.class);
- }
-
- @Override
- public AsyncFuture findTags(final FindTags.Request request) {
- return request(METADATA_FIND_TAGS, request, FindTags.class);
- }
-
- @Override
- public AsyncFuture findKeys(final FindKeys.Request request) {
- return request(METADATA_FIND_KEYS, request, FindKeys.class);
- }
-
- @Override
- public AsyncFuture findSeries(final FindSeries.Request request) {
- return request(METADATA_FIND_SERIES, request, FindSeries.class);
- }
-
- @Override
- public AsyncFuture countSeries(final CountSeries.Request request) {
- return request(METADATA_COUNT_SERIES, request, CountSeries.class);
- }
-
- @Override
- public AsyncFuture deleteSeries(final DeleteSeries.Request request) {
- return request(METADATA_DELETE_SERIES, request, DeleteSeries.class);
- }
-
- @Override
- public AsyncFuture writeSeries(final WriteMetadata.Request request) {
- return request(METADATA_WRITE, request, WriteMetadata.class);
- }
-
- @Override
- public AsyncFuture tagKeyCount(final TagKeyCount.Request request) {
- return request(SUGGEST_TAG_KEY_COUNT, request, TagKeyCount.class);
- }
-
- @Override
- public AsyncFuture tagSuggest(final TagSuggest.Request request) {
- return request(SUGGEST_TAG, request, TagSuggest.class);
- }
-
- @Override
- public AsyncFuture keySuggest(final KeySuggest.Request request) {
- return request(SUGGEST_KEY, request, KeySuggest.class);
- }
-
- @Override
- public AsyncFuture tagValuesSuggest(
- final TagValuesSuggest.Request request
- ) {
- return request(SUGGEST_TAG_VALUES, request, TagValuesSuggest.class);
- }
-
- @Override
- public AsyncFuture tagValueSuggest(
- final TagValueSuggest.Request request
- ) {
- return request(SUGGEST_TAG_VALUE, request, TagValueSuggest.class);
- }
-
- private AsyncFuture request(String endpoint, T body, Class expected) {
- final GroupedQuery grouped = new GroupedQuery<>(group, body);
- return client.request(endpoint, grouped, expected);
- }
- }
- }
-
- @Data
- public static class GroupedQuery {
- private final Optional group;
- private final T query;
-
- @JsonCreator
- public GroupedQuery(
- @JsonProperty("group") Optional group, @JsonProperty("query") T query
- ) {
- this.group = group;
- this.query = checkNotNull(query, "query");
- }
-
- public R apply(
- UsableGroupManager manager, BiFunction function
- ) {
- return function.apply(group.map(manager::useGroup).orElseGet(manager::useDefaultGroup),
- query);
- }
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcProtocolModule.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcProtocolModule.java
deleted file mode 100644
index b37613847..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcProtocolModule.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.spotify.heroic.cluster.RpcProtocolComponent;
-import com.spotify.heroic.cluster.RpcProtocolModule;
-import com.spotify.heroic.lifecycle.LifeCycle;
-import com.spotify.heroic.lifecycle.LifeCycleManager;
-import dagger.Component;
-import dagger.Module;
-import dagger.Provides;
-import eu.toolchain.async.AsyncFramework;
-import eu.toolchain.async.ResolvableFuture;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timer;
-import lombok.Data;
-import lombok.RequiredArgsConstructor;
-
-import javax.inject.Named;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Optional;
-
-@Data
-public class NativeRpcProtocolModule implements RpcProtocolModule {
- private static final String DEFAULT_HOST = "0.0.0.0";
- private static final int DEFAULT_PORT = 1394;
- private static final int DEFAULT_PARENT_THREADS = 2;
- private static final int DEFAULT_CHILD_THREADS = 100;
- private static final int DEFAULT_MAX_FRAME_SIZE = 10 * 1000000;
- private static final long DEFAULT_SEND_TIMEOUT = 5000;
- private static final long DEFAULT_HEARTBEAT_INTERVAL = 1000;
-
- private final InetSocketAddress address;
- private final int parentThreads;
- private final int childThreads;
- private final int maxFrameSize;
- private final long sendTimeout;
- private final long heartbeatInterval;
- private final NativeEncoding encoding;
-
- @JsonCreator
- public NativeRpcProtocolModule(
- @JsonProperty("host") String host, @JsonProperty("port") Integer port,
- @JsonProperty("parentThreads") Integer parentThreads,
- @JsonProperty("childThreads") Integer childThreads,
- @JsonProperty("maxFrameSize") Integer maxFrameSize,
- @JsonProperty("heartbeatInterval") Long heartbeatInterval,
- @JsonProperty("sendTimeout") Long sendTimeout,
- @JsonProperty("encoding") Optional encoding
- ) {
- this.address = new InetSocketAddress(Optional.ofNullable(host).orElse(DEFAULT_HOST),
- Optional.ofNullable(port).orElse(DEFAULT_PORT));
- this.parentThreads = Optional.ofNullable(parentThreads).orElse(DEFAULT_PARENT_THREADS);
- this.childThreads = Optional.ofNullable(childThreads).orElse(DEFAULT_CHILD_THREADS);
- this.maxFrameSize = Optional.ofNullable(maxFrameSize).orElse(DEFAULT_MAX_FRAME_SIZE);
- this.heartbeatInterval =
- Optional.ofNullable(heartbeatInterval).orElse(DEFAULT_HEARTBEAT_INTERVAL);
- this.sendTimeout = Optional.ofNullable(sendTimeout).orElse(DEFAULT_SEND_TIMEOUT);
- this.encoding = encoding.orElse(NativeEncoding.GZIP);
- }
-
- @Override
- public RpcProtocolComponent module(final Dependencies dependencies) {
- final C c = DaggerNativeRpcProtocolModule_C
- .builder()
- .dependencies(dependencies)
- .m(new M())
- .build();
-
- return c;
- }
-
- @NativeRpcScope
- @Component(modules = M.class, dependencies = Dependencies.class)
- interface C extends RpcProtocolComponent {
- @Override
- NativeRpcProtocol rpcProtocol();
-
- @Override
- LifeCycle life();
- }
-
- @RequiredArgsConstructor
- @Module
- class M {
- @Provides
- @NativeRpcScope
- @Named("bindFuture")
- public ResolvableFuture bindFuture(final AsyncFramework async) {
- return async.future();
- }
-
- @Provides
- @NativeRpcScope
- @Named("boss")
- public EventLoopGroup bossGroup() {
- return new NioEventLoopGroup(parentThreads);
- }
-
- @Provides
- @NativeRpcScope
- @Named("worker")
- public EventLoopGroup workerGroup() {
- return new NioEventLoopGroup(childThreads);
- }
-
- @Provides
- @NativeRpcScope
- public Timer timer() {
- return new HashedWheelTimer(
- new ThreadFactoryBuilder().setNameFormat("nativerpc-timer-%d").build());
- }
-
- @Provides
- @NativeRpcScope
- public NativeEncoding encoding() {
- return encoding;
- }
-
- @Provides
- @NativeRpcScope
- @Named("bindAddress")
- SocketAddress bindAddress() {
- return address;
- }
-
- @Provides
- @NativeRpcScope
- @Named("defaultPort")
- int defaultPort() {
- return DEFAULT_PORT;
- }
-
- @Provides
- @NativeRpcScope
- @Named("maxFrameSize")
- int maxFrameSize() {
- return maxFrameSize;
- }
-
- @Provides
- @NativeRpcScope
- @Named("sendTimeout")
- long sendTimeout() {
- return sendTimeout;
- }
-
- @Provides
- @NativeRpcScope
- @Named("heartbeatReadInterval")
- long heartbeatReadInterval() {
- return heartbeatInterval;
- }
-
- @Provides
- @NativeRpcScope
- LifeCycle server(LifeCycleManager manager, NativeRpcProtocolServer server) {
- return manager.build(server);
- }
- }
-
- @Override
- public String scheme() {
- return "nativerpc";
- }
-
- public static Builder builder() {
- return new Builder();
- }
-
- public static class Builder {
- private String host = DEFAULT_HOST;
- private int port = DEFAULT_PORT;
- private int parentThreads = DEFAULT_PARENT_THREADS;
- private int childThreads = DEFAULT_CHILD_THREADS;
- private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
- private long sendTimeout = DEFAULT_SEND_TIMEOUT;
- private long heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
- private NativeEncoding encoding = NativeEncoding.GZIP;
-
- public Builder host(final String host) {
- this.host = host;
- return this;
- }
-
- public Builder port(final int port) {
- this.port = port;
- return this;
- }
-
- public Builder parentThreads(final int parentThreads) {
- this.parentThreads = parentThreads;
- return this;
- }
-
- public Builder childThreads(final int childThreads) {
- this.childThreads = childThreads;
- return this;
- }
-
- public Builder maxFrameSize(final int maxFrameSize) {
- this.maxFrameSize = maxFrameSize;
- return this;
- }
-
- public Builder sendTimeout(final long sendtimeout) {
- this.sendTimeout = sendtimeout;
- return this;
- }
-
- public Builder heartbeatInterval(final int heartbeatInterval) {
- this.heartbeatInterval = heartbeatInterval;
- return this;
- }
-
- public Builder encoding(final NativeEncoding encoding) {
- this.encoding = encoding;
- return this;
- }
-
- public NativeRpcProtocolModule build() {
- return new NativeRpcProtocolModule(host, port, parentThreads, childThreads,
- maxFrameSize, sendTimeout, heartbeatInterval, Optional.of(encoding));
- }
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcProtocolServer.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcProtocolServer.java
deleted file mode 100644
index 89723296f..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcProtocolServer.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
-import com.spotify.heroic.cluster.NodeMetadata;
-import com.spotify.heroic.lifecycle.LifeCycleRegistry;
-import com.spotify.heroic.lifecycle.LifeCycles;
-import com.spotify.heroic.metadata.CountSeries;
-import com.spotify.heroic.metadata.DeleteSeries;
-import com.spotify.heroic.metadata.FindKeys;
-import com.spotify.heroic.metadata.FindSeries;
-import com.spotify.heroic.metadata.FindTags;
-import com.spotify.heroic.metadata.MetadataBackend;
-import com.spotify.heroic.metadata.MetadataManager;
-import com.spotify.heroic.metadata.WriteMetadata;
-import com.spotify.heroic.metric.FullQuery;
-import com.spotify.heroic.metric.MetricBackend;
-import com.spotify.heroic.metric.MetricBackendGroup;
-import com.spotify.heroic.metric.MetricManager;
-import com.spotify.heroic.metric.WriteMetric;
-import com.spotify.heroic.rpc.nativerpc.NativeRpcProtocol.GroupedQuery;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcEmptyBody;
-import com.spotify.heroic.suggest.KeySuggest;
-import com.spotify.heroic.suggest.SuggestBackend;
-import com.spotify.heroic.suggest.SuggestManager;
-import com.spotify.heroic.suggest.TagKeyCount;
-import com.spotify.heroic.suggest.TagSuggest;
-import com.spotify.heroic.suggest.TagValueSuggest;
-import com.spotify.heroic.suggest.TagValuesSuggest;
-import eu.toolchain.async.AsyncFramework;
-import eu.toolchain.async.AsyncFuture;
-import eu.toolchain.async.ResolvableFuture;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.util.Timer;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-
-import javax.inject.Inject;
-import javax.inject.Named;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class NativeRpcProtocolServer implements LifeCycles {
- private final AsyncFramework async;
- private final MetricManager metrics;
- private final MetadataManager metadata;
- private final SuggestManager suggest;
- private final NodeMetadata localMetadata;
- private final ObjectMapper mapper;
- private final Timer timer;
- private final EventLoopGroup bossGroup;
- private final EventLoopGroup workerGroup;
- private final NativeEncoding encoding;
- private final ResolvableFuture bindFuture;
-
- private final SocketAddress address;
- private final int maxFrameSize;
-
- private final AtomicReference serverChannel = new AtomicReference<>();
- private final NativeRpcContainer container = new NativeRpcContainer();
-
- @Inject
- public NativeRpcProtocolServer(
- AsyncFramework async, MetricManager metrics, MetadataManager metadata,
- SuggestManager suggest, NodeMetadata localMetadata,
- @Named("application/json+internal") ObjectMapper mapper, Timer timer,
- @Named("boss") EventLoopGroup bossGroup, @Named("worker") EventLoopGroup workerGroup,
- NativeEncoding encoding,
- @Named("bindFuture") ResolvableFuture bindFuture,
- @Named("bindAddress") SocketAddress address, @Named("maxFrameSize") int maxFrameSize
- ) {
- this.async = async;
- this.metrics = metrics;
- this.metadata = metadata;
- this.suggest = suggest;
- this.localMetadata = localMetadata;
- this.mapper = mapper;
- this.timer = timer;
- this.bossGroup = bossGroup;
- this.workerGroup = workerGroup;
- this.encoding = encoding;
- this.bindFuture = bindFuture;
- this.address = address;
- this.maxFrameSize = maxFrameSize;
- }
-
- @Override
- public void register(LifeCycleRegistry registry) {
- registry.start(this::start);
- registry.stop(this::stop);
- }
-
- {
- container.register("metadata", new NativeRpcEndpoint() {
- @Override
- public AsyncFuture handle(final NativeRpcEmptyBody request)
- throws Exception {
- return async.resolved(localMetadata);
- }
- });
-
- container.register(NativeRpcProtocol.METRICS_QUERY,
- new NativeRpcEndpoint, FullQuery>() {
- @Override
- public AsyncFuture handle(final GroupedQuery g)
- throws Exception {
- return g.apply(metrics, MetricBackendGroup::query);
- }
- });
-
- container.register(NativeRpcProtocol.METRICS_WRITE,
- new NativeRpcEndpoint, WriteMetric>() {
- @Override
- public AsyncFuture handle(final GroupedQuery g)
- throws Exception {
- return g.apply(metrics, MetricBackend::write);
- }
- });
-
- container.register(NativeRpcProtocol.METADATA_FIND_TAGS,
- new NativeRpcEndpoint, FindTags>() {
- @Override
- public AsyncFuture handle(final GroupedQuery g)
- throws Exception {
- return g.apply(metadata, MetadataBackend::findTags);
- }
- });
-
- container.register(NativeRpcProtocol.METADATA_FIND_KEYS,
- new NativeRpcEndpoint, FindKeys>() {
- @Override
- public AsyncFuture handle(final GroupedQuery g)
- throws Exception {
- return g.apply(metadata, MetadataBackend::findKeys);
- }
- });
-
- container.register(NativeRpcProtocol.METADATA_FIND_SERIES,
- new NativeRpcEndpoint, FindSeries>() {
- @Override
- public AsyncFuture handle(final GroupedQuery g)
- throws Exception {
- return g.apply(metadata, MetadataBackend::findSeries);
- }
- });
-
- container.register(NativeRpcProtocol.METADATA_COUNT_SERIES,
- new NativeRpcEndpoint, CountSeries>() {
- @Override
- public AsyncFuture handle(final GroupedQuery g)
- throws Exception {
- return g.apply(metadata, MetadataBackend::countSeries);
- }
- });
-
- container.register(NativeRpcProtocol.METADATA_WRITE,
- new NativeRpcEndpoint, WriteMetadata>() {
- @Override
- public AsyncFuture handle(
- final GroupedQuery g
- ) throws Exception {
- return g.apply(metadata, MetadataBackend::write);
- }
- });
-
- container.register(NativeRpcProtocol.METADATA_DELETE_SERIES,
- new NativeRpcEndpoint, DeleteSeries>() {
- @Override
- public AsyncFuture handle(final GroupedQuery g)
- throws Exception {
- return g.apply(metadata, MetadataBackend::deleteSeries);
- }
- });
-
- container.register(NativeRpcProtocol.SUGGEST_TAG_KEY_COUNT,
- new NativeRpcEndpoint, TagKeyCount>() {
- @Override
- public AsyncFuture handle(final GroupedQuery g)
- throws Exception {
- return g.apply(suggest, SuggestBackend::tagKeyCount);
- }
- });
-
- container.register(NativeRpcProtocol.SUGGEST_TAG,
- new NativeRpcEndpoint, TagSuggest>() {
- @Override
- public AsyncFuture handle(final GroupedQuery g)
- throws Exception {
- return g.apply(suggest, SuggestBackend::tagSuggest);
- }
- });
-
- container.register(NativeRpcProtocol.SUGGEST_KEY,
- new NativeRpcEndpoint, KeySuggest>() {
- @Override
- public AsyncFuture handle(final GroupedQuery g)
- throws Exception {
- return g.apply(suggest, SuggestBackend::keySuggest);
- }
- });
-
- container.register(NativeRpcProtocol.SUGGEST_TAG_VALUES,
- new NativeRpcEndpoint, TagValuesSuggest>() {
- @Override
- public AsyncFuture handle(
- final GroupedQuery g
- ) throws Exception {
- return g.apply(suggest, SuggestBackend::tagValuesSuggest);
- }
- });
-
- container.register(NativeRpcProtocol.SUGGEST_TAG_VALUE,
- new NativeRpcEndpoint, TagValueSuggest>() {
- @Override
- public AsyncFuture handle(
- final GroupedQuery g
- ) throws Exception {
- return g.apply(suggest, SuggestBackend::tagValueSuggest);
- }
- });
- }
-
- private AsyncFuture start() {
- final ServerBootstrap s = new ServerBootstrap();
- s.channel(NioServerSocketChannel.class);
- s.group(bossGroup, workerGroup);
- s.childHandler(
- new NativeRpcServerSession(timer, mapper, container, maxFrameSize, encoding));
-
- final ChannelFuture bind = s.bind(address);
-
- bind.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(final ChannelFuture f) throws Exception {
- if (!f.isSuccess()) {
- bindFuture.fail(f.cause());
- return;
- }
-
- serverChannel.set(f.channel());
- final InetSocketAddress address = (InetSocketAddress) f.channel().localAddress();
- bindFuture.resolve(address);
- }
- });
-
- return bindFuture.directTransform(a -> null);
- }
-
- private Callable teardownServer() {
- return new Callable() {
- @Override
- public Void call() throws Exception {
- final Channel ch = serverChannel.getAndSet(null);
-
- if (ch != null) {
- ch.close();
- }
-
- return null;
- }
- };
- }
-
- private List> teardownThreadpools() {
- final ResolvableFuture worker = async.future();
-
- workerGroup.shutdownGracefully().addListener(new FutureListener() {
- @Override
- public void operationComplete(Future f) throws Exception {
- if (!f.isSuccess()) {
- worker.fail(f.cause());
- return;
- }
-
- worker.resolve(null);
- }
- });
-
- final ResolvableFuture boss = async.future();
-
- bossGroup.shutdownGracefully().addListener(new FutureListener() {
- @Override
- public void operationComplete(Future f) throws Exception {
- if (!f.isSuccess()) {
- boss.fail(f.cause());
- return;
- }
-
- boss.resolve(null);
- }
- });
-
- return ImmutableList.>of(worker, boss);
- }
-
- private Callable teardownTimer() {
- return new Callable() {
- @Override
- public Void call() throws Exception {
- timer.stop();
- return null;
- }
- };
- }
-
- private AsyncFuture stop() {
- final List> callbacks = new ArrayList<>();
- callbacks.add(async.call(teardownServer()));
- callbacks.addAll(teardownThreadpools());
- callbacks.add(async.call(teardownTimer()));
- return async.collectAndDiscard(callbacks);
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcRemoteException.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcRemoteException.java
deleted file mode 100644
index 1275b34c5..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcRemoteException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-import lombok.Getter;
-import lombok.ToString;
-
-import java.net.InetSocketAddress;
-
-@ToString(of = {"address", "message"})
-public class NativeRpcRemoteException extends Exception {
- private static final long serialVersionUID = -664905544594225316L;
-
- @Getter
- public final InetSocketAddress address;
-
- public final String message;
-
- public NativeRpcRemoteException(InetSocketAddress address, String message) {
- super(address + ": " + message);
- this.address = address;
- this.message = message;
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcScope.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcScope.java
deleted file mode 100644
index e34f2b704..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcScope.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-import javax.inject.Scope;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-
-@Scope
-@Retention(RetentionPolicy.RUNTIME)
-public @interface NativeRpcScope {
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcServerSession.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcServerSession.java
deleted file mode 100644
index 781dbb19f..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeRpcServerSession.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.spotify.heroic.rpc.nativerpc.message.NativeOptions;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcError;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcHeartBeat;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcRequest;
-import com.spotify.heroic.rpc.nativerpc.message.NativeRpcResponse;
-import eu.toolchain.async.AsyncFuture;
-import eu.toolchain.async.FutureDone;
-import eu.toolchain.async.Transform;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.LengthFieldPrepender;
-import io.netty.util.Timeout;
-import io.netty.util.Timer;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-import java.nio.charset.Charset;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-@Slf4j
-@RequiredArgsConstructor
-public class NativeRpcServerSession extends ChannelInitializer {
- private static final Charset UTF8 = Charset.forName("UTF-8");
-
- private final Timer timer;
- private final ObjectMapper mapper;
- private final NativeRpcContainer container;
- private final int maxFrameSize;
- private final NativeEncoding encoding;
-
- @Override
- protected void initChannel(final SocketChannel ch) throws Exception {
- final AtomicReference heartbeatTimeout = new AtomicReference<>();
-
- final ChannelPipeline pipeline = ch.pipeline();
- // first four bytes are length prefix of message, strip first four bytes.
- pipeline.addLast(new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
- pipeline.addLast(new NativeRpcDecoder());
- pipeline.addLast(new ChannelHandler(heartbeatTimeout));
-
- pipeline.addLast(new LengthFieldPrepender(4));
- pipeline.addLast(new NativeRpcEncoder());
- }
-
- /**
- * Stop any current timeout, if pending.
- */
- private void stopCurrentTimeout(final AtomicReference heartbeatTimeout) {
- final Timeout old = heartbeatTimeout.getAndSet(null);
-
- if (old != null) {
- old.cancel();
- }
- }
-
- @RequiredArgsConstructor
- private class ChannelHandler extends SimpleChannelInboundHandler {
- private final AtomicReference heartbeatTimeout;
-
- @Override
- protected void channelRead0(final ChannelHandlerContext ctx, final Object msg)
- throws Exception {
- if (msg instanceof NativeRpcRequest) {
- try {
- handleRequest(ctx.channel(), (NativeRpcRequest) msg);
- } catch (Exception e) {
- log.error("Failed to handle request", e);
- sendError(ctx.channel(), e.getMessage()).addListener(closeListener());
- return;
- }
-
- return;
- }
-
- throw new IllegalArgumentException("Invalid request: " + msg);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- log.error("{}: exception in channel, closing", ctx.channel(), cause);
- stopCurrentTimeout(heartbeatTimeout);
- ctx.channel().close();
- }
-
- private void handleRequest(final Channel ch, NativeRpcRequest msg) throws Exception {
- final NativeRpcRequest request = (NativeRpcRequest) msg;
- final NativeRpcContainer.EndpointSpec handle =
- container.get(request.getEndpoint());
-
- if (handle == null) {
- sendError(ch, "No such endpoint: " + request.getEndpoint()).addListener(
- closeListener());
- return;
- }
-
- if (log.isTraceEnabled()) {
- log.trace("request[{}:{}ms] {}", request.getEndpoint(),
- request.getHeartbeatInterval(), new String(request.getBody(), UTF8));
- }
-
- final long heartbeatInterval = calculcateHeartbeatInterval(msg);
-
- if (heartbeatInterval > 0) {
- // start sending heartbeat since we are now processing a request.
- setupHeartbeat(ch, heartbeatInterval);
- }
-
- final byte[] bytes =
- NativeUtils.decodeBody(request.getOptions(), request.getSize(), request.getBody());
-
- final Object body = mapper.readValue(bytes, handle.requestType());
-
- final AsyncFuture handleFuture = handle.handle(body);
-
- // Serialize in a separate thread on the async thread pool.
- // this also neatly catches errors for us in the next step.
- // Stop sending heartbeats immediately when the future has been finished.
- // this will cause the other end to time out if a response is available, but its unable
- // to pass the network.
- handleFuture
- .directTransform(serialize(request))
- .onFinished(() -> stopCurrentTimeout(heartbeatTimeout))
- .onDone(sendResponseHandle(ch));
- }
-
- private long calculcateHeartbeatInterval(NativeRpcRequest msg) {
- if (msg.getHeartbeatInterval() <= 0) {
- return 0;
- }
-
- return msg.getHeartbeatInterval() / 2;
- }
-
- private void setupHeartbeat(final Channel ch, final long heartbeatInterval) {
- scheduleHeartbeat(ch, heartbeatInterval);
-
- ch.closeFuture().addListener(future -> {
- stopCurrentTimeout(heartbeatTimeout);
- });
- }
-
- private void scheduleHeartbeat(final Channel ch, final long heartbeatInterval) {
- final Timeout timeout = timer.newTimeout(t -> {
- sendHeartbeat(ch).addListener((final ChannelFuture future) -> {
- scheduleHeartbeat(ch, heartbeatInterval);
- });
- }, heartbeatInterval, TimeUnit.MILLISECONDS);
-
- final Timeout old = heartbeatTimeout.getAndSet(timeout);
-
- if (old != null) {
- old.cancel();
- }
- }
-
- private Transform serialize(final NativeRpcRequest request) {
- return (Object result) -> {
- byte[] body = mapper.writeValueAsBytes(result);
-
- if (log.isTraceEnabled()) {
- log.trace("response[{}]: {}", request.getEndpoint(), new String(body, UTF8));
- }
-
- final int bodySize = body.length;
- final NativeOptions options = new NativeOptions(encoding);
- return new NativeRpcResponse(options, bodySize,
- NativeUtils.encodeBody(options, body));
- };
- }
-
- private FutureDone sendResponseHandle(final Channel ch) {
- return new FutureDone() {
- @Override
- public void cancelled() throws Exception {
- log.error("{}: request cancelled", ch);
- ch
- .writeAndFlush(new NativeRpcError("request cancelled"))
- .addListener(closeListener());
- }
-
- @Override
- public void failed(final Throwable e) throws Exception {
- log.error("{}: request failed", ch, e);
- ch
- .writeAndFlush(new NativeRpcError(e.getMessage()))
- .addListener(closeListener());
- }
-
- @Override
- public void resolved(final NativeRpcResponse response) throws Exception {
- sendHeartbeat(ch).addListener(f -> {
- if (!f.isSuccess()) {
- sendError(ch, f.cause() == null ? "send of tail heartbeat failed"
- : f.cause().getMessage()).addListener(closeListener());
- return;
- }
-
- ch.writeAndFlush(response).addListener(closeListener());
- });
- }
- };
- }
-
- private ChannelFuture sendHeartbeat(final Channel ch) {
- return ch.writeAndFlush(new NativeRpcHeartBeat());
- }
-
- private ChannelFuture sendError(final Channel ch, final String error) {
- return ch.writeAndFlush(new NativeRpcError(error));
- }
-
- /**
- * Listener that shuts down the connection after its promise has been resolved.
- */
- private ChannelFutureListener closeListener() {
- // immediately stop sending heartbeats.
- stopCurrentTimeout(heartbeatTimeout);
-
- return future -> future.channel().close();
- }
- }
-};
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeUtils.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeUtils.java
deleted file mode 100644
index 3aa2ffa9e..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/NativeUtils.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc;
-
-import com.spotify.heroic.rpc.nativerpc.message.NativeOptions;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
-public abstract class NativeUtils {
- static byte[] decodeBody(final NativeOptions options, final int bodySize, final byte[] body)
- throws IOException {
- switch (options.getEncoding()) {
- case NONE:
- return body;
- case GZIP:
- return gzipDecompress(bodySize, body);
- default:
- throw new IllegalStateException("Unsupported encoding: " + options);
- }
- }
-
- static byte[] gzipDecompress(final int bodySize, final byte[] body) throws IOException {
- final byte[] bytes = new byte[bodySize];
-
- try (final GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(body))) {
- int offset = 0;
-
- while (offset < bodySize) {
- int read = in.read(bytes, offset, bodySize - offset);
-
- if (read < 0) {
- throw new EOFException();
- }
-
- offset += read;
- }
- }
-
- return bytes;
- }
-
- static byte[] encodeBody(final NativeOptions options, final byte[] body) throws IOException {
- switch (options.getEncoding()) {
- case NONE:
- return body;
- case GZIP:
- return gzipCompress(body);
- default:
- throw new IllegalStateException("Unsupported encoding: " + options.getEncoding());
- }
- }
-
- static byte[] gzipCompress(byte[] body) throws IOException {
- try (final ByteArrayOutputStream output = new ByteArrayOutputStream()) {
- try (final GZIPOutputStream out = new GZIPOutputStream(output)) {
- out.write(body);
- }
-
- return output.toByteArray();
- }
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeOptions.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeOptions.java
deleted file mode 100644
index 673ce3f98..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeOptions.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc.message;
-
-import com.spotify.heroic.rpc.nativerpc.NativeEncoding;
-import lombok.Data;
-import org.msgpack.packer.Packer;
-import org.msgpack.unpacker.Unpacker;
-
-import java.io.IOException;
-
-/**
- * A dynamic set of options that are part of a native rpc request.
- */
-@Data
-public class NativeOptions {
- /**
- * Indicates the encoding that the body is encoded using.
- */
- static final String ENCODING = "encoding";
-
- private final NativeEncoding encoding;
-
- public static NativeOptions unpack(final Unpacker unpacker) throws IOException {
- NativeEncoding encoding = NativeEncoding.NONE;
-
- final int size = unpacker.readInt();
-
- for (int i = 0; i < size; i++) {
- switch (unpacker.readString()) {
- case ENCODING:
- encoding = NativeEncoding.valueOf(unpacker.readString());
- break;
- default: // ignore unknown options
- break;
- }
- }
-
- return new NativeOptions(encoding);
- }
-
- public static void pack(NativeOptions options, Packer out) throws IOException {
- // number of options
- out.write(1);
-
- out.write(ENCODING);
- out.write(options.getEncoding().toString());
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeRpcEmptyBody.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeRpcEmptyBody.java
deleted file mode 100644
index 422e5c0bf..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeRpcEmptyBody.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc.message;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-public class NativeRpcEmptyBody {
- @JsonCreator
- public NativeRpcEmptyBody() {
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeRpcError.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeRpcError.java
deleted file mode 100644
index b86bc4330..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeRpcError.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc.message;
-
-import lombok.Data;
-import org.msgpack.packer.Packer;
-import org.msgpack.unpacker.Unpacker;
-
-import java.io.IOException;
-
-@Data
-public class NativeRpcError {
- private final String message;
-
- public static NativeRpcError unpack(final Unpacker unpacker) throws IOException {
- final String message = unpacker.readString();
- return new NativeRpcError(message);
- }
-
- public static void pack(final NativeRpcError in, final Packer out) throws IOException {
- out.write(in.getMessage());
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeRpcHeartBeat.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeRpcHeartBeat.java
deleted file mode 100644
index 2ff48a881..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeRpcHeartBeat.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc.message;
-
-import org.msgpack.packer.Packer;
-import org.msgpack.unpacker.Unpacker;
-
-import java.io.IOException;
-
-public class NativeRpcHeartBeat {
- public static NativeRpcHeartBeat unpack(final Unpacker unpacker) throws IOException {
- return new NativeRpcHeartBeat();
- }
-
- public static void pack(NativeRpcHeartBeat in, Packer out) throws IOException {
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeRpcRequest.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeRpcRequest.java
deleted file mode 100644
index 2da735d29..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeRpcRequest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc.message;
-
-import lombok.Data;
-import org.msgpack.packer.Packer;
-import org.msgpack.unpacker.Unpacker;
-
-import java.io.IOException;
-
-@Data
-public class NativeRpcRequest {
- private final String endpoint;
- private final long heartbeatInterval;
- private final NativeOptions options;
- private final int size;
- private final byte[] body;
-
- public static NativeRpcRequest unpack(final Unpacker unpacker) throws IOException {
- final String endpoint = unpacker.readString();
- final long heartbeatInterval = unpacker.readLong();
-
- final NativeOptions options = NativeOptions.unpack(unpacker);
-
- final int size = unpacker.readInt();
- final byte[] body = unpacker.readByteArray();
- return new NativeRpcRequest(endpoint, heartbeatInterval, options, size, body);
- }
-
- public static void pack(final NativeRpcRequest in, final Packer out) throws IOException {
- out.write(in.getEndpoint());
- out.write(in.getHeartbeatInterval());
-
- // OPTIONS
- NativeOptions.pack(in.getOptions(), out);
-
- out.write(in.getSize());
- out.write(in.getBody());
- }
-}
diff --git a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeRpcResponse.java b/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeRpcResponse.java
deleted file mode 100644
index 626afe7ab..000000000
--- a/rpc/nativerpc/src/main/java/com/spotify/heroic/rpc/nativerpc/message/NativeRpcResponse.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (c) 2015 Spotify AB.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.spotify.heroic.rpc.nativerpc.message;
-
-import lombok.Data;
-import org.msgpack.packer.Packer;
-import org.msgpack.unpacker.Unpacker;
-
-import java.io.IOException;
-
-@Data
-public class NativeRpcResponse {
- private final NativeOptions options;
- private final int size;
- private final byte[] body;
-
- public static NativeRpcResponse unpack(final Unpacker unpacker) throws IOException {
- final NativeOptions options = NativeOptions.unpack(unpacker);
- final int size = unpacker.readInt();
- final byte[] body = unpacker.readByteArray();
-
- return new NativeRpcResponse(options, size, body);
- }
-
- public static void pack(NativeRpcResponse in, Packer out) throws IOException {
- // OPTIONS
- NativeOptions.pack(in.getOptions(), out);
-
- out.write(in.getSize());
- out.write(in.getBody());
- }
-}
diff --git a/rpc/nativerpc/src/test/java/com/spotify/heroic/rpc/nativerpc/NativeRpcContainerTest.java b/rpc/nativerpc/src/test/java/com/spotify/heroic/rpc/nativerpc/NativeRpcContainerTest.java
deleted file mode 100644
index 4ca419778..000000000
--- a/rpc/nativerpc/src/test/java/com/spotify/heroic/rpc/nativerpc/NativeRpcContainerTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package com.spotify.heroic.rpc.nativerpc;
-
-import eu.toolchain.async.AsyncFuture;
-import org.junit.Test;
-
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.Map;
-
-import static org.junit.Assert.assertTrue;
-
-public class NativeRpcContainerTest {
- @Test
- public void testTypeReference() {
- final NativeRpcEndpoint, String> endpoint =
- new NativeRpcEndpoint, String>() {
- @Override
- public AsyncFuture handle(Map request) throws Exception {
- return null;
- }
- };
-
- final Type type = endpoint.requestType().getType();
- assertTrue(type instanceof ParameterizedType);
-
- final ParameterizedType t = (ParameterizedType) type;
- assertTrue(t.getRawType() == Map.class);
- assertTrue(t.getActualTypeArguments().length == 2);
- assertTrue(t.getActualTypeArguments()[0] == String.class);
- assertTrue(t.getActualTypeArguments()[1] == Integer.class);
- }
-}
diff --git a/rpc/nativerpc/src/test/java/com/spotify/heroic/rpc/nativerpc/NativeUtilsTest.java b/rpc/nativerpc/src/test/java/com/spotify/heroic/rpc/nativerpc/NativeUtilsTest.java
deleted file mode 100644
index 043cd6a79..000000000
--- a/rpc/nativerpc/src/test/java/com/spotify/heroic/rpc/nativerpc/NativeUtilsTest.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.spotify.heroic.rpc.nativerpc;
-
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertArrayEquals;
-
-public class NativeUtilsTest {
- // sufficiently large to force compression to use multiple reads.
- public static final int SIZE = 4096 * 200;
-
- @Test
- public void testGzipUtilities() throws IOException {
- byte[] reference = new byte[SIZE];
-
- for (int i = 0; i < reference.length; i++) {
- reference[i] = (byte) (i % 10);
- }
-
- final byte[] compressed = NativeUtils.gzipCompress(reference);
- final byte[] result = NativeUtils.gzipDecompress(SIZE, compressed);
-
- assertArrayEquals(reference, result);
- }
-}