Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
mochi-co authored Apr 30, 2024
2 parents 38895e1 + 6fc4027 commit b263293
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 20 deletions.
2 changes: 1 addition & 1 deletion README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func main() {
_ = server.AddHook(new(auth.AllowHook), nil)

// 在标1883端口上创建一个 TCP 服务端。
tcp := listeners.NewTCP("t1", ":1883", nil)
tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
err := server.AddListener(tcp)
if err != nil {
log.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion README-JP.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func main() {
_ = server.AddHook(new(auth.AllowHook), nil)

// Create a TCP listener on a standard port.
tcp := listeners.NewTCP("t1", ":1883", nil)
tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
err := server.AddListener(tcp)
if err != nil {
log.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func main() {
_ = server.AddHook(new(auth.AllowHook), nil)

// Create a TCP listener on a standard port.
tcp := listeners.NewTCP("t1", ":1883", nil)
tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
err := server.AddListener(tcp)
if err != nil {
log.Fatal(err)
Expand Down
27 changes: 24 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package config

import (
"encoding/json"
"log/slog"
"os"

"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/hooks/debug"
Expand All @@ -21,9 +23,27 @@ import (

// config defines the structure of configuration data to be parsed from a config source.
type config struct {
Options mqtt.Options
Listeners []listeners.Config `yaml:"listeners" json:"listeners"`
HookConfigs HookConfigs `yaml:"hooks" json:"hooks"`
Options mqtt.Options
Listeners []listeners.Config `yaml:"listeners" json:"listeners"`
HookConfigs HookConfigs `yaml:"hooks" json:"hooks"`
LoggingConfig LoggingConfig `yaml:"logging" json:"logging"`
}

type LoggingConfig struct {
Level string
}

func (lc LoggingConfig) ToLogger() *slog.Logger {
var level slog.Level
if err := level.UnmarshalText([]byte(lc.Level)); err != nil {
level = slog.LevelInfo
}

leveler := new(slog.LevelVar)
leveler.Set(level)
return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: leveler,
}))
}

// HookConfigs contains configurations to enable individual hooks.
Expand Down Expand Up @@ -149,6 +169,7 @@ func FromBytes(b []byte) (*mqtt.Options, error) {
o = c.Options
o.Hooks = c.HookConfigs.ToHooks()
o.Listeners = c.Listeners
o.Logger = c.LoggingConfig.ToLogger()

return &o, nil
}
6 changes: 5 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package config

import (
"log/slog"
"os"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -60,7 +62,6 @@ options:
}
}
`)

parsedOptions = mqtt.Options{
Listeners: []listeners.Config{
{
Expand All @@ -81,6 +82,9 @@ options:
RestoreSysInfoOnRestart: true,
},
},
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: new(slog.LevelVar),
})),
}
)

Expand Down
2 changes: 2 additions & 0 deletions examples/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,5 @@ options:
always_return_response_info: false
restore_sys_info_on_restart: false
no_inherited_properties_on_ack: false
logging:
level: INFO
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ require (
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
go.opencensus.io v0.22.5 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,8 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -409,17 +409,17 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
26 changes: 26 additions & 0 deletions packets/tpackets.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ const (
TDisconnect
TDisconnectTakeover
TDisconnectMqtt5
TDisconnectMqtt5DisconnectWithWillMessage
TDisconnectSecondConnect
TDisconnectReceiveMaximum
TDisconnectDropProperties
Expand Down Expand Up @@ -3781,6 +3782,31 @@ var TPacketData = map[byte]TPacketCases{
},
},
},
{
Case: TDisconnectMqtt5DisconnectWithWillMessage,
Desc: "mqtt5 disconnect with will message",
Primary: true,
RawBytes: append([]byte{
Disconnect << 4, 38, // fixed header
CodeDisconnectWillMessage.Code, // Reason Code
36, // Properties Length
17, 0, 0, 0, 120, // Session Expiry Interval (17)
31, 0, 28, // Reason String (31)
}, []byte(CodeDisconnectWillMessage.Reason)...),
Packet: &Packet{
ProtocolVersion: 5,
FixedHeader: FixedHeader{
Type: Disconnect,
Remaining: 22,
},
ReasonCode: CodeDisconnectWillMessage.Code,
Properties: Properties{
ReasonString: CodeDisconnectWillMessage.Reason,
SessionExpiryInterval: 120,
SessionExpiryIntervalFlag: true,
},
},
},
{
Case: TDisconnectSecondConnect,
Desc: "second connect packet mqtt5",
Expand Down
6 changes: 5 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type Options struct {
// ClientNetReadBufferSize specifies the size of the client *bufio.Reader read buffer.
ClientNetReadBufferSize int `yaml:"client_net_read_buffer_size" json:"client_net_read_buffer_size"`

// Logger specifies a custom configured implementation of zerolog to override
// Logger specifies a custom configured implementation of log/slog to override
// the servers default logger configuration. If you wish to change the log level,
// of the default logger, you can do so by setting:
// server := mqtt.New(nil)
Expand Down Expand Up @@ -1393,6 +1393,10 @@ func (s *Server) processDisconnect(cl *Client, pk packets.Packet) error {
cl.Properties.Props.SessionExpiryIntervalFlag = true
}

if pk.ReasonCode == packets.CodeDisconnectWillMessage.Code { // [MQTT-3.1.2.5] Non-normative comment
return packets.CodeDisconnectWillMessage
}

s.loop.willDelayed.Delete(cl.ID) // [MQTT-3.1.3-9] [MQTT-3.1.2-8]
cl.Stop(packets.CodeDisconnect) // [MQTT-3.14.4-2]

Expand Down
22 changes: 19 additions & 3 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2258,7 +2258,7 @@ func TestPublishToSubscribersExhaustedSendQuota(t *testing.T) {
require.True(t, subbed)

// coverage: subscriber publish errors are non-returnable
// can we hook into zerolog ?
// can we hook into log/slog ?
_ = r.Close()
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet
pkx.PacketID = 0
Expand All @@ -2279,7 +2279,7 @@ func TestPublishToSubscribersExhaustedPacketIDs(t *testing.T) {
require.True(t, subbed)

// coverage: subscriber publish errors are non-returnable
// can we hook into zerolog ?
// can we hook into log/slog ?
_ = r.Close()
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet
pkx.PacketID = 0
Expand All @@ -2296,7 +2296,7 @@ func TestPublishToSubscribersNoConnection(t *testing.T) {
require.True(t, subbed)

// coverage: subscriber publish errors are non-returnable
// can we hook into zerolog ?
// can we hook into log/slog ?
_ = r.Close()
s.publishToSubscribers(*packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet)
time.Sleep(time.Millisecond)
Expand Down Expand Up @@ -3138,6 +3138,22 @@ func TestServerProcessPacketDisconnectNonZeroExpiryViolation(t *testing.T) {
require.ErrorIs(t, err, packets.ErrProtocolViolationZeroNonZeroExpiry)
}

func TestServerProcessPacketDisconnectDisconnectWithWillMessage(t *testing.T) {
s := newServer()
cl, _, _ := newTestClient()
cl.Properties.Props.SessionExpiryInterval = 30
cl.Properties.ProtocolVersion = 5

s.loop.willDelayed.Add(cl.ID, packets.Packet{TopicName: "a/b/c", Payload: []byte("hello")})
require.Equal(t, 1, s.loop.willDelayed.Len())

err := s.processPacket(cl, *packets.TPacketData[packets.Disconnect].Get(packets.TDisconnectMqtt5DisconnectWithWillMessage).Packet)
require.Error(t, err)

require.Equal(t, 1, s.loop.willDelayed.Len())
require.False(t, cl.Closed())
}

func TestServerProcessPacketAuth(t *testing.T) {
s := newServer()
cl, r, w := newTestClient()
Expand Down

0 comments on commit b263293

Please sign in to comment.