Skip to content

Commit

Permalink
Add mr2, the 2nd map/reduce coding style.
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffhung committed May 11, 2010
1 parent c4e52a5 commit 142f14d
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 0 deletions.
1 change: 1 addition & 0 deletions libswk/GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ LIBSWK_TESTS += swk_str_split_test

LIBSWK_EXAMPLES =#
LIBSWK_EXAMPLES += mr1.wordcount
LIBSWK_EXAMPLES += mr2.wordcount

LIBSWK_STATIC_OBJS = $(foreach item,$(LIBSWK_SRCS),src/$(item).o)
LIBSWK_SHARED_OBJS = $(foreach item,$(LIBSWK_SRCS),src/$(item).lo)
Expand Down
66 changes: 66 additions & 0 deletions libswk/example/mr2.wordcount.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#include <swk/mr2/mapper.hpp>
#include <swk/mr2/reducer.hpp>
#include <swk/mr2/job.hpp>
#include <swk/fs_local.hpp>
#include <swk/str_split.hpp>
#include <string>
#include <vector>
#include <stdint.h>

class wordcount_mapper : public swk::mr2::mapper<uint32_t, std::string,
std::string, uint32_t>
{
public:

void operator()(const uint32_t& key,
const std::string& value,
context& ctx)
{
SWK_DOUT << value;
std::vector<std::string> tokens = swk::str_split(value);
for (std::vector<std::string>::const_iterator it = tokens.begin();
it != tokens.end();
++it) {
SWK_DVAR(*it);
ctx.push(*it, uint32_t(1));
}
}

}; // class wordcount

class wordcount_reducer : public swk::mr2::reducer<std::string, uint32_t,
std::string, uint32_t>
{
public:

void operator()(const std::string& key,
const std::vector<uint32_t>& values,
context& ctx)
{
uint32_t sum = 0;
for (std::vector<uint32_t>::const_iterator it = values.begin();
it != values.end();
++it) {
sum += *it;
}
ctx.push(key, sum);
}
};

int main()
{
try {
typedef swk::mr2::job<wordcount_mapper, wordcount_reducer> job_type;
job_type wc;
wc.add_input_path("data1.txt");
wc.add_input_path("data2.txt");
wc.add_input_path("data3.txt");
wc.set_output_dir("output");
wc.run();
return 0;
}
catch (std::exception e) {
return 1;
}
}

66 changes: 66 additions & 0 deletions libswk/include/swk/mr2/job.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#ifndef SWK_MR2_JOB_HPP_INCLUDED
#define SWK_MR2_JOB_HPP_INCLUDED

#include <swk/dtool.hpp>
#include <string>
#include <vector>
#include <stdint.h>
#include <iostream>

namespace swk {
namespace mr2 {

template <class M, class R, class FS = fs_local>
class job
{
public:

void add_input_path(const std::string& path)
{
input_paths.push_back(path);
}

void set_output_dir(const std::string& dir)
{
output_dir = dir;
}

void run()
{
M m;
typename M::context mc;
for (std::vector<std::string>::const_iterator ip = input_paths.begin(); // ip: input path
ip != input_paths.end();
++ip) {
typename FS::ichannel ic(ip->c_str());
std::string line;
uint32_t no = 0;
while (std::getline(ic, line)) {
SWK_DOUT << line;
++no;
m(no, line, mc);
}
}
SWK_DMVEC(mc.outputs_);

R r;
typename R::context rc;
for (typename M::context::outputs_type::const_iterator me = mc.outputs_.begin(); // me: map entry
me != mc.outputs_.end();
++me) {
r(me->first, me->second, rc);
}
}

private:

std::vector<std::string> input_paths;
std::string output_dir;

};

} // namespace swk::mr2
} // namespace swk

#endif /* SWK_MR2_JOB_HPP_INCLUDED */

47 changes: 47 additions & 0 deletions libswk/include/swk/mr2/mapper.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#ifndef SWK_MR2_MAPPER_HPP_INCLUDED
#define SWK_MR2_MAPPER_HPP_INCLUDED

#include <swk/fs_local.hpp>
#include <vector>
#include <map>
#include <utility>

namespace swk {
namespace mr2 {

template <class IK, class IV, class OK, class OV>
class mapper_context
{
public:

typedef std::map<OK, std::vector<OV> > outputs_type;

void push(const OK& ok, const OV& ov)
{
std::pair<typename outputs_type::iterator, bool> r
= outputs_.insert(make_pair(ok, std::vector<OV>()));
r.first->second.push_back(ov);
}

outputs_type outputs_;

}; // class swk::mapper_context

template <class IK, class IV, class OK, class OV>
class mapper
{
public:

typedef mapper_context<IK, IV, OK, OV> context;

// virtual void operator()(const IK& key,
// const IV& value,
// context& ctx) = 0;

}; // class swk::mapper

} // namespace swk::mr2
} // namespace swk

#endif /* SWK_MR2_MAPPER_HPP_INCLUDED */

41 changes: 41 additions & 0 deletions libswk/include/swk/mr2/reducer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#ifndef SWK_MR2_REDUCER_HPP_INCLUDED
#define SWK_MR2_REDUCER_HPP_INCLUDED

#include <swk/dtool.hpp>
#include <swk/fs_local.hpp>
#include <vector>

namespace swk {
namespace mr2 {

template <class IK, class IV, class OK, class OV>
class reducer_context
{
public:

void push(const OK& ok, const OV& ov)
{
SWK_DOUT << ov << " <= " << ok;
}

}; // class swk::reducer_context

template <class IK, class IV, class OK, class OV>
class reducer
{
public:

typedef reducer_context<IK, IV, OK, OV> context;

// virtual void operator()(const IK& key,
// const std::vector<IV>& values,
// context& ctx) = 0;

}; // class swk::reducer

} // namespace mr2
} // namespace swk

#endif /* SWK_MR2_REDUCER_HPP_INCLUDED */


0 comments on commit 142f14d

Please sign in to comment.