diff --git a/Makefile b/Makefile index 05ca6b0..e436527 100644 --- a/Makefile +++ b/Makefile @@ -128,7 +128,7 @@ runcommon: $(MAKE) \ upload-config \ init-db-tables - $(DOCKER_COMPOSE) up --force-recreate leveroshost + $(DOCKER_COMPOSE) up --force-recreate leveroshost nghttpxext .PHONY: install-cli install-cli: $(BIN_DIR)/lever @@ -266,6 +266,7 @@ docker-%: $(SERVICES_DIR)/%/Dockerfile FORCE $(DOCKER) build -t leveros/$(@:docker-%=%) $(dir $<) docker-consul: | docker-base +docker-nghttpx: | docker-base docker-levercontainer: | docker-base docker-levercontainer: $(SERVICES_DIR)/levercontainer/js/leveros-server docker-levercontainer: $(SERVICES_DIR)/levercontainer/js/leveros-common diff --git a/api/client.go b/api/client.go index 3ca979d..98d2d32 100644 --- a/api/client.go +++ b/api/client.go @@ -328,5 +328,13 @@ func (client *Client) invokeChanInternal( grpcStream.CloseSend() return nil, err } + // First message received must be empty. + fstMsg, err := grpcStream.Recv() + if err != nil { + return nil, err + } + if fstMsg.GetMessageOneof() != nil { + return nil, fmt.Errorf("First server message needs to be empty") + } return newClientStream(grpcStream), nil } diff --git a/api/server.go b/api/server.go index defa9f3..2940bdf 100644 --- a/api/server.go +++ b/api/server.go @@ -226,10 +226,11 @@ func (server *Server) HandleStreamingRPC( if rpc == nil { return fmt.Errorf("First message needs to have RPC field set") } - if rpc.GetArgsOneof() == nil { return fmt.Errorf("RPC has no args oneof") } + // Reply with an empty msg (first response message must be empty). + grpcStream.Send(&core.StreamMessage{}) server.lock.RLock() entry, handlerOK := server.streamingHandlers[leverURL.Method] diff --git a/core/leverrpc.pb.go b/core/leverrpc.pb.go index 7f57c90..8132d12 100644 --- a/core/leverrpc.pb.go +++ b/core/leverrpc.pb.go @@ -25,7 +25,7 @@ import math "math" import ( context "golang.org/x/net/context" - "google.golang.org/grpc" + grpc "google.golang.org/grpc" ) // Reference imports to suppress errors if they are not otherwise used. @@ -161,6 +161,9 @@ type StreamMessage struct { // Note that the invokation details (rpc field) are part of the very first // message sent by the client. It is an error for any other message to have // that field set. + // Also, the very first message sent by the server must be an empty + // message. This is a workaround for some reverse-proxies which do funny + // things if there's no data frame. // // Types that are valid to be assigned to MessageOneof: // *StreamMessage_Rpc diff --git a/docker-compose.yml b/docker-compose.yml index f531a21..e3909bb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,7 +12,6 @@ services: - /var/run/docker.sock:/var/run/docker.sock:z - ${LEVEROS_REPO_DIR}:/leveros/custcodetree:Z ports: - - ${LEVEROS_IP_PORT}:8080 - 127.0.0.1:6514:6514 # Dev logger. command: >- --config leveroshost @@ -23,6 +22,22 @@ services: environment: LEVEROS_IP_PORT: ${LEVEROS_IP_PORT} + nghttpxext: + image: leveros/nghttpx:latest + container_name: leverosnghttpxext + ports: + - ${LEVEROS_IP_PORT}:8080 + command: >- + --frontend=*,8080;no-tls + --backend=leveroshost,3502;/core.LeverRPC/;proto=h2 + --backend=leveroshost,3503 + --backend-address-family=IPv4 + --backend-keep-alive-timeout=30s + --strip-incoming-x-forwarded-for + --add-x-forwarded-for + --backend-http2-max-concurrent-streams=10000 + --workers=4 + consul: image: leveros/consul:latest container_name: leverosconsul diff --git a/host/proxy.go b/host/proxy.go index b44419a..90cd9a8 100644 --- a/host/proxy.go +++ b/host/proxy.go @@ -42,7 +42,7 @@ var ( PackageName, "envInListenPortFlag", "3500") // EnvExtListenPortFlag is the listen port for external connections. EnvExtListenPortFlag = config.DeclareString( - PackageName, "envExtListenPortFlag", "8080") + PackageName, "envExtListenPortFlag", "3502") ) // LeverProxy is a proxy server that mediates Lever RPCs from the outside world diff --git a/host/proxyextout.go b/host/proxyextout.go index 9c59cfd..42a528a 100644 --- a/host/proxyextout.go +++ b/host/proxyextout.go @@ -10,7 +10,6 @@ import ( ) func (proxy *LeverProxy) serveExt() (err error) { - // TODO: Restrict to external network. listenAddr := ":" + EnvExtListenPortFlag.Get() proxy.extLogger.WithFields("listenAddr", listenAddr).Info("Listening") proxy.extListener, _, err = proxy.inServer.Serve( diff --git a/js/leveros-server/lib/serve.js b/js/leveros-server/lib/serve.js index 721fcb5..6aa8857 100644 --- a/js/leveros-server/lib/serve.js +++ b/js/leveros-server/lib/serve.js @@ -14,8 +14,6 @@ class Handler { } handleRpc(call, callback) { - // TODO RPC gateway - //call.metadata.get('x-lever-internal-rpc-gateway')[0]; setInternalRPCGateway(call.metadata); let leverURL; try { @@ -50,8 +48,6 @@ class Handler { } handleStreamingRpc(call) { - // TODO RPC gateway - //call.metadata.get('x-lever-internal-rpc-gateway')[0]; setInternalRPCGateway(call.metadata); let leverURL; try { @@ -66,7 +62,7 @@ class Handler { }; call.on('error', onError); call.once('data', (streamMsg) => { - call.removeListener(onError); + call.removeListener('error', onError); if (streamMsg.message_oneof !== 'rpc') { call.write({error: "First message was not rpc"}); call.end(); @@ -80,6 +76,10 @@ class Handler { return; } const method = this._custHandler[leverURL.method]; + + // First message sent back must be empty. + call.write({}); + const args = [new common.Stream(call)]; if (leverURL.resource !== "") { args.push(leverURL.resource); diff --git a/js/leveros/lib/client.js b/js/leveros/lib/client.js index bdcb01e..4f727e8 100644 --- a/js/leveros/lib/client.js +++ b/js/leveros/lib/client.js @@ -149,7 +149,22 @@ class Client { const call = sendStreamingLeverRPC(connection, leverURL); call.write({rpc: common.jsToRpc(args)}); - callback(null, new common.Stream(call)); + const onError = (error) => { + callback(error); + }; + call.on('error', onError); + call.once('data', (streamMsg) => { + call.removeListener('error', onError); + if (streamMsg.message_oneof !== null) { + const errorStr = "First message must be empty"; + call.write({error: errorStr}); + call.end(); + callback(new Error(errorStr)); + return; + } + + callback(null, new common.Stream(call)); + }); }); } } diff --git a/protos/core/leverrpc.proto b/protos/core/leverrpc.proto index 4e5a05a..ee5032b 100644 --- a/protos/core/leverrpc.proto +++ b/protos/core/leverrpc.proto @@ -30,10 +30,13 @@ message StreamMessage { // Note that the invokation details (rpc field) are part of the very first // message sent by the client. It is an error for any other message to have // that field set. + // Also, the very first message sent by the server must be an empty + // message. This is a workaround for some reverse-proxies which do funny + // things if there's no data frame. oneof message_oneof { // First message, client-to-server only. RPC rpc = 1; - // Other messages (all server-to-client and non-first client-to-server). + // Non-first messages. // These are passed to the Lever application's handlers. JSON message = 2; bytes byte_message = 3; diff --git a/services/leveroshost/Dockerfile b/services/leveroshost/Dockerfile index e35e41f..7a1fa57 100644 --- a/services/leveroshost/Dockerfile +++ b/services/leveroshost/Dockerfile @@ -2,7 +2,7 @@ FROM leveros/base:latest COPY ./leveroshost /leveros/bin/ -EXPOSE 3500 3501 3838 6514 8080 +EXPOSE 3500 3501 3502 3838 6514 VOLUME /var/run/docker.sock WORKDIR /leveros diff --git a/services/nghttpx/Dockerfile b/services/nghttpx/Dockerfile new file mode 100644 index 0000000..39ccd1f --- /dev/null +++ b/services/nghttpx/Dockerfile @@ -0,0 +1,13 @@ +FROM leveros/base:latest + +RUN echo "http://dl-4.alpinelinux.org/alpine/edge/testing" >> /etc/apk/repositories && \ + addgroup -S nghttpx && \ + adduser -S -D -h /dev/null -s /sbin/nologin -G nghttpx nghttpx && \ + apk add --update nghttp2 && \ + rm -rf /var/cache/apk/* + +EXPOSE 8080 +USER nghttpx + +ENTRYPOINT ["/usr/bin/nghttpx"] +CMD ["--help"]