Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fork-join threading type does not work #10

Closed
FarzadT opened this issue Feb 4, 2016 · 8 comments
Closed

Fork-join threading type does not work #10

FarzadT opened this issue Feb 4, 2016 · 8 comments

Comments

@FarzadT
Copy link

FarzadT commented Feb 4, 2016

setting the threading type to 'fj' prints the following error:

code:

var ems = require('ems')(8, true, 'fj');

console:
node test.js
> EMS: Must declare number of nodes to use. Input:NaN

@FarzadT
Copy link
Author

FarzadT commented Feb 4, 2016

Ok looks like when the threadingType is set to fork-join, you need to specify the number threads through the command line. i.e. node test.js 8. Is this a bug?

Furthermore, my program doesn't seem to be doing anything:

var a = {
    name: 'a',
    guid: 'f1ec1bba-cb46-4802-be99-70c76db40027'
};
var b = {
    name: 'b',
    guid: 'b16b4d65-a6cd-45f5-bcb9-cdda175bf2ff'
};
var c = {
    name: 'c',
    guid: '7b04ace0-86a7-413f-85c3-1eb7e9f414c7'
};
var d = {
    name: 'd',
    guid: '36b41b0b-fea9-4ee2-8f18-95847ac074f0'
};
var e = {
    name: 'e',
    guid: '17717b9c-8281-4c16-9f74-04a27e6c1784'
};
var f = {
    name: 'f',
    guid: '5738f651-2056-44ed-87f3-8f5a273a5500'
};
var input = [a,b,c,d,e];
var ems = require('ems')(8, false, 'fj');
var maxObjects = 1000;

var sharedData = ems.new({
    dimensions: [maxObjects],
    heapSize: maxObjects * maxObjects,
    useMap: true        
});

ems.parallel(function(){
    ems.parForEach(0, 5, function(num) {
        var object = input[num];
        sharedData.writeXF(object.guid, object);
    });
    if(ems.myID !== 0) {
      process.exit();
    }
});

for(var i in input) {
    console.log(sharedData.read(input[i].guid)); // This doesn't get printed
 }

UPDATE:
It seems that the call to ems.new() is the culprit. If I do not instantiate an EMS array, the program executes until the end, otherwise it just gets pinned.

@mogill
Copy link
Owner

mogill commented Feb 5, 2016

Hi Farzad,

Due to the rules for forming closures in Javascript, everything to be executed in the parallel context must appear inside the ems.parallel function. This refactoring of your program is doing what you expected to happen:

var ems = require('ems')(process.argv[2], false, 'fj');

ems.parallel(function(){
    var a = {
        name: 'a',
        guid: 'f1ec1bba-cb46-4802-be99-70c76db40027'
    };
    var b = {
        name: 'b',
        guid: 'b16b4d65-a6cd-45f5-bcb9-cdda175bf2ff'
    };
    var c = {
        name: 'c',
        guid: '7b04ace0-86a7-413f-85c3-1eb7e9f414c7'
    };
    var d = {
        name: 'd',
        guid: '36b41b0b-fea9-4ee2-8f18-95847ac074f0'
    };
    var e = {
        name: 'e',
        guid: '17717b9c-8281-4c16-9f74-04a27e6c1784'
    };
    var f = {
        name: 'f',
        guid: '5738f651-2056-44ed-87f3-8f5a273a5500'
    };
    var input = [a,b,c,d,e];
    var maxObjects = 1000;
    var sharedData = ems.new({
        dimensions: [maxObjects],
        heapSize: maxObjects * maxObjects,
        useMap: true
    });

    ems.parForEach(0, input.length, function(num) {
        var object = input[num];
        sharedData.writeXF(object.guid, object);
    });
    if(ems.myID !== 0) {
        process.exit();
    }

    for(var i in input) {
        ems.diag(JSON.stringify(sharedData.read(input[i].guid)));
    }
});

[EDIT: I made a few mods to allow any number of processes]

Let me know if that doesn't work out in some larger context.

@FarzadT
Copy link
Author

FarzadT commented Feb 5, 2016

Thanks for the quick response!

Due to the rules for forming closures in Javascript, everything to be executed in the parallel context must appear inside the ems.parallel function

This is problematic for me. Since I have objects that are declared outside of the parallel scope and I need some work to be done on them in different processes before gathering the result and continuing with the rest of the program.

Maybe if I explain to you what I am trying to do, you may be able to give me some more information on how to achieve my goal.

Basically, I have a REST server which handles requests by doing some work before serializing the data and sending back the response to the client. Now my problem is that the serialization part takes quite some time and it is possible to parallelize it with multi-threading but unfortunately, Nodejs is single threaded.

This is when I came across your library which seems to be able to solve this issue for me however I can't seem to find a proper way of utilizing it in my program.

So is there a way to spawn processes from the main thread so that they :

  1. Have access to objects declared before they were spawned
  2. Don't have any side effects on the main program
  3. Pick up where the fork happens without running the entire program all over again:
  • What I observed is that the child processes run from the begining of the program which is where the server is setup. They then crash with a EADDRINUSE error code because there is already a server running.

Thank you in advance

@mogill
Copy link
Owner

mogill commented Feb 7, 2016

To make that possible I added the ability to pass arguments to the parallel region. I just pushed this as v1.1.0 which you can git pull or npm install. While I strive to make parallel programming transparent, there are limits to what's can be hidden: Parallel programs will, by necessity, be different from serial programs.

One difference is that global variables (including modules) can not shared between processes, so each process must instantiate it's own global variables, meaning some setup code will need to move inside an EMS parallel region.

  1. Have access to objects declared before they were spawned

You can now pass these in as arguments to the function executed as the parallel region.

  1. Don't have any side effects on the main program

Each process only interacts with other processes via EMS shared memory and arguments passed to the parallel region from the master process.

  1. Pick up where the fork happens without running the entire program all over again:

Each process is persistent between parallel regions -- a global variable declared in one parallel region is still defined in that process in later parallel regions.

I added an example called Examples/web_server.js that computes a response in parallel when a request is received. When a GET is received by the server, each process appends it's part of the results to a shared result, when all the processes are done the results are returned as response to the GET. The shared object is persistent between REST operations and repeated access to the same URL will continue to append new results.

var ems = require('ems')(parseInt(process.argv[2]), true, 'fj');
var http = require('http');
var port = 8080;
var shared_data;

/* Connect to the shared data on every task.  The map key will be
 * the URL, the value will be the concatenation of work performed
 * by all the processes.  */
ems.parallel(function () {
    shared_data = ems.new({
        dimensions: [1000],
        heapSize: [100000],
        useExisting: false,
        useMap: true,
        setFEtags: 'full',
        filename: '/tmp/EMS_shared_web_data.ems'
    });
});


// When a request arrives, each process does some work and appends the results
// to shared memory
function handleRequest(request, response) {
    // If this URL has not yet been requested, the data is undefined
    // and must be initialized.
    // Alternatively, may be initialized not here but at ems.new()
    shared_data.cas(request.url, undefined, "Response preamble.");

    // Enter a parallel region, each process does some work, and then
    // appends the result the value.
    ems.parallel(request.url, function (url) {
        // Do some work
        shared_data.faa(url, "  Work from process " + ems.myID + ".");
    });

    // Return the results, leaving them in shared memory for later updates
    response.end('Shared results from(' + request.url + "):" + shared_data.readFF(request.url));
}

// Create the Web server
http.createServer(handleRequest).listen(port, function () {
    ems.diag("Server listening on: http://localhost:" + port);
});

Let me know if this doesn't help with your application.

       -J

[EDIT: I slept on it decided the barrier should be implied by the join at the end of a parallel region. Pushed as v1.1.1]

@FarzadT
Copy link
Author

FarzadT commented Feb 8, 2016

Thanks! This helps quite a bit. However it does raise a few questions:

  1. Since the shared_data EMS array is global, how do you protect against having multiple requests on the same url from clobbering each other?
  2. How do you destroy an EMS array after using it?
  3. Is it possible to create these EMS arrays/processes when needed instead of doing it at the beginning of the program?

What I'm looking for is something like this:

var http = require('http');
var port = 8080;

// When a request arrives, each process does some work and appends the results
// to shared memory
function handleRequest(request, response) {
    // I want to fork from here and not from the beginning of the program is that possible?
    var ems = require('ems')(4, true, 'fj');

    var shared_data;
    ems.parallel(function(){
        shared_data = ems.new({
        dimensions: [1000],
        heapSize: [100000],
        useExisting: false,
        useMap: true,
        setFEtags: 'full',
        filename: '/tmp/EMS_shared_web_data.ems'
    });

    // If this URL has not yet been requested, the data is undefined
    // and must be initialized.
    // Alternatively, may be initialized not here but at ems.new()
    shared_data.cas(request.url, undefined, "Response preamble.");

    // Enter a parallel region, each process does some work, and then
    // appends the result the value.
    ems.parallel(request.url, function (url) {
        // Do some work
        shared_data.faa(url, "  Work from process " + ems.myID + ".");
        if(ems.myID !==0 ) {
          // exit the child processes because we do not need them anymore
          process.exit();
        }
    });

    // Return the results, leaving them in shared memory for later updates
    response.end('Shared results from(' + request.url + "):" + shared_data.readFF(request.url));

   // Destroy the shared object
   shared_data.destroy();
}

// Create the Web server
http.createServer(handleRequest).listen(port, function () {
    ems.diag("Server listening on: http://localhost:" + port);
});

@mogill
Copy link
Owner

mogill commented Feb 11, 2016

Thanks for taking the time to explain what you're trying to do, my apologies for taking so long to respond. I have been looking at this and tried a few things that didn't work out so I wanted to get back to you and let you know where things are.

Since the shared_data EMS array is global, how do you protect against having multiple requests on the same url from clobbering each other?

I have modified example that uses atomic fetch-and-add to generate GUIDs that can be used to distinguish different requests to the same URL. This will be in the next commit.

How do you destroy an EMS array after using it?

Presently EMS arrays always persist after the program exits, primarily because EMS was designed for persistent memory. Ephemeral EMS arrays didn't get much attention and the only two hooks to programmatically remove an EMS array are broken and commented out. There are potential parallel hazards when a new EMS array with the same name is created immediately after the first one is destroyed, so a naive implementation won't work.

In the meantime if you want to delete a key-value pair in EMS you can write undefined to the key, deleting any data that was there. In my new example, after the response is sent I write undefined to EMS to destroy the data.

Is it possible to create these EMS arrays/processes when needed instead of doing it at the beginning of the program?

I can see why encapsulating EMS entirely within a callback would help with integrating legacy code, however I'm becoming less optimistic this can be made to work in EMS 1.x.

Initializing EMS is a fairly expensive operation -- almost certainly not something you'd want to do in the middle of generating a REST response. Like other Node modules, EMS is meant to be required once per program, during initialization, and as a global variable. EMS starts all the processes during initialization and wakes processes up when there is work, the implementation of ems.parallel region is only waking up a process, not creating a new one.

Using the same process in different parallel regions is why global variables are persistent. It's worth repeating that the only variables that persist between parallel regions are global variables, not vars declared locally in the parallel region. If you don't store data in global variables, no data can leak between parallel regions so there's no reason to exit and restart an EMS process.

One last note about global variables in EMS -- parallel loops are dynamically load balanced, meaning iteration N of a parallel loop may be executed any process. Over a series of loops, a global variable instantiated by iteration N in one loop may not be present on iteration N of the next loop because it was instantiated in another process.

Over the weekend I should be able to finish up the new example demonstrating persistence, unique IDs, and maybe ephemeral EMS arrays. Thanks for your patience,

             -J

@mogill
Copy link
Owner

mogill commented Feb 16, 2016

I just pushed a few mods:

  • Added array.destroy([remove_file]), which will release ephemeral resources associated with the array and (optionally) remove the persistence file.
  • Updated the web_server.js example with a REST interface for persistence, GUIDs, allocating and destroying EMS arrays
  • Added a new example, kv_store.js which presents a RESTful EMS API for a persistent EMS array. Uses cluster module to support parallelism around a fully synchronous event loop.
  • Added some kludges to work around a bad early design decision that may have resulted in deadlocks

Initializing ems more than once per program is still looking unlikely in EMS 1.x.

Hopefully the web_server and kv_store examples should give you some ideas for how to implement your REST server, let me know if the new features are still lacking.

                     -J

@mogill
Copy link
Owner

mogill commented Apr 6, 2016

Hi Farzad,

It's been a while since any comments or changes were made on this issue, and I think the original fork-join issues eventually were resolved, so I'm closing this issue. Please do re-open it if any fork-join issues remain.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants