Skip to content
This repository has been archived by the owner on Nov 4, 2019. It is now read-only.

Commit

Permalink
Add nghttpx reverse-proxy in front of ext interface. This will be use…
Browse files Browse the repository at this point in the history
…d (1) to distinguish gRPC traffic from REST API traffic and (2) as TLS termination.
  • Loading branch information
vladaionescu committed Apr 27, 2016
1 parent 26a13e9 commit cfb6272
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 14 deletions.
3 changes: 2 additions & 1 deletion Makefile
Expand Up @@ -128,7 +128,7 @@ runcommon:
$(MAKE) \
upload-config \
init-db-tables
$(DOCKER_COMPOSE) up --force-recreate leveroshost
$(DOCKER_COMPOSE) up --force-recreate leveroshost nghttpxext

.PHONY: install-cli
install-cli: $(BIN_DIR)/lever
Expand Down Expand Up @@ -266,6 +266,7 @@ docker-%: $(SERVICES_DIR)/%/Dockerfile FORCE
$(DOCKER) build -t leveros/$(@:docker-%=%) $(dir $<)

docker-consul: | docker-base
docker-nghttpx: | docker-base
docker-levercontainer: | docker-base
docker-levercontainer: $(SERVICES_DIR)/levercontainer/js/leveros-server
docker-levercontainer: $(SERVICES_DIR)/levercontainer/js/leveros-common
Expand Down
8 changes: 8 additions & 0 deletions api/client.go
Expand Up @@ -328,5 +328,13 @@ func (client *Client) invokeChanInternal(
grpcStream.CloseSend()
return nil, err
}
// First message received must be empty.
fstMsg, err := grpcStream.Recv()
if err != nil {
return nil, err
}
if fstMsg.GetMessageOneof() != nil {
return nil, fmt.Errorf("First server message needs to be empty")
}
return newClientStream(grpcStream), nil
}
3 changes: 2 additions & 1 deletion api/server.go
Expand Up @@ -226,10 +226,11 @@ func (server *Server) HandleStreamingRPC(
if rpc == nil {
return fmt.Errorf("First message needs to have RPC field set")
}

if rpc.GetArgsOneof() == nil {
return fmt.Errorf("RPC has no args oneof")
}
// Reply with an empty msg (first response message must be empty).
grpcStream.Send(&core.StreamMessage{})

server.lock.RLock()
entry, handlerOK := server.streamingHandlers[leverURL.Method]
Expand Down
5 changes: 4 additions & 1 deletion core/leverrpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 16 additions & 1 deletion docker-compose.yml
Expand Up @@ -12,7 +12,6 @@ services:
- /var/run/docker.sock:/var/run/docker.sock:z
- ${LEVEROS_REPO_DIR}:/leveros/custcodetree:Z
ports:
- ${LEVEROS_IP_PORT}:8080
- 127.0.0.1:6514:6514 # Dev logger.
command: >-
--config leveroshost
Expand All @@ -23,6 +22,22 @@ services:
environment:
LEVEROS_IP_PORT: ${LEVEROS_IP_PORT}

nghttpxext:
image: leveros/nghttpx:latest
container_name: leverosnghttpxext
ports:
- ${LEVEROS_IP_PORT}:8080
command: >-
--frontend=*,8080;no-tls
--backend=leveroshost,3502;/core.LeverRPC/;proto=h2
--backend=leveroshost,3503
--backend-address-family=IPv4
--backend-keep-alive-timeout=30s
--strip-incoming-x-forwarded-for
--add-x-forwarded-for
--backend-http2-max-concurrent-streams=10000
--workers=4
consul:
image: leveros/consul:latest
container_name: leverosconsul
Expand Down
2 changes: 1 addition & 1 deletion host/proxy.go
Expand Up @@ -42,7 +42,7 @@ var (
PackageName, "envInListenPortFlag", "3500")
// EnvExtListenPortFlag is the listen port for external connections.
EnvExtListenPortFlag = config.DeclareString(
PackageName, "envExtListenPortFlag", "8080")
PackageName, "envExtListenPortFlag", "3502")
)

// LeverProxy is a proxy server that mediates Lever RPCs from the outside world
Expand Down
1 change: 0 additions & 1 deletion host/proxyextout.go
Expand Up @@ -10,7 +10,6 @@ import (
)

func (proxy *LeverProxy) serveExt() (err error) {
// TODO: Restrict to external network.
listenAddr := ":" + EnvExtListenPortFlag.Get()
proxy.extLogger.WithFields("listenAddr", listenAddr).Info("Listening")
proxy.extListener, _, err = proxy.inServer.Serve(
Expand Down
10 changes: 5 additions & 5 deletions js/leveros-server/lib/serve.js
Expand Up @@ -14,8 +14,6 @@ class Handler {
}

handleRpc(call, callback) {
// TODO RPC gateway
//call.metadata.get('x-lever-internal-rpc-gateway')[0];
setInternalRPCGateway(call.metadata);
let leverURL;
try {
Expand Down Expand Up @@ -50,8 +48,6 @@ class Handler {
}

handleStreamingRpc(call) {
// TODO RPC gateway
//call.metadata.get('x-lever-internal-rpc-gateway')[0];
setInternalRPCGateway(call.metadata);
let leverURL;
try {
Expand All @@ -66,7 +62,7 @@ class Handler {
};
call.on('error', onError);
call.once('data', (streamMsg) => {
call.removeListener(onError);
call.removeListener('error', onError);
if (streamMsg.message_oneof !== 'rpc') {
call.write({error: "First message was not rpc"});
call.end();
Expand All @@ -80,6 +76,10 @@ class Handler {
return;
}
const method = this._custHandler[leverURL.method];

// First message sent back must be empty.
call.write({});

const args = [new common.Stream(call)];
if (leverURL.resource !== "") {
args.push(leverURL.resource);
Expand Down
17 changes: 16 additions & 1 deletion js/leveros/lib/client.js
Expand Up @@ -149,7 +149,22 @@ class Client {

const call = sendStreamingLeverRPC(connection, leverURL);
call.write({rpc: common.jsToRpc(args)});
callback(null, new common.Stream(call));
const onError = (error) => {
callback(error);
};
call.on('error', onError);
call.once('data', (streamMsg) => {
call.removeListener('error', onError);
if (streamMsg.message_oneof !== null) {
const errorStr = "First message must be empty";
call.write({error: errorStr});
call.end();
callback(new Error(errorStr));
return;
}

callback(null, new common.Stream(call));
});
});
}
}
Expand Down
5 changes: 4 additions & 1 deletion protos/core/leverrpc.proto
Expand Up @@ -30,10 +30,13 @@ message StreamMessage {
// Note that the invokation details (rpc field) are part of the very first
// message sent by the client. It is an error for any other message to have
// that field set.
// Also, the very first message sent by the server must be an empty
// message. This is a workaround for some reverse-proxies which do funny
// things if there's no data frame.
oneof message_oneof {
// First message, client-to-server only.
RPC rpc = 1;
// Other messages (all server-to-client and non-first client-to-server).
// Non-first messages.
// These are passed to the Lever application's handlers.
JSON message = 2;
bytes byte_message = 3;
Expand Down
2 changes: 1 addition & 1 deletion services/leveroshost/Dockerfile
Expand Up @@ -2,7 +2,7 @@ FROM leveros/base:latest

COPY ./leveroshost /leveros/bin/

EXPOSE 3500 3501 3838 6514 8080
EXPOSE 3500 3501 3502 3838 6514
VOLUME /var/run/docker.sock

WORKDIR /leveros
Expand Down
13 changes: 13 additions & 0 deletions services/nghttpx/Dockerfile
@@ -0,0 +1,13 @@
FROM leveros/base:latest

RUN echo "http://dl-4.alpinelinux.org/alpine/edge/testing" >> /etc/apk/repositories && \
addgroup -S nghttpx && \
adduser -S -D -h /dev/null -s /sbin/nologin -G nghttpx nghttpx && \
apk add --update nghttp2 && \
rm -rf /var/cache/apk/*

EXPOSE 8080
USER nghttpx

ENTRYPOINT ["/usr/bin/nghttpx"]
CMD ["--help"]

0 comments on commit cfb6272

Please sign in to comment.