Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge #350

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open

Merge #350

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
bb84a68
Change 更改afterBindCallbacks调用位置
urionz Sep 14, 2023
947f0a7
fix conflict
urionz Oct 7, 2023
b9d3956
fix fix session drain loop
urionz Oct 9, 2023
40d5a92
update pb version to v3
urionz Oct 9, 2023
bcd4a1d
fix test
urionz Oct 9, 2023
ac3934c
fix test
urionz Oct 9, 2023
e5a8e59
fix test
urionz Oct 9, 2023
f7a4cf7
fix: skip etcd go leak detection
urionz Oct 10, 2023
8fdff63
fix kick bug
urionz Oct 27, 2023
361be7a
add ectd auth config
urionz Oct 27, 2023
b9cde42
Merge branch 'topfreegames:main' into main
urionz Dec 19, 2023
e0cebc8
Add ws acceptor allow setting path
urionz Feb 29, 2024
77a1802
Fix fix conflict
urionz Feb 29, 2024
8e3e79d
Fix fix etcd auth config
urionz Feb 29, 2024
9e56b32
Fix fix ws_acceptor test
urionz Feb 29, 2024
71c6281
Fix fix test
urionz Feb 29, 2024
ec67971
Fix fix config
urionz Feb 29, 2024
13cff0c
optmize optimize WSAcceptor parameters
urionz Mar 4, 2024
d313780
optmize optimize WSAcceptor parameters
urionz Mar 4, 2024
a54b1f4
fix fix test
urionz Mar 4, 2024
02c2e05
update docs
urionz Mar 11, 2024
136a1cf
update docs
urionz Mar 11, 2024
45180df
update docs
urionz Mar 11, 2024
0298649
Change the route registration method, add rw locks to the route map, …
urionz Mar 22, 2024
af632af
fix test
urionz Mar 22, 2024
e347b58
fix session get number value bug
urionz May 28, 2024
cd1ad62
fix conflict
urionz May 28, 2024
7fe8f5a
update support adding routes at runtime
urionz May 28, 2024
ee04f5b
Add password support for worker config
urionz Jun 13, 2024
4984011
Fix worker redis address configuration mapping bug.
urionz Jun 13, 2024
5fe2982
add session bind callback error log
urionz Jul 12, 2024
fb048ee
Merge remote-tracking branch 'upstream/main'
urionz Jul 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 41 additions & 23 deletions acceptor/ws_acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,40 +37,52 @@ import (
// WSAcceptor struct
type WSAcceptor struct {
addr string
path string
connChan chan PlayerConn
listener net.Listener
certFile string
keyFile string
running bool
}

// NewWSAcceptor returns a new instance of WSAcceptor
func NewWSAcceptor(addr string, certs ...string) *WSAcceptor {
keyFile := ""
certFile := ""
if len(certs) != 2 && len(certs) != 0 {
panic(constants.ErrInvalidCertificates)
} else if len(certs) == 2 {
certFile = certs[0]
keyFile = certs[1]
// WSAcceptorOption is a function that returns a WSAcceptorOption
type WSAcceptorOption func(*WSAcceptor)

func WithWSAcceptorPath(path string) WSAcceptorOption {
return func(ac *WSAcceptor) {
ac.path = path
}
}

func WithWSAcceptorCerts(certFile, keyFile string) WSAcceptorOption {
return func(ac *WSAcceptor) {
ac.certFile = certFile
ac.keyFile = keyFile
}
}

// NewWSAcceptor returns a new instance of WSAcceptor
func NewWSAcceptor(addr string, opts ...WSAcceptorOption) *WSAcceptor {
w := &WSAcceptor{
addr: addr,
connChan: make(chan PlayerConn),
certFile: certFile,
keyFile: keyFile,
running: false,
path: "/",
}

for _, opt := range opts {
opt(w)
}

return w
}

func (w *WSAcceptor) IsRunning() bool {
return w.running
return w.running
}

func (w *WSAcceptor) GetConfiguredAddress() string {
return w.addr
return w.addr
}

// GetAddr returns the addr the acceptor will listen on
Expand All @@ -93,21 +105,26 @@ func (w *WSAcceptor) EnableProxyProtocol() {
type connHandler struct {
upgrader *websocket.Upgrader
connChan chan PlayerConn
path string
}

func (h *connHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
conn, err := h.upgrader.Upgrade(rw, r, nil)
if err != nil {
logger.Log.Errorf("Upgrade failure, URI=%s, Error=%s", r.RequestURI, err.Error())
return
}
if r.URL.Path == h.path && r.Method == http.MethodGet {
conn, err := h.upgrader.Upgrade(rw, r, nil)
if err != nil {
logger.Log.Errorf("Upgrade failure, URI=%s, Error=%s", r.RequestURI, err.Error())
return
}

c, err := NewWSConn(conn)
if err != nil {
logger.Log.Errorf("Failed to create new ws connection: %s", err.Error())
return
c, err := NewWSConn(conn)
if err != nil {
logger.Log.Errorf("Failed to create new ws connection: %s", err.Error())
return
}
h.connChan <- c
} else {
http.Error(rw, "Not found", http.StatusNotFound)
}
h.connChan <- c
}

func (w *WSAcceptor) hasTLSCertificates() bool {
Expand Down Expand Up @@ -165,6 +182,7 @@ func (w *WSAcceptor) serve(upgrader *websocket.Upgrader) {
http.Serve(w.listener, &connHandler{
upgrader: upgrader,
connChan: w.connChan,
path: w.path,
})
}

Expand Down
7 changes: 2 additions & 5 deletions acceptor/ws_acceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ var wsAcceptorTables = []struct {
// TODO change to allocatable ports
{"test_1", "0.0.0.0:0", []byte{0x01, 0x02}, []string{"./fixtures/server.crt", "./fixtures/server.key"}, nil},
{"test_2", "127.0.0.1:0", []byte{0x00}, []string{"./fixtures/server.crt", "./fixtures/server.key"}, nil},
{"test_3", "0.0.0.0:0", []byte{0x00}, []string{"wqodij"}, constants.ErrInvalidCertificates},
{"test_4", "0.0.0.0:0", []byte{0x00}, []string{"wqodij", "qwdo", "wod"}, constants.ErrInvalidCertificates},
{"test_4", "0.0.0.0:0", []byte{0x00}, []string{}, nil},
}

func TestNewWSAcceptor(t *testing.T) {
Expand All @@ -34,12 +31,12 @@ func TestNewWSAcceptor(t *testing.T) {
t.Run(table.name, func(t *testing.T) {
if table.panicErr != nil {
assert.PanicsWithValue(t, table.panicErr, func() {
NewWSAcceptor(table.addr, table.certs...)
NewWSAcceptor(table.addr, WithWSAcceptorCerts(table.certs[0], table.certs[1]))
})
} else {
var w *WSAcceptor
assert.NotPanics(t, func() {
w = NewWSAcceptor(table.addr, table.certs...)
w = NewWSAcceptor(table.addr, WithWSAcceptorCerts(table.certs[0], table.certs[1]))
})

if len(table.certs) == 2 {
Expand Down
13 changes: 11 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,17 @@ func (a *agentImpl) Kick(ctx context.Context) error {
if err != nil {
return err
}
_, err = a.conn.Write(p)
return err
pWrite := pendingWrite{
ctx: ctx,
data: p,
}

select {
case a.chSend <- pWrite:
case <-a.chDie:
}

return nil
}

// SetLastAt sets the last at to now
Expand Down
24 changes: 16 additions & 8 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,29 @@ func TestKick(t *testing.T) {
dieChan := make(chan bool)
hbTime := time.Second

expectedBytes := []byte("kick")

mockConn := mocks.NewMockPlayerConn(ctrl)
mockEncoder.EXPECT().Encode(gomock.Any(), gomock.Nil()).Do(
func(typ packet.Type, d []byte) {
assert.EqualValues(t, packet.Kick, typ)
})
mockConn.EXPECT().Write(gomock.Any()).Return(0, nil)
mockEncoder.EXPECT().Encode(packet.Type(packet.Kick), gomock.Nil()).Return(expectedBytes, nil)
messageEncoder := message.NewMessagesEncoder(false)

mockSerializer.EXPECT().GetName()

ctx := context.Background()

exceptedWrite := pendingWrite{
ctx: ctx,
data: expectedBytes,
}

sessionPool := session.NewSessionPool()
ag := newAgent(mockConn, mockDecoder, mockEncoder, mockSerializer, hbTime, 10, dieChan, messageEncoder, nil, sessionPool)
c := context.Background()
err := ag.Kick(c)
ag := newAgent(mockConn, mockDecoder, mockEncoder, mockSerializer, hbTime, 10, dieChan, messageEncoder, nil, sessionPool).(*agentImpl)

err := ag.Kick(ctx)
assert.NoError(t, err)

recvData := helpers.ShouldEventuallyReceive(t, ag.chSend).(pendingWrite)
assert.Equal(t, exceptedWrite, recvData)
}

func TestAgentSend(t *testing.T) {
Expand Down
3 changes: 0 additions & 3 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,6 @@ func (app *App) AddRoute(
routingFunction router.RoutingFunc,
) error {
if app.router != nil {
if app.running {
return constants.ErrChangeRouteWhileRunning
}
app.router.AddRoute(serverType, routingFunction)
} else {
return constants.ErrRouterNotInitialized
Expand Down
6 changes: 0 additions & 6 deletions app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,6 @@ func TestAddRoute(t *testing.T) {
return nil, nil
})
assert.NoError(t, err)

app.running = true
err = app.AddRoute("somesv", func(ctx context.Context, route *route.Route, payload []byte, servers map[string]*cluster.Server) (*cluster.Server, error) {
return nil, nil
})
assert.EqualError(t, constants.ErrChangeRouteWhileRunning, err.Error())
}

func TestShutdown(t *testing.T) {
Expand Down
18 changes: 9 additions & 9 deletions cluster/etcd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ func NewEtcdServiceDiscovery(
client = cli[0]
}
sd := &etcdServiceDiscovery{
running: false,
server: server,
serverMapByType: make(map[string]map[string]*Server),
listeners: make([]SDListener, 0),
stopChan: make(chan bool),
stopLeaseChan: make(chan bool),
appDieChan: appDieChan,
cli: client,
running: false,
server: server,
serverMapByType: make(map[string]map[string]*Server),
listeners: make([]SDListener, 0),
stopChan: make(chan bool),
stopLeaseChan: make(chan bool),
appDieChan: appDieChan,
cli: client,
syncServersRunning: make(chan bool),
}

Expand Down Expand Up @@ -300,7 +300,7 @@ func (sd *etcdServiceDiscovery) GetServersByType(serverType string) (map[string]
// Create a new map to avoid concurrent read and write access to the
// map, this also prevents accidental changes to the list of servers
// kept by the service discovery.
ret := make(map[string]*Server,len(sd.serverMapByType[serverType]))
ret := make(map[string]*Server, len(sd.serverMapByType[serverType]))
for k, v := range sd.serverMapByType[serverType] {
ret[k] = v
}
Expand Down
24 changes: 16 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,9 @@ func newDefaultClusterSDConfig() *ClusterSDConfig {
// WorkerConfig provides worker configuration
type WorkerConfig struct {
Redis struct {
ServerURL string `mapstructure:"serverurl"`
Pool string `mapstructure:"pool"`
Password string `mapstructure:"password"`
Address string `mapstructure:"address"`
Pool string `mapstructure:"pool"`
Password string `mapstructure:"password"`
} `mapstructure:"redis"`
Namespace string `mapstructure:"namespace"`
Concurrency int `mapstructure:"concurrency"`
Expand All @@ -474,12 +474,12 @@ type WorkerConfig struct {
func newDefaultWorkerConfig() *WorkerConfig {
return &WorkerConfig{
Redis: struct {
ServerURL string `mapstructure:"serverurl"`
Pool string `mapstructure:"pool"`
Password string `mapstructure:"password"`
Address string `mapstructure:"address"`
Pool string `mapstructure:"pool"`
Password string `mapstructure:"password"`
}{
ServerURL: "localhost:6379",
Pool: "10",
Address: "localhost:6379",
Pool: "10",
},
Concurrency: 1,
Retry: *newDefaultEnqueueOpts(),
Expand Down Expand Up @@ -522,6 +522,8 @@ func newDefaultMemoryGroupConfig() *MemoryGroupConfig {
type EtcdGroupServiceConfig struct {
DialTimeout time.Duration `mapstructure:"dialtimeout"`
Endpoints []string `mapstructure:"endpoints"`
User string `mapstructure:"user"`
Pass string `mapstructure:"pass"`
Prefix string `mapstructure:"prefix"`
TransactionTimeout time.Duration `mapstructure:"transactiontimeout"`
}
Expand All @@ -531,6 +533,8 @@ func newDefaultEtcdGroupServiceConfig() *EtcdGroupServiceConfig {
return &EtcdGroupServiceConfig{
DialTimeout: time.Duration(5 * time.Second),
Endpoints: []string{"localhost:2379"},
User: "",
Pass: "",
Prefix: "pitaya/",
TransactionTimeout: time.Duration(5 * time.Second),
}
Expand Down Expand Up @@ -562,6 +566,8 @@ func newDefaultGroupsConfig() *GroupsConfig {
type ETCDBindingConfig struct {
DialTimeout time.Duration `mapstructure:"dialtimeout"`
Endpoints []string `mapstructure:"endpoints"`
User string `mapstructure:"user"`
Pass string `mapstructure:"pass"`
Prefix string `mapstructure:"prefix"`
LeaseTTL time.Duration `mapstructure:"leasettl"`
}
Expand All @@ -571,6 +577,8 @@ func newDefaultETCDBindingConfig() *ETCDBindingConfig {
return &ETCDBindingConfig{
DialTimeout: time.Duration(5 * time.Second),
Endpoints: []string{"localhost:2379"},
User: "",
Pass: "",
Prefix: "pitaya/",
LeaseTTL: time.Duration(5 * time.Hour),
}
Expand Down
9 changes: 8 additions & 1 deletion config/viper_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func (c *Config) fillDefaultValues() {
"pitaya.cluster.rpc.server.nats.buffer.push": pitayaConfig.Cluster.RPC.Server.Nats.Buffer.Push,
"pitaya.cluster.sd.etcd.dialtimeout": pitayaConfig.Cluster.SD.Etcd.DialTimeout,
"pitaya.cluster.sd.etcd.endpoints": pitayaConfig.Cluster.SD.Etcd.Endpoints,
"pitaya.cluster.sd.etcd.user": pitayaConfig.Cluster.SD.Etcd.User,
"pitaya.cluster.sd.etcd.pass": pitayaConfig.Cluster.SD.Etcd.Pass,
"pitaya.cluster.sd.etcd.prefix": pitayaConfig.Cluster.SD.Etcd.Prefix,
"pitaya.cluster.sd.etcd.grantlease.maxretries": pitayaConfig.Cluster.SD.Etcd.GrantLease.MaxRetries,
"pitaya.cluster.sd.etcd.grantlease.retryinterval": pitayaConfig.Cluster.SD.Etcd.GrantLease.RetryInterval,
Expand All @@ -96,6 +98,8 @@ func (c *Config) fillDefaultValues() {
"pitaya.defaultpipelines.structvalidation.enabled": pitayaConfig.DefaultPipelines.StructValidation.Enabled,
"pitaya.groups.etcd.dialtimeout": pitayaConfig.Groups.Etcd.DialTimeout,
"pitaya.groups.etcd.endpoints": pitayaConfig.Groups.Etcd.Endpoints,
"pitaya.groups.etcd.user": pitayaConfig.Groups.Etcd.User,
"pitaya.groups.etcd.pass": pitayaConfig.Groups.Etcd.Pass,
"pitaya.groups.etcd.prefix": pitayaConfig.Groups.Etcd.Prefix,
"pitaya.groups.etcd.transactiontimeout": pitayaConfig.Groups.Etcd.TransactionTimeout,
"pitaya.groups.memory.tickduration": pitayaConfig.Groups.Memory.TickDuration,
Expand All @@ -113,6 +117,8 @@ func (c *Config) fillDefaultValues() {
"pitaya.metrics.statsd.rate": pitayaConfig.Metrics.Statsd.Rate,
"pitaya.modules.bindingstorage.etcd.dialtimeout": pitayaConfig.Modules.BindingStorage.Etcd.DialTimeout,
"pitaya.modules.bindingstorage.etcd.endpoints": pitayaConfig.Modules.BindingStorage.Etcd.Endpoints,
"pitaya.modules.bindingstorage.etcd.user": pitayaConfig.Modules.BindingStorage.Etcd.User,
"pitaya.modules.bindingstorage.etcd.pass": pitayaConfig.Modules.BindingStorage.Etcd.Pass,
"pitaya.modules.bindingstorage.etcd.leasettl": pitayaConfig.Modules.BindingStorage.Etcd.LeaseTTL,
"pitaya.modules.bindingstorage.etcd.prefix": pitayaConfig.Modules.BindingStorage.Etcd.Prefix,
"pitaya.conn.ratelimiting.limit": pitayaConfig.Conn.RateLimiting.Limit,
Expand All @@ -124,7 +130,8 @@ func (c *Config) fillDefaultValues() {
"pitaya.session.drain.period": pitayaConfig.Session.Drain.Period,
"pitaya.worker.concurrency": pitayaConfig.Worker.Concurrency,
"pitaya.worker.redis.pool": pitayaConfig.Worker.Redis.Pool,
"pitaya.worker.redis.url": pitayaConfig.Worker.Redis.ServerURL,
"pitaya.worker.redis.address": pitayaConfig.Worker.Redis.Address,
"pitaya.worker.redis.password": pitayaConfig.Worker.Redis.Password,
"pitaya.worker.retry.enabled": pitayaConfig.Worker.Retry.Enabled,
"pitaya.worker.retry.exponential": pitayaConfig.Worker.Retry.Exponential,
"pitaya.worker.retry.max": pitayaConfig.Worker.Retry.Max,
Expand Down
Loading
Loading