Skip to content

Commit

Permalink
merging classes from storage_manager_extension
Browse files Browse the repository at this point in the history
  • Loading branch information
fabiana001 committed Nov 24, 2017
2 parents f261ac0 + 67b402a commit 6cd0ea6
Show file tree
Hide file tree
Showing 11 changed files with 526 additions and 68 deletions.
3 changes: 3 additions & 0 deletions iot_ingestion_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ Therefore, you have to run the command *sbt compile* another time.
> sbt compile publishLocal
## Check if datapoints are stored in HBase

>scan 'tsdb', { TIMERANGE => [1509121625000, 1509128825000], LIMIT => 10 }
Original file line number Diff line number Diff line change
Expand Up @@ -45,46 +45,14 @@ import scala.annotation.tailrec
import scala.collection.convert.decorateAsJava._
import scala.language.postfixOps
import scala.util.{Failure,Success,Try}
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.spark.rdd.RDD
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.spark.opentsdb.DataPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
<<<<<<< HEAD
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
Expand All @@ -105,6 +73,8 @@ import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.ColumnSchema
import org.apache.kudu.client.CreateTableOptions
=======
>>>>>>> storage_manager_extension

/**
* This controller is re-generated after each change in the specification.
Expand All @@ -113,7 +83,12 @@ import org.apache.kudu.client.CreateTableOptions

package iot_ingestion_manager.yaml {
// ----- Start of unmanaged code area for package Iot_ingestion_managerYaml
<<<<<<< HEAD
@SuppressWarnings(
=======

@SuppressWarnings(
>>>>>>> storage_manager_extension
Array(
"org.wartremover.warts.While",
"org.wartremover.warts.Var",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package it.gov.daf.iotingestion.common

import it.gov.daf.iotingestion.common.SerializerDeserializer
import it.gov.daf.iotingestion.event.Event
import org.json4s.DefaultFormats
import org.json4s.native.JsonMethods._
import org.specs2.mutable.Specification

import scala.util.Success

class SerializerDeserializerSpec extends Specification {
"The SerDeser" should {
"serialize and deserialize the events correctly" in {
val jsonEvent =
"""{"id": "TorinoFDT",
|"ts": 1488532860000,
|"event_type_id": 1,
|"source": "-1965613475",
|"location": "45.06766-7.66662",
|"service": "http://opendata.5t.torino.it/get_fdt",
|"body": {"bytes": "<FDT_data period=\"5\" accuracy=\"100\" lng=\"7.66662\" lat=\"45.06766\" direction=\"positive\" offset=\"55\" Road_name=\"Corso Vinzaglio(TO)\" Road_LCD=\"40201\" lcd1=\"40202\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns=\"http://www.5t.torino.it/simone/ns/traffic_data\">\n <speedflow speed=\"20.84\" flow=\"528.00\"/>\n </FDT_data>"},
|"attributes": {"period": "5", "offset": "55", "Road_name": "Corso Vinzaglio(TO)", "Road_LCD": "40201", "accuracy": "100", "FDT_data": "40202", "flow": "528.00", "speed": "20.84", "direction": "positive"}
|}""".stripMargin

implicit val formats = DefaultFormats
val event = parse(jsonEvent, true).extract[Event]
val eventBytes = SerializerDeserializer.serialize(event)
val Success(deserEvent) = SerializerDeserializer.deserialize(eventBytes)

event must be equalTo (deserEvent)
}
}
}
2 changes: 2 additions & 0 deletions opentsdb/docker/start.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/usr/bin/env bash

rm -rf /tmp/opentsdb/*

kinit -kt /etc/opentsdb/daf.keytab daf@PLATFORM.DAF.LOCAL

export JVMARGS="-Djava.security.auth.login.config=/etc/opentsdb/jaas.conf"
Expand Down
111 changes: 108 additions & 3 deletions storage_manager/app/controllers/PhysicalDatasetController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,32 @@
package controllers

import java.lang.reflect.UndeclaredThrowableException
import java.net.URI
import java.net.{URI, URLClassLoader}
import java.security.PrivilegedExceptionAction

import akka.stream.scaladsl.{Source, StreamConverters}
import akka.util.ByteString
import com.databricks.spark.avro.SchemaConverters
import com.google.inject.Inject
import com.typesafe.config.ConfigFactory
import io.swagger.annotations.{Api, ApiOperation, ApiParam, Authorization}
import it.gov.daf.common.authentication.Authentication
import org.apache.avro.SchemaBuilder
import org.apache.hadoop.fs._
import org.apache.hadoop.security.{AccessControlException, UserGroupInformation}
import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.pac4j.play.store.PlaySessionStore
import play.api.Configuration
import play.api.libs.json.Json
import play.api.mvc._
import play.mvc.Http
import org.apache.kudu.spark.kudu._
import org.apache.spark.opentsdb.OpenTSDBContext
import play.Logger
import play.Logger.ALogger

import scala.annotation.tailrec
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}

Expand All @@ -62,12 +68,57 @@ class PhysicalDatasetController @Inject()(configuration: Configuration, val play
sparkConfig.set("spark.driver.memory", configuration.getString("spark_driver_memory").getOrElse("128M"))

private val sparkSession = SparkSession.builder().master("local").config(sparkConfig).getOrCreate()
//add all spark jars
addClassPathJars(sparkSession.sparkContext, getClass.getClassLoader)

implicit private val alogger: ALogger = Logger.of(this.getClass.getCanonicalName)

System.setProperty("sun.security.krb5.debug", "true")

val keytab: Option[String] = configuration.getString("opentsdb.context.keytab")
alogger.info(s"OpenTSDBContext Keytab: $keytab")

val principal: Option[String] = configuration.getString("opentsdb.context.principal")
alogger.info(s"OpenTSDBContext Principal: $principal")

val keytabLocalTempDir: Option[String] = configuration.getString("opentsdb.context.keytablocaltempdir")
alogger.info(s"OpenTSDBContext Keytab Local Temp Dir: $keytabLocalTempDir")

val saltwidth: Option[Int] = configuration.getInt("opentsdb.context.saltwidth")
alogger.info(s"OpenTSDBContext SaltWidth: $saltwidth")

val saltbucket: Option[Int] = configuration.getInt("opentsdb.context.saltbucket")
alogger.info(s"OpenTSDBContext SaltBucket: $saltbucket")

val openTSDBContext: OpenTSDBContext = new OpenTSDBContext(sparkSession)
keytabLocalTempDir.foreach(openTSDBContext.keytabLocalTempDir = _)
keytab.foreach(openTSDBContext.keytab = _)
principal.foreach(openTSDBContext.principal = _)
saltwidth.foreach(OpenTSDBContext.saltWidth = _)
saltbucket.foreach(OpenTSDBContext.saltBuckets = _)


private val fileSystem: FileSystem = {
val conf = new org.apache.hadoop.conf.Configuration()
FileSystem.get(conf)
}

@tailrec
private def addClassPathJars(sparkContext: SparkContext, classLoader: ClassLoader): Unit = {
classLoader match {
case urlClassLoader: URLClassLoader =>
urlClassLoader.getURLs.foreach { classPathUrl =>
if (classPathUrl.toExternalForm.endsWith(".jar") && !classPathUrl.toExternalForm.contains("test-interface")) {
sparkContext.addJar(classPathUrl.toExternalForm)
}
}
case _ =>
}
if (classLoader.getParent != null) {
addClassPathJars(sparkContext, classLoader.getParent)
}
}

UserGroupInformation.loginUserFromSubject(null)

private val proxyUser = UserGroupInformation.getCurrentUser
Expand Down Expand Up @@ -117,24 +168,30 @@ class PhysicalDatasetController @Inject()(configuration: Configuration, val play
@ApiParam(value = "the dataset's format", required = true) format: String,
@ApiParam(value = "max number of rows/chunks to return", required = false) limit: Option[Int],
@ApiParam(value = "chunk size", required = false) chunk_size: Option[Int]): Action[AnyContent] =
//TODO the storage manager should know the input database format
Action {
CheckedAction(exceptionManager orElse hadoopExceptionManager) {
HadoopDoAsAction {
_ =>
val datasetURI = new URI(uri)
val locationURI = new URI(datasetURI.getSchemeSpecificPart)
val locationScheme = locationURI.getScheme
alogger.info(s"$datasetURI\t$locationScheme\t$locationURI")

alogger.info(s" Request for $locationScheme")
val actualFormat = format match {
case "avro" => "com.databricks.spark.avro"
case format: String => format
}
locationScheme match {
val res: Result = locationScheme match {
case "hdfs" if actualFormat == "text" =>
alogger.info(s"In HDFS with format $actualFormat")
val location = locationURI.getSchemeSpecificPart
val rdd = sparkSession.sparkContext.textFile(location)
val doc = rdd.take(limit.getOrElse(defaultLimit)).mkString("\n")
Ok(doc).as("text/plain")
case "hdfs" if actualFormat == "raw" =>
alogger.info(s"In HDFS with format $actualFormat")
val location = locationURI.getSchemeSpecificPart
val path = new Path(location)
if (fileSystem.isDirectory(path))
Expand All @@ -143,6 +200,7 @@ class PhysicalDatasetController @Inject()(configuration: Configuration, val play
val dataContent: Source[ByteString, _] = StreamConverters.fromInputStream(() => data, chunk_size.getOrElse(defaultChunkSize))
Ok.chunked(dataContent.take(limit.getOrElse(defaultLimit).asInstanceOf[Long])).as("application/octet-stream")
case "hdfs" =>
alogger.info(s"In HDFS with format $actualFormat")
val location = locationURI.getSchemeSpecificPart
val df = sparkSession.read.format(actualFormat).load(location)
val doc = s"[${
Expand All @@ -151,9 +209,55 @@ class PhysicalDatasetController @Inject()(configuration: Configuration, val play
}).mkString(",")
}]"
Ok(doc).as(JSON)
case "kudu" =>
//TODO te table name is encoded in the uri
alogger.info(s"In KUDU")
println("You are in OPENTSDB")
val table = "Events"
val master = configuration.getString("kudu.master").getOrElse("NO_kudu.master")
val df = sparkSession
.sqlContext
.read
.options(Map("kudu.master" -> master, "kudu.table" -> table)).kudu
val doc = s"[${
df.take(limit.getOrElse(defaultLimit)).map(row => {
Utility.rowToJson(df.schema)(row)
}).mkString(",")
}]"
Ok(doc).as(JSON)
case "opentsdb" =>
alogger.info("You are in OPENTSDB")
//TODO te table name is encoded in the uri
Logger.info(s"In OPENTSDB")
val metric = "speed"
val tags: Map[String, String] = Map()
val interval: Option[(Long, Long)] = Some((1510499181, 1510585560))


//val ts = Try(openTSDBContext.load(metric, tags, interval)).map(_.collect)
//val result = ts.collect()

Try(openTSDBContext.loadDataFrame(metric, tags, interval)) match {
case Success(df) =>
alogger.info(s" Total size ${df.count()}")
val doc = s"[${
df.take(limit.getOrElse(defaultLimit)).map(row => {
Utility.rowToJson(df.schema)(row)
}).mkString(",")
}]"
Ok(doc).as(JSON)
case Failure(ex) =>
println(s"${ex.getMessage} \n ${ex.getStackTrace}")
Ok(ex.getMessage).as(JSON)
}


case scheme =>
alogger.info(s"Storage scheme $format not supported")
throw new NotImplementedError(s"storage scheme: $scheme not supported")
}

res
}
}
}
Expand Down Expand Up @@ -194,3 +298,4 @@ class PhysicalDatasetController @Inject()(configuration: Configuration, val play
}

}

Loading

0 comments on commit 6cd0ea6

Please sign in to comment.