Skip to content

Commit

Permalink
Format
Browse files Browse the repository at this point in the history
  • Loading branch information
palikar committed Jul 14, 2020
1 parent 6aaaf9b commit 557530d
Show file tree
Hide file tree
Showing 16 changed files with 106 additions and 127 deletions.
2 changes: 1 addition & 1 deletion src/alisp/include/alisp/alisp/async/action.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct async_action
{
}

ALObjectPtr future(async::AsyncS*)
ALObjectPtr future(async::AsyncS *)
{
if (pfunction(g_callback))
{
Expand Down
20 changes: 8 additions & 12 deletions src/alisp/include/alisp/alisp/async/asyncs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,12 @@ struct Timer
{
using time_point = std::chrono::time_point<std::chrono::high_resolution_clock>;

static time_point now()
{
return std::chrono::high_resolution_clock::now();
}

static time_point now() { return std::chrono::high_resolution_clock::now(); }

time_point time;
ALObjectPtr callback;
al_callback internal_callback{};
ALObjectPtr periodic{Qnil};
ALObjectPtr periodic{ Qnil };
};

class AsyncS
Expand All @@ -87,13 +84,13 @@ class AsyncS

std::queue<event_type> m_event_queue;
mutable std::mutex event_queue_mutex;

std::queue<callback_type> m_callback_queue;
mutable std::mutex callback_queue_mutex;

std::vector<action_type> m_actions_queue;
mutable std::mutex action_queue_mutex;

std::atomic_uint32_t m_flags;
std::atomic_int m_asyncs{ 0 };

Expand All @@ -104,7 +101,7 @@ class AsyncS
thread_pool::ThreadPool m_thread_pool;

#ifndef MULTI_THREAD_EVENT_LOOP
std::thread m_event_loop;
std::thread m_event_loop;
mutable std::mutex event_loop_mutex;
mutable std::condition_variable event_loop_cv;
void event_loop();
Expand All @@ -116,7 +113,6 @@ class AsyncS
#endif



void execute_event(event_type call);

void execute_callback(callback_type call);
Expand All @@ -142,8 +138,8 @@ class AsyncS
void submit_future(uint32_t t_id, ALObjectPtr t_value, bool t_good = true);

void submit_timer(Timer::time_point time, ALObjectPtr function, ALObjectPtr periodic, al_callback internal = {});


void async_pending();

void async_reset_pending();
Expand Down
4 changes: 2 additions & 2 deletions src/alisp/include/alisp/alisp/async/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ struct ActionObject

std::atomic<bool> valid;
std::atomic<bool> executing;

template<class T> ActionObject(T t) { ptr_ = std::make_unique<WrappingCallback<T>>(std::move(t)); }

ALObjectPtr operator()(AsyncS *async) const { return ptr_->call(async); }
Expand All @@ -84,4 +84,4 @@ struct CallbackObject

} // namespace detail

}
} // namespace alisp::async
17 changes: 3 additions & 14 deletions src/alisp/include/alisp/alisp/async/fs_notify.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,16 @@ namespace alisp::async
{



struct FSNotify
{
ALObjectPtr callback;



bool init()
{

}


ALObjectPtr operator()(AsyncS *async)
{
bool init() {}


return false;
}

ALObjectPtr operator()(AsyncS *async) { return false; }
};


}
} // namespace alisp::async
5 changes: 1 addition & 4 deletions src/alisp/include/alisp/alisp/async/future.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ struct Future
// c++ space things
std::function<void(ALObjectPtr)> internal{};

uint32_t next_in_line{0};
uint32_t next_in_line{ 0 };

inline static std::mutex future_mutex{};

Expand All @@ -74,15 +74,12 @@ struct Future
static ALObjectPtr future_resolved(uint32_t t_id);

static void merge(uint32_t t_next, uint32_t t_current);

};


inline management::Registry<Future, 0x05> future_registry{};




} // namespace async

} // namespace alisp
41 changes: 17 additions & 24 deletions src/alisp/include/alisp/alisp/async/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,44 +34,40 @@
#include <functional>
#include <condition_variable>

namespace alisp::async::thread_pool {
namespace alisp::async::thread_pool
{


class Semaphore {
class Semaphore
{
public:
Semaphore(std::uint32_t value);
Semaphore(const Semaphore&) = delete;
const Semaphore& operator=(const Semaphore&) = delete;
~Semaphore() = default;
Semaphore(const Semaphore &) = delete;
const Semaphore &operator=(const Semaphore &) = delete;
~Semaphore() = default;

std::uint32_t value() const {
return value_;
}
std::uint32_t value() const { return value_; }

void wait();
void post();

private:

std::mutex mutex_;
std::condition_variable condition_;
std::uint32_t value_;
};

class ThreadPool {
class ThreadPool
{
public:

ThreadPool(std::uint32_t num_threads);
ThreadPool(const ThreadPool&) = delete;
const ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(const ThreadPool &) = delete;
const ThreadPool &operator=(const ThreadPool &) = delete;
~ThreadPool();

size_t num_threads() const {
return threads_.size();
}
size_t num_threads() const { return threads_.size(); }

template<typename T, typename... Ts>
void submit(T&& routine, Ts&&... params)
template<typename T, typename... Ts> void submit(T &&routine, Ts &&... params)
{

auto task = [func = std::forward<T>(routine), args = std::make_tuple(std::forward<Ts>(params)...)]() {
Expand All @@ -84,13 +80,10 @@ class ThreadPool {

queue_sem_.post();
active_sem_.post();

}

private:


static void worker_thread(ThreadPool* thread_pool);
private:
static void worker_thread(ThreadPool *thread_pool);

std::vector<std::thread> threads_;

Expand All @@ -102,4 +95,4 @@ class ThreadPool {
std::atomic<bool> terminate_;
};

}
} // namespace alisp::async::thread_pool
52 changes: 21 additions & 31 deletions src/alisp/src/async/asyncs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@
#include <chrono>



namespace alisp::async
{

AsyncS::AsyncS(eval::Evaluator *t_eval, bool defer_init) : m_eval(t_eval), m_flags(0), m_thread_pool{2}
AsyncS::AsyncS(eval::Evaluator *t_eval, bool defer_init) : m_eval(t_eval), m_flags(0), m_thread_pool{ 2 }
{
AL_BIT_OFF(m_flags, INIT_FLAG);

Expand Down Expand Up @@ -70,7 +69,7 @@ void AsyncS::check_exit_condition()
return;
}

if (! m_event_queue.empty())
if (!m_event_queue.empty())
{
return;
}
Expand All @@ -80,12 +79,12 @@ void AsyncS::check_exit_condition()
return;
}

if( !m_eval->is_interactive())
if (!m_eval->is_interactive())
{
return;
}
if(AL_BIT_CHECK(m_flags, AWAIT_FLAG))

if (AL_BIT_CHECK(m_flags, AWAIT_FLAG))
{
return;
}
Expand All @@ -104,17 +103,16 @@ void AsyncS::check_exit_condition()
AL_BIT_OFF(m_flags, RUNNING_FLAG);
m_eval->reset_async_flag();
m_eval->callback_cv.notify_all();

}

void AsyncS::handle_timers()
{
std::lock_guard<std::mutex> guard{ timers_mutex };
auto it = m_timers.begin();
while(it != m_timers.end())
while (it != m_timers.end())
{

if(it->time < m_now)
if (it->time < m_now)
{
submit_callback(it->callback, nullptr, it->internal_callback);

Expand All @@ -123,34 +121,30 @@ void AsyncS::handle_timers()
it = m_timers.erase(it);
continue;
}

}

++it;
}
}

void AsyncS::handle_actions()
{

auto it = m_actions_queue.begin();
while(it != m_actions_queue.end())
auto it = m_actions_queue.begin();
while (it != m_actions_queue.end())
{
if(it->valid and !it->executing)

if (it->valid and !it->executing)
{
m_thread_pool.submit([&, action = it](){

m_thread_pool.submit([&, action = it]() {
action->executing = true;
action->operator()(this);
action->executing = false;
});

}

++it;
}

}

#ifndef MULTI_THREAD_EVENT_LOOP
Expand All @@ -163,20 +157,18 @@ void AsyncS::event_loop()
AL_BIT_ON(m_flags, INIT_FLAG);
while (AL_BIT_CHECK(m_flags, RUNNING_FLAG))
{

event_loop_cv.wait_for(el_lock, 10ms);

m_now = Timer::now();

m_thread_pool.submit([&](){
handle_timers();
});
m_thread_pool.submit([&]() { handle_timers(); });

if (!AL_BIT_CHECK(m_flags, RUNNING_FLAG))
{
return;
}

while (!m_event_queue.empty())
{
execute_event(std::move(m_event_queue.front()));
Expand All @@ -199,7 +191,6 @@ void AsyncS::event_loop()
}

check_exit_condition();

}
}

Expand Down Expand Up @@ -292,7 +283,6 @@ void AsyncS::spin_loop()
}



void AsyncS::submit_event(event_type t_callback)
{

Expand Down Expand Up @@ -356,7 +346,7 @@ void AsyncS::submit_future(uint32_t t_id, ALObjectPtr t_value, bool t_good)
{
init();
}

std::lock_guard<std::mutex> lock(Future::future_mutex);

if (!future_registry.belong(t_id))
Expand All @@ -380,7 +370,7 @@ void AsyncS::submit_future(uint32_t t_id, ALObjectPtr t_value, bool t_good)

if (pint(res) and future_registry.belong(object_to_resource(res)))
{
auto other = object_to_resource(res);
auto other = object_to_resource(res);
Future::merge(other, next);
return;
}
Expand Down Expand Up @@ -409,7 +399,7 @@ void AsyncS::submit_timer(Timer::time_point time, ALObjectPtr function, ALObject
}

std::lock_guard<std::mutex> guard{ timers_mutex };
m_timers.push_back({time + m_now.time_since_epoch(), function, internal, periodic});
m_timers.push_back({ time + m_now.time_since_epoch(), function, internal, periodic });
}


Expand Down Expand Up @@ -464,4 +454,4 @@ void AsyncS::dispose()
}


}
} // namespace alisp::async

0 comments on commit 557530d

Please sign in to comment.