Skip to content
Browse files

first version

  • Loading branch information...
0 parents commit 5d13857679a32f6d9582833b771dc291e6a13e69 Tyler Riddle committed
Showing with 407 additions and 0 deletions.
  1. +8 −0 .gitignore
  2. +57 −0 Build
  3. +9 −0 MANIFEST
  4. +40 −0 example.pl
  5. +179 −0 lib/Acme/Threads.pm
  6. +104 −0 lib/Acme/Threads/Proxy.pm
  7. +10 −0 t/00-load.t
8 .gitignore
@@ -0,0 +1,8 @@
+/_build
+/blib
+/perl
+/MYMETA.yml
+/setup.sh
+/Build.PL
+/.project
+/.includepath
57 Build
@@ -0,0 +1,57 @@
+#! /Users/tyler/work/eclipse-workspace/VoiceConnectionEngine/perl/bin/perl
+
+use strict;
+use Cwd;
+use File::Basename;
+use File::Spec;
+
+sub magic_number_matches {
+ return 0 unless -e '_build/magicnum';
+ local *FH;
+ open FH, '_build/magicnum' or return 0;
+ my $filenum = <FH>;
+ close FH;
+ return $filenum == 519220;
+}
+
+my $progname;
+my $orig_dir;
+BEGIN {
+ $^W = 1; # Use warnings
+ $progname = basename($0);
+ $orig_dir = Cwd::cwd();
+ my $base_dir = '/Users/tyler/work/eclipse-workspace/Acme-Threads';
+ if (!magic_number_matches()) {
+ unless (chdir($base_dir)) {
+ die ("Couldn't chdir($base_dir), aborting\n");
+ }
+ unless (magic_number_matches()) {
+ die ("Configuration seems to be out of date, please re-run 'perl Build.PL' again.\n");
+ }
+ }
+ unshift @INC,
+ (
+ 'lib'
+ );
+}
+
+close(*DATA) unless eof(*DATA); # ensure no open handles to this script
+
+use Module::Build;
+
+# Some platforms have problems setting $^X in shebang contexts, fix it up here
+$^X = Module::Build->find_perl_interpreter;
+
+if (-e 'Build.PL' and not Module::Build->up_to_date('Build.PL', $progname)) {
+ warn "Warning: Build.PL has been altered. You may need to run 'perl Build.PL' again.\n";
+}
+
+# This should have just enough arguments to be able to bootstrap the rest.
+my $build = Module::Build->resume (
+ properties => {
+ config_dir => '_build',
+ orig_dir => $orig_dir,
+ },
+);
+
+$build->dispatch;
9 MANIFEST
@@ -0,0 +1,9 @@
+example.pl
+
+lib/Acme/Threads.pm
+lib/Acme/Threads/Proxy.pm
+
+t/00-load.t
+
+Build.PL
+MANIFEST
40 example.pl
@@ -0,0 +1,40 @@
+#currently hangs when put under much load
+
+use strict;
+use warnings;
+
+use Acme::Threads;
+use IO::File;
+use Time::HiRes qw(setitimer ITIMER_REAL);
+
+#unhangs the system, weird
+$SIG{ALRM} = sub { };
+setitimer(ITIMER_REAL, 1, 1);
+
+my $input = Acme::Threads->spawn('Input', 'new');
+
+#this leaks objects
+#while(defined(my $line = $input->fh->getline)) {
+# print $line;
+#}
+
+#but this leaks less
+while(1) {
+ my $fh = $input->fh;
+ my $line = $fh->getline;
+
+ print $line;
+}
+
+package Input;
+
+use strict;
+use warnings;
+
+sub new {
+ return bless({}, $_[0]);
+}
+
+sub fh {
+ return \*STDIN;
+}
179 lib/Acme/Threads.pm
@@ -0,0 +1,179 @@
+package Acme::Threads;
+
+our $VERSION = '0.0.0_00';
+
+use strict;
+use warnings;
+use threads;
+use threads::shared;
+use Thread::Queue;
+use Data::Dumper;
+use Time::HiRes qw (setitimer);
+
+use Acme::Threads::Proxy;
+
+our %threadIdToRPCBuffer : shared;
+my %threadHeap;
+
+sub create {
+ my ($class, $entry, @args) = @_;
+ my $queue = Thread::Queue->new;
+ my $threadIsReady : shared;
+ my ($proxy, $thread);
+
+ $thread = threads->create(sub {
+ $class->init_new_thread;
+ $class->set_rpc_buffer($queue);
+
+ {
+ lock($threadIsReady);
+ $threadIsReady = 1;
+ cond_signal($threadIsReady);
+ }
+
+ return $entry->(@args);
+ });
+
+ lock($threadIsReady);
+
+ while(1) {
+ last if defined $threadIsReady;
+ cond_wait($threadIsReady);
+ }
+
+ return $thread;
+}
+
+sub spawn {
+ my ($ourClass, $wantedClass, $constructor, @args) = @_;
+ my $thread;
+ my %stash : shared;
+
+ $thread = Acme::Threads->create(sub {
+ my $self = $wantedClass->$constructor(@args);
+
+ {
+ lock(%stash);
+
+ $stash{proxy} = Acme::Threads->proxy_for_object($self);
+ cond_signal(%stash);
+ }
+
+ #there's got to be something better to block on to hold the thread open
+ while(1) { sleep(.001); }
+ });
+
+ lock(%stash);
+
+ while(1) {
+ last if defined $stash{proxy};
+ cond_wait(%stash);
+ }
+
+ return $stash{proxy};
+}
+
+sub init_new_thread {
+ my ($class) = @_;
+
+ $SIG{HUP} = sub { $class->check_rpc_buffer };
+
+ $class->init_thread_globals;
+}
+
+sub init_thread_globals {
+ my ($class) = @_;
+
+ undef(%threadHeap);
+}
+
+sub set_rpc_buffer {
+ my ($class, $buffer) = @_;
+
+ $threadIdToRPCBuffer{threads->tid} = $buffer;
+}
+
+sub check_rpc_buffer {
+ my ($class) = @_;
+ my $queue = $threadIdToRPCBuffer{threads->tid};
+ my @items;
+
+ while(my $rpcInvocation = $queue->dequeue_nb) {
+ $class->process_rpc_request($rpcInvocation);
+ }
+}
+
+sub process_rpc_request {
+ my ($class, $request) = @_;
+
+ lock(%$request);
+
+ if (defined($request->{method})) {
+ return $class->rpc_method_invocation($request);
+ } elsif (defined($request->{release})) {
+ die "attempt to free reference that does not exist" unless delete $threadHeap{objectRefs}->{$request->{refNumber}};
+ }
+}
+
+sub rpc_method_invocation {
+ my ($class, $request) = @_;
+ my @return;
+
+ eval {
+ my $method = $request->{method};
+ my $object = $threadHeap{objectRefs}->{$request->{refNumber}};
+
+ unless(defined $object) {
+ print Dumper(\%threadHeap);
+ die "could not get original object for reference number ", $request->{refNumber};
+ }
+
+ unless (defined($request->{context})) {
+ $object->$method(@{$request->{args}});
+ } elsif ($request->{context}) {
+ @return = $object->$method(@{$request->{args}});
+ } else {
+ @return = scalar($object->$method(@{$request->{args}}));
+ }
+ };
+
+ if ($@) {
+ $request->{die} = $@;
+ } else {
+ $request->{return} = $class->proxy_unshareable(@return);
+ }
+
+ cond_signal(%$request);
+
+}
+
+sub proxy_unshareable {
+ my ($class, @returned) = @_;
+
+ for(my $i = 0; $i <= $#returned; $i++) {
+ my $refType = ref($returned[$i]);
+
+ next if $refType eq '' || $refType eq 'HASH' || $refType eq 'ARRAY' || $refType eq 'SCALAR';
+
+ $returned[$i] = $class->proxy_for_object($returned[$i]);
+ }
+
+ return shared_clone(\@returned);
+}
+
+sub proxy_for_object {
+ my ($class, $object) = @_;
+ my $refNumber = ++$threadHeap{refCounter};
+
+ $threadHeap{objectRefs}{$refNumber} = $object;
+
+ return Acme::Threads::Proxy->new($refNumber);
+}
+
+sub rpc_buffer {
+ my ($class) = @_;
+
+ return $threadIdToRPCBuffer{threads->tid};
+}
+
+1;
104 lib/Acme/Threads/Proxy.pm
@@ -0,0 +1,104 @@
+package Acme::Threads::Proxy;
+
+use strict;
+use warnings;
+use Data::Dumper;
+
+use threads::shared;
+
+our $AUTOLOAD;
+
+sub new {
+ my ($class, $refNumber) = @_;
+ my $self = shared_clone({});
+
+ $self->{threadId} = threads->tid;
+ $self->{rpcBuffer} = Acme::Threads->rpc_buffer;
+ $self->{refNumber} = $refNumber;
+
+ bless($self, $class);
+
+ return $self;
+}
+
+sub DESTROY {
+ my ($self) = @_;
+ my %invocation : shared;
+
+ return unless threads->tid != $self->{threadId};
+
+ $invocation{release} = 1;
+ $invocation{refNumber} = $self->{refNumber};
+
+ Acme::Threads::Proxy::Subs::add_to_queue($self, \%invocation);
+
+ return;
+}
+
+sub AUTOLOAD {
+ my ($self, @args) = @_;
+ my $name = $AUTOLOAD;
+ my $context = wantarray;
+ my %invocation : shared;
+
+ $name =~ s/.*://; # strip fully-qualified portion
+
+ $invocation{args} = Acme::Threads->proxy_unshareable(@args);
+ $invocation{method} = $name;
+ $invocation{context} = $context;
+ $invocation{refNumber} = $self->{refNumber};
+
+ Acme::Threads::Proxy::Subs::add_to_queue($self, \%invocation);
+ Acme::Threads::Proxy::Subs::wait_for_remote_side($self, \%invocation);
+
+ if (exists $invocation{die}) {
+ die "method invocation died on thread ", $self->{threadId}, ": ", $invocation{die};
+ }
+
+ #handle void context
+ return unless defined $context;
+ #handle list context
+ return @{$invocation{return}} if $context;
+ #handle scalar context
+ return $invocation{return}->[0];
+}
+
+package Acme::Threads::Proxy::Subs;
+
+use strict;
+use warnings;
+
+use threads::shared;
+use Data::Dumper;
+
+sub add_to_queue {
+ my ($self, $item) = @_;
+ my $queue = $self->{rpcBuffer};
+ my $threadId = $self->{threadId};
+
+ $self->{rpcBuffer}->enqueue($item);
+
+ my $thread = threads->object($threadId);
+
+ $thread->kill('HUP');
+
+ return;
+}
+
+sub wait_for_remote_side {
+ my ($self, $invocation) = @_;
+
+ lock(%$invocation);
+
+ while(1) {
+ cond_wait(%$invocation);
+
+ last if exists $invocation->{return};
+ last if exists $invocation->{die};
+ }
+
+ return;
+
+}
+
+1;
10 t/00-load.t
@@ -0,0 +1,10 @@
+use threads;
+
+use Test::More tests => 2;
+
+BEGIN {
+ use_ok('Acme::Threads::Proxy');
+ use_ok( 'Acme::Threads' );
+}
+
+diag( "Testing MediaWiki::DumpFile $Acme::Threads::VERSION, Perl $], $^X" );

0 comments on commit 5d13857

Please sign in to comment.
Something went wrong with that request. Please try again.