From 9dc3bfd1d2ac5705e8f834299f4432d15145e972 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Fri, 22 Dec 2023 00:26:08 -0800 Subject: [PATCH] feat: update tcp client connections (#1429) Signed-off-by: Sidhant Kohli --- pkg/sdkclient/const.go | 1 - .../grpc_resolver/client_resolver.go | 31 +++++++++---------- pkg/sdkclient/mapper/client.go | 2 +- pkg/sdkclient/mapstreamer/client.go | 2 +- pkg/sdkclient/options.go | 14 --------- pkg/sdkclient/reducer/client.go | 2 +- pkg/sdkclient/sessionreducer/client.go | 2 +- pkg/sdkclient/sourcetransformer/client.go | 2 +- pkg/shared/util/grpc_utils.go | 15 +++++---- 9 files changed, 27 insertions(+), 44 deletions(-) diff --git a/pkg/sdkclient/const.go b/pkg/sdkclient/const.go index 8588b0aa4..cc7344661 100644 --- a/pkg/sdkclient/const.go +++ b/pkg/sdkclient/const.go @@ -2,7 +2,6 @@ package sdkclient const ( UDS = "unix" - TcpAddr = ":55551" MapAddr = "/var/run/numaflow/map.sock" ReduceAddr = "/var/run/numaflow/reduce.sock" ReduceStreamAddr = "/var/run/numaflow/reducestream.sock" diff --git a/pkg/sdkclient/grpc_resolver/client_resolver.go b/pkg/sdkclient/grpc_resolver/client_resolver.go index 6e4188650..abb0ce915 100644 --- a/pkg/sdkclient/grpc_resolver/client_resolver.go +++ b/pkg/sdkclient/grpc_resolver/client_resolver.go @@ -23,8 +23,6 @@ import ( "github.com/numaproj/numaflow-go/pkg/info" "google.golang.org/grpc/resolver" - - "github.com/numaproj/numaflow/pkg/sdkclient" ) const ( @@ -37,7 +35,7 @@ type MultiProcResolverBuilder struct { addrsList []string } -func NewMultiProcResolverBuilder(addrsList []string) *MultiProcResolverBuilder { +func newMultiProcResolverBuilder(addrsList []string) *MultiProcResolverBuilder { return &MultiProcResolverBuilder{ addrsList: addrsList, } @@ -81,12 +79,15 @@ func (*multiProcResolver) ResolveNow(o resolver.ResolveNowOptions) {} func (*multiProcResolver) Close() {} func (*multiProcResolver) Resolve(target resolver.Target) {} -// BuildConnAddrs Populate the connection list for the clients -// Format (serverAddr, serverIdx) : (0.0.0.0:5551, 1) -func BuildConnAddrs(numCpu int) []string { - var conn = make([]string, numCpu) - for i := 0; i < numCpu; i++ { - conn[i] = ConnAddr + sdkclient.TcpAddr + "," + strconv.Itoa(i+1) +// buildConnAddrs Populate the connection list for the clients +// Format (serverAddr, serverIdx) : (0.0.0.0:5551, 1), (0.0.0.0:5552, 2) +func buildConnAddrs(servPorts []string) []string { + var conn = make([]string, len(servPorts)) + for i := 0; i < len(servPorts); i++ { + // Use the server ports from the server info file and assign to each client + addr, _ := strconv.Atoi(servPorts[i]) + // Format (serverAddr, serverIdx) + conn[i] = ConnAddr + ":" + strconv.Itoa(addr) + "," + strconv.Itoa(i+1) } return conn } @@ -94,13 +95,11 @@ func BuildConnAddrs(numCpu int) []string { // RegMultiProcResolver is used to populate the connection properties based // on multiprocessing TCP or UDS connection func RegMultiProcResolver(svrInfo *info.ServerInfo) error { - numCpu, err := strconv.Atoi(svrInfo.Metadata["CPU_LIMIT"]) - if err != nil { - return err - } - log.Println("Num CPU:", numCpu) - conn := BuildConnAddrs(numCpu) - res := NewMultiProcResolverBuilder(conn) + // Extract the server ports from the server info file and convert it to a list + servPorts := strings.Split(svrInfo.Metadata["SERV_PORTS"], ",") + log.Println("Multiprocessing TCP Server Ports:", servPorts) + conn := buildConnAddrs(servPorts) + res := newMultiProcResolverBuilder(conn) resolver.Register(res) return nil } diff --git a/pkg/sdkclient/mapper/client.go b/pkg/sdkclient/mapper/client.go index c199a7eb3..0b13c006c 100644 --- a/pkg/sdkclient/mapper/client.go +++ b/pkg/sdkclient/mapper/client.go @@ -53,7 +53,7 @@ func New(inputOptions ...sdkclient.Option) (Client, error) { } // Connect to the server - conn, err := util.ConnectToServer(opts.UdsSockAddr(), opts.TcpSockAddr(), serverInfo, opts.MaxMessageSize()) + conn, err := util.ConnectToServer(opts.UdsSockAddr(), serverInfo, opts.MaxMessageSize()) if err != nil { return nil, err } diff --git a/pkg/sdkclient/mapstreamer/client.go b/pkg/sdkclient/mapstreamer/client.go index f88ab3ca0..0c5a2c272 100644 --- a/pkg/sdkclient/mapstreamer/client.go +++ b/pkg/sdkclient/mapstreamer/client.go @@ -55,7 +55,7 @@ func New(inputOptions ...sdkclient.Option) (Client, error) { } // Connect to the server - conn, err := util.ConnectToServer(opts.UdsSockAddr(), opts.TcpSockAddr(), serverInfo, opts.MaxMessageSize()) + conn, err := util.ConnectToServer(opts.UdsSockAddr(), serverInfo, opts.MaxMessageSize()) if err != nil { return nil, err } diff --git a/pkg/sdkclient/options.go b/pkg/sdkclient/options.go index d8908edca..cb6c8e9ee 100644 --- a/pkg/sdkclient/options.go +++ b/pkg/sdkclient/options.go @@ -19,18 +19,12 @@ package sdkclient import "time" type Options struct { - tcpSockAddr string udsSockAddr string maxMessageSize int serverInfoFilePath string serverInfoReadinessTimeout time.Duration } -// TcpSockAddr returns the TCP sock addr. -func (o *Options) TcpSockAddr() string { - return o.tcpSockAddr -} - // UdsSockAddr returns the UDS sock addr. func (o *Options) UdsSockAddr() string { return o.udsSockAddr @@ -56,7 +50,6 @@ func DefaultOptions(address string) *Options { return &Options{ maxMessageSize: DefaultGRPCMaxMessageSize, serverInfoFilePath: ServerInfoFilePath, - tcpSockAddr: TcpAddr, udsSockAddr: address, serverInfoReadinessTimeout: 120 * time.Second, // Default timeout is 120 seconds } @@ -72,13 +65,6 @@ func WithUdsSockAddr(addr string) Option { } } -// WithTcpSockAddr start the client with the given TCP sock addr. This is mainly used for testing purpose. -func WithTcpSockAddr(addr string) Option { - return func(opts *Options) { - opts.tcpSockAddr = addr - } -} - // WithMaxMessageSize sets the server max receive message size and the server max send message size to the given size. func WithMaxMessageSize(size int) Option { return func(opts *Options) { diff --git a/pkg/sdkclient/reducer/client.go b/pkg/sdkclient/reducer/client.go index 78d84a973..8e1207b41 100644 --- a/pkg/sdkclient/reducer/client.go +++ b/pkg/sdkclient/reducer/client.go @@ -54,7 +54,7 @@ func New(inputOptions ...sdkclient.Option) (Client, error) { } // Connect to the server - conn, err := util.ConnectToServer(opts.UdsSockAddr(), opts.TcpSockAddr(), serverInfo, opts.MaxMessageSize()) + conn, err := util.ConnectToServer(opts.UdsSockAddr(), serverInfo, opts.MaxMessageSize()) if err != nil { return nil, err } diff --git a/pkg/sdkclient/sessionreducer/client.go b/pkg/sdkclient/sessionreducer/client.go index 832ccc564..ea6a7260c 100644 --- a/pkg/sdkclient/sessionreducer/client.go +++ b/pkg/sdkclient/sessionreducer/client.go @@ -54,7 +54,7 @@ func New(inputOptions ...sdkclient.Option) (Client, error) { } // Connect to the server - conn, err := util.ConnectToServer(opts.UdsSockAddr(), opts.TcpSockAddr(), serverInfo, opts.MaxMessageSize()) + conn, err := util.ConnectToServer(opts.UdsSockAddr(), serverInfo, opts.MaxMessageSize()) if err != nil { return nil, err } diff --git a/pkg/sdkclient/sourcetransformer/client.go b/pkg/sdkclient/sourcetransformer/client.go index 5ee6fad18..d85f56bbf 100644 --- a/pkg/sdkclient/sourcetransformer/client.go +++ b/pkg/sdkclient/sourcetransformer/client.go @@ -53,7 +53,7 @@ func New(inputOptions ...sdkclient.Option) (Client, error) { } // Connect to the server - conn, err := util.ConnectToServer(opts.UdsSockAddr(), opts.TcpSockAddr(), serverInfo, opts.MaxMessageSize()) + conn, err := util.ConnectToServer(opts.UdsSockAddr(), serverInfo, opts.MaxMessageSize()) if err != nil { return nil, err } diff --git a/pkg/shared/util/grpc_utils.go b/pkg/shared/util/grpc_utils.go index eeedb5742..8c453f611 100644 --- a/pkg/shared/util/grpc_utils.go +++ b/pkg/shared/util/grpc_utils.go @@ -79,15 +79,18 @@ func WaitForServerInfo(timeout time.Duration, filePath string) (*info.ServerInfo } // ConnectToServer connects to the server with the given socket address based on the server info protocol. -func ConnectToServer(udsSockAddr string, tcpSockAddr string, serverInfo *info.ServerInfo, maxMessageSize int) (*grpc.ClientConn, error) { +func ConnectToServer(udsSockAddr string, serverInfo *info.ServerInfo, maxMessageSize int) (*grpc.ClientConn, error) { var conn *grpc.ClientConn var err error var sockAddr string if serverInfo.Protocol == info.TCP { - sockAddr = getTcpSockAddr(tcpSockAddr) - log.Println("Multiprocessing TCP Client:", sockAddr) - + // TCP connections are used for Multiprocessing server mode, here we have multiple servers forks + // and each server will listen on a different port. + // On the client side we will create a connection to each of these server instances. + // The client will use a custom resolver to resolve the server address. + // The custom resolver will return the list of server addresses from the server info file. + // The client will use the list of server addresses to create the multiple connections. if err := resolver.RegMultiProcResolver(serverInfo); err != nil { return nil, fmt.Errorf("failed to start Multiproc Client: %w", err) } @@ -113,10 +116,6 @@ func ConnectToServer(udsSockAddr string, tcpSockAddr string, serverInfo *info.Se return conn, nil } -func getTcpSockAddr(tcpSock string) string { - return fmt.Sprintf("%s%s", resolver.ConnAddr, tcpSock) -} - func getUdsSockAddr(udsSock string) string { return fmt.Sprintf("%s:%s", "unix", udsSock) }