Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Newer
Older
100644 153 lines (123 sloc) 4.928 kb
b7aa815 @kmx initial version
authored
1 #!/usr/bin/env perl
2
3 use threads;
4 use threads::shared;
5
85bd13a @kmx better load balancing + improving performance
authored
6 use 5.010;
7 use strict;
b7aa815 @kmx initial version
authored
8 use warnings;
9
10 use Socket qw(IPPROTO_TCP TCP_NODELAY);
11 use IO::Select;
12 use IO::Socket::INET;
13 use Thread::Queue::Duplex;
14 use Time::HiRes 'time';
15 use Time::Piece 'localtime';
16 use Getopt::Long;
17 use IO::Socket::SSL; # to be thread-safe has to loaded in the main thread
18
19 ### global variables
20
21 my $addr = 'localhost';
22 my $port = 3000;
23 my $workers = 4;
24
25 ### functions
26
27 sub thread_dispatcher {
28 my ($TQ_handles, $TQ_logs) = @_;
29
85bd13a @kmx better load balancing + improving performance
authored
30 $TQ_handles->[$_]->wait_for_listener() for (0..$workers-1);
31 $TQ_logs->wait_for_listener();
b7aa815 @kmx initial version
authored
32
33 my $listener = IO::Socket::INET->new(
85bd13a @kmx better load balancing + improving performance
authored
34 LocalAddr =>$addr,
35 LocalPort =>$port,
36 Proto =>'tcp',
37 Listen =>SOMAXCONN,
38 ReuseAddr =>1,
39 Blocking => 0,
40 Type => SOCK_STREAM,
41 ) or die "FATAL: Can't start listening: $!";
42
43 my $read_selector = IO::Select->new();
b7aa815 @kmx initial version
authored
44 $read_selector->add($listener);
45
85bd13a @kmx better load balancing + improving performance
authored
46 my $accept_count = 0;
47 my %clients;
b7aa815 @kmx initial version
authored
48
49 $TQ_logs->enqueue(threads->tid, "Info", "entering dispatcher loop, listening at '$addr:$port'");
50
85bd13a @kmx better load balancing + improving performance
authored
51 while (1) {
52 my @a = $read_selector->can_read(0.25);
53 for my $s (@a) {
b7aa815 @kmx initial version
authored
54 my $fd = $s->accept();
55 next unless $fd;
85bd13a @kmx better load balancing + improving performance
authored
56 my $w = $accept_count++ % $workers;
57 my $id = $TQ_handles->[$w]->enqueue('newfd', $fd->fileno);
58 $clients{"$w.$id"} = $fd;
b7aa815 @kmx initial version
authored
59 }
85bd13a @kmx better load balancing + improving performance
authored
60 for my $w (0..$workers-1) {
61 my @i = $TQ_handles->[$w]->available;
62 next unless defined $i[0]; #if none available: @i = (undef);
63 for my $id (@i) {
64 $TQ_handles->[$w]->dequeue_response($id);
65 delete $clients{"$w.$id"}; # will be destroyed&closed automatically
66 }
b7aa815 @kmx initial version
authored
67 }
68 }
85bd13a @kmx better load balancing + improving performance
authored
69
b7aa815 @kmx initial version
authored
70 }
71
72 sub thread_logger {
73 my ($TQ_logs) = @_;
74 $TQ_logs->listen();
75 while (1) {
76 my ($id, $tid, $type, $msg) = @{$TQ_logs->dequeue()};
77 $msg =~ s/[\n\r]*$//;
78 print STDERR sprintf("#ID=%03d:TID=%03d# %s: %s\n", $id, $tid, $type, $msg);
79 }
80 }
81
82 sub thread_worker {
83 my ($wid, $TQ_handles, $TQ_logs, $mojo_app_path) = @_;
84 my $tid = threads->tid();
85
86 $TQ_logs->wait_for_listener();
87 $TQ_handles->listen();
88
89 $ENV{MOJO_REACTOR} = 'Mojo::Reactor::Poll'; # prevent Mojo::Reactor::EV as it is not thread-safe
90 require Mojo::Server::Daemon;
acaa404 @kmx force M::S::Daemon not to listen
authored
91 my $mojosrv = Mojo::Server::Daemon->new(listen =>[]); # do not listen
b7aa815 @kmx initial version
authored
92 $mojosrv->load_app($mojo_app_path);
12eb8b9 @kmx force Mojo::Server::Daemon not to listen
authored
93
85bd13a @kmx better load balancing + improving performance
authored
94 $mojosrv->ioloop->recurring(0.02 =>
b7aa815 @kmx initial version
authored
95 sub {
96 while (my $dq = $TQ_handles->dequeue_nb) { # NON-BLOCKING get socket from TQ_handles queue
85bd13a @kmx better load balancing + improving performance
authored
97 my ($id, $cmd, $fn) = @$dq;
b7aa815 @kmx initial version
authored
98 my $socket = IO::Socket::INET->new_from_fd($fn, '+>');
85bd13a @kmx better load balancing + improving performance
authored
99 $TQ_handles->respond($id, 'handle taken'); # tell dispatcher that we took the socket
b7aa815 @kmx initial version
authored
100 if ($socket) {
101 $socket->blocking(0);
102 setsockopt($socket, IPPROTO_TCP, TCP_NODELAY, 1);
103
104 #XXX-HACK-BEGIN: this was ripped with help of sri from Mojo::Server::Daemon::_listen
105 my $id = $mojosrv->ioloop->stream(my $stream = Mojo::IOLoop::Stream->new($socket));
106 # Add new connection
107 my $c = $mojosrv->{connections}{$id} = { tls=>{} };
108 $stream->timeout($mojosrv->inactivity_timeout);
109 # Events
110 $stream->on(close =>sub { $mojosrv->_close($id) });
111 $stream->on(error =>sub { return unless $mojosrv; $mojosrv->app->log->error(pop); $mojosrv->_close($id)});
112 $stream->on(read =>sub { $mojosrv->_read($id => pop) });
113 $stream->on(timeout=>sub { $mojosrv->app->log->debug('Inactivity timeout.') if $c->{tx} });
114 #XXX-HACK-END:
115 }
116 }
117 }
118 );
119
120 $TQ_logs->enqueue(threads->tid, "Info", "worker[$wid] entering mojo loop");
121 $mojosrv->run;
122 }
123
124 ### main
125
126 GetOptions( 'a|address=s' => \$addr, 'p|port=n' => \$port );
127 die <<"EOF" if !(my $mojo_app_path = shift);
128 usage: $0 [OPTIONS] [APPLICATION]
129
130 metyl script/myapp
131 metyl myapp.pl
132 metyl -a 127.0.0.1 -p 80 myapp.pl
133
134 These options are available:
135 -a, --address <ip> Set IP adress you want to listen on (default: localhost)
b7139fb @kmx cosmetics
authored
136 -p, --port <num> Set port number you want to listen on (default: 3000)
b7aa815 @kmx initial version
authored
137 -w, --workers <num> Set number of worker threads (default: 4)
138 EOF
139
85bd13a @kmx better load balancing + improving performance
authored
140 my @TQ_handles;
141 my $TQ_logs = Thread::Queue::Duplex->new(ListenerRequired => 1, MaxPending => 100);
b7aa815 @kmx initial version
authored
142 my $TH_logger = threads->new(\&thread_logger, $TQ_logs);
143
85bd13a @kmx better load balancing + improving performance
authored
144 for my $w (0..$workers-1) {
145 $TQ_handles[$w] = Thread::Queue::Duplex->new(ListenerRequired => 1, MaxPending => 1000);
146 threads->new(\&thread_worker, $w, $TQ_handles[$w], $TQ_logs, $mojo_app_path)->detach;
147 }
148
149 my $TH_dispatcher = threads->new(\&thread_dispatcher, \@TQ_handles, $TQ_logs);
b7aa815 @kmx initial version
authored
150 $TH_dispatcher->join;
85bd13a @kmx better load balancing + improving performance
authored
151
b7aa815 @kmx initial version
authored
152 die "FATAL: We should never reach this point!";
Something went wrong with that request. Please try again.