Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
sprsquish committed Jul 20, 2016
0 parents commit 86eb390
Show file tree
Hide file tree
Showing 8 changed files with 395 additions and 0 deletions.
14 changes: 14 additions & 0 deletions .gitignore
@@ -0,0 +1,14 @@
*.class
*.log

# sbt specific
.cache
.history
.lib/
dist/*
target/
lib_managed/
src_managed/
project/boot/
project/plugins/project/
run.sh
18 changes: 18 additions & 0 deletions build.sbt
@@ -0,0 +1,18 @@
import com.github.retronym.SbtOneJar._

oneJarSettings

name := "home-metrics"

version := "1.0"

scalaVersion := "2.11.8"

resolvers += "twitter-repo" at "https://maven.twttr.com"

libraryDependencies ++= Seq(
"com.twitter" %% "twitter-server" % "1.21.0",
"com.twitter" %% "finagle-stats" % "6.36.0",
"org.scala-lang.modules" %% "scala-java8-compat" % "0.7.0"
//"org.apache.commons" % "commons-io" % "1.3.2"
)
1 change: 1 addition & 0 deletions project/plugins.sbt
@@ -0,0 +1 @@
addSbtPlugin("org.scala-sbt.plugins" % "sbt-onejar" % "0.8")
62 changes: 62 additions & 0 deletions src/main/scala/InfluxDB.scala
@@ -0,0 +1,62 @@
package smick

import com.twitter.finagle.{ Http, Service }
import com.twitter.finagle.http._
import com.twitter.io.Buf
import com.twitter.util.Future
import java.net.URL

case class WriteFail(code: Int, body: String) extends Exception(s"$code: $body")

trait Store {
def write(vals: Seq[StoreEntry]): Future[Unit]
}

case class StoreEntry(
name: String,
value: Any,
tags: Map[String,Any] = Map.empty,
time: Option[Long] = None)

trait InfluxDB { self: SmickHome =>
val influxDest = flag("influxdb.dest", "hostname:8086", "Dest of influxDB")
val influxUser = flag("influxdb.user", "username", "Influx username")
val influxPass = flag("influxdb.pass", "password", "Influx password")
val influxDB = flag("influxdb.db", "database", "Influx Database")

class InfluxStore extends Store {
private[this] val url = new URL(
s"http://${influxUser()}:${influxPass()}@${influxDest()}/write?db=${influxDB()}")

@volatile private[this] var _client: Service[Request, Response] = _

private def client = if (_client != null) _client else {
_client = Http.newClient(influxDest()).toService
_client
}

private def escape(v: Any): String = v match {
case str: String =>
str.replaceAll(",", "\\\\,").replaceAll(" ", "\\\\ ")
case _ => v.toString
}

def write(vals: Seq[StoreEntry]): Future[Unit] =
if (vals.isEmpty) Future.Done else {
val body = vals map { case StoreEntry(name, value, tagMap, time) =>
val tags = tagMap map { case (k, v) => s"""$k="${escape(v)}"""" }
val pre = (Seq(name) ++ tags) mkString(",")
s"""$pre value=$value ${time.getOrElse("")}"""
} mkString("\n")

val req = RequestBuilder().url(url).buildPost(Buf.Utf8(body))

client(req) flatMap {
case rep if rep.statusCode < 200 || rep.statusCode >= 300 =>
Future.exception(WriteFail(rep.statusCode, rep.contentString))
case _ =>
Future.Done
}
}
}
}
50 changes: 50 additions & 0 deletions src/main/scala/Main.scala
@@ -0,0 +1,50 @@
package smick

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.twitter.finagle.util.DefaultTimer
import com.twitter.server.{ Closer, TwitterServer }
import com.twitter.util.{ Await, Duration, Future }
import java.net.URL

trait SmickHome extends TwitterServer with Closer {
implicit val timer = DefaultTimer.twitter

val json = new ObjectMapper with ScalaObjectMapper
json.registerModule(DefaultScalaModule)

val noLoop = flag("noLoop", false, "Do not start the updater loops")
protected def loopIt[T](what: String, delay: Duration, f: => Future[T]): Future[Unit] =
if (noLoop()) Future.Done else {
def loop(): Future[Unit] =
f onFailure(log.error(_, what)) transform(_ => Future.Done) delayed(delay) before loop()

val loopHandle = loop()
onExit { loopHandle.raise(Halt) }
loopHandle
}

protected def destStr(url: URL): String = {
val port = if (url.getPort < 0) url.getDefaultPort else url.getPort
s"${url.getHost}:${port}"
}
}

object Halt extends Exception

object Main extends SmickHome
with InfluxDB
with Nest
with ObserverIP
with WattVision
{
def main(): Unit = {
val store = new InfluxStore
val nest = nestLoop(store)
val observer = observerLoop(store)
val wattVision = wattVisionLoop(store)

Await.all(nest, observer, wattVision)
}
}
120 changes: 120 additions & 0 deletions src/main/scala/Nest.scala
@@ -0,0 +1,120 @@
package smick

import com.twitter.concurrent.AsyncStream
import com.twitter.conversions.time._
import com.twitter.finagle.http._
import com.twitter.finagle.{ Http, Service }
import com.twitter.io.Buf
import com.twitter.util.Future
import java.net.URL
import java.util.concurrent.ConcurrentHashMap
import scala.compat.java8.FunctionConverters._

object Nest {
type Data = Map[String, Map[String, Map[String, Any]]]
}

case class NestResult(path: String, data: Nest.Data)

trait Nest { self: SmickHome =>

val nestUrl = flag("nest.url", "https://developer-api.nest.com/devices.json", "URL for nest")
val nestAuth = flag("nest.auth", "authstring", "Nest auth code")

def nestLoop(store: Store): Future[Unit] =
loopIt("nest", 1.minute, process(store))

private def translate(v: Any): Any = v match {
// alarm state (ok is also battery health)
case "ok" => 0
case "warning" => 1
case "emergency" => 2

// battery health
case "replace" => 1

// hvac state (off is also hvac_mode)
case "off" => 0
case "cooling" => 1
case "heating" => 2

case "heat" => 1
case "cool" => 2
case "heat-cool" => 3

case str: String => s"""\"$str\""""
case _ => v
}

private[this] val protectMetrics =
Seq(
"battery_health",
"co_alarm_state",
"smoke_alarm_state",
"is_online")

private[this] val thermoMetrics =
Seq(
"humidity",
"ambient_temperature_f",
"hvac_mode",
"hvac_state",
"target_temperature_f",
"target_temperature_high_f",
"target_temperature_low_f",
"has_leaf")

private def process(store: Store): Future[Unit] =
request() flatMap { r =>
AsyncStream.fromReader(r.reader) foreach { case Buf.Utf8(body) =>
body.split("\n") match {
case Array("event: put", data) =>
val rec = json.readValue[NestResult](data.drop(6))
write(store, rec.data) onFailure println
case _ => ()
}
}
}

private[this] val dataDefs = Seq(
("smoke_co_alarms", "protect", protectMetrics),
("thermostats", "thermostat", thermoMetrics))

private def write(store: Store, data: Nest.Data): Future[Unit] = {
val entries = dataDefs flatMap { case (field, name, metrics) =>
data.get(field).toSeq flatMap { objs =>
objs flatMap { case (_, info) =>
val tags = Map("name" -> info.get("name").get, "type" -> name)
metrics flatMap { metric =>
info.get(metric) map { v => StoreEntry(metric, translate(v), tags) }
}
}
}
}
println(entries)
store.write(entries) respond println
}

private[this] val clients = new ConcurrentHashMap[URL, Service[Request, Response]]()
private[this] val newClient = asJavaFunction { url: URL =>
Http.client
.withTls(url.getHost)
.withStreaming(true)
.newClient(destStr(url))
.toService
}

private def request(urlStr: String = nestUrl()): Future[Response] = {
val url = new URL(urlStr)

val req = Request(url.getPath, "auth" -> nestAuth())
req.accept = "text/event-stream"
req.host = url.getHost

clients.computeIfAbsent(url, newClient)(req) flatMap {
case r if r.statusCode == 307 => request(r.location.get)
case r if r.statusCode == 200 => Future.value(r)
case r => Future.exception(new Exception("Can't handle " + r))
}
}
}
66 changes: 66 additions & 0 deletions src/main/scala/ObserverIP.scala
@@ -0,0 +1,66 @@
package smick

import com.twitter.conversions.time._
import com.twitter.finagle.{ Http, Service }
import com.twitter.finagle.http._
import com.twitter.util.Future
import java.net.URL

trait ObserverIP { self: SmickHome =>
val observerDest = flag("observer.dest", "hostname:80", "Meteobridge dest")
val observerFreq = flag("observer.freq", 10.seconds, "Meteobridge polling frequency")
val observerUser = flag("observer.user", "username", "Meteobridge username")
val observerPass = flag("observer.pass", "password", "Meteobridge password")

def observerLoop(store: Store): Future[Unit] =
loopIt("observer", observerFreq(), process(store))

@volatile private[this] var _url: URL = _
private def url = if (_url != null) _url else {
_url = new URL(s"http://${observerUser()}:${observerPass()}@${observerDest()}/cgi-bin/livedata.cgi")
_url
}

@volatile private[this] var _client: Service[Request, Response] = _
private def client = if (_client != null) _client else {
_client = Http.newClient(destStr(url)).toService
_client
}

private def process(store: Store): Future[Unit] = {
client(RequestBuilder().url(url).buildGet()) flatMap { res =>
val entries = res.contentString.split("\n") flatMap { line =>
line.split(" ").toList match {
case _ :: "thb0" :: temp :: hum :: _ :: press :: _ =>
Seq(StoreEntry("indoor_temp", temp),
StoreEntry("indoor_hum", hum),
StoreEntry("indoor_press", press))

case _ :: "th0" :: temp :: hum :: _ =>
Seq(StoreEntry("outdoor_temp", temp),
StoreEntry("outdoor_hum", hum))

case _ :: "wind0" :: dir :: gust :: speed :: _ =>
Seq(StoreEntry("wind_dir", dir),
StoreEntry("wind_gust", gust),
StoreEntry("wind_speed", speed))

case _ :: "sol0" :: rad :: _ =>
Seq(StoreEntry("solar_radiation", rad))

case _ :: "uv0" :: index :: _ =>
Seq(StoreEntry("uv_index", index))

case _ :: "rain0" :: rate :: total :: _ =>
Seq(StoreEntry("rain_rate", rate),
StoreEntry("rain_total", total))

case _ =>
Seq.empty
}
}

store.write(entries)
}
}
}

0 comments on commit 86eb390

Please sign in to comment.