Skip to content

Commit

Permalink
Reworks Storage Manager (#329)
Browse files Browse the repository at this point in the history
* Added file and directory info retrieval and stream merging

* Adds Livy integration via Jobs and Actor

* Added some facilities to read play configuration more easily

* Wired up the bulk download functionality, added logging within Actions and minor cleanup

* Removed OpenTSDB references and refactored sbt files

* Merged core and playApi and moved survirors to top level

* Added support for Kudu based export, export file cleanup, bug fixes and general improvements

* Adds doobie and draft json API

* Fixed final merge conflict

* Finalizes json readers and tests

* Updated README.md

* Added some WriterT instances and tests for the Select fragment

* Added where fragment writer + tests

* Implemented groupBy, having and limit + tests

* Added jdbc result conversion and tests

* Fixes some bugs and added unit test for JdbcQueryService

* Added copyrights, wired impala into query

* Fixed impala connectivity, excl. security

* Removed accidental leak of jdbc url

* Adds impersonation for both impala and hive

* Added scaladoc + minor cleanup

* Added deployment settings

* add ssl_keystore_pwd to the deployment

* Update readme.md

* Updates deployment setup

* Cleaned up configurations

* Fixed hadoop conf setting

* Added login for user from keytab

* Added loading keytab from secret

* Fixed content types and JSON representations

* Fixed problems with ticket renewal

* Fixes version numbers

* Updates default commons version

* Updated versions in deployment descriptors

* Added query explanation analysis

* Split the bulk API

* Split Query API

* Fixed location header, empty tail error for download, and cleaned up code

* Added port number to livy's host

* Changed the CA certs file into a secret

* Fixed typo in kerberos config class
  • Loading branch information
luzzu-lab committed Jul 19, 2018
1 parent 90f8440 commit f2ba367
Show file tree
Hide file tree
Showing 207 changed files with 9,298 additions and 3,015 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -10,6 +10,7 @@ logs
*/conf/prodLocal.conf
*/conf/testLocal.conf
*/localConfigs/*
test-dir/
iot_ingestion_manager/common/src/generated
.ipynb_checkpoints
catalog_manager/conf/cluster_application.conf
Expand All @@ -21,3 +22,4 @@ catalog_manager/conf/cross_spec
ingestion_manager/conf/
storage_manager_core
storage_manager/playApi/conf/production.conf
storage_manager/core/opendata
20 changes: 11 additions & 9 deletions catalog_manager/app/Module.scala
@@ -1,29 +1,31 @@
import com.google.inject.{AbstractModule, Singleton}
import it.gov.daf.catalogmanager.listeners.{IngestionListener, IngestionListenerImpl}
import it.gov.daf.common.sso.common.{CacheWrapper, LoginClient}
import com.google.inject.{ AbstractModule, Singleton }
import it.gov.daf.catalogmanager.listeners.{ IngestionListener, IngestionListenerImpl }
import it.gov.daf.common.sso.common.{ CacheWrapper, LoginClient }
import it.gov.daf.common.sso.client.LoginClientRemote
import play.api.{Configuration, Environment}
import org.slf4j.LoggerFactory
import play.api.{ Configuration, Environment }

@Singleton
class Module (environment: Environment, configuration: Configuration) extends AbstractModule {

private val logger = LoggerFactory.getLogger("it.gov.daf.catalogmanager.Module")

def configure() = {

println("executing module..")
logger.info { "executing module.." }

// REMEMBER TO LEAVE COMMENT FOR DEALING WIth Ingestion of file
bind(classOf[IngestionListener]).to(classOf[IngestionListenerImpl]).asEagerSingleton()

val cacheWrapper = new CacheWrapper(Option(30L), Option(0L))// cookie 30 min, credential not needed
bind(classOf[CacheWrapper]).toInstance(cacheWrapper)

bind(classOf[LoginClient]).to(classOf[LoginClientRemote])// for the initialization of SecuredInvocationManager

val securityManHost :Option[String] = configuration.getString("security.manager.host")
require(!securityManHost.isEmpty,"security.manager.host entry not provided")
val securityManHost: Option[String] = configuration.getString("security.manager.host")
require(securityManHost.nonEmpty,"security.manager.host entry not provided")

val loginClientRemote = new LoginClientRemote(securityManHost.get)
bind(classOf[LoginClientRemote]).toInstance(loginClientRemote)
bind(classOf[LoginClient]).to(classOf[LoginClientRemote])// for the initialization of SecuredInvocationManager
//private val secInvokManager = SecuredInvocationManager.init( LoginClientRemote.init(SEC_MANAGER_HOST) )

}
Expand Down
28 changes: 16 additions & 12 deletions common/build.sbt
Expand Up @@ -26,7 +26,7 @@ name := "common"

Seq(gitStampSettings: _*)

version in ThisBuild := sys.env.get("COMMON_VERSION").getOrElse("1.0.8-SNAPSHOT")
version in ThisBuild := sys.env.getOrElse("COMMON_VERSION", "1.0.9-SNAPSHOT")

//version := "1.0.1-SNAPSHOT"

Expand All @@ -44,7 +44,7 @@ scalacOptions ++= Seq(
"-Xfuture"
)

wartremoverErrors ++= Warts.allBut(Wart.Equals)
wartremoverErrors ++= Warts.allBut(Wart.Equals, Wart.ImplicitParameter, Wart.Overloading)

lazy val root = (project in file(".")).enablePlugins(AutomateHeaderPlugin)

Expand All @@ -61,16 +61,18 @@ val hadoopLibraries = Seq(
)
*/
val playLibraries = Seq(
"io.swagger" %% "swagger-play2" % "1.5.3",
"com.typesafe.play" %% "play-cache" % playVersion,
"com.typesafe.play" %% "filters-helpers" % playVersion,
"com.typesafe.play" %% "play-ws" % playVersion,
"com.github.cb372" %% "scalacache-guava" % "0.9.4",
"org.pac4j" % "play-pac4j" % playPac4jVersion,
"org.pac4j" % "pac4j-http" % pac4jVersion,
"org.pac4j" % "pac4j-jwt" % pac4jVersion exclude("commons-io", "commons-io"),
"org.pac4j" % "pac4j-ldap" % pac4jVersion,
"commons-codec" % "commons-codec" % "1.11"
"io.swagger" %% "swagger-play2" % "1.5.3",
"org.typelevel" %% "cats-core" % catsVersion,
"org.typelevel" %% "cats-effect" % catsEffectVersion,
"com.typesafe.play" %% "play-cache" % playVersion,
"com.typesafe.play" %% "filters-helpers" % playVersion,
"com.typesafe.play" %% "play-ws" % playVersion,
"com.github.cb372" %% "scalacache-guava" % "0.9.4",
"org.pac4j" % "play-pac4j" % playPac4jVersion,
"org.pac4j" % "pac4j-http" % pac4jVersion,
"org.pac4j" % "pac4j-jwt" % pac4jVersion exclude("commons-io", "commons-io"),
"org.pac4j" % "pac4j-ldap" % pac4jVersion,
"commons-codec" % "commons-codec" % "1.11"
)

//libraryDependencies ++= hadoopLibraries ++ playLibraries
Expand All @@ -96,5 +98,7 @@ publishTo := {

publishMavenStyle := true

autoAPIMappings := true

credentials += {if(isStaging) Credentials(Path.userHome / ".ivy2" / ".credentialsTest") else Credentials(Path.userHome / ".ivy2" / ".credentials")}

4 changes: 4 additions & 0 deletions common/project/Versions.scala
Expand Up @@ -7,4 +7,8 @@ object Versions {
val playPac4jVersion = "3.0.0"

val pac4jVersion = "2.0.0"

val catsVersion = "0.9.0"

val catsEffectVersion = "1.0.0-RC"
}
@@ -0,0 +1,21 @@
/*
* Copyright 2017 TEAM PER LA TRASFORMAZIONE DIGITALE
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package it.gov.daf.common.config

final case class ConfigMissingException(key: String) extends RuntimeException(s"Missing mandatory configuration [$key]")

final case class ConfigReadException(message: String, cause: Throwable) extends RuntimeException(message, cause)
138 changes: 138 additions & 0 deletions common/src/main/scala/it/gov/daf/common/config/ConfigReader.scala
@@ -0,0 +1,138 @@
/*
* Copyright 2017 TEAM PER LA TRASFORMAZIONE DIGITALE
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package it.gov.daf.common.config

import cats.syntax.traverse.toTraverseOps
import cats.instances.option.catsStdInstancesForOption
import cats.instances.try_.catsStdInstancesForTry
import play.api.Configuration

import scala.util.{ Try, Success, Failure }

/**
* Representation of a lazy read on configuration, with safe attempt. Serves mainly as a wrapper for composition
* purposes, with limited capabilities.
*
* @note This can be easily superseded by better and more general category transformations such as `Kleisli` associated
* with [[scala.util.Try]].
*
* @param read the binding attempt from a `Configuration` to `A`
* @tparam A
*/
sealed abstract class ConfigReader[A](val read: Configuration => Try[A]) {

def map[B](f: A => B): ConfigReader[B]

def flatMap[B](f: A => ConfigReader[B]): ConfigReader[B]

/**
* Will try to apply the mapping `f` to the result of this mapping as a composition, equivalent to calling
* [[scala.util.Try#flatMap]] on the result of this [[read]].
*
* @param f the function to compose
* @tparam B the return type of the composition result
* @return a [[ConfigReader]] instance whose read attempt will result in `B`
*/
def mapAttempt[B](f: A => Try[B]): ConfigReader[B]

/**
* Alias for [[mapAttempt]] where the return type of this [[ConfigReader]] is known to be `Configuration`.
* This allows for smooth compositions when drilling through parts of a complex configuration.
*
* {{{
* obj1 {
* arg1 = 1
* obj2 {
* arg2 = 2
* }
* }
* }}}
*
* One way to get to `arg2` is by
*
* {{{
* readObj1 ~> readObj2 ~> readArg2
* }}}
*
* @param otherReader the reader to compose after this one
* @param ev implicit evidence that shows that this reader reads `play.api.Configuration`
* @tparam B return type of `otherReader`
* @return a reader that will first apply this reader's [[read]] and then `otherReader`'s
*/
final def ~>[B](otherReader: ConfigReader[B])(implicit ev: A =:= Configuration): ConfigReader[B] = mapAttempt { a => otherReader.read(ev(a)) }

/**
* Similar to [[~>]] only it assumes that the current config reader returns an optional configuration. This allows
* composition over optional configuration fragments, with mandatory contents.
*
* @param otherReader the reader to compose after this one
* @param ev implicit evidence that shows that this reader reads `Option[play.api.Configuration]`
* @tparam B return type of `otherReader`
* @return a reader that will first apply this reader's [[read]] and then `otherReader`'s
*/
final def ~?>[B](otherReader: ConfigReader[B])(implicit ev: A =:= Option[Configuration]): ConfigReader[Option[B]] = map { ev }.mapAttempt {
_.traverse[Try, B] { otherReader.read }
}

}

sealed class MandatoryConfigReader[A](f: Configuration => Try[A]) extends ConfigReader(f) {

def map[B](ff: A => B) = new MandatoryConfigReader[B](
f andThen { _.map(ff) }
)

def flatMap[B](ff: A => ConfigReader[B]) = new MandatoryConfigReader[B]( configuration =>
f andThen {
_.flatMap { ff(_).read(configuration) }
} apply configuration
)

def mapAttempt[B](ff: A => Try[B]) = new MandatoryConfigReader[B](
f andThen {
_.flatMap { ff }
}
)

}

sealed case class OptionalConfigReader[A](private val key: String)
(private val f: Configuration => Option[A])
extends MandatoryConfigReader[Option[A]](config => Try(f(config))) {

/**
* Forces this optional value.
* @return a [[ConfigReader]] that reads a mandatory value
*/
def ! : ConfigReader[A] = new MandatoryConfigReader[A](
this.read(_).flatMap {
case Some(value) => Success(value)
case None => Failure[A] { ConfigMissingException(key) }
}
)

/**
* Adds a default value to this reader.
* @param a the default value
* @return a [[ConfigReader]] that reads an optional value and returns `a` if the value is missing
*/
def default(a: => A): ConfigReader[A] = new MandatoryConfigReader(
this.read(_).map { _ getOrElse a }
)

}

50 changes: 50 additions & 0 deletions common/src/main/scala/it/gov/daf/common/config/Read.scala
@@ -0,0 +1,50 @@
/*
* Copyright 2017 TEAM PER LA TRASFORMAZIONE DIGITALE
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package it.gov.daf.common.config

import play.api.Configuration

import scala.concurrent.duration._
import scala.collection.convert.decorateAsScala._

object Read {

def config(key: String): OptionalConfigReader[Configuration] = OptionalConfigReader(key) { _.getConfig(key) }

def configs(key: String): OptionalConfigReader[List[Configuration]] = OptionalConfigReader(key) {
_.getConfigList(key).map { _.asScala.toList }
}

def int(key: String): OptionalConfigReader[Int] = OptionalConfigReader(key) { _.getInt(key) }

def long(key: String): OptionalConfigReader[Long] = OptionalConfigReader(key) { _.getLong(key) }

def string(key: String): OptionalConfigReader[String] = OptionalConfigReader(key) { _.getString(key) }

def strings(key: String): OptionalConfigReader[List[String]] = OptionalConfigReader(key) {
_.getStringList(key).map { _.asScala.toList }
}

def double(key: String): OptionalConfigReader[Double] = OptionalConfigReader(key) { _.getDouble(key) }

def boolean(key: String): OptionalConfigReader[Boolean] = OptionalConfigReader(key) { _.getBoolean(key) }

def time(key: String): OptionalConfigReader[FiniteDuration] = OptionalConfigReader(key) {
_.getMilliseconds(key).map { _.milliseconds }
}

}

0 comments on commit f2ba367

Please sign in to comment.