Skip to content

Commit

Permalink
Merge pull request #264 from taosdata/enh/xftan/TD-29403-3.0
Browse files Browse the repository at this point in the history
enh: websocket reconnect
  • Loading branch information
huskar-t authored Jun 13, 2024
2 parents e6eaad1 + dd12077 commit fef228d
Show file tree
Hide file tree
Showing 16 changed files with 1,101 additions and 225 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ jobs:
- name: checkout
uses: actions/checkout@v3

- name: copy taos cfg
run: |
sudo mkdir -p /etc/taos
sudo cp ./.github/workflows/taos.cfg /etc/taos/taos.cfg
sudo cp ./.github/workflows/taosadapter.toml /etc/taos/taosadapter.toml
- name: shell
run: |
cat >start.sh<<EOF
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ jobs:
- name: checkout
uses: actions/checkout@v3

- name: copy taos cfg
run: |
sudo mkdir -p /etc/taos
sudo cp ./.github/workflows/taos.cfg /etc/taos/taos.cfg
sudo cp ./.github/workflows/taosadapter.toml /etc/taos/taosadapter.toml
- name: shell
run: |
cat >start.sh<<EOF
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/taos.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fqdn localhost
firstEp localhost:6030
asyncLog 0
debugFlag 143
supportVnodes 256
115 changes: 115 additions & 0 deletions .github/workflows/taosadapter.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
debug = true
taosConfigDir = ""
port = 6041
logLevel = "info"
httpCodeServerError = false
SMLAutoCreateDB = false

[cors]
allowAllOrigins = true

#[pool]
#maxConnect = 0
#maxIdle = 0
#idleTimeout = 0

[ssl]
enable = false
certFile = ""
keyFile = ""

[log]
#path = "/var/log/taos"
rotationCount = 30
rotationTime = "24h"
rotationSize = "1GB"
enableRecordHttpSql = false
sqlRotationCount = 2
sqlRotationTime = "24h"
sqlRotationSize = "1GB"

[monitor]
disable = true
collectDuration = "3s"
disableCollectClientIP = true
incgroup = false
pauseQueryMemoryThreshold = 70
pauseAllMemoryThreshold = 80
identity = ""
writeToTD = false
user = "root"
password = "taosdata"
writeInterval = "30s"

[uploadKeeper]
enable = false
url = "http://127.0.0.1:6043/adapter_report"
interval = "15s"
timeout = "5s"
retryTimes = 3
retryInterval = "5s"

[opentsdb]
enable = true

[influxdb]
enable = true

[statsd]
enable = false
port = 6044
db = "statsd"
user = "root"
password = "taosdata"
worker = 10
gatherInterval = "5s"
protocol = "udp"
maxTCPConnections = 250
tcpKeepAlive = false
allowPendingMessages = 50000
deleteCounters = true
deleteGauges = true
deleteSets = true
deleteTimings = true

[collectd]
enable = false
port = 6045
db = "collectd"
user = "root"
password = "taosdata"
worker = 10


[opentsdb_telnet]
enable = false
maxTCPConnections = 250
tcpKeepAlive = false
dbs = ["opentsdb_telnet", "collectd", "icinga2", "tcollector"]
ports = [6046, 6047, 6048, 6049]
user = "root"
password = "taosdata"
batchSize = 1
flushInterval = "0s"

[node_exporter]
enable = false
db = "node_exporter"
user = "root"
password = "taosdata"
urls = ["http://localhost:9100"]
responseTimeout = "5s"
httpUsername = ""
httpPassword = ""
httpBearerTokenString = ""
caCertFile = ""
certFile = ""
keyFile = ""
insecureSkipVerify = true
gatherDuration = "5s"

[prometheus]
enable = true

[tmq]
releaseIntervalMultiplierForAutocommit = 2
68 changes: 50 additions & 18 deletions ws/client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"bytes"
"encoding/json"
"errors"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -33,7 +34,7 @@ type EnvelopePool struct {
func (ep *EnvelopePool) Get() *Envelope {
epv := ep.p.Get()
if epv == nil {
return &Envelope{Msg: new(bytes.Buffer)}
return &Envelope{Msg: new(bytes.Buffer), ErrorChan: make(chan error, 1)}
}
return epv.(*Envelope)
}
Expand All @@ -44,14 +45,24 @@ func (ep *EnvelopePool) Put(epv *Envelope) {
}

type Envelope struct {
Type int
Msg *bytes.Buffer
Type int
Msg *bytes.Buffer
ErrorChan chan error
}

func (e *Envelope) Reset() {
e.Msg.Reset()
if e.Msg.Cap() > 64*1024 {
e.Msg = new(bytes.Buffer)
} else {
e.Msg.Reset()
}
if len(e.ErrorChan) > 0 {
e.ErrorChan = make(chan error, 1)
}
}

var ClosedError = errors.New("websocket closed")

type Client struct {
conn *websocket.Conn
status uint32
Expand All @@ -63,9 +74,10 @@ type Client struct {
TextMessageHandler func(message []byte)
BinaryMessageHandler func(message []byte)
ErrorHandler func(err error)
SendMessageHandler func(envelope *Envelope)
once sync.Once
errHandlerOnce sync.Once
//SendMessageHandler func(envelope *Envelope)
once sync.Once
errHandlerOnce sync.Once
err error
}

func NewClient(conn *websocket.Conn, sendChanLength uint) *Client {
Expand All @@ -80,9 +92,9 @@ func NewClient(conn *websocket.Conn, sendChanLength uint) *Client {
TextMessageHandler: func(message []byte) {},
BinaryMessageHandler: func(message []byte) {},
ErrorHandler: func(err error) {},
SendMessageHandler: func(envelope *Envelope) {
GlobalEnvelopePool.Put(envelope)
},
//SendMessageHandler: func(envelope *Envelope) {
// GlobalEnvelopePool.Put(envelope)
//},
}
}

Expand Down Expand Up @@ -117,41 +129,61 @@ func (c *Client) WritePump() {
defer func() {
ticker.Stop()
}()

for {
select {
case message, ok := <-c.sendChan:
if !ok {
return
if message == nil {
return
}
message.ErrorChan <- ClosedError
continue
}
c.conn.SetWriteDeadline(time.Now().Add(c.WriteWait))
err := c.conn.WriteMessage(message.Type, message.Msg.Bytes())
if err != nil {
message.ErrorChan <- err
c.handleError(err)
return
c.Close()
for message := range c.sendChan {
if message == nil {
return
}
message.ErrorChan <- ClosedError
}
}
c.SendMessageHandler(message)
message.ErrorChan <- nil
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(c.WriteWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
c.handleError(err)
return
c.Close()
for message := range c.sendChan {
if message == nil {
return
}
message.ErrorChan <- ClosedError
}
}
}
}
}

func (c *Client) Send(envelope *Envelope) {
func (c *Client) Send(envelope *Envelope) error {
if !c.IsRunning() {
return
return ClosedError
}
var err error
defer func() {
// maybe closed
if recover() != nil {

err = ClosedError
return
}
}()
c.sendChan <- envelope
return err
}

func (c *Client) GetEnvelope() *Envelope {
Expand All @@ -168,8 +200,8 @@ func (c *Client) IsRunning() bool {

func (c *Client) Close() {
c.once.Do(func() {
close(c.sendChan)
atomic.StoreUint32(&c.status, StatusStop)
close(c.sendChan)
if c.conn != nil {
c.conn.Close()
}
Expand Down
41 changes: 31 additions & 10 deletions ws/schemaless/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@ const (
)

type Config struct {
url string
chanLength uint
user string
password string
db string
readTimeout time.Duration
writeTimeout time.Duration
errorHandler func(error)
enableCompression bool
url string
chanLength uint
user string
password string
db string
readTimeout time.Duration
writeTimeout time.Duration
errorHandler func(error)
enableCompression bool
autoReconnect bool
reconnectIntervalMs int
reconnectRetryCount int
}

func NewConfig(url string, chanLength uint, opts ...func(*Config)) *Config {
c := Config{url: url, chanLength: chanLength}
c := Config{url: url, chanLength: chanLength, reconnectRetryCount: 3, reconnectIntervalMs: 2000}
for _, opt := range opts {
opt(&c)
}
Expand Down Expand Up @@ -71,3 +74,21 @@ func SetEnableCompression(enableCompression bool) func(*Config) {
c.enableCompression = enableCompression
}
}

func SetAutoReconnect(reconnect bool) func(*Config) {
return func(c *Config) {
c.autoReconnect = reconnect
}
}

func SetReconnectIntervalMs(reconnectIntervalMs int) func(*Config) {
return func(c *Config) {
c.reconnectIntervalMs = reconnectIntervalMs
}
}

func SetReconnectRetryCount(reconnectRetryCount int) func(*Config) {
return func(c *Config) {
c.reconnectRetryCount = reconnectRetryCount
}
}
Loading

0 comments on commit fef228d

Please sign in to comment.