In [None]:
#include <vector>

template <typename T>
class scanner{
public:
	scanner(bool master){}                
public:
	void scan() {
		for(auto& element: array)// parallel for
			on_scan(element);
	}
	std::vector<T> array;
protected:	
    virtual void on_scan(T&element) = 0;
	virtual void on_save(const T&element, std::ostream&, bool scanned) = 0;
	virtual void on_load(T&element, std::istream&, bool scanned) = 0;
};

In [6]:
#include <set>
#include <time.h>

// code annotation //
#define _SHARED_STATE_
#define _STATE_CHANGER_
#define _IN_
#define _OUT_
#define _IO_
#define _CONST_

template <typename T>
class _SHARED_STATE_ scanner{
public:
	scanner(bool master):is_master(master),ready_to_compute(false){}
public:
	void scan() {
        srand(time(0));
        unsigned index = 0;
        
        if(is_master) { 
            for(auto& element: array)share_element(index++,element);
            set_ready_to_compute();
        }
        
        while(!ready_to_compute) update_state();
        
        while(get_not_scanned(index,rand())){
            on_scan(array[index]);
            put_scanned(index,array[index]);
        }
	}
	std::vector<T> array;
protected:	
    virtual void on_scan(T&element) = 0;
	virtual void on_save(T&element, std::ostream&, bool scanned) = 0;
	virtual void on_load(T&element, std::istream&, bool scanned) = 0;
private:
    void _STATE_CHANGER_ update_state() _CONST_ {}

    void _STATE_CHANGER_ set_ready_to_compute() {
        not_scanned.clear();
        for(int i=0;i<array.size();i++)not_scanned.insert(i);
        ready_to_compute = true;
    }
    void _STATE_CHANGER_ share_element(_IN_ unsigned index,_IN_ T&element){
            if(array.size()<=index) array.resize(index+1);
            array[index] = element;
    }
    bool _STATE_CHANGER_ get_not_scanned(_OUT_ unsigned& index,_IN_ int  random) _CONST_ {
        if(not_scanned.size()==0) return false;
        int selected = random % not_scanned.size();
        auto it=not_scanned.begin(); for(int i=0; i!=selected; i++,it++){} index=*it;
        return true;
    }
    void _STATE_CHANGER_ put_scanned(_IN_ unsigned index,_IN_ T&element){
        if(not_scanned.find(index)!=not_scanned.end()){//if(not_scanned.contains(index))
            array[index] = element; not_scanned.erase(index);
        }
    }
private:
    std::set<unsigned> not_scanned;
    bool is_master;
    bool ready_to_compute;
};

In [3]:
#include <vector>
#include <string>
#include <mutex>

#include <iostream>

class write_ahead_log{
friend class shared_state;
public:
    void write(unsigned tag, const std::string& blob){
        std::unique_lock lock(mut);
        log.push_back(std::pair(tag,blob));
    }
    bool read(unsigned index, unsigned& tag, std::string& blob){
        std::unique_lock lock(mut);
        if(index<log.size()){tag=log[index].first; blob=log[index].second; return true;}
        return false;
    }
private:
    std::vector<std::pair<unsigned,std::string>> log;
    std::mutex mut;
};

In [4]:
#include <vector>
#include <sstream>

class shared_state{
public:
    shared_state(write_ahead_log&_wal):wal(_wal),wal_index(0){}
public:
    class state_changer{
    friend class shared_state;
    public:
        state_changer(shared_state& s):state(s)
            {tag=s.changers.size();s.changers.push_back(this);}
        void change(){
            std::ostringstream out;
            on_save(out); state.wal.write(tag,out.str());
            state.update();
        }
    protected:
        virtual void on_save(std::ostream&) = 0;
        virtual void on_load(std::istream&) = 0;
    private:
        unsigned tag;
        shared_state& state;
    };    
public:
    void update(){
        unsigned tag; std::string blob;
        for(;wal.read(wal_index,tag,blob);wal_index++){
            state_changer& changer=*changers[tag];
            std::istringstream in(blob); changer.on_load(in);
        }
    }
private:
    write_ahead_log& wal;
    unsigned wal_index;
    std::vector<state_changer*> changers;
};


In [5]:
#include <set>
#include <time.h>

template <typename T>
class scanner: public shared_state{
public:
	scanner(bool master,write_ahead_log&_wal):
        shared_state(_wal),is_master(master),ready_to_compute(false),
        set_ready_to_compute_changer_obj(*this),
        share_element_changer_obj(*this),
        put_scanned_changer_obj(*this){}
public:
	void scan() {
        srand(time(0));
        unsigned index = 0;
        
        if(is_master) { 
            for(auto& element: array)share_element(index++,element);
            set_ready_to_compute();
        }
        
        while(!ready_to_compute) update_state();
        
        while(get_not_scanned(index,rand())){
            on_scan(array[index]);
            put_scanned(index,array[index]);
        }
	}
	std::vector<T> array;
protected:	
    virtual void on_scan(T&element) = 0;
	virtual void on_save(const T&element, std::ostream&, bool scanned) = 0;
	virtual void on_load(T&element, std::istream&, bool scanned) = 0;
private:
    //  void _STATE_CHANGER_ update_state() _CONST_
    void update_state() { update(); }
    
    // void _STATE_CHANGER_ set_ready_to_compute()
    struct set_ready_to_compute_changer : public shared_state::state_changer{
        set_ready_to_compute_changer(scanner& s) : shared_state::state_changer(s), host(s){}
        void on_save(std::ostream&) override {}
        void on_load(std::istream&) override {
            host.not_scanned.clear();
            for(int i=0;i<host.array.size();i++)host.not_scanned.insert(i);
            host.ready_to_compute = true;    
        }
        scanner& host;
    } set_ready_to_compute_changer_obj;

    void set_ready_to_compute() {
        set_ready_to_compute_changer_obj.change();
    }
    
    // void _STATE_CHANGER_ share_element(_IN_ unsigned  index,_IN_ T&element)
    struct share_element_changer : public shared_state::state_changer{
        share_element_changer(scanner& s) : shared_state::state_changer(s), host(s){}
        void on_save(std::ostream& out) override { 
            out << index << ' '; host.on_save(element,out,false);
        }
        void on_load(std::istream& in) override {
            in >> index; host.on_load(element,in,false);           
            if(host.array.size()<=index) host.array.resize(index+1);
            host.array[index] = element;
        }
        scanner& host;
        unsigned index; // IN
        T      element; // IN
    } share_element_changer_obj;

    void share_element(unsigned index,T& element){
        share_element_changer_obj.index   = index;
        share_element_changer_obj.element = element;
        share_element_changer_obj.change();
    }
    
    // bool _STATE_CHANGER_ get_not_scanned(_OUT_ unsigned&  index,_IN_ int  random) _CONST_ 
    bool get_not_scanned(unsigned& index, int random) {
        update();
        if(not_scanned.size()==0) return false;
        int selected = random % not_scanned.size();
        auto it=not_scanned.begin(); for(int i=0; i!=selected; i++,it++){} index=*it;
        return true;
    }

    // void _STATE_CHANGER_ put_scanned(_IN_ unsigned  index,_IN_ T&element)
    struct put_scanned_changer : public shared_state::state_changer{
        put_scanned_changer(scanner& s) : shared_state::state_changer(s), host(s){}
        void on_save(std::ostream& out) override {
            out << index << ' '; host.on_save(element,out,true);
        }
        void on_load(std::istream& in) override {
            in >> index; host.on_load(element,in,true);
            //if(host.not_scanned.contains(index))
            if(host.not_scanned.find(index)!=host.not_scanned.end()){
                host.not_scanned.erase(index);
                host.array[index] = element;
            }
        }
        scanner& host;
        unsigned index; // IN
        T      element; // IN
    } put_scanned_changer_obj;

    void put_scanned(unsigned index, T&element){
        put_scanned_changer_obj.index = index;
        put_scanned_changer_obj.element = element;
        put_scanned_changer_obj.change();
    }
private:
    std::set<unsigned> not_scanned;
    bool is_master;
    bool ready_to_compute;
};

In [6]:
struct sqware_type{
	int N;
	int NxN;
}; 

class calc_sqware_scanner : public  scanner<sqware_type>{
public:
	calc_sqware_scanner(bool master,write_ahead_log&wal):scanner(master,wal){}
private:
	void on_scan(sqware_type&e) override { e.NxN = e.N * e.N; }
	void on_save(const sqware_type&e, std::ostream&out, bool scanned) override {
		if(scanned) out << e.NxN; else out << e.N;
	}
	void on_load(sqware_type&e, std::istream&in, bool scanned) override {
		if(scanned) in >> e.NxN; else in >> e.N;
	}
};

In [7]:
#include <iostream>
#include <thread>
#include <atomic>

{
    std::atomic_int PID = 0;
    int NUM_THREADS = 10;
    int ARRAY_SIZE = 10;
    
    write_ahead_log wal;
    std::vector<std::thread> threads(NUM_THREADS);
    
    for(auto& t:threads)t=std::thread([&]{ int pid = PID++;
    //////////////// inside a 'process' ////////////////
        calc_sqware_scanner scanner_object(pid==0,wal);
        
        if(pid==0){
        	scanner_object.array.resize(ARRAY_SIZE);
        	for(int i=0;i<10;i++) scanner_object.array[i].N = i;
        }
                                          
        scanner_object.scan();

        if(pid==0){// .. or any other pid < NUM_THREADS 
            for(int i=0; i<ARRAY_SIZE; i++)
                std::cout << i << " * " << i  << " = " 
                    << scanner_object.array[i].NxN
                    << std::endl;
        }
    ////////////////////////////////////////////////////
    }); for(auto& t:threads) t.join();
    std::cout << "Success!" << std::endl;
}

0 * 0 = 0
1 * 1 = 1
2 * 2 = 4
3 * 3 = 9
4 * 4 = 16
5 * 5 = 25
6 * 6 = 36
7 * 7 = 49
8 * 8 = 64
9 * 9 = 81
Success!
