Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add *withAttributes Pubsub I/O #546

Merged
merged 8 commits into from
Apr 19, 2017
Merged

Conversation

samschlegel
Copy link
Contributor

@samschlegel samschlegel commented Apr 15, 2017

Resolves #535, but by adding new input and output functions, as discussed in #538

Open to renaming. Needs tests.

val elementCoder = pipeline.getCoderRegistry.getScalaCoder[T]
val outputCoder = pipeline.getCoderRegistry.getScalaCoder[(T, Map[String, String])]
val parseFn = Functions.simpleFn { msg: PubsubMessage =>
val element = CoderUtils.decodeFromByteArray(elementCoder, msg.getMessage)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can throw a CoderException. It looks like in some places Beam catches this and rethrows as a RuntimeException. Not sure what we should do here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only throws exception for corrupt message right? In that case not much we can do and should be fine to let it escalate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, only for corrupt message.

val elementCoder = pipeline.getCoderRegistry.getScalaCoder[T]
val outputCoder = pipeline.getCoderRegistry.getScalaCoder[(T, Map[String, String])]
val parseFn = Functions.simpleFn { msg: PubsubMessage =>
val element = CoderUtils.decodeFromByteArray(elementCoder, msg.getMessage)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above re: exceptions.

@samschlegel
Copy link
Contributor Author

How should I add tests to this? Just reuse PubsubIO but pass in (String, Map[String, String]), or create a new TestIO type?

val outputCoder = pipeline.getCoderRegistry.getScalaCoder[(T, Map[String, String])]
val parseFn = Functions.simpleFn { msg: PubsubMessage =>
val element = CoderUtils.decodeFromByteArray(elementCoder, msg.getMessage)
val attributes = msg.getAttributeMap.asScala.toMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use a private class that extends Map[String, String] to do the lazy wrapping. It's fine since the underlying java Map is never exposed.

https://github.com/spotify/scio/blob/master/scio-core/src/main/scala/com/spotify/scio/values/SideInput.scala#L98

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a little bit too much for pubsub attributes? I mean if someone is using this methods specifically, they do care about getting attributes. Seems like a unnecessary optimization here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They do care about getting them, but modifying that map is probably unlikely, so delaying the conversion makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems pretty common to wrap j.u.Map as immutable Map instead of the mutable one from asScala. We can move those from SideInput.scala to a util file and reuse them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wrapper in SideInput also converts JIterable to Iterable, so would have to specialize for that anyways, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No that's specific to the IterableSideInput case. But wouldn't hurt to pull it into a util file like the regular Map[K, V] wrapper.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since parseFn is a SimpleFunction<T, U>, which is invariant in U, it would have to leak the wrapper. Is that okay, or is there some way to get around that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do mean by leak? parseFn and it's signature doesn't leave this method right? Can you provide a snippet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output type of parseFn is what's used as the output type of the read transform:

public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn)

and thus the element type of the returned SCollection. Would casting the wrapper to an instance of Map[String, String] be the way to go?

* @group input
*/
def pubsubSubscriptionWithAttributes[T: ClassTag](sub: String,
idLabel: String = null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: align arguments?

} else {
val elementCoder = pipeline.getCoderRegistry.getScalaCoder[T]
val outputCoder = pipeline.getCoderRegistry.getScalaCoder[(T, Map[String, String])]
val parseFn = Functions.simpleFn { msg: PubsubMessage =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elementCoder & parseFn are identical in pubsubSubscriptionWithAttributes and pubsubTopicWithAttributes, put them in a helper method? You can even put almost everything in a helper method and parameterize only the gio.PubsubIO.read().subscription(topic) vs gio.PubsubIO.read().topic(topic) part.

* Save this SCollection as a Pub/Sub topic using the given map as message attributes.
* @group output
*/
def saveAsPubsubWithAttributes[V: ClassTag](topic: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you want idLabel and timestamp args here too?

import scala.collection.JavaConverters._

private[scio] object JMapWrapper {
def ofMultiMap[A, B](self: JMap[A, JIterable[B]]): Map[A, Iterable[B]] =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ofJIterable? 🤷‍♂️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way don't have a strong opinion. We can keep ofMultiMap.

@nevillelyh
Copy link
Contributor

LGTM but there's some conflict against master. Can you fix? Thanks.

@nevillelyh nevillelyh merged commit d399eef into spotify:master Apr 19, 2017
@nevillelyh
Copy link
Contributor

Somehow squash works. Weird.
Anyway thanks for the great work!

@jonhpowell
Copy link

Appreciate the work, guys!

Would you expect it to be immediately included in an upcoming release? If so, when?

@nevillelyh
Copy link
Contributor

Don't have any immediate plan and we're still waiting on a lot of upstream changes like Beam release and scala 2.12 fixes. Can you give the snapshot a try first?

@jonhpowell
Copy link

jonhpowell commented Apr 20, 2017 via email

@smadigan-dexcom
Copy link

Hello, I work with jonopwell and tried to build the current source, it fails with
[error] /Users/smadigan/git/scio/scio-schemas/src/main/scala/com/spotify/scio/avro/AvroUtils.scala:54: not found: type TestRecord [error] def newSpecificRecord(i: Int): TestRecord = [error] ^ [error] /Users/smadigan/git/scio/scio-schemas/src/main/scala/com/spotify/scio/avro/AvroUtils.scala:55: not found: type TestRecord [error] new TestRecord(i, i.toLong, i.toFloat, i.toDouble, true, "hello") [error] ^ [error] two errors found [error] (scio-schemas/compile:compileIncremental) Compilation failed

I tried working around that error and hit further errors.

@nevillelyh
Copy link
Contributor

Are you building in IntelliJ? There's a known IntelliJ issue that can be worked around, see #543
We also publish snapshots on every master commit so you don't have to build yourself.

@smadigan-dexcom
Copy link

I built from the command line, sbt compile
Where are the snapshots published? (Forgive my ignorance, please)

@samschlegel
Copy link
Contributor Author

They're published to the Sonatype Snapshots repo which you can add to your build.sbt with:

 resolvers += Resolver.sonatypeRepo("snapshots")

@nevillelyh
Copy link
Contributor

@dgouyette
Copy link

There is no unit test on saveAsPubsubWithAttributes.
There is one unit test on saveAsPubsub

The signature of saveAsPubsubWithAttributes is difficult to read. An example with a unit test would be a good thing

@nevillelyh
Copy link
Contributor

Filed #590, would be great if you already have something and can submit a PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants