Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic tasks (Mapper, Reducer and Combiner) for hadoopcompatibility #833

Closed
wants to merge 3 commits into from

Conversation

atsikiridis
Copy link
Contributor

Wrappers for basic tasks (Mapper, Reducer, Combiner), new interface for OutputCollectors and a testcase with a complete Hadoop WordCount. With these in place, along with HadoopDataSource and HadoopDataSink the ground is set to start working seriously on the hadoop abstraction layer (which by the way is my Google Summer of code project and starts officially today :))

Notice that in some cases there is code that might be generalised / refactored very soon.

@Override
@SuppressWarnings("unchecked")
public void map(Record record, Collector<Record> out) throws Exception {
output.wrapStratosphereCollector(out);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not particularly great this one... A Stratosphere Collector is wrapped everytime map and reduce are called which is not elegant. As I am aware, the implementation of Collector interface is not configurable in Stratosphere via the user (as in Hadoop) and one should work to the job driver level (the caller of the reduce function) to utilise this one. I can do that, but what do you think would be the best approach?

@rmetzger
Copy link
Member

Hey @atsikiridis, great to see your first contribution after 5 hours of GSoC.
I'll take a look at your code soon!

@twalthr
Copy link
Contributor

twalthr commented May 19, 2014

Hey @atsikiridis, please also have a look on my PR #777. I have refactored the the complete hadoop compatibility package in order to support our new Java API and the Hadoop mapred as well as the newer mapreduce API. As far as I know, type conversions (e.g. thru StratosphereTypeConverter) are actually not necessary any more since the new Java API supports Writables.

@atsikiridis
Copy link
Contributor Author

Hello @twalthr . Thanks for mentioning and very nice work! PR #777 changes a lot of stuff for hadoop-compatibility, actually. But it's not much in my pull request so I can always refactor it later on top of your stuff if integrators think so.

Artem Tsikiridis added 3 commits May 19, 2014 23:34
* wrappers for Mapper, Reducer and Combiner (as a local Reducer)
* interface for Wrappers of OutputCollectors and a default implementation
* New full example of Wordcount using mapred Mapper and Reducer
* Updated test case
}

@Override
public Plan getPlan(String... args) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are using here Stratosphere's old "Record" Java API. We are moving away from this one and will probably deprecate it in the 0.6 release.
Please check out the new Java API:

@fhueske
Copy link
Contributor

fhueske commented May 19, 2014

@atsikiridis Nice work so far! 👍
Let's try to rebase your commits to @twalthr branch and port it to the new Java API. A couple of things should be easier now (typed Tuple2 instead of Record, native support for Writables, ...).
Can you push the result of that to a new branch and post the link to it?

Thanks!

@atsikiridis
Copy link
Contributor Author

Hello @fhueske ! Thanks for the feedback. ok I will rebase on #777 . Actually I should have done this from the beginning... Well, I'll post the link here once it's ready.

Thanks.

@atsikiridis
Copy link
Contributor Author

I have ported the code of this branch to the new Java API (basically rebased on the branch in #777 ). Here is the link:
https://github.com/atsikiridis/stratosphere/tree/HadoopCompatibilityJAPIReady/stratosphere-addons/hadoop-compatibility

Due to the fact that there are some limitations with the TypeExtractor (Java's type erasure. as described in #845 and its consequences ) the implementation is not as generic as it can be and probably this shouldn't be a pull request yet. However, there is a test case implementing the identity function with mapreduce than can be taken as a proof of concept and it won't be difficult to make it generic as soon as the TypeExtractor supports it.

By the way, if we don't need the initial code for the old Records API maybe this PR should close. It's ported to the new API anyway and I'll submit a new PR very soon. Thanks!

@zentol
Copy link

zentol commented May 23, 2014

you should be able to support non identical generic input/output types already.
i had the same issue as yours while working on the python interface.
when writing a plan, the user can simply use

data
   .map(new HadoopMapFunction<Tuple2<SomeWritableComparable,SomeWritable>,SomeOutput>(jobconf){})
   ...

will this approach not work for you?

@atsikiridis
Copy link
Contributor Author

@zentol Hi, yes this works. Probably @twalthr also meant that before but I thought it was a solution to a different problem. My bad :/ . So, now we can very soon have more generic wrappers. Thanks! :)

@uce
Copy link
Contributor

uce commented Jun 25, 2014

I think this PR is subsumed by apache/flink#37.

@uce uce closed this Jun 25, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants