Skip to content

Commit

Permalink
Router: ignore line breaks for blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
Albert Meltzer committed Mar 8, 2020
1 parent 47955c1 commit 242d091
Show file tree
Hide file tree
Showing 7,170 changed files with 32,813 additions and 65,102 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
Expand Up @@ -335,9 +335,8 @@ class Engine[TD, EI, PD, Q, P, A](

val algorithms = algoParamsList.map {
case (algoName, algoParams) => {
try {
Doer(algorithmClassMap(algoName), algoParams)
} catch {
try { Doer(algorithmClassMap(algoName), algoParams) }
catch {
case e: NoSuchElementException => {
if (algoName == "") {
logger.error(
Expand Down Expand Up @@ -647,16 +646,12 @@ object Engine {
logger.info(s"Preparator: $preparator")
logger.info(s"AlgorithmList: $algorithmList")

if (params.skipSanityCheck) {
logger.info("Data sanity check is off.")
} else {
logger.info("Data sanity check is on.")
}
if (params.skipSanityCheck) { logger.info("Data sanity check is off.") }
else { logger.info("Data sanity check is on.") }

val td =
try {
dataSource.readTrainingBase(sc)
} catch {
try { dataSource.readTrainingBase(sc) }
catch {
case e: StorageClientException =>
logger.error(
s"Error occured reading from data source. (Reason: " +
Expand Down Expand Up @@ -783,15 +778,11 @@ object Engine {
.mapValues { _.zipWithUniqueId().map(_.swap) }

val preparedMap: Map[EX, PD] = evalTrainMap.mapValues { td =>
{
preparator.prepareBase(sc, td)
}
{ preparator.prepareBase(sc, td) }
}

val algoModelsMap: Map[EX, Map[AX, Any]] = preparedMap.mapValues { pd =>
{
algoMap.mapValues(_.trainBase(sc, pd))
}
{ algoMap.mapValues(_.trainBase(sc, pd)) }
}

val suppQAsMap: Map[EX, RDD[(QX, (Q, A))]] = evalQAsMap.mapValues { qas =>
Expand All @@ -813,9 +804,7 @@ object Engine {
val rawPredicts: RDD[(QX, P)] =
algo.batchPredictBase(sc, model, qs)
val predicts: RDD[(QX, (AX, P))] = rawPredicts.map {
case (qx, p) => {
(qx, (ax, p))
}
case (qx, p) => { (qx, (ax, p)) }
}
predicts
}
Expand Down Expand Up @@ -856,9 +845,7 @@ object Engine {
}

(0 until evalCount).map { ex =>
{
(evalInfoMap(ex), servingQPAMap(ex))
}
{ (evalInfoMap(ex), servingQPAMap(ex)) }
}.toSeq
}
}
Expand Down
Expand Up @@ -266,9 +266,7 @@ object FastEvalEngineWorkflow {
}

val servingResult = (0 until evalQAsMap.size).map { ex =>
{
(evalInfoMap(ex), servingQPAMap(ex))
}
{ (evalInfoMap(ex), servingQPAMap(ex)) }
}.toSeq

cache += Tuple2(prefix, servingResult)
Expand Down
Expand Up @@ -121,11 +121,7 @@ abstract class LAlgorithm[PD, M: ClassTag, Q, P]
if (m.asInstanceOf[PersistentModel[Params]]
.save(modelId, algoParams, sc)) {
PersistentModelManifest(className = m.getClass.getName)
} else {
Unit
}
} else {
m
}
} else { Unit }
} else { m }
}
}
Expand Up @@ -36,9 +36,7 @@ abstract class LServing[Q, P] extends BaseServing[Q, P] {
@Experimental
def supplement(q: Q): Q = q

def serveBase(q: Q, ps: Seq[P]): P = {
serve(q, ps)
}
def serveBase(q: Q, ps: Seq[P]): P = { serve(q, ps) }

/** Implement this method to combine multiple algorithms' predictions to
* produce a single final prediction. The query is the original query sent to
Expand Down
Expand Up @@ -111,11 +111,7 @@ abstract class P2LAlgorithm[PD, M: ClassTag, Q: ClassTag, P]
if (m.asInstanceOf[PersistentModel[Params]]
.save(modelId, algoParams, sc)) {
PersistentModelManifest(className = m.getClass.getName)
} else {
Unit
}
} else {
m
}
} else { Unit }
} else { m }
}
}
Expand Up @@ -115,11 +115,7 @@ abstract class PAlgorithm[PD, M, Q, P] extends BaseAlgorithm[PD, M, Q, P] {
if (m.asInstanceOf[PersistentModel[Params]]
.save(modelId, algoParams, sc)) {
PersistentModelManifest(className = m.getClass.getName)
} else {
Unit
}
} else {
Unit
}
} else { Unit }
} else { Unit }
}
}
Expand Up @@ -28,9 +28,7 @@ import org.apache.spark.SparkContext
*/
abstract class PPreparator[TD, PD] extends BasePreparator[TD, PD] {

def prepareBase(sc: SparkContext, td: TD): PD = {
prepare(sc, td)
}
def prepareBase(sc: SparkContext, td: TD): PD = { prepare(sc, td) }

/** Implement this method to produce prepared data that is ready for model
* training.
Expand Down
Expand Up @@ -40,10 +40,7 @@ abstract class JavaEvaluation extends Evaluation {
*/
def setEngineMetric[EI, Q, P, A](
baseEngine: BaseEngine[EI, Q, P, A],
metric: Metric[EI, Q, P, A, _]) {

engineMetric = (baseEngine, metric)
}
metric: Metric[EI, Q, P, A, _]) { engineMetric = (baseEngine, metric) }

/** Set the [[BaseEngine]] and [[Metric]]s for this [[JavaEvaluation]]
*
Expand Down
Expand Up @@ -50,9 +50,7 @@ object CoreWorkflow {

val batch = if (params.batch.nonEmpty) {
s"{engineInstance.engineFactory} (${params.batch}})"
} else {
engineInstance.engineFactory
}
} else { engineInstance.engineFactory }
val sc = WorkflowContext(batch, env, params.sparkEnv, mode.capitalize)

try {
Expand Down Expand Up @@ -109,9 +107,7 @@ object CoreWorkflow {

val batch = if (params.batch.nonEmpty) {
s"{evaluation.getClass.getName} (${params.batch}})"
} else {
evaluation.getClass.getName
}
} else { evaluation.getClass.getName }
val sc = WorkflowContext(batch, env, params.sparkEnv, mode.capitalize)
val evaluationInstanceId = evaluationInstances.insert(evaluationInstance)

Expand Down
Expand Up @@ -191,12 +191,8 @@ object CreateServer extends Logging {
implicit val timeout = Timeout(5.seconds)
master ? StartServer()
actorSystem.awaitTermination
} getOrElse {
error(s"Invalid engine ID or version. Aborting server.")
}
} getOrElse {
error(s"Invalid engine instance ID. Aborting server.")
}
} getOrElse { error(s"Invalid engine ID or version. Aborting server.") }
} getOrElse { error(s"Invalid engine instance ID. Aborting server.") }
}
}

Expand All @@ -220,9 +216,7 @@ object CreateServer extends Logging {

val batch = if (engineInstance.batch.nonEmpty) {
s"${engineInstance.engineFactory} (${engineInstance.batch})"
} else {
engineInstance.engineFactory
}
} else { engineInstance.engineFactory }

val sparkContext = WorkflowContext(
batch = batch,
Expand Down Expand Up @@ -337,18 +331,14 @@ class MasterActor(
interface = sc.ip,
port = sc.port,
settings = Some(settings.copy(sslEncryption = true)))
} getOrElse {
log.error("Cannot bind a non-existing server backend.")
}
} getOrElse { log.error("Cannot bind a non-existing server backend.") }
case x: StopServer =>
log.info(s"Stop server command received.")
sprayHttpListener.map { l =>
log.info("Server is shutting down.")
l ! Http.Unbind(5.seconds)
system.shutdown
} getOrElse {
log.warning("No active server is running.")
}
} getOrElse { log.warning("No active server is running.") }
case x: ReloadServer =>
log.info("Reload server command received.")
val latestEngineInstance =
Expand Down Expand Up @@ -385,9 +375,7 @@ class MasterActor(
if (retry > 0) {
retry -= 1
log.error(s"Bind failed. Retrying... ($retry more trial(s))")
context.system.scheduler.scheduleOnce(1.seconds) {
self ! BindServer()
}
context.system.scheduler.scheduleOnce(1.seconds) { self ! BindServer() }
} else {
log.error("Bind failed. Shutting down.")
system.shutdown
Expand Down Expand Up @@ -459,9 +447,7 @@ class ServerActor[Q, P](
if (args.accessKey.isEmpty) {
log.error("Feedback loop cannot be enabled because accessKey is empty.")
false
} else {
true
}
} else { true }
} else false

def remoteLog(logUrl: String, logPrefix: String, message: String): Unit = {
Expand Down Expand Up @@ -566,9 +552,7 @@ class ServerActor[Q, P](
implicit val formats =
algorithms.headOption map { alg =>
alg.querySerializer
} getOrElse {
Utils.json4sDefaultFormats
}
} getOrElse { Utils.json4sDefaultFormats }
// val genPrId = Random.alphanumeric.take(64).mkString
def genPrId: String = Random.alphanumeric.take(64).mkString
val newPrId = prediction match {
Expand Down Expand Up @@ -626,9 +610,7 @@ class ServerActor[Q, P](
// - if it is not WithPrId, no prId injection
if (prediction.isInstanceOf[WithPrId]) {
predictionJValue merge parse(s"""{"prId" : "$newPrId"}""")
} else {
predictionJValue
}
} else { predictionJValue }
} else predictionJValue

val pluginResult =
Expand Down Expand Up @@ -696,9 +678,7 @@ class ServerActor[Q, P](
}
}
} ~
pathPrefix("assets") {
getFromResourceDirectory("assets")
} ~
pathPrefix("assets") { getFromResourceDirectory("assets") } ~
path("plugins.json") {
import EngineServerJson4sSupport._
get {
Expand Down Expand Up @@ -738,9 +718,7 @@ class ServerActor[Q, P](
case EngineServerPlugin.outputSniffer =>
pluginsActorRef ? PluginsActor.HandleREST(
pluginName = pluginName,
pluginArgs = pluginArgs) map {
_.asInstanceOf[String]
}
pluginArgs = pluginArgs) map { _.asInstanceOf[String] }
}
}
}
Expand Down
Expand Up @@ -133,9 +133,8 @@ object CreateWorkflow extends Logging {
WorkflowUtils.modifyLogging(wfc.verbose)

val evaluation = wfc.evaluationClass.map { ec =>
try {
WorkflowUtils.getEvaluation(ec, getClass.getClassLoader)._2
} catch {
try { WorkflowUtils.getEvaluation(ec, getClass.getClassLoader)._2 }
catch {
case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
error(s"Unable to obtain evaluation $ec. Aborting workflow.", e)
sys.exit(1)
Expand Down Expand Up @@ -187,9 +186,8 @@ object CreateWorkflow extends Logging {
sys.exit(1)
}
val (engineLanguage, engineFactoryObj) =
try {
WorkflowUtils.getEngine(engineFactory, getClass.getClassLoader)
} catch {
try { WorkflowUtils.getEngine(engineFactory, getClass.getClassLoader) }
catch {
case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
error(
s"Unable to obtain engine: ${e.getMessage}. Aborting workflow.")
Expand All @@ -216,9 +214,7 @@ object CreateWorkflow extends Logging {

val engineParams = if (wfc.engineParamsKey == "") {
trainableEngine.jValueToEngineParams(variantJson, wfc.jsonExtractor)
} else {
engineFactoryObj.engineParams(wfc.engineParamsKey)
}
} else { engineFactoryObj.engineParams(wfc.engineParamsKey) }

val engineInstance = EngineInstance(
id = "",
Expand Down
Expand Up @@ -62,12 +62,8 @@ object EngineServerPluginContext extends Logging {
if ((params \ "enabled").extractOrElse(false)) {
info(s"Plugin ${service.pluginName} is enabled.")
plugins(service.pluginType) += service.pluginName -> service
} else {
info(s"Plugin ${service.pluginName} is disabled.")
}
} getOrElse {
info(s"Plugin ${service.pluginName} is disabled.")
}
} else { info(s"Plugin ${service.pluginName} is disabled.") }
} getOrElse { info(s"Plugin ${service.pluginName} is disabled.") }
}
new EngineServerPluginContext(plugins, pluginParams, log)
}
Expand Down
Expand Up @@ -99,7 +99,5 @@ trait FakeRun extends Evaluation with EngineParamsGenerator {
}

def func: (SparkContext => Unit) = { (sc: SparkContext) => Unit }
def func_=(f: SparkContext => Unit) {
runner = new FakeRunner(f)
}
def func_=(f: SparkContext => Unit) { runner = new FakeRunner(f) }
}
Expand Up @@ -63,9 +63,8 @@ object JsonExtractor {

extractorOption match {
case JsonExtractorOption.Both =>
try {
extractWithJson4sNative(json, json4sFormats, clazz)
} catch {
try { extractWithJson4sNative(json, json4sFormats, clazz) }
catch {
case e: Exception =>
extractWithGson(json, clazz, gsonTypeAdapterFactories)
}
Expand Down

0 comments on commit 242d091

Please sign in to comment.