New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use timeFinalized? #1347

Closed
jkm opened this Issue Dec 7, 2015 · 14 comments

Comments

Projects
None yet
3 participants
@jkm
Contributor

jkm commented Dec 7, 2015

When a HTTPServerResponse is finalized (by calling finalize()) its member m_timeFinalized is set. I want to access it through timeFinalized(). The difficulty is I don't know how. Because the response is finalized by the driver. Do I start another task via runTask? I believe I tried that which resulted in a segfault. What do I do in case of multi-threading? I need to make sure that one task is finished (finalized) before the other is run. I read somewhere about signaling between fibers. Is that the way to go?

@s-ludwig

This comment has been minimized.

Show comment
Hide comment
@s-ludwig

s-ludwig Dec 7, 2015

Member

There is currently no hook for that specific place (probably applies to #1327). However, as long as -version=VibeManualMemoryManagement is not set, the task approach should work (have to admit that I didn't try, if that fails indeed, I'll look into it):

void handleRequest(HTTPServerRequest req, HTTPServerResponse res)
{
    // ...
    auto t = Task.getThis();
    runTask({
        t.join(); // wait for the response to finish
        logInfo("%s", res.timeFinalized);
    });
}

In case of multi-threaded request processing (HTTPServerOption.distribute), that should work as well, because the new task will always run in the same thread as the request task.

Member

s-ludwig commented Dec 7, 2015

There is currently no hook for that specific place (probably applies to #1327). However, as long as -version=VibeManualMemoryManagement is not set, the task approach should work (have to admit that I didn't try, if that fails indeed, I'll look into it):

void handleRequest(HTTPServerRequest req, HTTPServerResponse res)
{
    // ...
    auto t = Task.getThis();
    runTask({
        t.join(); // wait for the response to finish
        logInfo("%s", res.timeFinalized);
    });
}

In case of multi-threaded request processing (HTTPServerOption.distribute), that should work as well, because the new task will always run in the same thread as the request task.

@jkm

This comment has been minimized.

Show comment
Hide comment
@jkm

jkm Dec 7, 2015

Contributor

I'm exploring this. Currently it seems that the HTTPServerResponse gets
corrupted. I get a segfault in Object's invariant. I'm still on 0.7.25 but
0.7.26 fails also.

import vibe.http.server;

void someFunction(HTTPServerRequest req, HTTPServerResponse res)
{
    import vibe.core.core;
    auto t = Task.getThis();
    import vibe.core.log;
    logInfo("%s", req.clientAddress.family);
    assert(req.clientAddress.family != 0);
    res.writeBody("hello");
    runTask(
    {
        t.join(); // wait for the response to finish
        logInfo("%s", res.timeFinalized);
    });
}

int main(string[] args)
{
    import vibe.core.log : setLogLevel, LogLevel;
    setLogLevel(LogLevel.trace);

    import vibe.http.server : HTTPServerSettings;
    auto settings = new HTTPServerSettings;
    settings.port = 8080;

    import vibe.http.router;
    auto router = new URLRouter;
    router.get("/some", &someFunction);

    listenHTTP(settings, router);

    import vibe.core.log;
    logInfo("Running event loop ...");
    import vibe.core.core;
    runEventLoop();

    return 0;
}

$ dub run and then access http://localhost:8080/some. Count slowly until ten. By then the server
should have crashed.

Contributor

jkm commented Dec 7, 2015

I'm exploring this. Currently it seems that the HTTPServerResponse gets
corrupted. I get a segfault in Object's invariant. I'm still on 0.7.25 but
0.7.26 fails also.

import vibe.http.server;

void someFunction(HTTPServerRequest req, HTTPServerResponse res)
{
    import vibe.core.core;
    auto t = Task.getThis();
    import vibe.core.log;
    logInfo("%s", req.clientAddress.family);
    assert(req.clientAddress.family != 0);
    res.writeBody("hello");
    runTask(
    {
        t.join(); // wait for the response to finish
        logInfo("%s", res.timeFinalized);
    });
}

int main(string[] args)
{
    import vibe.core.log : setLogLevel, LogLevel;
    setLogLevel(LogLevel.trace);

    import vibe.http.server : HTTPServerSettings;
    auto settings = new HTTPServerSettings;
    settings.port = 8080;

    import vibe.http.router;
    auto router = new URLRouter;
    router.get("/some", &someFunction);

    listenHTTP(settings, router);

    import vibe.core.log;
    logInfo("Running event loop ...");
    import vibe.core.core;
    runEventLoop();

    return 0;
}

$ dub run and then access http://localhost:8080/some. Count slowly until ten. By then the server
should have crashed.

@etcimon

This comment has been minimized.

Show comment
Hide comment
@etcimon

etcimon Dec 8, 2015

Contributor

The HTTPServerResponse object allocated in a FreeListRef, which is backed by a FreeListObjectAlloc which is backed by the manualAllocator. In other words, before the tasks joins the HTTPServerResponse's memory block will be recycled in freelists, and there's currently no way of hooking to finalize events before this happens.

On the other hand, if you use writeBody, your finalize time should be the current time on the next instruction, because finalize is called in writeBody, and vibe.d uses fibers that block while allowing the thread to be non-blocking to handle other fibers and I/O events. So, look at Clock.currTime() after writeBody and it will be reliable. You can also launch your next task at that moment if you want, and since there are no other blocking operations in this task, the other will start after this one.

My personal use of vibe.d doesn't involve using .distribute option or multi-threading within it. I recommend opening multiple processes of the same application (linux will round robin the incoming connections if you set the SO_REUSEADDR option) and storing your "globals" in redis. This design will also scale to multiple servers (cloud computing?) and make any possible GC contention insignificant.

Contributor

etcimon commented Dec 8, 2015

The HTTPServerResponse object allocated in a FreeListRef, which is backed by a FreeListObjectAlloc which is backed by the manualAllocator. In other words, before the tasks joins the HTTPServerResponse's memory block will be recycled in freelists, and there's currently no way of hooking to finalize events before this happens.

On the other hand, if you use writeBody, your finalize time should be the current time on the next instruction, because finalize is called in writeBody, and vibe.d uses fibers that block while allowing the thread to be non-blocking to handle other fibers and I/O events. So, look at Clock.currTime() after writeBody and it will be reliable. You can also launch your next task at that moment if you want, and since there are no other blocking operations in this task, the other will start after this one.

My personal use of vibe.d doesn't involve using .distribute option or multi-threading within it. I recommend opening multiple processes of the same application (linux will round robin the incoming connections if you set the SO_REUSEADDR option) and storing your "globals" in redis. This design will also scale to multiple servers (cloud computing?) and make any possible GC contention insignificant.

@s-ludwig

This comment has been minimized.

Show comment
Hide comment
@s-ludwig

s-ludwig Dec 8, 2015

Member

The HTTPServerResponse object allocated in a FreeListRef, which is backed by a FreeListObjectAlloc which is backed by the manualAllocator. In other words, before the tasks joins the HTTPServerResponse's memory block will be recycled in freelists, and there's currently no way of hooking to finalize events before this happens.

You are right, I assumed that we'd still have everything in a single memory pool, which is reclaimed by the GC, but the FreeListRefs eagerly destroy everything anyway. This kind of increases the priority for the scope transition. Very unfortunate that scope's semantics are just not clear yet...

The other problem is that there is keep-alive, which renders the whole point pretty useless anyway. All those tasks would be called at the same time when the connection gets closed instead of after each request.

On the other hand, if you use writeBody, your finalize time should be the current time on the next instruction, because finalize is called in writeBody, and vibe.d uses fibers that block while allowing the thread to be non-blocking to handle other fibers and I/O events. So, look at Clock.currTime() after writeBody and it will be reliable. You can also launch your next task at that moment if you want, and since there are no other blocking operations in this task, the other will start after this one.

Usually this will work, but if you have for example compression + chunked output, there is no way to send the last chunk before existing the request handler scope. I think the finalize method should simply be made public and should then ignore redundant calls to it. Alternatively there could be another wrapper stream around bodyWriter, which has a finalize method that calls the private HTTPResponse.finalize. That would use up a little more memory per request and adds another virtual call on top of every bodyWriter function call.

My personal use of vibe.d doesn't involve using .distribute option or multi-threading within it. I recommend opening multiple processes of the same application (linux will round robin the incoming connections if you set the SO_REUSEADDR option) and storing your "globals" in redis. This design will also scale to multiple servers (cloud computing?) and make any possible GC contention insignificant.

Right, a little GC use in particular will render any multi-threading attempts almost useless (Amdahl's law kicks in really soon). I usually only really use it for benchmark purposes. All actual services/apps are running single-threaded.

Makes me think if it would be a good idea to add something like a processDistribute option that uses fork to clone some worker processes.

Member

s-ludwig commented Dec 8, 2015

The HTTPServerResponse object allocated in a FreeListRef, which is backed by a FreeListObjectAlloc which is backed by the manualAllocator. In other words, before the tasks joins the HTTPServerResponse's memory block will be recycled in freelists, and there's currently no way of hooking to finalize events before this happens.

You are right, I assumed that we'd still have everything in a single memory pool, which is reclaimed by the GC, but the FreeListRefs eagerly destroy everything anyway. This kind of increases the priority for the scope transition. Very unfortunate that scope's semantics are just not clear yet...

The other problem is that there is keep-alive, which renders the whole point pretty useless anyway. All those tasks would be called at the same time when the connection gets closed instead of after each request.

On the other hand, if you use writeBody, your finalize time should be the current time on the next instruction, because finalize is called in writeBody, and vibe.d uses fibers that block while allowing the thread to be non-blocking to handle other fibers and I/O events. So, look at Clock.currTime() after writeBody and it will be reliable. You can also launch your next task at that moment if you want, and since there are no other blocking operations in this task, the other will start after this one.

Usually this will work, but if you have for example compression + chunked output, there is no way to send the last chunk before existing the request handler scope. I think the finalize method should simply be made public and should then ignore redundant calls to it. Alternatively there could be another wrapper stream around bodyWriter, which has a finalize method that calls the private HTTPResponse.finalize. That would use up a little more memory per request and adds another virtual call on top of every bodyWriter function call.

My personal use of vibe.d doesn't involve using .distribute option or multi-threading within it. I recommend opening multiple processes of the same application (linux will round robin the incoming connections if you set the SO_REUSEADDR option) and storing your "globals" in redis. This design will also scale to multiple servers (cloud computing?) and make any possible GC contention insignificant.

Right, a little GC use in particular will render any multi-threading attempts almost useless (Amdahl's law kicks in really soon). I usually only really use it for benchmark purposes. All actual services/apps are running single-threaded.

Makes me think if it would be a good idea to add something like a processDistribute option that uses fork to clone some worker processes.

s-ludwig added a commit that referenced this issue Dec 8, 2015

Make HTTPServerResponse.finalize public. Fixes #1347.
Also ignores any redundant calls to it.
@s-ludwig

This comment has been minimized.

Show comment
Hide comment
@s-ludwig

s-ludwig Dec 8, 2015

Member

Now works like this:

res.writeBody("hello");
res.finalize();
logInfo("%s", res.timeFinalized);
Member

s-ludwig commented Dec 8, 2015

Now works like this:

res.writeBody("hello");
res.finalize();
logInfo("%s", res.timeFinalized);

@s-ludwig s-ludwig closed this in 77284b9 Dec 8, 2015

@jkm

This comment has been minimized.

Show comment
Hide comment
@jkm

jkm Dec 8, 2015

Contributor

I see. That should work. I'll try.

Where do I pass SO_REUSEADDR to listenHTTP? I see no way to specify that.
processDistribute sounds even better. I didn't know that letting Linux do round robin is the way to go. So having a function doing that is better. Can't you just fork after creating the listening socket? Do you need SO_REUSEADDR for that?
BTW since Linux 3.9 there is SO_REUSEPORT which should simplify things.

In my case I want to have two threads (and two kinds of tasks). The first kind handles web requests and the second kind handles background jobs. The first one should not by handled by the same thread as the second one because I'll guess that slows down the first one. How would you achieve such a setup?

Contributor

jkm commented Dec 8, 2015

I see. That should work. I'll try.

Where do I pass SO_REUSEADDR to listenHTTP? I see no way to specify that.
processDistribute sounds even better. I didn't know that letting Linux do round robin is the way to go. So having a function doing that is better. Can't you just fork after creating the listening socket? Do you need SO_REUSEADDR for that?
BTW since Linux 3.9 there is SO_REUSEPORT which should simplify things.

In my case I want to have two threads (and two kinds of tasks). The first kind handles web requests and the second kind handles background jobs. The first one should not by handled by the same thread as the second one because I'll guess that slows down the first one. How would you achieve such a setup?

@s-ludwig

This comment has been minimized.

Show comment
Hide comment
@s-ludwig

s-ludwig Dec 8, 2015

Member

Fork after listen should in theory work, it would reuse the same socket descriptor in that case, but I've never tried it. For the libevent backend, SO_REUSEADDR is used by default, and I think libasync does that same (but Etienne will know for sure).

Regarding the background jobs, runWorkerTask/runWorkerTaskH is probably the best solution (without setting HTTPServerOption.distribute, so that only the main thread handles requests).

Member

s-ludwig commented Dec 8, 2015

Fork after listen should in theory work, it would reuse the same socket descriptor in that case, but I've never tried it. For the libevent backend, SO_REUSEADDR is used by default, and I think libasync does that same (but Etienne will know for sure).

Regarding the background jobs, runWorkerTask/runWorkerTaskH is probably the best solution (without setting HTTPServerOption.distribute, so that only the main thread handles requests).

@etcimon

This comment has been minimized.

Show comment
Hide comment
@etcimon

etcimon Dec 8, 2015

Contributor

I think libasync does that same (but Etienne will know for sure).

Yes, this is default in libasync as well.

The first one should not by handled by the same thread as the second one because I'll guess that slows down the first one.

If your background jobs are very intensive, might as well run them in a new process as well, it could even use std.process. Linux has some good mechanisms to schedule the CPU time on these background jobs. I think Sönke had a Task waiter for that?

I would use runWorkerTask for these background jobs if they are in static/dynamic libraries.

Contributor

etcimon commented Dec 8, 2015

I think libasync does that same (but Etienne will know for sure).

Yes, this is default in libasync as well.

The first one should not by handled by the same thread as the second one because I'll guess that slows down the first one.

If your background jobs are very intensive, might as well run them in a new process as well, it could even use std.process. Linux has some good mechanisms to schedule the CPU time on these background jobs. I think Sönke had a Task waiter for that?

I would use runWorkerTask for these background jobs if they are in static/dynamic libraries.

@jkm

This comment has been minimized.

Show comment
Hide comment
@jkm

jkm Dec 8, 2015

Contributor

Thanks. That works. Though runWorkerTask is kind of restricted. How would you pass a HTTPServerRequest or a HTTPServerResponse? I tried std.typecons.scoped. I understand that I need to make a copy somehow. Seems no way to avoid the copy because the request and response are destroyed. I'm copying now all the data I need and pass them to runWorkerTask.

On a related note: Any plans to give some fine grained control over the worker threads? Currently it creates core.cpuid.threadsPerCPU many in addition to the main thread. I'm thinking about passing an array of Threads instead. So the user has more control. Maybe that's not needed. I will think about it. This mixing of threads and fork may be no good.

Contributor

jkm commented Dec 8, 2015

Thanks. That works. Though runWorkerTask is kind of restricted. How would you pass a HTTPServerRequest or a HTTPServerResponse? I tried std.typecons.scoped. I understand that I need to make a copy somehow. Seems no way to avoid the copy because the request and response are destroyed. I'm copying now all the data I need and pass them to runWorkerTask.

On a related note: Any plans to give some fine grained control over the worker threads? Currently it creates core.cpuid.threadsPerCPU many in addition to the main thread. I'm thinking about passing an array of Threads instead. So the user has more control. Maybe that's not needed. I will think about it. This mixing of threads and fork may be no good.

@s-ludwig

This comment has been minimized.

Show comment
Hide comment
@s-ludwig

s-ludwig Dec 9, 2015

Member

Usually you would extract the parts of the req/res that the worker task actually needs to avoid a complete copy. Also important to note is that using req.bodyReader or res.bodyWriter (or res.writeBody etc.) is illegal from another thread, because the I/O objects are bound to the thread in which they were created. Having said that, if you really need to have direct access to req/res, you can do this:

void handleRequest(HTTPServerRequest req, HTTPServerResponse res)
{
    import vibe.core.concurrency : send, receiveOnly;

    auto t = runWorkerTaskH((sreq, sres) {
        scope (exit) owner.send(true);
        auto req = cast(HTTPServerRequest)sreq;
        auto res = cast(HTTPServerResponse)sres;
        // ...
    }, cast(shared)req, cast(shared)res);

    // wait for task exit
    t.receiveOnly!bool();

    // write response...
}

Joining tasks across threads isn't implemented, yet. Otherwise this would work, too:

void handleRequest(HTTPServerRequest req, HTTPServerResponse res)
{
    auto t = runWorkerTaskH((sreq, sres) {
        auto req = cast(HTTPServerRequest)sreq;
        auto res = cast(HTTPServerResponse)sres;
        // ...
    }, cast(shared)req, cast(shared)res);

    t.join();

    // write response...
}

The downside is that the request handler will have to block for the length of the background operation. If the response should instead be sent earlier, you have no choice but to copy the important bits in the request handler.

Member

s-ludwig commented Dec 9, 2015

Usually you would extract the parts of the req/res that the worker task actually needs to avoid a complete copy. Also important to note is that using req.bodyReader or res.bodyWriter (or res.writeBody etc.) is illegal from another thread, because the I/O objects are bound to the thread in which they were created. Having said that, if you really need to have direct access to req/res, you can do this:

void handleRequest(HTTPServerRequest req, HTTPServerResponse res)
{
    import vibe.core.concurrency : send, receiveOnly;

    auto t = runWorkerTaskH((sreq, sres) {
        scope (exit) owner.send(true);
        auto req = cast(HTTPServerRequest)sreq;
        auto res = cast(HTTPServerResponse)sres;
        // ...
    }, cast(shared)req, cast(shared)res);

    // wait for task exit
    t.receiveOnly!bool();

    // write response...
}

Joining tasks across threads isn't implemented, yet. Otherwise this would work, too:

void handleRequest(HTTPServerRequest req, HTTPServerResponse res)
{
    auto t = runWorkerTaskH((sreq, sres) {
        auto req = cast(HTTPServerRequest)sreq;
        auto res = cast(HTTPServerResponse)sres;
        // ...
    }, cast(shared)req, cast(shared)res);

    t.join();

    // write response...
}

The downside is that the request handler will have to block for the length of the background operation. If the response should instead be sent earlier, you have no choice but to copy the important bits in the request handler.

@jkm

This comment has been minimized.

Show comment
Hide comment
@jkm

jkm Dec 9, 2015

Contributor

I went with copying for now.

Contributor

jkm commented Dec 9, 2015

I went with copying for now.

@jkm

This comment has been minimized.

Show comment
Hide comment
@jkm

jkm Dec 10, 2015

Contributor

Probably I'm doing it wrong. But you seemed to suggest that I keep vibe single threaded and fork off processes and let linux round robin the connections. So like

// setting up router and settings

listenHTTP(settings, router);

// forking a child here

runEventLoop();

The thing is that the forked process dies with an libevent assertion:

[err] evmap.c:401: Assertion ctx failed in evmap_io_active

So what's the trick?

Contributor

jkm commented Dec 10, 2015

Probably I'm doing it wrong. But you seemed to suggest that I keep vibe single threaded and fork off processes and let linux round robin the connections. So like

// setting up router and settings

listenHTTP(settings, router);

// forking a child here

runEventLoop();

The thing is that the forked process dies with an libevent assertion:

[err] evmap.c:401: Assertion ctx failed in evmap_io_active

So what's the trick?

@s-ludwig

This comment has been minimized.

Show comment
Hide comment
@s-ludwig

s-ludwig Dec 11, 2015

Member

According to the libevent docs, this may require a call to event_reinit. Try: import deimos.event2.event; event_init((cast(LibeventEventDriver)getEventDriver).eventLoop);

Looks like forking will have to be a feature of the EventDriver.

Member

s-ludwig commented Dec 11, 2015

According to the libevent docs, this may require a call to event_reinit. Try: import deimos.event2.event; event_init((cast(LibeventEventDriver)getEventDriver).eventLoop);

Looks like forking will have to be a feature of the EventDriver.

@jkm

This comment has been minimized.

Show comment
Hide comment
@jkm

jkm Dec 11, 2015

Contributor

Many thanks. That works. Fixed some typos in your suggested code. The child process(es) execute now.

import deimos.event2.event : event_reinit;
import vibe.core.drivers.libevent2 : Libevent2Driver;
import vibe.core.core : getEventDriver;
event_reinit((cast(Libevent2Driver)getEventDriver).eventLoop);

Then I see that a request may be handled by a different process.

You may want to add a function reinit to the EventDriver interface.

One thing though is strange. The forked child does not have vibe signal handling. I need to kill the child processes manually via SIGKILL.

Contributor

jkm commented Dec 11, 2015

Many thanks. That works. Fixed some typos in your suggested code. The child process(es) execute now.

import deimos.event2.event : event_reinit;
import vibe.core.drivers.libevent2 : Libevent2Driver;
import vibe.core.core : getEventDriver;
event_reinit((cast(Libevent2Driver)getEventDriver).eventLoop);

Then I see that a request may be handled by a different process.

You may want to add a function reinit to the EventDriver interface.

One thing though is strange. The forked child does not have vibe signal handling. I need to kill the child processes manually via SIGKILL.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment