-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor switch statements to methods on enums #42
Conversation
* CompositeTransform.of(input -> input | ||
* .apply(PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInput())) | ||
* .apply(DecodePubsubMessages.AlreadyDecoded)); | ||
* } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was surprised that the beam SDK itself didn't have some method for creating a PTransfrom from a lambda, so maybe I missed it. This allows us to cut the size of a few of the enum classes in half due to the reduced boilerplate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this approach lgtm with a few comments
return CompositeTransform.of(input -> input | ||
.apply(OutputFileFormat.json.encode()) | ||
.apply(Foreach.string(System.out::println)) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
@Override | ||
public PDone expand(PCollection<InputT> input) { | ||
input.apply(ParDo.of(new Fn())); | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why null
instead of PDone.in(input.getPipeline())
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oversight. Just pushed a commit.
public PTransform<PBegin, PCollectionTuple> read(Sink.Options options) { | ||
return CompositeTransform.of(input -> input | ||
.apply(TextIO.read().from(options.getInput())) | ||
.apply(InputFileFormat.text.decode()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
presumably part of WIP, but file format shouldn't fixed except on error outputs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Had forgotten, but need to change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushed a commit.
|
||
void setErrorOutputType(ErrorOutputType value); | ||
void setErrorOutputType(OutputType value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need some sort of validation to assert that --errorOutputType
can't be BigQuery
, and we need to make options.getErrorOutputType().write(options)
use json encoding even when --outputType=text
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It caused some code duplication, bit I split out a separate ErrorOutputType. It uncovered some deeper logical errors in the reuse too, so good catch. I'll try to eliminate the code duplication as part of implementing dynamic writes.
.apply(TextIO.write().to(options.getErrorOutput()).withWindowedWrites()) | ||
); | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ErrorOutputType
should support pubsub
ingestion-beam/src/main/java/com/mozilla/telemetry/options/OutputType.java
Show resolved
Hide resolved
ingestion-beam/src/main/java/com/mozilla/telemetry/transforms/PubsubMessageMixin.java
Outdated
Show resolved
Hide resolved
} catch (Throwable e) { | ||
throw new RuntimeException(); | ||
} | ||
}); | ||
} | ||
}; | ||
|
||
public static ObjectMapper objectMapper = new ObjectMapper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out we were using the PubsubMixin for reading JSON, but not for writing. We now use a single static PubsubMessageMixin.MAPPER
instance everywhere for consistency.
* analogous to fromText or fromJson to handle output from PubsubIO. | ||
* | ||
* The packaging of subclasses here follows the style guidelines as captured in | ||
* https://beam.apache.org/contribute/ptransform-style-guide/#packaging-a-family-of-transforms |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Factored out these anonymous subclasses to be named classes with static factory methods per style guide.
|
||
static public Duration parseWindowDuration(Sink.Options options) { | ||
return DurationUtils.parseDuration(options.getWindowDuration()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added reusable static members here to reduce a bit of the duplication between OutputType and ErrorOutputType
d843543
to
1412bbe
Compare
4a394ca
to
40fe15c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* Static factory methods. | ||
*/ | ||
|
||
/** Decoder from non-json fromText to PubsubMessage. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i feel like we should drop the fromText
part of this comment to match the one on line 49
/** Decoder from non-json fromText to PubsubMessage. */ | ||
public static Text fromText() { | ||
return new Text(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess it doesn't matter, but why a method instead of a final variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly to match what I'm seeing in the built-in transforms. This fromText()
method is analogous to TextIO.write()
.
I guess to fully match naming conventions the methods should be text()
and json()
(since the class name is already a verb, the factory method should name the type of the input/output).
40fe15c
to
f66f7f8
Compare
f66f7f8
to
9509f65
Compare
Looking for early feedback about whether you're okay with this approach, @relud. If you're okay with going ahead, I'll add some additional Javadoc and make sure style, etc. is all passing.
This is a major refactor that leans into the Options class and the enums we've defined. Basically, each enum gains a method that returns a PTransform, and all the transformation logic is accessed via calling those enum methods.
This avoids needing switch statements, and instead the particular configured enum values determine the behavior. The actual Sink class is now tiny.