Skip to content

Commit

Permalink
feat: Transaction requests forwarder
Browse files Browse the repository at this point in the history
  • Loading branch information
efirs committed Jun 13, 2022
1 parent fc4548e commit 88bf0ae
Show file tree
Hide file tree
Showing 12 changed files with 134 additions and 45 deletions.
1 change: 0 additions & 1 deletion api/server/v1/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func GetTransaction(ctx context.Context, req proto.Message) *TransactionCtx {
func GetTransactionLegacy(req proto.Message) *TransactionCtx {
switch r := req.(type) {
case *InsertRequest:
r.GetOptions().ProtoReflect()
if r.GetOptions() == nil || r.GetOptions().GetWriteOptions() == nil {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/json-iterator/go v1.1.12
github.com/m3db/prometheus_client_golang v1.12.8
github.com/mwitkow/grpc-proxy v0.0.0-20220126150247-db34e7bfee32
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.26.1
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/grpc-proxy v0.0.0-20220126150247-db34e7bfee32 h1:CC9KzU7WPrK6DTppkUGiwmttoHCNwOLT7Z+stp1eIpU=
github.com/mwitkow/grpc-proxy v0.0.0-20220126150247-db34e7bfee32/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
Expand Down Expand Up @@ -625,6 +627,7 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
Expand Down Expand Up @@ -672,6 +675,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -723,6 +727,7 @@ golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210331175145-43e1dd70ce54/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down Expand Up @@ -922,6 +927,7 @@ google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20210303154014-9728d6b83eeb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210310155132-4ce2db91004e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210319143718-93e7006c17a6/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210401141331-865547bb08e2/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A=
google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A=
google.golang.org/genproto v0.0.0-20210513213006-bf773b8c8384/go.mod h1:P3QM42oQyzQSnHPnZ/vqoCdDmzH28fzWByN9asMeM8A=
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
Expand Down Expand Up @@ -1044,6 +1050,7 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
moul.io/http2curl v1.0.0 h1:6XwpyZOYsgZJrU8exnG87ncVkU1FVCcTRpwzOkTDUi8=
moul.io/http2curl v1.0.0/go.mod h1:f6cULg+e4Md/oW1cYmwW4IWQOVl2lGbmCNGOHvzX2kE=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
Expand Down
93 changes: 93 additions & 0 deletions server/midddleware/forwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package middleware

import (
"context"
"sync"

"github.com/mwitkow/grpc-proxy/proxy"
api "github.com/tigrisdata/tigris/api/server/v1"
"github.com/tigrisdata/tigris/server/types"
"github.com/tigrisdata/tigris/util"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)

var (
UserAgent = "tigris-forwarder/" + util.Version
)

type Forwarder struct {
clients sync.Map
}

var forwarder = Forwarder{}

// TODO: Sweep unused connections periodically
func getClient(ctx context.Context, origin string) (*grpc.ClientConn, error) {
if c, ok := forwarder.clients.Load(origin); ok {
return c.(*grpc.ClientConn), nil
}

opts := []grpc.DialOption{
grpc.FailOnNonTempDialError(true),
grpc.WithReturnConnectionError(),
grpc.WithUserAgent(UserAgent),
grpc.WithBlock(),
}

conn, err := grpc.DialContext(ctx, origin, opts...)
if err != nil {
return nil, err
}

forwarder.clients.Store(origin, conn)

return conn, nil
}

func proxyDirector(ctx context.Context, _ string) (context.Context, *grpc.ClientConn, error) {
txCtx := api.GetTransaction(ctx, nil)

client, err := getClient(ctx, txCtx.GetOrigin())

return ctx, client, err
}

func forwarderStreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
txCtx := api.GetTransaction(stream.Context(), nil)

if txCtx != nil && txCtx.GetOrigin() != types.MyOrigin {
return proxy.TransparentHandler(proxyDirector)(srv, stream)
}

return handler(srv, stream)
}
}

func forwardRequest(ctx context.Context, txCtx *api.TransactionCtx, method string, req proto.Message) (interface{}, error) {
client, err := getClient(ctx, txCtx.GetOrigin())
if err != nil {
return nil, err
}

resp := &anypb.Any{}
if err := client.Invoke(ctx, method, req, resp); err != nil {
return nil, err
}

return resp, nil
}

func forwarderUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (iface interface{}, err error) {
txCtx := api.GetTransaction(ctx, req.(proto.Message))

if txCtx != nil && txCtx.GetOrigin() != types.MyOrigin {
return forwardRequest(ctx, txCtx, info.FullMethod, req.(proto.Message))
}

return handler(ctx, req)
}
}
5 changes: 4 additions & 1 deletion server/midddleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package middleware

import (
"context"

middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zerolog "github.com/grpc-ecosystem/go-grpc-middleware/providers/zerolog/v2"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
Expand All @@ -40,6 +41,7 @@ func Get(config *config.Config) (grpc.UnaryServerInterceptor, grpc.StreamServerI
// Note: we don't add validate here and rather call it in server code because the validator interceptor returns gRPC
// error which is not convertible to the internal rest error code.
stream := middleware.ChainStreamServer(
forwarderStreamServerInterceptor(),
grpc_ratelimit.StreamServerInterceptor(&RateLimiter{}),
grpc_auth.StreamServerInterceptor(authFunction),
grpc_logging.StreamServerInterceptor(grpc_zerolog.InterceptorLogger(log.Logger), []grpc_logging.Option{}...),
Expand All @@ -54,12 +56,13 @@ func Get(config *config.Config) (grpc.UnaryServerInterceptor, grpc.StreamServerI
// Note: we don't add validate here and rather call it in server code because the validator interceptor returns gRPC
// error which is not convertible to the internal rest error code.
unary := middleware.ChainUnaryServer(
forwarderUnaryServerInterceptor(),
pprofUnaryServerInterceptor(),
grpc_ratelimit.UnaryServerInterceptor(&RateLimiter{}),
grpc_auth.UnaryServerInterceptor(authFunction),
grpc_logging.UnaryServerInterceptor(grpc_zerolog.InterceptorLogger(log.Logger)),
validatorUnaryServerInterceptor(),
TimeoutUnaryServerInterceptor(DefaultTimeout),
timeoutUnaryServerInterceptor(DefaultTimeout),
UnaryMetricsServerInterceptor(),
grpc_opentracing.UnaryServerInterceptor(),
grpc_recovery.UnaryServerInterceptor(),
Expand Down
4 changes: 2 additions & 2 deletions server/midddleware/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ var (
MaximumTimeout = 5 * time.Second
)

// TimeoutUnaryServerInterceptor returns a new unary server interceptor
// timeoutUnaryServerInterceptor returns a new unary server interceptor
// that sets request timeout if it's not set in the context
func TimeoutUnaryServerInterceptor(timeout time.Duration) grpc.UnaryServerInterceptor {
func timeoutUnaryServerInterceptor(timeout time.Duration) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (iface interface{}, err error) {
var cancel context.CancelFunc

Expand Down
15 changes: 5 additions & 10 deletions server/grpc/grpc.go → server/muxer/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,31 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc
package muxer

import (
"github.com/rs/zerolog/log"
"github.com/soheilhy/cmux"
"github.com/tigrisdata/tigris/server/config"
middleware "github.com/tigrisdata/tigris/server/midddleware"
"github.com/tigrisdata/tigris/server/types"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

type Server struct {
type GRPCServer struct {
*grpc.Server
}

func NewServer(cfg *config.Config) *Server {
s := &Server{}
func NewGRPCServer(cfg *config.Config) *GRPCServer {
s := &GRPCServer{}

unary, stream := middleware.Get(cfg)
s.Server = grpc.NewServer(grpc.StreamInterceptor(stream), grpc.UnaryInterceptor(unary))
reflection.Register(s)
return s
}

func (s *Server) Start(mux cmux.CMux) error {
func (s *GRPCServer) Start(mux cmux.CMux) error {
// MatchWithWriters is needed as it needs SETTINGS frame from the server otherwise the client will block
match := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
go func() {
Expand All @@ -46,7 +45,3 @@ func (s *Server) Start(mux cmux.CMux) error {
}()
return nil
}

func (s *Server) GetType() string {
return types.GRPCServer
}
15 changes: 5 additions & 10 deletions server/http/http.go → server/muxer/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package http
package muxer

import (
"net/http"
Expand All @@ -23,18 +23,17 @@ import (
"github.com/soheilhy/cmux"
"github.com/tigrisdata/tigris/server/config"
middleware "github.com/tigrisdata/tigris/server/midddleware"
"github.com/tigrisdata/tigris/server/types"
)

type Server struct {
type HTTPServer struct {
Router chi.Router
httpS *http.Server
Inproc *inprocgrpc.Channel
}

func NewServer(cfg *config.Config) *Server {
func NewHTTPServer(cfg *config.Config) *HTTPServer {
r := chi.NewRouter()
s := &Server{
s := &HTTPServer{
Inproc: &inprocgrpc.Channel{},
Router: r,
httpS: &http.Server{
Expand All @@ -50,15 +49,11 @@ func NewServer(cfg *config.Config) *Server {
return s
}

func (s *Server) Start(mux cmux.CMux) error {
func (s *HTTPServer) Start(mux cmux.CMux) error {
match := mux.Match(cmux.HTTP1Fast())
go func() {
err := s.httpS.Serve(match)
log.Fatal().Err(err).Msg("start http server")
}()
return nil
}

func (s *Server) GetType() string {
return types.HTTPServer
}
18 changes: 7 additions & 11 deletions server/muxer/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,13 @@ import (
"github.com/rs/zerolog/log"
"github.com/soheilhy/cmux"
"github.com/tigrisdata/tigris/server/config"
tgrpc "github.com/tigrisdata/tigris/server/grpc"
tHTTP "github.com/tigrisdata/tigris/server/http"
v1 "github.com/tigrisdata/tigris/server/services/v1"
"github.com/tigrisdata/tigris/server/types"
"github.com/tigrisdata/tigris/store/kv"
"github.com/tigrisdata/tigris/store/search"
)

type Server interface {
Start(mux cmux.CMux) error
GetType() string
}

type Muxer struct {
Expand All @@ -40,8 +36,8 @@ type Muxer struct {

func NewMuxer(cfg *config.Config) *Muxer {
var s []Server
s = append(s, tHTTP.NewServer(cfg))
s = append(s, tgrpc.NewServer(cfg))
s = append(s, NewHTTPServer(cfg))
s = append(s, NewGRPCServer(cfg))
m := &Muxer{
servers: s,
}
Expand All @@ -52,11 +48,11 @@ func NewMuxer(cfg *config.Config) *Muxer {
func (m *Muxer) RegisterServices(kvStore kv.KeyValueStore, searchStore search.Store) {
services := v1.GetRegisteredServices(kvStore, searchStore)
for _, r := range services {
for _, s := range m.servers {
if s.GetType() == types.GRPCServer {
_ = r.RegisterGRPC(s.(*tgrpc.Server).Server)
} else if s.GetType() == types.HTTPServer {
_ = r.RegisterHTTP(s.(*tHTTP.Server).Router, s.(*tHTTP.Server).Inproc)
for _, v := range m.servers {
if s, ok := v.(*GRPCServer); ok {
_ = r.RegisterGRPC(s.Server)
} else if s, ok := v.(*HTTPServer); ok {
_ = r.RegisterHTTP(s.Router, s.Inproc)
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions server/transaction/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ package transaction

import (
"context"
"os"
"sync"

"github.com/google/uuid"
api "github.com/tigrisdata/tigris/api/server/v1"
"github.com/tigrisdata/tigris/internal"
"github.com/tigrisdata/tigris/keys"
"github.com/tigrisdata/tigris/schema"
"github.com/tigrisdata/tigris/server/types"
"github.com/tigrisdata/tigris/store/kv"
)

Expand Down Expand Up @@ -297,9 +297,8 @@ func (s *TxSession) Context() *SessionCtx {
}

func generateTransactionCtx() *api.TransactionCtx {
origin, _ := os.Hostname() // not necessarily it needs to be hostname, something sticky for routing
return &api.TransactionCtx{
Id: uuid.New().String(),
Origin: origin,
Origin: types.MyOrigin,
}
}
7 changes: 4 additions & 3 deletions server/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@

package types

import "os"

type ContentType string

const (
JSON ContentType = "application/json; charset=utf-8"
STREAMING ContentType = "application/x-json-stream; charset=utf-8"
)

const (
GRPCServer = "GRPC"
HTTPServer = "http"
var (
MyOrigin, _ = os.Hostname()
)
Loading

0 comments on commit 88bf0ae

Please sign in to comment.