-
Notifications
You must be signed in to change notification settings - Fork 30
/
output-sparkfun
executable file
·135 lines (117 loc) · 4.38 KB
/
output-sparkfun
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/lua
--[[
Karl Palsson, 2015 <karlp@remake.is>
]]
local json = require("dkjson")
local uloop = require("uloop")
uloop.init()
local mosq = require("mosquitto")
--local posix = require("posix")
local ugly = require("remake.ugly_log")
local pl = require'pl.import_into'()
local args = pl.lapp [[
Takes the first phase of any mains and posts to a sparkfun/phant server.
if key_file is provided, the environment var is ignored.
the name of the environment var can be changed to allow multiple instances.
-H,--host (default "localhost") MQTT host to listen to
-p,--public (string) public key to use
-k,--key (default "PHANT_KEY") environment variable or file containing private key
-f,--key_is_file using this means that -k refers to a file, not an env var
-v,--verbose (0..7 default 4) Logging level, higher == more
]]
local cfg = {
APP_NAME = "output-sparkfun",
--MOSQ_CLIENT_ID = string.format("output-sparkfun-%d", posix.getpid()["pid"]),
MOSQ_CLIENT_ID = string.format("output-sparkfun-%d", 123),
MOSQ_IDLE_LOOP_MS = 100,
TOPIC_LISTEN = "status/local/json/device/#",
TEMPLATE_POST_URL="http://data.sparkfun.com/input/%s",
POST_INTERVAL = 60 * 1000, -- once a minute is enough...
}
local cache = {}
ugly.initialize(cfg.APP_NAME, args.verbose or 4)
if args.key_is_file then
ugly.debug("looking at file... %s", args.key)
if pl.path.isfile(args.key) then
args.key = pl.stringx.strip(pl.file.read(args.key))
else
pl.utils.quit(1, "key file specified does not exist: %s", args.key)
end
else
ugly.debug("looking at envvar... %s", args.key)
args.key_real = os.getenv(args.key)
end
if not args.key_real then
pl.utils.quit(1, "key must be provided by either env var or file: %s", args.key)
end
ugly.debug("Operating with private key: <%s>", args.key_real)
mosq.init()
local mqtt = mosq.new(cfg.MOSQ_CLIENT_ID, true)
for i = 1,3 do
local rv = mqtt:connect(args.host, 1883, 60)
if rv then
ugly.notice("Connected to " .. args.host)
break
else
ugly.notice("Failed to connect, retrying...")
--posix.sleep(1) -- let's not add more deps to get sub second sleeping
end
end
if not mqtt:subscribe(cfg.TOPIC_LISTEN, 0) then
-- We are not connected, just abort here and let monit restart us
ugly.err("Aborting, unable to make MQTT connection")
os.exit(1)
end
mqtt.ON_MESSAGE = function(mid, topic, jpayload, qos, retain)
local payload, err = json.decode(jpayload)
if not payload then
ugly.warning("Invalid json in message on topic: %s, %s", topic, err)
return
end
if not payload.hwc or not payload.phases then
ugly.notice("Ignoring unsuitable json format on topic: %s", topic);
return
end
if payload.error then ugly.notice("Ignoring failed reading"); return end
if payload.hwc.typeOfMeasurementPoints == "cbms" then ugly.debug("Ignoring bar reading"); return end
-- we simply keep the last remaining values.
cache.volts = payload.phases[1].voltage
cache.amps = payload.phases[1].current
cache.pf = payload.phases[1].pf
cache.kwh = payload.cumulative_wh / 1000
end
local mqtt_read = uloop.fd_add(mqtt:socket(), function(ufd, events)
mqtt:loop_read()
end, uloop.ULOOP_READ)
local mqtt_write = uloop.fd_add(mqtt:socket(), function(ufd, events)
mqtt:loop_write()
end, uloop.ULOOP_WRITE)
local mqtt_idle_timer
mqtt_idle_timer = uloop.timer(function()
-- just handle the mosquitto idle/misc loop
local success, errno, err = mqtt:loop_misc()
if not success then
local err = string.format("Lost MQTT connection: %s", err)
ugly.crit(err)
error(err)
end
mqtt_idle_timer:set(cfg.MOSQ_IDLE_LOOP_MS)
end, cfg.MOSQ_IDLE_LOOP_MS)
local timer_process_deltas
timer_process_deltas = uloop.timer(function()
if pl.tablex.size(cache) == 0 then
ugly.debug("Skipping post, no data in cache")
timer_process_deltas:set(cfg.POST_INTERVAL)
return
end
local url = string.format(cfg.TEMPLATE_POST_URL, args.public)
local cmd = string.format([[curl -X POST '%s' -H 'Phant-Private-Key: %s' ]], url, args.key_real)
local fields_args = pl.tablex.pairmap(function(k,v)
return string.format([[--data "%s=%s"]], k, v)
end, cache)
cmd = cmd .. table.concat(fields_args, " ")
ugly.debug("executing: %s", cmd)
os.execute(cmd)
timer_process_deltas:set(cfg.POST_INTERVAL)
end, cfg.POST_INTERVAL)
uloop.run()