diff --git a/libswk/GNUmakefile b/libswk/GNUmakefile index 346a98e..abcb931 100644 --- a/libswk/GNUmakefile +++ b/libswk/GNUmakefile @@ -82,6 +82,7 @@ LIBSWK_TESTS += swk_str_split_test LIBSWK_EXAMPLES =# LIBSWK_EXAMPLES += mr1.wordcount LIBSWK_EXAMPLES += mr2.wordcount +LIBSWK_EXAMPLES += mr3.wordcount LIBSWK_STATIC_OBJS = $(foreach item,$(LIBSWK_SRCS),src/$(item).o) LIBSWK_SHARED_OBJS = $(foreach item,$(LIBSWK_SRCS),src/$(item).lo) diff --git a/libswk/example/mr3.wordcount.cpp b/libswk/example/mr3.wordcount.cpp new file mode 100644 index 0000000..ca2885c --- /dev/null +++ b/libswk/example/mr3.wordcount.cpp @@ -0,0 +1,52 @@ +//#include +//#include +#include +#include +#include +#include +#include +#include + +void wordcount_mapper(const uint32_t& key, const std::string& value, + swk::mr3::mapper_context& ctx) +{ + SWK_DOUT << value; + std::vector tokens = swk::str_split(value); + for (std::vector::const_iterator it = tokens.begin(); + it != tokens.end(); + ++it) { + SWK_DVAR(*it); + ctx.push(*it, uint32_t(1)); + } +} + +void wordcount_reducer(const std::string& key, const std::vector& values, + swk::mr3::reducer_context& ctx) +{ + uint32_t sum = 0; + for (std::vector::const_iterator it = values.begin(); + it != values.end(); + ++it) { + sum += *it; + } + ctx.push(key, sum); +} + +int main() +{ + try { + std::auto_ptr wc(swk::mr3::make_job( + wordcount_mapper, wordcount_reducer + )); + wc->add_input_path("data1.txt"); + wc->add_input_path("data2.txt"); + wc->add_input_path("data3.txt"); + wc->set_output_dir("output"); + wc->run(); + return 0; + } + catch (std::exception e) { + return 1; + } +} + diff --git a/libswk/include/swk/mr3/job.hpp b/libswk/include/swk/mr3/job.hpp new file mode 100644 index 0000000..4db1f79 --- /dev/null +++ b/libswk/include/swk/mr3/job.hpp @@ -0,0 +1,92 @@ +#ifndef SWK_MR3_JOB_HPP_INCLUDED +#define SWK_MR3_JOB_HPP_INCLUDED + +#include +#include +#include +#include +#include +#include + +namespace swk { +namespace mr3 { + +class mr_context +{ +public: + + template + void push(const K& k, const V& v) + { + } +}; + +typedef mr_context mapper_context; +typedef mr_context reducer_context; + +class job +{ +public: + virtual void run() = 0; + + void add_input_path(const std::string& path) + { + input_paths.push_back(path); + } + + void set_output_dir(const std::string& dir) + { + output_dir = dir; + } + +protected: + + std::vector input_paths; + std::string output_dir; + +}; + +template +class generic_job : public job +{ +public: + + virtual void run() + { +// M m; +// mapper_context mc; +// for (std::vector::const_iterator ip = input_paths.begin(); // ip: input path +// ip != input_paths.end(); +// ++ip) { +// typename FS::ichannel ic(ip->c_str()); +// std::string line; +// uint32_t no = 0; +// while (std::getline(ic, line)) { +// SWK_DOUT << line; +// ++no; +// m(no, line, mc); +// } +// } +// SWK_DMVEC(mc.outputs_); +// +// R r; +// reducer_context rc; +// for (typename M::context::outputs_type::const_iterator me = mc.outputs_.begin(); // me: map entry +// me != mc.outputs_.end(); +// ++me) { +// r(me->first, me->second, rc); +// } + } +}; + +template +std::auto_ptr make_job(M m, R r) +{ + return std::auto_ptr(new generic_job()); +} + +} // namespace swk::mr3 +} // namespace swk + +#endif /* SWK_MR3_JOB_HPP_INCLUDED */ +