Skip to content

Commit

Permalink
Merge pull request #2 from wdias/develop
Browse files Browse the repository at this point in the history
Initial Release Changes
  • Loading branch information
gihankarunarathne committed Feb 15, 2018
2 parents 3df593a + 2d5f393 commit 6fdd5a6
Show file tree
Hide file tree
Showing 46 changed files with 1,612 additions and 333 deletions.
15 changes: 8 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ lazy val app = project in file(".") enablePlugins (Cinnamon)
cinnamon in run := true
cinnamon in test := true
// Set the Cinnamon Agent log level
cinnamonLogLevel := "INFO"
// cinnamonLogLevel := "INFO"

scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8")

Expand All @@ -23,6 +23,10 @@ lazy val mysqlConnectorVersion = "5.1.36"
lazy val netcdfVersion = "4.6.11"

libraryDependencies ++= Seq(
// Use Coda Hale Metrics and Akka instrumentation
Cinnamon.library.cinnamonCHMetrics,
Cinnamon.library.cinnamonAkka,
Cinnamon.library.cinnamonCHMetricsElasticsearchReporter,
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion,
Expand All @@ -36,14 +40,11 @@ libraryDependencies ++= Seq(
"com.typesafe.slick" %% "slick" % slickVersion,
"com.typesafe.slick" %% "slick-hikaricp" % slickVersion,
"mysql" % "mysql-connector-java" % mysqlConnectorVersion,
"edu.ucar" % "netcdfAll" % netcdfVersion,
"de.aktey.akka.visualmailbox" %% "collector" % "1.1.0",
// Use Coda Hale Metrics and Akka instrumentation
Cinnamon.library.cinnamonCHMetrics,
Cinnamon.library.cinnamonAkka,
Cinnamon.library.cinnamonCHMetricsElasticsearchReporter
"edu.ucar" % "netcdfAll" % netcdfVersion
)

resolvers += "Unidata Releases" at "https://artifacts.unidata.ucar.edu/repository/unidata-releases"

licenses := Seq(("CC0", url("http://creativecommons.org/publicdomain/zero/1.0")))

connectInput in run := true
28 changes: 18 additions & 10 deletions src/data/RF/MetaData.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
{
"type": "Observed",
"station": {
"name": "Hanwella",
"latitude": 6.909722222,
"longitude": 80.08166667
"moduleId": "HEC-HMS",
"valueType": "Scalar",
"parameter": {
"parameterId": "O.Precipitation",
"variable": "Precipitation",
"unit": "mm",
"parameterType": "Instantaneous"
},
"source": "HEC-HMS",
"unit": {
"unit": "m3/s",
"type": "Instantaneous"
"location": {
"locationId": "wdias_waga",
"name": "Waga",
"lat": 6.909722222,
"lon": 80.08166667
},
"timeSeriesType": "External Historical",
"timeStep": {
"timeStepId": "each_hour",
"unit": "Hour",
"multiplier": 1
},
"variable": "Precipitation",
"tags": ["Test1"]
}
3 changes: 0 additions & 3 deletions src/data/RF/RF.csv
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
Location Names,Awissawella,Colombo
Location Ids,Awissawella,Colombo
Time,Rainfall,Rainfall
2017-05-17 00:00:00,0.01,0.19
2017-05-17 01:00:00,0.00,0.50
2017-05-17 02:00:00,0.00,2.32
Expand Down
5 changes: 3 additions & 2 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ akka {

http {
interface = "0.0.0.0"
port-input = 8000
port-api = 9000
port-input = 9000
port-api = 9001
port-single = 8000
}

services {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package org.wdias.adapter
package org.wdias.adapters.extension_adapter

import akka.actor.{Actor, ActorLogging}
import org.wdias.adapter.models._
import org.wdias.adapters.extension_adapter.ExtensionAdapter._
import org.wdias.adapters.metadata_adapter.models.{LocationsDAO, ParametersDAO, TimeSeriesMetadataDAO, TimeStepsDAO}
import org.wdias.constant._
import org.wdias.constant.TimeSeriesEnvelop

object ExtensionAdapter {

Expand All @@ -21,29 +21,21 @@ object ExtensionAdapter {

class ExtensionAdapter extends Actor with ActorLogging {

import ExtensionAdapter._

var validationConfigs = Set(
ValidationConfig("Hanwella", 0, 120),
ValidationConfig("Colombo", 0, 120),
ValidationConfig("Waga", 0, 120),
)

def receive: Receive = {
case GetValidationConfig(timeSeriesEnvelop) =>
log.info("GetValidationConfig:: {}, {}", timeSeriesEnvelop, validationConfigs.find(_.name == timeSeriesEnvelop.metaData.location.name))

val stationName = timeSeriesEnvelop.metaData.location.name
LocationsDAO.create(Location(stationName, stationName, 0, 0))
val p: ParameterObj = timeSeriesEnvelop.metaData.parameter.toParameterObj
ParametersDAO.create(p)
TimeStepsDAO.create(TimeStepObj("every_5_min", TimeStepUnit.Minute, Option(5)))
TimeSeriesMetadataDAO.create(MetadataIds("asdf", "WeatherStation", ValueType.Scalar, p.parameterId, stationName, TimeSeriesType.ExternalHistorical, "every_5_min"))

sender() ! ValidationConfigResult(validationConfigs.find(_.name == timeSeriesEnvelop.metaData.location.name), timeSeriesEnvelop)
log.info("<<<<")
case GetTransformationConfig(timeSeriesEnvelop) =>
log.info("GetTransformationConfig: {}", timeSeriesEnvelop)
case GetInterpolationConfig(timeSeriesEnvelop) =>
log.info("GetInterpolationConfig: {}", timeSeriesEnvelop)
}
}
}
186 changes: 186 additions & 0 deletions src/main/scala/org/wdias/adapters/grid_adapter/GridAdapter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package org.wdias.adapters.grid_adapter

import java.time.format.DateTimeFormatter
import java.time.{LocalDateTime, ZoneId}

import akka.actor.{Actor, ActorIdentity, ActorLogging, ActorRef, Identify}
import akka.pattern.pipe
import akka.util.Timeout
import com.paulgoldbaum.influxdbclient.Parameter.Precision
import org.wdias.adapters.grid_adapter.GridAdapter._
import org.wdias.extensions.ExtensionHandler.ExtensionHandlerData
import ucar.ma2.DataType
import ucar.nc2.{Attribute, Dimension}
// Check to Run in Actor Context
import java.nio.file.{Files, Paths}
import java.util

import com.paulgoldbaum.influxdbclient.{InfluxDB, Point, QueryResult, Record}
import org.wdias.constant._

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object GridAdapter {

case class StoreTimeSeries(timeSeriesEnvelop: TimeSeriesEnvelop)
case class StoreValidatedTimeSeries(timeSeriesEnvelop: TimeSeriesEnvelop)

case class GetTimeSeries(metaData: Metadata)

case class StoreSuccess(metadata: Metadata)

case class Result(timeSeriesEnvelop: TimeSeriesEnvelop)

case class StoreFailure()

}

class GridAdapter extends Actor with ActorLogging {


implicit val timeout: Timeout = Timeout(15 seconds)

var extensionHandlerRef: ActorRef = _
context.actorSelection("/user/extensionHandler") ! Identify(None)

def createResponse(metaData: Metadata, result: QueryResult): TimeSeriesEnvelop = {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")

var points: List[DataPoint] = List()
val valueIndex = result.series.head.columns.indexOf("value")
val records: List[Record] = result.series.head.records
records.foreach { record =>
log.info(record.allValues.toString())
val dateTimeStr: String = record.allValues.head.toString.split('Z')(0)
val dateTime = LocalDateTime.parse(dateTimeStr)
val value: Double = record.allValues(valueIndex).toString.toDouble
points = points :+ DataPoint(dateTime.format(formatter), value)
}
val timeSeries = Some(TimeSeries(points))
println("Created Response TimeSeries")
TimeSeriesEnvelop(metaData, timeSeries, None)
}

def createNetcdfFile(): Unit = {
val location = "/tmp/testWrite.nc"

if(Files.exists(Paths.get(location))) {
return
}
println("File does not exists. Create new ", location)
import ucar.nc2.NetcdfFileWriter
val writer = NetcdfFileWriter.createNew(NetcdfFileWriter.Version.netcdf3, location, null)

// add dimensions
val latDim = writer.addDimension(null, "lat", 64)
val lonDim = writer.addDimension(null, "lon", 128)

// add Variable double temperature(lat,lon)
val dims = new util.ArrayList[Dimension]
dims.add(latDim)
dims.add(lonDim)

val t = writer.addVariable(null, "temperature", DataType.DOUBLE, dims)
t.addAttribute(new Attribute("units", "K")) // add a 1D attribute of length 3
import ucar.ma2.{Array => NetcdfArray}
val data = NetcdfArray.factory(classOf[Int], Array[Int](3), Array[Int](1, 2, 3))
t.addAttribute(new Attribute("scale", data))

// add a string-valued variable: char svar(80)
val svar_len: Dimension = writer.addDimension(null, "svar_len", 80)
writer.addVariable(null, "svar", DataType.CHAR, "svar_len")

// add a 2D string-valued variable: char names(names, 80)
val names: Dimension = writer.addDimension(null, "names", 3)
writer.addVariable(null, "names", DataType.CHAR, "names svar_len")

// add a scalar variable
writer.addVariable(null, "scalar", DataType.DOUBLE, new util.ArrayList[Dimension])

// add global attributes
writer.addGroupAttribute(null, new Attribute("yo", "face"))
writer.addGroupAttribute(null, new Attribute("versionD", 1.2))
writer.addGroupAttribute(null, new Attribute("versionF", 1.2.toFloat))
writer.addGroupAttribute(null, new Attribute("versionI", 1))
writer.addGroupAttribute(null, new Attribute("versionS", 2.toShort))
writer.addGroupAttribute(null, new Attribute("versionB", 3.toByte))

// create the file
try {
writer.create()
} catch {
case e: Exception => System.err.printf("ERROR creating file %s%n%s", location, e.getMessage);
}
writer.close()
}

def receive: Receive = {
case StoreTimeSeries(data) =>
log.info("StoringTimeSeries... {}", sender())
val influxdb = InfluxDB.connect("localhost", 8086)
val database = influxdb.selectDatabase("wdias")
val metaData: Metadata = data.metaData
var points: List[Point] = List()
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
val zoneId = ZoneId.systemDefault
data.timeSeries.get.timeSeries.foreach { tt: DataPoint =>
val dateTime: LocalDateTime = LocalDateTime.parse(tt.time, formatter)
val p = Point("observed", dateTime.atZone(zoneId).toEpochSecond())
// Tags
.addTag("moduleId", metaData.moduleId)
.addTag("valueType", metaData.valueType)
.addTag("parameterId", metaData.parameter.parameterId)
.addTag("locationId", metaData.location.locationId)
.addTag("timeSeriesType", metaData.timeSeriesType)
.addTag("timeStepId", metaData.timeStep.timeStepId)
// Values
.addField("value", tt.value)

points = points :+ p
}
log.info("Created points {}", points)

pipe(database.bulkWrite(points, precision = Precision.SECONDS).mapTo[Boolean] map { isWritten =>
println("Written to the DB: " + isWritten)
if (isWritten) {
log.info("Data written to DB Success.")
log.info("Send Data to Extension Handler: {}", extensionHandlerRef)
extensionHandlerRef ! ExtensionHandlerData(data)
StoreSuccess(metaData)
} else {
StoreFailure()
}
}) to sender()

case StoreValidatedTimeSeries(timeSeriesEnvelop) =>
log.info("StoreValidatedTimeSeries... {}, {}", sender(), timeSeriesEnvelop)
createNetcdfFile()

case GetTimeSeries(query) =>
val influxdb = InfluxDB.connect("localhost", 8086)
val database = influxdb.selectDatabase("wdias")

val influxQuery = s"SELECT * FROM observed WHERE " +
s"moduleId = '${query.moduleId}' " +
s"AND valueType = '${query.valueType}' " +
s"AND parameterId = '${query.parameter.parameterId}' " +
s"AND locationId = '${query.location.locationId}' " +
s"AND timeSeriesType = '${query.timeSeriesType}' " +
s"AND timeStepId = '${query.timeStep.timeStepId}'"
log.info("Influx Query: {}", influxQuery)
val queryResult = database.query(influxQuery)

pipe(queryResult.mapTo[QueryResult] map { result =>
Result(createResponse(query, result))
}) to sender()

case ActorIdentity(_, Some(ref)) =>
log.info("Set Extension Handler Ref: {}", ref)
extensionHandlerRef = ref
case ActorIdentity(_, None) =>
context.stop(self)
case _ =>
log.info("Unknown message")
}
}
Loading

0 comments on commit 6fdd5a6

Please sign in to comment.