Permalink
Browse files

Add scio bridge

  • Loading branch information...
alexarchambault committed Nov 26, 2016
1 parent aa5a275 commit 4f28452ae9a5f99f109b052df3f59e1c3a1fd3c0
View
@@ -133,6 +133,24 @@ lazy val `flink-yarn` = project
)
)
lazy val scio = project
.dependsOn(`scala-api` % "provided")
.settings(commonSettings)
.settings(
libraryDependencies ++= {
Seq(
"org.slf4j" % "slf4j-simple" % "1.7.21",
"jline" % "jline" % scalaBinaryVersion.value,
"org.scala-lang" % "scala-compiler" % scalaVersion.value,
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"com.nrinaudo" %% "kantan.csv" % "0.1.12",
"org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full,
"com.spotify" %% "scio-core" % "0.2.6",
"com.spotify" %% "scio-extra" % "0.2.6"
)
}
)
lazy val `jupyter-scala` = project
.in(file("."))
@@ -147,7 +165,8 @@ lazy val `jupyter-scala` = project
spark,
`spark-tests`,
flink,
`flink-yarn`
`flink-yarn`,
scio
)
@@ -0,0 +1,107 @@
package com.spotify.scio.jupyter
import java.io.{File, FileInputStream}
import java.nio.file.{Files, Path}
import ammonite.repl.RuntimeAPI
import ammonite.runtime.InterpAPI
import com.google.api.client.auth.oauth2.Credential
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
import com.google.api.services.dataflow.DataflowScopes
import com.google.cloud.dataflow.sdk.options.{DataflowPipelineOptions, PipelineOptions, PipelineOptionsFactory}
import com.spotify.scio.{ScioContext, ScioResult}
import scala.collection.JavaConverters._
// similar to com.spotify.scio.repl.ReplScioContext
// in the com.spotify.scio namespace to access private[scio] things
class JupyterScioContext(
options: PipelineOptions,
replJarPath: Path
)(implicit
interpApi: InterpAPI,
runtimeApi: RuntimeAPI
) extends ScioContext(options, Nil) {
addArtifacts(
replJarPath.toAbsolutePath.toString ::
runtimeApi.sess.frames
.flatMap(_.classpath)
.map(_.getAbsolutePath)
)
interpApi.load.onJarAdded {
case Seq() => // just in case
case jars =>
addArtifacts(jars.map(_.getAbsolutePath).toList)
}
def setGcpCredential(credential: Credential): Unit =
options.as(classOf[DataflowPipelineOptions]).setGcpCredential(credential)
def setGcpCredential(path: String): Unit =
setGcpCredential(
GoogleCredential.fromStream(new FileInputStream(new File(path))).createScoped(
List(DataflowScopes.CLOUD_PLATFORM).asJava
)
)
def withGcpCredential(credential: Credential): this.type = {
setGcpCredential(credential)
this
}
def withGcpCredential(path: String): this.type = {
setGcpCredential(path)
this
}
/** Enhanced version that dumps REPL session jar. */
override def close(): ScioResult = {
runtimeApi.sess.sessionJarFile(replJarPath.toFile)
super.close()
}
private[scio] override def requireNotClosed[T](body: => T) = {
require(!isClosed, "ScioContext already closed")
super.requireNotClosed(body)
}
}
object JupyterScioContext {
def apply(args: (String, String)*)(implicit
interpApi: InterpAPI,
runtimeApi: RuntimeAPI
): JupyterScioContext =
JupyterScioContext(
PipelineOptionsFactory.fromArgs(
args
.map { case (k, v) => s"--$k=$v" }
.toArray
).as(classOf[DataflowPipelineOptions]),
nextReplJarPath()
)
def apply(options: PipelineOptions)(implicit
interpApi: InterpAPI,
runtimeApi: RuntimeAPI
): JupyterScioContext =
JupyterScioContext(options, nextReplJarPath())
def apply(
options: PipelineOptions,
replJarPath: Path
)(implicit
interpApi: InterpAPI,
runtimeApi: RuntimeAPI
): JupyterScioContext =
new JupyterScioContext(options, replJarPath)
def nextReplJarPath(prefix: String = "jupyter-scala-scio-", suffix: String = ".jar"): Path =
Files.createTempFile(prefix, suffix)
}
@@ -0,0 +1,24 @@
package jupyter
import _root_.scala.tools.nsc.interpreter.Helper
import java.io.File
import com.google.api.client.auth.oauth2.Credential
import com.spotify.scio.bigquery.BigQueryClient
package object scio {
val JupyterScioContext: com.spotify.scio.jupyter.JupyterScioContext.type =
com.spotify.scio.jupyter.JupyterScioContext
def bigQueryClient(project: String): BigQueryClient =
Helper.bigQueryClient(project)
def bigQueryClient(project: String, credential: Credential): BigQueryClient =
Helper.bigQueryClient(project, credential)
def bigQueryClient(project: String, secretFile: File): BigQueryClient =
Helper.bigQueryClient(project, secretFile)
}
@@ -0,0 +1,28 @@
package scala.tools.nsc.interpreter
import java.io.File
import com.google.api.client.auth.oauth2.Credential
import com.spotify.scio.bigquery.BigQueryClient
/**
* Temporary workaround to make scio's BigQueryClient think we're in a REPL
* (requires scala.tools.nsc.interpreter somewhere in the stack)
*/
object Helper {
def bigQueryClient(project: String): BigQueryClient = {
val secret = sys.props(BigQueryClient.SECRET_KEY)
if (secret == null)
BigQueryClient(project)
else
BigQueryClient(project, new File(secret))
}
def bigQueryClient(project: String, credential: Credential): BigQueryClient =
BigQueryClient(project, credential)
def bigQueryClient(project: String, secretFile: File): BigQueryClient =
BigQueryClient(project, secretFile)
}

0 comments on commit 4f28452

Please sign in to comment.