/
handler.go
143 lines (124 loc) · 4.83 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Package zigbee2mqtt contains a handler for dealing with zigbee2mqtt messages
package zigbee2mqtt
import (
"context"
"encoding/json"
"fmt"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// Handler handles zigbee2mqtt messages
type Handler struct {
database *mongo.Database
logger zerolog.Logger
devices map[string]*Device
}
// Connected is a handler which is called when the initial connection to the mqtt server is established.
func (h *Handler) Connected(client mqtt.Client) {
token := client.Subscribe("zigbee2mqtt/bridge/devices", 1, h.deviceListChangedHandler)
if token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
}
}
// DefaultPublished is a handler which is called when a message is received but no explicit handler is called.
func (h *Handler) DefaultPublished(client mqtt.Client, message mqtt.Message) {
h.logger.Info().Msgf("received message on topic: %s", message.Topic())
}
func (h *Handler) deviceListChangedHandler(client mqtt.Client, message mqtt.Message) {
var devices []Device
err := json.Unmarshal(message.Payload(), &devices)
if err != nil {
fmt.Println(err.Error())
} else {
for i := range devices {
device := &devices[i]
if h.devices[device.IEEEAddress] == nil {
h.subscribeToDevice(client, device)
}
}
}
}
func (h *Handler) subscribeToDevice(client mqtt.Client, device *Device) {
logger := h.logger.With().Str("friendly_name", device.FriendlyName).Str("device_id", device.IEEEAddress).Logger()
logger.Info().Msgf("subscribing to device")
client.Subscribe("zigbee2mqtt/"+device.FriendlyName, 1, h.deviceEventHandler(device, &logger))
h.persistDeviceToMongodb(device)
}
func (h *Handler) deviceEventHandler(device *Device, logger *zerolog.Logger) mqtt.MessageHandler {
return func(client mqtt.Client, message mqtt.Message) {
logger.Info().Msgf("received new status message")
var m map[string]interface{}
err := json.Unmarshal(message.Payload(), &m)
if err != nil {
logger.Error().Msgf("failed to unmarshall payload: %s", err.Error())
return
}
h.persistStateToMongoDB(device, m)
}
}
func (h *Handler) persistDeviceToMongodb(device *Device) {
// Set up mongodb variables
collection := h.database.Collection("current_state")
set := bson.D{
{Key: "friendly_name", Value: device.FriendlyName},
{Key: "type", Value: device.Type},
{Key: "network_address", Value: device.NetworkAddress},
{Key: "model", Value: device.Definition.Model},
{Key: "vendor", Value: device.Definition.Vendor},
{Key: "attributes", Value: device.Definition.Exposes},
}
filter := bson.D{{Key: "_id", Value: "mqtt_" + device.IEEEAddress}}
_, err := collection.UpdateOne(context.Background(), filter, bson.D{{Key: "$set", Value: set}}, options.Update().SetUpsert(true))
if err != nil {
h.logger.Error().Msgf("failed to persist to mongodb: %s", err.Error())
}
}
func (h *Handler) persistStateToMongoDB(device *Device, state map[string]interface{}) {
// Figure out the current time
t := time.Now().UTC()
y, m, d := time.Now().UTC().Date()
ts := t.Unix()
// Set up mongodb variables
collection := h.database.Collection("current_state")
set := bson.D{{Key: "update_ts", Value: ts}, {Key: "friendly_name", Value: device.FriendlyName}}
push := bson.D{}
// Iterate through the changes.
for _, attr := range device.Definition.Exposes {
if state[attr.Property] != nil {
set = append(set, bson.E{Key: "status." + attr.Property, Value: state[attr.Property]})
push = append(push,
bson.E{Key: "status." + attr.Property, Value: bson.D{
{Key: "ts", Value: ts}, {Key: "v", Value: state[attr.Property]}}})
}
}
filter := bson.D{{Key: "_id", Value: "mqtt_" + device.IEEEAddress}}
_, err := collection.UpdateOne(context.Background(), filter, bson.D{{Key: "$set", Value: set}}, options.Update().SetUpsert(true))
if err != nil {
h.logger.Error().Msgf("failed to persist to mongodb: %s", err.Error())
}
collection = h.database.Collection("historical_state")
filter = bson.D{
{Key: "ieee_address", Value: device.IEEEAddress},
{Key: "friendly_name", Value: device.FriendlyName},
{Key: "y", Value: y},
{Key: "m", Value: m},
{Key: "d", Value: d},
}
_, err = collection.UpdateOne(context.Background(), filter, bson.D{{Key: "$push", Value: push}}, options.Update().SetUpsert(true))
if err != nil {
h.logger.Error().Msgf("failed to persist to mongodb: %s", err.Error())
}
}
// Disconnected is called when the client disconnects from mqtt.
func (h *Handler) Disconnected(client mqtt.Client, err error) {
h.logger.Info().Msg("disconnected from mqtt")
}
// New creates a new handler.
func New(client *mongo.Client) *Handler {
return &Handler{devices: make(map[string]*Device), logger: log.Logger, database: client.Database("home")}
}