Skip to content

Commit

Permalink
feat: update tcp client connections (#1429)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <sidhant.kohli@gmail.com>
  • Loading branch information
kohlisid committed Dec 22, 2023
1 parent 4f45e3a commit 9dc3bfd
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 44 deletions.
1 change: 0 additions & 1 deletion pkg/sdkclient/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package sdkclient

const (
UDS = "unix"
TcpAddr = ":55551"
MapAddr = "/var/run/numaflow/map.sock"
ReduceAddr = "/var/run/numaflow/reduce.sock"
ReduceStreamAddr = "/var/run/numaflow/reducestream.sock"
Expand Down
31 changes: 15 additions & 16 deletions pkg/sdkclient/grpc_resolver/client_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (

"github.com/numaproj/numaflow-go/pkg/info"
"google.golang.org/grpc/resolver"

"github.com/numaproj/numaflow/pkg/sdkclient"
)

const (
Expand All @@ -37,7 +35,7 @@ type MultiProcResolverBuilder struct {
addrsList []string
}

func NewMultiProcResolverBuilder(addrsList []string) *MultiProcResolverBuilder {
func newMultiProcResolverBuilder(addrsList []string) *MultiProcResolverBuilder {
return &MultiProcResolverBuilder{
addrsList: addrsList,
}
Expand Down Expand Up @@ -81,26 +79,27 @@ func (*multiProcResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (*multiProcResolver) Close() {}
func (*multiProcResolver) Resolve(target resolver.Target) {}

// BuildConnAddrs Populate the connection list for the clients
// Format (serverAddr, serverIdx) : (0.0.0.0:5551, 1)
func BuildConnAddrs(numCpu int) []string {
var conn = make([]string, numCpu)
for i := 0; i < numCpu; i++ {
conn[i] = ConnAddr + sdkclient.TcpAddr + "," + strconv.Itoa(i+1)
// buildConnAddrs Populate the connection list for the clients
// Format (serverAddr, serverIdx) : (0.0.0.0:5551, 1), (0.0.0.0:5552, 2)
func buildConnAddrs(servPorts []string) []string {
var conn = make([]string, len(servPorts))
for i := 0; i < len(servPorts); i++ {
// Use the server ports from the server info file and assign to each client
addr, _ := strconv.Atoi(servPorts[i])
// Format (serverAddr, serverIdx)
conn[i] = ConnAddr + ":" + strconv.Itoa(addr) + "," + strconv.Itoa(i+1)
}
return conn
}

// RegMultiProcResolver is used to populate the connection properties based
// on multiprocessing TCP or UDS connection
func RegMultiProcResolver(svrInfo *info.ServerInfo) error {
numCpu, err := strconv.Atoi(svrInfo.Metadata["CPU_LIMIT"])
if err != nil {
return err
}
log.Println("Num CPU:", numCpu)
conn := BuildConnAddrs(numCpu)
res := NewMultiProcResolverBuilder(conn)
// Extract the server ports from the server info file and convert it to a list
servPorts := strings.Split(svrInfo.Metadata["SERV_PORTS"], ",")
log.Println("Multiprocessing TCP Server Ports:", servPorts)
conn := buildConnAddrs(servPorts)
res := newMultiProcResolverBuilder(conn)
resolver.Register(res)
return nil
}
2 changes: 1 addition & 1 deletion pkg/sdkclient/mapper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func New(inputOptions ...sdkclient.Option) (Client, error) {
}

// Connect to the server
conn, err := util.ConnectToServer(opts.UdsSockAddr(), opts.TcpSockAddr(), serverInfo, opts.MaxMessageSize())
conn, err := util.ConnectToServer(opts.UdsSockAddr(), serverInfo, opts.MaxMessageSize())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sdkclient/mapstreamer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func New(inputOptions ...sdkclient.Option) (Client, error) {
}

// Connect to the server
conn, err := util.ConnectToServer(opts.UdsSockAddr(), opts.TcpSockAddr(), serverInfo, opts.MaxMessageSize())
conn, err := util.ConnectToServer(opts.UdsSockAddr(), serverInfo, opts.MaxMessageSize())
if err != nil {
return nil, err
}
Expand Down
14 changes: 0 additions & 14 deletions pkg/sdkclient/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,12 @@ package sdkclient
import "time"

type Options struct {
tcpSockAddr string
udsSockAddr string
maxMessageSize int
serverInfoFilePath string
serverInfoReadinessTimeout time.Duration
}

// TcpSockAddr returns the TCP sock addr.
func (o *Options) TcpSockAddr() string {
return o.tcpSockAddr
}

// UdsSockAddr returns the UDS sock addr.
func (o *Options) UdsSockAddr() string {
return o.udsSockAddr
Expand All @@ -56,7 +50,6 @@ func DefaultOptions(address string) *Options {
return &Options{
maxMessageSize: DefaultGRPCMaxMessageSize,
serverInfoFilePath: ServerInfoFilePath,
tcpSockAddr: TcpAddr,
udsSockAddr: address,
serverInfoReadinessTimeout: 120 * time.Second, // Default timeout is 120 seconds
}
Expand All @@ -72,13 +65,6 @@ func WithUdsSockAddr(addr string) Option {
}
}

// WithTcpSockAddr start the client with the given TCP sock addr. This is mainly used for testing purpose.
func WithTcpSockAddr(addr string) Option {
return func(opts *Options) {
opts.tcpSockAddr = addr
}
}

// WithMaxMessageSize sets the server max receive message size and the server max send message size to the given size.
func WithMaxMessageSize(size int) Option {
return func(opts *Options) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sdkclient/reducer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func New(inputOptions ...sdkclient.Option) (Client, error) {
}

// Connect to the server
conn, err := util.ConnectToServer(opts.UdsSockAddr(), opts.TcpSockAddr(), serverInfo, opts.MaxMessageSize())
conn, err := util.ConnectToServer(opts.UdsSockAddr(), serverInfo, opts.MaxMessageSize())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sdkclient/sessionreducer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func New(inputOptions ...sdkclient.Option) (Client, error) {
}

// Connect to the server
conn, err := util.ConnectToServer(opts.UdsSockAddr(), opts.TcpSockAddr(), serverInfo, opts.MaxMessageSize())
conn, err := util.ConnectToServer(opts.UdsSockAddr(), serverInfo, opts.MaxMessageSize())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sdkclient/sourcetransformer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func New(inputOptions ...sdkclient.Option) (Client, error) {
}

// Connect to the server
conn, err := util.ConnectToServer(opts.UdsSockAddr(), opts.TcpSockAddr(), serverInfo, opts.MaxMessageSize())
conn, err := util.ConnectToServer(opts.UdsSockAddr(), serverInfo, opts.MaxMessageSize())
if err != nil {
return nil, err
}
Expand Down
15 changes: 7 additions & 8 deletions pkg/shared/util/grpc_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,18 @@ func WaitForServerInfo(timeout time.Duration, filePath string) (*info.ServerInfo
}

// ConnectToServer connects to the server with the given socket address based on the server info protocol.
func ConnectToServer(udsSockAddr string, tcpSockAddr string, serverInfo *info.ServerInfo, maxMessageSize int) (*grpc.ClientConn, error) {
func ConnectToServer(udsSockAddr string, serverInfo *info.ServerInfo, maxMessageSize int) (*grpc.ClientConn, error) {
var conn *grpc.ClientConn
var err error
var sockAddr string

if serverInfo.Protocol == info.TCP {
sockAddr = getTcpSockAddr(tcpSockAddr)
log.Println("Multiprocessing TCP Client:", sockAddr)

// TCP connections are used for Multiprocessing server mode, here we have multiple servers forks
// and each server will listen on a different port.
// On the client side we will create a connection to each of these server instances.
// The client will use a custom resolver to resolve the server address.
// The custom resolver will return the list of server addresses from the server info file.
// The client will use the list of server addresses to create the multiple connections.
if err := resolver.RegMultiProcResolver(serverInfo); err != nil {
return nil, fmt.Errorf("failed to start Multiproc Client: %w", err)
}
Expand All @@ -113,10 +116,6 @@ func ConnectToServer(udsSockAddr string, tcpSockAddr string, serverInfo *info.Se
return conn, nil
}

func getTcpSockAddr(tcpSock string) string {
return fmt.Sprintf("%s%s", resolver.ConnAddr, tcpSock)
}

func getUdsSockAddr(udsSock string) string {
return fmt.Sprintf("%s:%s", "unix", udsSock)
}

0 comments on commit 9dc3bfd

Please sign in to comment.