Skip to content

Commit

Permalink
Add comments to main modules (#38) (#39)
Browse files Browse the repository at this point in the history
* update comments on first 4 modules (alphabetically)

* update comments on all modules except opt

* fix rebase

* fix rebase 2

* comment voters

* add comments in opt

* fix missed rebase issues
  • Loading branch information
raggledodo committed Oct 4, 2019
1 parent 4a2d9ff commit ab4afa4
Show file tree
Hide file tree
Showing 56 changed files with 1,211 additions and 238 deletions.
2 changes: 1 addition & 1 deletion Doxyfile
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ WARN_LOGFILE =
# spaces. See also FILE_PATTERNS and EXTENSION_MAPPING
# Note: If this tag is empty the current directory is searched.

INPUT = README.md teq dbg eteq opt pbm tag ccur layr
INPUT = README.md ccur dbg eteq layr opt pbm tag teq

# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses
Expand Down
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,17 @@ This module marshals any TEQ graph, but requires data serialization functors whe

This module tags TEQ tensors with labels.

- [CCUR (ConCURrent session)](ccur/README_CCUR.md)

This module implements session that updates graph nodes concurrently

- [LAYR (LAYeR models)](layr/README_LAYR.md)

This module implements common machine learning models

## Tools and utility

- DBG (Debugger)
- DBG (DeBuGger)

## Building

Expand Down
19 changes: 16 additions & 3 deletions ccur/partition.hpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
///
/// partition.hpp
/// ccur
///
/// Purpose:
/// Implement the algorithm to separate all nodes under a series of graphs
/// into k groups as to minimize the size of each group while ensuring the
/// parents of every node is found under the same group
///

#include "teq/traveler.hpp"

#ifndef CCE_PARTITION_HPP
#define CCE_PARTITION_HPP
#ifndef CCUR_PARTITION_HPP
#define CCUR_PARTITION_HPP

namespace ccur
{

/// Groups of functors
using PartGroupsT = std::vector<std::vector<teq::iFunctor*>>;

/// Map functor opcode to the operation's weight value
using OpWeightT = std::unordered_map<size_t,double>;

/// Return k groups of graphs under roots given some weight
PartGroupsT k_partition (teq::TensptrsT roots, size_t k, OpWeightT weights = OpWeightT());

}

#endif // CCE_PARTITION_HPP
#endif // CCUR_PARTITION_HPP
37 changes: 30 additions & 7 deletions ccur/session.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
///
/// session.hpp
/// ccur
///
/// Purpose:
/// Implement session that runs functor updates concurrently
///

#include <atomic>

#include <boost/asio/thread_pool.hpp>
Expand All @@ -7,24 +15,33 @@

#include "ccur/partition.hpp"

#ifndef CCE_ASESS_HPP
#define CCE_ASESS_HPP
#ifndef CCUR_SESS_HPP
#define CCUR_SESS_HPP

namespace ccur
{

using LSessReqsT = std::list<std::pair<teq::iOperableFunc*,size_t>>;

/// Vector of operable functors and number of unique non-leaf children
/// Functors are ordered by dependency,
/// such that parents of any node always appears after the node in this vector
using SessReqsT = std::vector<std::pair<teq::iOperableFunc*,size_t>>;

/// Same as SessReqsT except as a list
using LSessReqsT = std::list<std::pair<teq::iOperableFunc*,size_t>>;

/// Map operable functors to the number of children updated in
/// any update/update_target call
using AtomicFulfilMapT = std::unordered_map<
teq::iOperableFunc*,std::atomic<long>>;

/// Session that updates operable functors concurrently
/// across specified a number of jobs
struct Session final : public eteq::iSession
{
Session (size_t nthreads = 2, OpWeightT weights = OpWeightT()) :
nthreads_(nthreads), weights_(weights) {}

/// Implementation of iSession
void track (teq::TensptrsT roots) override
{
tracked_.insert(roots.begin(), roots.end());
Expand Down Expand Up @@ -86,7 +103,7 @@ struct Session final : public eteq::iSession
}
}

// this function is expected to be called repeatedly during runtime
/// Implementation of iSession
void update (teq::TensSetT ignored = {}) override
{
size_t nthreads = requirements_.size();
Expand Down Expand Up @@ -169,7 +186,7 @@ struct Session final : public eteq::iSession
pool.join();
}

// this function is expected to be called repeatedly during runtime
/// Implementation of iSession
void update_target (teq::TensSetT target,
teq::TensSetT ignored = {}) override
{
Expand Down Expand Up @@ -253,6 +270,7 @@ struct Session final : public eteq::iSession
pool.join();
}

/// Apply input optimization rules using opt module, then re-track
void optimize (const opt::OptCtx& rules)
{
teq::TensptrsT tracked(tracked_.begin(), tracked_.end());
Expand All @@ -261,11 +279,16 @@ struct Session final : public eteq::iSession
track(tracked);
}

/// Set of all tensors input through tracked function
/// The set of roots of all session graphs is a possible subset
teq::TensptrSetT tracked_;

/// Map of tensor to the set of the tensor's parents
std::unordered_map<teq::iTensor*,
std::unordered_set<teq::iOperableFunc*>> parents_;

/// Vector of vectors of operable functors specific to each job
/// See SessReqsT
std::vector<SessReqsT> requirements_;

private:
Expand All @@ -278,4 +301,4 @@ struct Session final : public eteq::iSession

}

#endif // CCE_ASESS_HPP
#endif // CCUR_SESS_HPP
2 changes: 1 addition & 1 deletion ccur/src/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

#include "ccur/partition.hpp"

#ifdef CCE_PARTITION_HPP
#ifdef CCUR_PARTITION_HPP

namespace ccur
{
Expand Down
18 changes: 18 additions & 0 deletions dbg/eteq/custom_functor.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
///
/// custom_functor.hpp
/// dbg
///
/// Purpose:
/// Define custom functor version of eteq functor
///

#include "teq/iopfunc.hpp"

#include "eteq/generated/opcode.hpp"
Expand All @@ -12,15 +20,19 @@
namespace dbg
{

/// Arguments of raw data and shapes
template <typename T>
using DataMapT = std::vector<eteq::OpArg<T>>;

/// Custom functor to assign DataMap to Eigen tensor output
template <typename T>
using CustomOpF = std::function<void(eteq::TensorT<T>&,const DataMapT<T>&)>;

/// Functor that runs a custom functor instead of Eigen operators
template <typename T>
struct CustomFunctor final : public teq::iOperableFunc
{
/// Return a CustomFunctor with input function and meta arguments
static CustomFunctor<T>* get (CustomOpF<T> op, eteq::ArgsT<T> args);

CustomFunctor (const CustomFunctor<T>& other) = default;
Expand Down Expand Up @@ -124,26 +136,31 @@ struct CustomFunctor final : public teq::iOperableFunc
teq::ArgsT args_;
};

/// CustomFunctor's node wrapper
template <typename T>
struct CustomFunctorNode final : public eteq::iNode<T>
{
CustomFunctorNode (std::shared_ptr<CustomFunctor<T>> f) : func_(f) {}

/// Return deep copy of this instance (with a copied functor)
CustomFunctorNode<T>* clone (void) const
{
return static_cast<CustomFunctorNode<T>*>(clone_impl());
}

/// Implementation of iNode<T>
T* data (void) override
{
return (T*) func_->data();
}

/// Implementation of iNode<T>
void update (void) override
{
func_->update();
}

/// Implementation of iNode<T>
teq::TensptrT get_tensor (void) const override
{
return func_;
Expand Down Expand Up @@ -205,6 +222,7 @@ CustomFunctor<T>* CustomFunctor<T>::get (CustomOpF<T> op, eteq::ArgsT<T> args)
return new CustomFunctor<T>(op, shape, input_args);
}

/// Return custom functor node given custom function and arguments
template <typename T>
eteq::NodeptrT<T> make_functor (CustomOpF<T> op, eteq::ArgsT<T> args)
{
Expand Down
15 changes: 15 additions & 0 deletions dbg/grpc/client.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
///
/// client.hpp
/// dbg
///
/// Purpose:
/// Implement grpc client that create and update graphs
///

#include <chrono>

#include <grpc/grpc.h>
Expand All @@ -19,6 +27,7 @@ static const size_t max_attempts = 10;

static const size_t data_sync_interval = 50;

/// Configuration wrapper for creating the client
struct ClientConfig
{
ClientConfig (void) = default;
Expand All @@ -27,13 +36,16 @@ struct ClientConfig
std::chrono::duration<int64_t,std::milli> stream_duration) :
request_duration_(request_duration), stream_duration_(stream_duration) {}

/// Request timeout
std::chrono::duration<int64_t,std::milli> request_duration_ =
std::chrono::milliseconds(250);

/// Stream timeout
std::chrono::duration<int64_t,std::milli> stream_duration_ =
std::chrono::milliseconds(10000);
};

/// GRPC client that checks for server health and make graph creation and update calls
struct GraphEmitterClient final
{
GraphEmitterClient (std::shared_ptr<grpc::ChannelInterface> channel,
Expand Down Expand Up @@ -251,16 +263,19 @@ struct GraphEmitterClient final
}, std::move(requests), std::move(update_it));
}

/// Return true if the client is connected to the server
bool is_connected (void)
{
return connected_;
}

/// Wait until all request jobs are complete
void join (void)
{
sequential_jobs_.join();
}

/// Kill all request jobs
void clear (void)
{
sequential_jobs_.stop();
Expand Down
Loading

0 comments on commit ab4afa4

Please sign in to comment.