Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Support for Complex Watermark Types #121

Merged
merged 13 commits into from May 15, 2015
Merged

Adding Support for Complex Watermark Types #121

merged 13 commits into from May 15, 2015

Conversation

sahilTakiar
Copy link
Contributor

This is the first pull request for adding complex watermark types to Gobblin. This will replace the legacy system of tracking watermarks. The old system was de-centralized, and depended and passing custom configuration parameters between executions via a WorkUnitState.

The new implementation contains a new interface called Watermark which extends the Comparable and the Copyable interfaces. It only contains one method increment(Object record) which defines how to increment the watermark for a given record.

Another class called WatermarkInterval contains the logic for maintaining low and high watermarks. The corresponding changes to WorkUnit and WorkUnitState have been made. Since this pull request mainly focuses on defining the interfaces, no migration code has been done to the framework yet.

@sahilTakiar sahilTakiar changed the title Complex watermarks Adding Support for Complex Watermark Types May 4, 2015
import org.codehaus.jackson.annotate.JsonTypeInfo;

@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
public interface Watermark extends Comparable<Watermark>, Copyable<Watermark> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's already a Watermark interface in gobblin.source.extractor.watermark. Better to name this one ComplexWatermark or something like that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is better to be generic with a type parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The increment method is gone now, so I'm not sure if this still makes sense. A Watermark is no longer tied to a specific record type.

@sahilTakiar
Copy link
Contributor Author

As per conversation with Chavdar. I have removed the increment(Object record) from the Watermark interface. It is now up to the Extractor class to update the Watermark. I have also removed the Jackson based serialization of the Watermark class and have just added toJson and initFromJson methods to the Watermark interface. Jackson is not that efficient, and can be messy to deal with, so its best use a simpler serialization approach.

So now the expected e2e flow is as follows:

1: The Source class creates a series of WorkUnits, each WorkUnit contains a WatermarkInterval. The WatermarkInterval contains a low watermark, expected high watermark, and an actual high watermark. It is possible that a WorkUnit is not always able to pull data from its low watermark, to its high watermark. Take Camus for example, where map tasks only pull data for a (topic, partition) for "x" amount of time, and then move on to the next (topic, partition).

2: Each WorkUnit is then serialized into a SequenceFile

3: Each map task reads a series of WorkUnits from this sequence file, and de-serializes them. It then executes these WorkUnits.

4: The Extractor class is responsible for maintaining the following invariant. After each call to readRecord() the Watermark returned by getCurrentHighWatermark should cover all records up to and including the last record returned by readRecord(). This method may be called multiple times in order to report the % completion of the WorkUnit.

The following questions are still open:

1: I have added a new method to Extractor which is causing a lot of other classes to break

2: In order to avoid another state-store migration, I have added a hack to serialize the WatermarkInterval class. Should be discuss migrating away from SequenceFile formats?

*/
public interface Watermark extends Comparable<Watermark>, Copyable<Watermark> {

public void initFromJson(String json);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fromJson is sufficient.

@liyinan926
Copy link
Contributor

Wants to share some of my thoughts on the Watermark and WatermarkInterval classes:

  1. Watermark should be generic with a type parameter and a method public T getValue().
  2. WatermarkInterval should be immutable and only contain two things, the low and high Watermarks. Then it's essentially a PairWritable.
  3. The logic of maintaining a currentWatermark and manipulating it should be left to the extractors. The runtime is only responsible for serialization/deserialization of the interval.
  4. Watermark does not need to extend Copyable given the above three points.

@zliu41
Copy link
Contributor

zliu41 commented May 5, 2015

If Watermark is generic, then in addition to a MyWatermark class, I also need another class MyWatermarkValueType and do
public class MyWatermark implements Watermark<MyWatermarkValueType>, which doesn't seem necessary. Why not put those values directly in MyWatermark?

It seems the getX method can be implemented in each individual Watermark class if needed, e.g., IntWatermark.getInt(), LongWatermark.getLong(), PairLongWatermark.getFirst(), PairLongWatermark.getSecond() etc.

@liyinan926
Copy link
Contributor

I don't quite understand your point. Regardless of whether Watermark is generic or not, you still need to put some type of value into your MyWatermark class. With generic, the only additional thing you need to do is to specify explicitly what is the type of that value. The benefits are you allow the compiler to enforce type checking, which is always better than using Object.

@liyinan926
Copy link
Contributor

It's good to have a common method like getValue so it is able to get the value of a Watermark as long as it has access to the interface. If every implementation classes implement their own way of getting the value of a Watermark, then the code that needs the value needs to know the specific implementation class and very likely need to do a casting before it is able to call that method.

@sahilTakiar
Copy link
Contributor Author

@liyinan926 for points 2 and 3. I like the idea of having a WatermarkInterval only containing two Watermarks, a low and a high Watermark. However, I think it is still important that the runtime high watermark still get serialized along with the WorkUnitState. The source class should have access to the WorkUnitStates of the previous run, along with some way to access the low, expected high, and actual high watermark.

Perhaps, the WorkUnit can have a WatermarkInterval that contains the low and expected high Watermark, and the WorkUnitState can have the actual high Watermark. Thoughts?

@liyinan926
Copy link
Contributor

@sahilTakiar, yeah, I think that's a good solution. So basically the source/extractor gives the WorkUnitState a reference to the actualHighWatermark, and increments that as records get extracted. Upon completion of the task, the actualHighWatermark and the WatermarkInterval get serialized as part of the WorkunitState. When serializing the actualHighWatermark, the WorkUnitState simply calls the toJson method. In addition, the WatermarkInterval that is immutable should be owned by the WorkUnit inside the WorkUnitState, whereas the actualHighWatermark should be owned by the WorkUnitState.

@sahilTakiar
Copy link
Contributor Author

Updated. The Watermark interface now contains a fromJson(JsonElement json) and a JsonElement toJson() method.

I removed the new method from the Extractor interface so we don't break all the dependent classes, the WorkUnitState.setHighWatermark(Watermark) method is now responsible for setting the runtime high watermark. This method must be called during the Extractor initialization so that the WorkUnitState can properly report the progress of the Watermark.

I also changed WatermarkInterval to contain just two Watermarks, and now the WorkUnit serializes a WatermarkInterval, and the WorkUnitState just serializes a single high Watermark.

@sahilTakiar
Copy link
Contributor Author

@liyinan926 I am not entirely sure I understand why / how to make the Watermark generic. The Watermark interface may contain a variety of data structures used to track the progress of a WorkUnitState - e.g. maps, arrays, lists, etc. The only requirement is that all these data structures can be converted into a JsonElement, and that it is possible to do comparisons.

@liyinan926
Copy link
Contributor

@sahilTakiar. What I thought is Watermark is a container for some value of a certain type, which can be parameterized. I also thought Watermark would support getting the internal value it contains. I think an interface for classes that may contain any types of values deserve being type parameterized. I am thinking of the following interface.

/**
 * @param <T> type of the watermark value
 */
public interface Watermark<T> {

  public T getValue();
}

With this interface, you can have something like:

public class NumericalWatermark<T extends Numeric> implements Watermark<T> {
}

Like what I said above, adding a type parameter allows the compiler to do type checking and helps reduce possible abuse of Object like the following:

public class SimpleWatermark implements Watermark {
  public SimpleWatermark(Object obj) {
    ...
  }
}

@sahilTakiar
Copy link
Contributor Author

Hey, so I think the Watermark interface could be used in a couple different ways, depending on the source:

  • For MySQL, each table has a single Date value inside its Watermark interface
  • For SFTP, the Watermark interface contains a list of files that have already been pulled
  • For Kafka, the Watermark contains a Map of from a Kafka (topic, partition) pair to some offset

For all the above use cases I think have a getValue() function would be good, for MySQL you would have public class MySQL implements Watermark<Long> and the method public Long getValue().

However, if we take that approach then we add a restriction any class that implements the Watermark interface. The restriction is that the Watermark but be represented entirely by one data structure. While I can't think of any Source that would need multiple data structures to represent its Watermark, but it may entirely be possible.

Also, I think the toJson() method is already doing what the getValue() method would do. The toJson() method allows you to return the value of the Watermark in JSON format.

@chavdar
Copy link
Contributor

chavdar commented May 6, 2015

Let's discuss this in person. I don't understand why we need getValue(). The Watermark should be the watermark implementation. We already have a container for watermarks -- the workunit.

@sahilTakiar
Copy link
Contributor Author

@chavdar I still have some questions, so maybe we can sync up. Primarily, I realized that I will also have to change the calculatePercentCompletion method to take two JsonElements instead of Watermarks, since this method will get called in the in the map task.

Which seems a little odd from a usability perspective, ideally the user should not need to worry about the fact that Watermark is getting serialized.

@sahilTakiar
Copy link
Contributor Author

Updated based on discussion with Chavdar.

@liyinan926 any more comments?

*/
public class WatermarkInterval {

private Watermark lowWatermark;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both can be final.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@sahilTakiar
Copy link
Contributor Author

Comments have been address @liyinan926

@liyinan926
Copy link
Contributor

LGTM.

@sahilTakiar
Copy link
Contributor Author

@chavdar any other comments?

sahilTakiar added a commit that referenced this pull request May 15, 2015
Adding Support for Complex Watermark Types
@sahilTakiar sahilTakiar merged commit ed4b4d5 into apache:master May 15, 2015
@sahilTakiar
Copy link
Contributor Author

Merged

@sahilTakiar sahilTakiar deleted the complexWatermarks branch May 15, 2015 18:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants