Skip to content

Commit

Permalink
Try another map/reduce API design.
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffhung committed May 11, 2010
1 parent 142f14d commit 33b8526
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 0 deletions.
1 change: 1 addition & 0 deletions libswk/GNUmakefile
Expand Up @@ -82,6 +82,7 @@ LIBSWK_TESTS += swk_str_split_test
LIBSWK_EXAMPLES =#
LIBSWK_EXAMPLES += mr1.wordcount
LIBSWK_EXAMPLES += mr2.wordcount
LIBSWK_EXAMPLES += mr3.wordcount

LIBSWK_STATIC_OBJS = $(foreach item,$(LIBSWK_SRCS),src/$(item).o)
LIBSWK_SHARED_OBJS = $(foreach item,$(LIBSWK_SRCS),src/$(item).lo)
Expand Down
52 changes: 52 additions & 0 deletions libswk/example/mr3.wordcount.cpp
@@ -0,0 +1,52 @@
//#include <swk/mr2/mapper.hpp>
//#include <swk/mr2/reducer.hpp>
#include <swk/mr3/job.hpp>
#include <swk/fs_local.hpp>
#include <swk/str_split.hpp>
#include <string>
#include <vector>
#include <stdint.h>

void wordcount_mapper(const uint32_t& key, const std::string& value,
swk::mr3::mapper_context& ctx)
{
SWK_DOUT << value;
std::vector<std::string> tokens = swk::str_split(value);
for (std::vector<std::string>::const_iterator it = tokens.begin();
it != tokens.end();
++it) {
SWK_DVAR(*it);
ctx.push(*it, uint32_t(1));
}
}

void wordcount_reducer(const std::string& key, const std::vector<uint32_t>& values,
swk::mr3::reducer_context& ctx)
{
uint32_t sum = 0;
for (std::vector<uint32_t>::const_iterator it = values.begin();
it != values.end();
++it) {
sum += *it;
}
ctx.push(key, sum);
}

int main()
{
try {
std::auto_ptr<swk::mr3::job> wc(swk::mr3::make_job(
wordcount_mapper, wordcount_reducer
));
wc->add_input_path("data1.txt");
wc->add_input_path("data2.txt");
wc->add_input_path("data3.txt");
wc->set_output_dir("output");
wc->run();
return 0;
}
catch (std::exception e) {
return 1;
}
}

92 changes: 92 additions & 0 deletions libswk/include/swk/mr3/job.hpp
@@ -0,0 +1,92 @@
#ifndef SWK_MR3_JOB_HPP_INCLUDED
#define SWK_MR3_JOB_HPP_INCLUDED

#include <swk/dtool.hpp>
#include <swk/fs_local.hpp>
#include <string>
#include <vector>
#include <stdint.h>
#include <iostream>

namespace swk {
namespace mr3 {

class mr_context
{
public:

template <class K, class V>
void push(const K& k, const V& v)
{
}
};

typedef mr_context mapper_context;
typedef mr_context reducer_context;

class job
{
public:
virtual void run() = 0;

void add_input_path(const std::string& path)
{
input_paths.push_back(path);
}

void set_output_dir(const std::string& dir)
{
output_dir = dir;
}

protected:

std::vector<std::string> input_paths;
std::string output_dir;

};

template <class M, class R, class FS = fs_local>
class generic_job : public job
{
public:

virtual void run()
{
// M m;
// mapper_context mc;
// for (std::vector<std::string>::const_iterator ip = input_paths.begin(); // ip: input path
// ip != input_paths.end();
// ++ip) {
// typename FS::ichannel ic(ip->c_str());
// std::string line;
// uint32_t no = 0;
// while (std::getline(ic, line)) {
// SWK_DOUT << line;
// ++no;
// m(no, line, mc);
// }
// }
// SWK_DMVEC(mc.outputs_);
//
// R r;
// reducer_context rc;
// for (typename M::context::outputs_type::const_iterator me = mc.outputs_.begin(); // me: map entry
// me != mc.outputs_.end();
// ++me) {
// r(me->first, me->second, rc);
// }
}
};

template <class M, class R>
std::auto_ptr<job> make_job(M m, R r)
{
return std::auto_ptr<job>(new generic_job<M, R>());
}

} // namespace swk::mr3
} // namespace swk

#endif /* SWK_MR3_JOB_HPP_INCLUDED */

0 comments on commit 33b8526

Please sign in to comment.