#include #include #include #include #include #include #include #include #include template class lock_free_queue_mpmc { private: struct node; struct counted_node_ptr { int external_count; node* ptr; }; std::atomic head; std::atomic tail; struct node_counter { unsigned internal_count:30; unsigned external_counters:2; }; struct node { std::atomic data; std::atomic count; counted_node_ptr next; node() { node_counter new_count; new_count.internal_count = 0; new_count.external_counters = 2; count.store( new_count ); next.ptr = nullptr; next.external_count = 0; data.store( nullptr ); } void release_ref() { node_counter old_counter = count.load( std::memory_order_relaxed ); node_counter new_counter; do { new_counter=old_counter; --new_counter.internal_count; } while( !count.compare_exchange_strong( old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed ) ); if ( !new_counter.internal_count && !new_counter.external_counters ) { delete this; } } }; static void increase_external_count( std::atomic& counter, counted_node_ptr& old_counter) { counted_node_ptr new_counter; do { new_counter=old_counter; ++new_counter.external_count; } while( !counter.compare_exchange_strong(old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed) ); old_counter.external_count = new_counter.external_count; } static void free_external_counter( counted_node_ptr &old_node_ptr ) { node* const ptr = old_node_ptr.ptr; int const count_increase=old_node_ptr.external_count-2; node_counter old_counter = ptr->count.load(std::memory_order_relaxed); node_counter new_counter; do { new_counter=old_counter; --new_counter.external_counters; new_counter.internal_count+=count_increase; } while( !ptr->count.compare_exchange_strong( old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed ) ); if( !new_counter.internal_count && !new_counter.external_counters ) { delete ptr; } } public: lock_free_queue_mpmc() { counted_node_ptr sDummy; node * sDummyNode = new node; sDummy.external_count = 1; sDummy.ptr = sDummyNode; tail.store( sDummy ); head.store( sDummy ); } lock_free_queue_mpmc( const lock_free_queue_mpmc& other )=delete; lock_free_queue_mpmc& operator=( const lock_free_queue_mpmc& other )=delete; ~lock_free_queue_mpmc() { while( pop() != std::unique_ptr() ); } void push(T new_value) { std::unique_ptr new_data( new T(new_value) ); counted_node_ptr new_next; new_next.ptr = new node; new_next.external_count = 1; counted_node_ptr old_tail = tail.load(); for(;;) { increase_external_count( tail, old_tail ); T* old_data = nullptr; if( old_tail.ptr->data.compare_exchange_strong( old_data, new_data.get() ) ) { old_tail.ptr->next = new_next; old_tail = tail.exchange( new_next ); free_external_counter( old_tail ); new_data.release(); break; } old_tail.ptr->release_ref(); } } std::unique_ptr pop() { counted_node_ptr old_head = head.load( std::memory_order_relaxed ); for(;;) { increase_external_count( head, old_head ); node* const ptr = old_head.ptr; if ( ptr == tail.load().ptr ) { ptr->release_ref(); return std::unique_ptr(); } if ( head.compare_exchange_strong( old_head, ptr->next ) ) { T* const res = ptr->data.exchange( nullptr ); free_external_counter( old_head ); return std::unique_ptr( res ); } ptr->release_ref(); } } }; void push(lock_free_queue_mpmc* q) { for (int i = 0; i < 1000000; ++i) { // printf("pushing %d\n", i); q->push(i); } } void pop(lock_free_queue_mpmc* q) { int i = 0; while (i < 1000000) { std::shared_ptr p = q->pop(); if (p) { // printf("poping %d\n", *p); ++i; } } } int main() { lock_free_queue_mpmc q; std::thread t1(push, &q); std::thread t2(pop, &q); std::thread t3(push, &q); std::thread t4(pop, &q); std::thread t5(push, &q); std::thread t6(pop, &q); t1.join(); t2.join(); t3.join(); t4.join(); t5.join(); t6.join(); return 0; }