Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unwrap internal SCollection coders to be reused in transforms #4690

Merged
merged 2 commits into from
Feb 3, 2023

Conversation

RustedBones
Copy link
Contributor

When NullableCoders is tuned on, users could get the following error

java.lang.IllegalArgumentException: Failed to extract key-value coders from Coder[(K, V)]: 
  NullableCoder(NullableCode(...

This happens when keyBy is used. It only requires the key coder as implicit. The value coder is propagated for the materialized coder set on the internal collection.
When using pre-materialized coders, We should make sure those are completely unwrapped.

@@ -27,7 +27,7 @@ private[values] trait PCollectionWrapper[T] extends TransformNameable {
/** The [[org.apache.beam.sdk.values.PCollection PCollection]] being wrapped internally. */
val internal: PCollection[T]

implicit def coder: Coder[T] = Coder.beam(internal.getCoder)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we were propagating wrapped coders

@codecov
Copy link

codecov bot commented Feb 2, 2023

Codecov Report

Merging #4690 (d744b5a) into main (470c80f) will decrease coverage by 0.01%.
The diff coverage is 77.77%.

@@            Coverage Diff             @@
##             main    #4690      +/-   ##
==========================================
- Coverage   60.64%   60.63%   -0.01%     
==========================================
  Files         286      286              
  Lines       10453    10456       +3     
  Branches      694      669      -25     
==========================================
+ Hits         6339     6340       +1     
- Misses       4114     4116       +2     
Impacted Files Coverage Δ
...ain/scala/com/spotify/scio/coders/BeamCoders.scala 76.19% <75.00%> (-1.59%) ⬇️
...a/com/spotify/scio/values/PCollectionWrapper.scala 100.00% <100.00%> (ø)
.../com/spotify/scio/parquet/read/ParquetReadFn.scala 84.90% <0.00%> (-0.95%) ⬇️

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@RustedBones RustedBones requested review from a team and lcguerrerocovo and removed request for a team February 3, 2023 09:41
case c: RecordCoder[(K, V)] =>
(
c.cs.find(_._1 == "_1").get._2.asInstanceOf[beam.Coder[K]],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to rely on type position that on the field name

@RustedBones RustedBones merged commit bf0d77a into main Feb 3, 2023
@RustedBones RustedBones deleted the scollection-internal-coder branch February 3, 2023 15:45
case _ =>
throw new IllegalArgumentException(
s"Failed to extract key-value coders from Coder[(K, V)]: $coder"
)
}
(Coder.beam(k), Coder.beam(v))
(Coder.beam(unwrap(k)), Coder.beam(unwrap(v)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is calling unwrap required everywhere Coder.beam is called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Most of the usage of Coder.beam are not with unwrap like here.
We have to unwrap when moving back from materilalized coder back to scio coders. (reuse of internal coders in partial transformation with keyBy, mapValues, ...)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants