New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add Spanner package #1491
add Spanner package #1491
Conversation
config.getInstanceId.get(), | ||
config.getDatabaseId.get(), | ||
List( | ||
s"CREATE TABLE ${tablePrefix}_1 (\n Key INT64, \n Value STRING(MAX) \n) PRIMARY KEY (Key)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we really need the \n
s here? Kinda makes it hard to read.
import SpannerIOIT._ | ||
implicit val ec: ExecutionContext = ExecutionContext.global | ||
|
||
override def beforeAll(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also make a helper function for table creation and move the logic to each individual tests? We can also change _[123]
to something more meaningful in each test to make it more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it made since to batch all the Spanner CREATE statements for the sake of simplicity/performance -- I refactored it a bit to provide a Spanner table context per test that handles all the other setup logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 no strong preference. Either way works.
.set("Value").to("bar") | ||
.build()) | ||
|
||
val awaited = Await.result(result, 10.seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason for these timeouts?
throw new IllegalStateException("SpannerRead is read-only") | ||
|
||
override def tap(params: ReadP): Tap[Nothing] = | ||
throw new NotImplementedError("SpannerRead tap is not implemented") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be EmptyTap
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose the NotImplementedError
because I thought it was something that actually should be implemented in the future for the SpannerRead op. what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked IRL. Database source/sinks are not immutable/atomic so reading a tap is not guaranteed to return the same data in the original SCollection
.
.withInstanceId("someInstance") | ||
} | ||
|
||
class SpannerIOTest extends PipelineSpec with Matchers { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a ScioIOSpec
so you don't have to handcraft test jobs ;)
https://github.com/spotify/scio/blob/master/scio-test/src/main/scala/com/spotify/scio/testing/ScioIOSpec.scala
} | ||
|
||
sealed trait ReadMethod | ||
case class FromTable(tableName: String, columns: Seq[String]) extends ReadMethod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case class FromTable(tableName: String, columns: Seq[String]) extends ReadMethod | |
final case class FromTable(tableName: String, columns: Seq[String]) extends ReadMethod |
|
||
sealed trait ReadMethod | ||
case class FromTable(tableName: String, columns: Seq[String]) extends ReadMethod | ||
case class FromQuery(query: String) extends ReadMethod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case class FromQuery(query: String) extends ReadMethod | |
final case class FromQuery(query: String) extends ReadMethod |
|
||
override protected def write(data: SCollection[Mutation], | ||
params: WriteP): Future[Tap[Nothing]] = { | ||
var transform = BSpannerIO.write().withSpannerConfig(config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clairemcginty can you do it like this?
var transform = BSpannerIO.write().withSpannerConfig(config) | |
val transform = BSpannerIO | |
.write() | |
.withSpannerConfig(config) | |
.withBatchSizeBytes(params.batchSizeBytes) | |
.withFailureMode(params.failureMode) |
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig | ||
|
||
object Spanner { | ||
lazy val defaultInstance: Spanner = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lazy val defaultInstance: Spanner = { | |
private lazy val defaultInstance: Spanner = { |
SpannerOptions.newBuilder().build().getService | ||
} | ||
|
||
def getDatabaseClient(config: SpannerConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def getDatabaseClient(config: SpannerConfig, | |
def databaseClient(config: SpannerConfig, |
)) | ||
} | ||
|
||
def getAdminClient(project: String, instance: Spanner = defaultInstance): DatabaseAdminClient = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def getAdminClient(project: String, instance: Spanner = defaultInstance): DatabaseAdminClient = | |
def adminClient(project: String, instance: Spanner = defaultInstance): DatabaseAdminClient = |
|
||
package object spanner { | ||
|
||
implicit class SpannerScioContext(@transient val self: ScioContext) extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implicit class SpannerScioContext(@transient val self: ScioContext) extends Serializable { | |
implicit class SpannerScioContext(@transient private val self: ScioContext) extends Serializable { |
} | ||
} | ||
|
||
implicit class SpannerSCollection(@transient val self: SCollection[Mutation]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implicit class SpannerSCollection(@transient val self: SCollection[Mutation]) | |
implicit class SpannerSCollection(@transient private val self: SCollection[Mutation]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good 👍 There are some suggestions.
Codecov Report
@@ Coverage Diff @@
## master #1491 +/- ##
==========================================
+ Coverage 79.05% 79.12% +0.07%
==========================================
Files 170 173 +3
Lines 5208 5251 +43
Branches 415 411 -4
==========================================
+ Hits 4117 4155 +38
- Misses 1091 1096 +5
Continue to review full report at Codecov.
|
scio-spanner/src/it/scala/com/spotify/scio/spanner/SpannerIOIT.scala
Outdated
Show resolved
Hide resolved
scio-spanner/src/it/scala/com/spotify/scio/spanner/SpannerIOIT.scala
Outdated
Show resolved
Hide resolved
* spanner IO first pass * clean up * add to root project * make scala 2.11 happy * remove redundant implicits * address comments * SpannerRead returns EmptyTap * refactor integration test
* spanner IO first pass * clean up * add to root project * make scala 2.11 happy * remove redundant implicits * address comments * SpannerRead returns EmptyTap * refactor integration test
Currently supports writes from
Mutation
s and reads intoStruct
s. I have the spanner type macros saved for sometime further down the line.notes:
testId
doesn't support a unit having 2 spanner reads/writes to the same DB since it's just keyed by theSpannerConfig
params, which don't include table name. For writes the table name is keyed into eachMutation
object and reads give you a choice of specifying a table name or a literal query