Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

First stab at implementing IPC::ConcurrencyLimit::Lock::MySQL

  • Loading branch information...
commit 2e2779dc55c4e0a403b30204e2d66c371453b78e 1 parent 9c8e116
@tsee authored
View
5 Changes
@@ -0,0 +1,5 @@
+Revision history for Perl extension IPC::ConcurrencyLimit::Lock::MySQL
+
+0.01 Fri Sep 2 20:00 2011
+ - original version
+
View
67 Makefile.PL
@@ -0,0 +1,67 @@
+use 5.008001;
+use ExtUtils::MakeMaker;
+use strict;
+use Data::Dumper;
+
+my $conf_file = 'mysql.dat';
+
+my $host = 'localhost';
+my $port = '3306';
+my $db = 'test';
+my $usr = 'test';
+my $pwd = 'test';
+
+if (not -e $conf_file) {
+ print <<EOFTEXT;
+To test properly I will need access to a MySQL database
+with the priviledge to call GET_LOCK and RELEASE_LOCK.
+To do this I will need to ask some more questions to
+complete configuration.
+
+What you answer will be stored in the file '$conf_file'
+and will not be removed unless you do a make clean, thus
+you wont have to go through this process again even if you
+rerun this script. If you wish to change your answer
+delete the file (or run make clean) and rerun Makefile.PL.
+
+If you wish to set things up now the db, username, and
+password will by default be '$db', '$usr', '$pwd'.
+
+EOFTEXT
+
+ my $do_mysql_config = prompt "Run MySQL tests? [yes/no]", 'no';
+ my @connect;
+ if ($do_mysql_config=~/^y/i) {
+ $db = prompt "Please enter database name:", $db;
+ $host = prompt "Please enter hostname:", $host;
+ $port = prompt "Please enter port:", $port;
+ $usr = prompt "Please enter user name to connect to $db with:", $usr;
+ $pwd = prompt "Please enter password to use for $usr:", $pwd;
+ push @connect, $host, $port, $db, $usr, $pwd;
+ }
+ open my $fh, ">", $conf_file
+ or die "Cant open '$conf_file' for writing: $!";
+ print $fh Data::Dumper->Dump([@connect],[qw(*host *port *db *usr *pwd)]);
+ close $fh;
+}
+
+
+
+WriteMakefile(
+ NAME => 'IPC::ConcurrencyLimit::Lock::MySQL',
+ VERSION_FROM => 'lib/IPC/ConcurrencyLimit/Lock/MySQL.pm', # finds $VERSION
+ PREREQ_PM => {
+ 'Carp' => '0',
+ 'DBI' => 0,
+ 'DBD::mysql' => '0',
+ 'Class::XSAccessor' => '1.11',
+ 'Data::Dumper' => '0',
+ 'IPC::ConcurrencyLimit' => '0.01',
+ 'IPC::ConcurrencyLimit::Lock' => '0',
+ }, # e.g., Module::Name => 1.1
+ ($] >= 5.005 ? ## Add these new keywords supported since 5.005
+ (ABSTRACT_FROM => 'lib/IPC/ConcurrencyLimit/Lock/MySQL.pm', # retrieve abstract from module
+ AUTHOR => 'Steffen Mueller <smueller@cpan.org>') : ()),
+ dist => { COMPRESS => 'gzip -9f', SUFFIX => 'gz', },
+ clean => { FILES => "IPC-ConcurrencyLimit-Lock-MySQL-* *~ $conf_file" },
+);
View
165 lib/IPC/ConcurrencyLimit/Lock/MySQL.pm
@@ -0,0 +1,165 @@
+package IPC::ConcurrencyLimit::Lock::MySQL;
+use 5.008001;
+use strict;
+use warnings;
+
+our $VERSION = '0.01';
+
+use Carp qw(croak);
+use Class::XSAccessor {
+ accessors => [qw(dbh id)],
+ getters => [qw(make_new_dbh_callback lock_name)],
+};
+
+use IPC::ConcurrencyLimit::Lock;
+our @ISA = qw(IPC::ConcurrencyLimit::Lock);
+
+sub new {
+ my $class = shift;
+ my $opt = shift;
+
+ my $max_procs = $opt->{max_procs}
+ or croak("Need a 'max_procs' parameter");
+
+ my $dbh_callback = $opt->{make_new_dbh};
+ $dbh_callback && ref($dbh_callback) eq 'CODE'
+ or croak("Need a 'make_new_dbh' callback as parameter");
+
+ my $lock_name = $opt->{lock_name};
+ defined $lock_name
+ or croak("Need a 'lock_name' parameter for the lock");
+
+ my $h = {};
+ my $self = bless {
+ lock_name => $lock_name,
+ max_procs => $max_procs,
+ make_new_dbh_callback => $dbh_callback,
+ dbh => undef,
+ id => undef,
+ } => $class;
+
+ $self->_get_lock() or return undef;
+
+ return $self;
+}
+
+sub _get_dbh {
+ my $self = shift;
+ my $dbh = $self->dbh;
+
+ if (not defined $dbh) {
+ $dbh = $self->make_new_dbh_callback->($self);
+ die "Could not get a DB handle for getting a lock"
+ if not defined $dbh;
+ $self->dbh($dbh);
+ }
+
+ return $dbh;
+}
+
+sub _get_lock {
+ my $self = shift;
+
+ my $dbh = $self->_get_dbh;
+ my $lock_name_base = $self->lock_name;
+ my $timeout = 0;
+ for my $worker (1 .. $self->{max_procs}) {
+ my $lock_name = $lock_name_base . "_" . $worker;
+ my $query = "SELECT GET_LOCK(?, ?)";
+ my $res = $dbh->selectcol_arrayref($query, undef, $lock_name, $timeout);
+ if (not defined $res or not ref($res) eq 'ARRAY') {
+ die "Failed to execute query '$query': " . $dbh->errstr;
+ }
+ if (@$res && $res->[0]) {
+ $self->id($worker);
+ last;
+ }
+ }
+
+ return undef if not $self->{id};
+ return 1;
+}
+
+sub _release_lock {
+ my $self = shift;
+ my $dbh = $self->dbh;
+ return if not $dbh;
+ my $id = $self->id;
+ return if not $id;
+ my $query = "SELECT RELEASE_LOCK(?)";
+ $dbh->do($query, undef, $self->lock_name . "_" . $id);
+}
+
+sub DESTROY {
+ my $self = shift;
+ $self->_release_lock();
+}
+
+1;
+
+__END__
+
+
+=head1 NAME
+
+IPC::ConcurrencyLimit::Lock::MySQL - Locking via MySQL GET_LOCK
+
+=head1 SYNOPSIS
+
+ use IPC::ConcurrencyLimit;
+
+=head1 DESCRIPTION
+
+This locking strategy uses MySQL's C<GET_LOCK> to implement
+locking across multiple hosts.
+
+=head1 METHODS
+
+=head2 new
+
+Given a hash ref with options, attempts to obtain a lock in
+the pool. On success, returns the lock object, otherwise undef.
+
+Required options:
+
+=over 2
+
+=item C<lock_name>
+
+The name prefix for the named C<GET_LOCK> locks to use.
+Make sure this doesn't collide with any other locks.
+
+=item C<make_new_dbh>
+
+A code reference that, when called, will return a B<NEW>
+database handle for use in locking. If it returns a handle
+that is used for other purposes as well, there can be
+strange action at a distance since MySQL allow exactly one
+lock at a time per connection. If a second C<GET_LOCK>
+is issued for the same connection, the old lock will
+be silently released!
+
+=item C<max_procs>
+
+The maximum no. of locks (and thus usually processes)
+to allow at one time.
+
+=back
+
+=head1 AUTHOR
+
+Steffen Mueller, C<smueller@cpan.org>
+
+=head1 COPYRIGHT AND LICENSE
+
+ (C) 2011 Steffen Mueller. All rights reserved.
+
+ This code is available under the same license as Perl version
+ 5.8.1 or higher.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+
+=cut
+
View
5 t/01load.t
@@ -0,0 +1,5 @@
+use strict;
+use warnings;
+use Test::More tests => 1;
+BEGIN { use_ok('IPC::ConcurrencyLimit::Lock::MySQL') };
+
View
123 t/02basic.t
@@ -0,0 +1,123 @@
+use strict;
+use warnings;
+use File::Temp;
+use Test::More tests => 43;
+use IPC::ConcurrencyLimit;
+use IPC::ConcurrencyLimit::Lock::MySQL;
+use DBI;
+use DBD::mysql;
+
+$| = 1;
+
+our($db, $usr, $pwd, $host, $port);
+my $conf_file = 'mysql.dat';
+if (-e $conf_file) {
+ # eval connection parameters into existance
+ my $ok = do $conf_file;
+ defined $ok or die "Error loading $conf_file: ", $@||$!;
+
+ unless (defined $db) {
+ my $reason = 'no mysql connection details available';
+ eval 'use Test::More skip_all => $reason; 1;'
+ or die $@;
+ }
+}
+
+my %args = (
+ type => 'MySQL',
+ lock_name => 'testlock',
+ make_new_dbh => sub {
+ my $dbh = DBI->connect("DBI:mysql:database=$db;host=$host;port=$port", $usr, $pwd);
+ $dbh or die "Could not connect to database '$db'";
+ return $dbh;
+ },
+);
+
+SCOPE: {
+ my $limit = IPC::ConcurrencyLimit->new(%args);
+ isa_ok($limit, 'IPC::ConcurrencyLimit');
+
+ ok(!$limit->release_lock(), 'No lock to release yet');
+
+ my $id = $limit->get_lock;
+ is($id, 1, 'First and only lock has id 1');
+
+ $id = $limit->get_lock;
+ is($id, 1, 'Repeated call to lock returns same id');
+
+ ok($limit->is_locked, 'We have a lock');
+ ok($limit->lock_id, 'Lock id still 1');
+
+ ok($limit->release_lock(), 'Lock released');
+ ok(!$limit->is_locked, 'We do not have a lock');
+ is($limit->lock_id, undef, 'No lock');
+
+ $id = $limit->get_lock;
+ is($id, 1, 'New lock returns same id');
+
+ my $limit2 = IPC::ConcurrencyLimit->new(%args);
+ my $id2 = $limit2->get_lock();
+ is($id2, undef, 'Cannot get second lock');
+
+ is($limit->release_lock(), 1, 'Lock released');
+ $id2 = $limit2->get_lock();
+ is($id2, 1, 'Got other lock after first was released');
+}
+
+SCOPE: {
+ my $limit = IPC::ConcurrencyLimit->new(%args, max_procs => 2);
+ isa_ok($limit, 'IPC::ConcurrencyLimit');
+
+ my $id = $limit->get_lock;
+ ok($id, 'Got lock');
+
+ my $idr = $limit->get_lock;
+ is($idr, $id, 'Repeated call to lock returns same id');
+
+ my $id2;
+ INNER: {
+ my $limit2 = IPC::ConcurrencyLimit->new(%args, max_procs => 2);
+ $id2 = $limit2->get_lock();
+ ok($id2, 'Got second lock');
+
+ ok($id != $id2, 'Lock ids are not equal');
+
+ my $limit3 = IPC::ConcurrencyLimit->new(%args, max_procs => 2);
+ my $id3 = $limit3->get_lock();
+ is($id3, undef, 'Only two locks to go around');
+ } # end INNER
+
+ my $limit4 = IPC::ConcurrencyLimit->new(%args, max_procs => 2);
+ my $id4 = $limit4->get_lock();
+ is($id4, $id2, 'Lock 2 went out of scope, got new lock with same id');
+
+ my $limit5 = IPC::ConcurrencyLimit->new(%args, max_procs => 2);
+ my $id5 = $limit5->get_lock();
+ is($id5, undef, 'Only two lock to go around');
+
+ undef $limit;
+
+ my $limit6 = IPC::ConcurrencyLimit->new(%args, max_procs => 2);
+ my $id6 = $limit6->get_lock();
+ is($id6, $id, 'Recycled first lock');
+} # end SCOPE
+
+SCOPE: {
+ my $max = 20;
+ my @limits = map {
+ IPC::ConcurrencyLimit->new(%args, max_procs => $max)
+ } (1..$max+1);
+
+ foreach my $limitno (0..$#limits) {
+ my $limit = $limits[$limitno];
+ my $id = $limit->get_lock();
+ if ($limitno == $#limits) {
+ ok(!$id, 'One too many locks');
+ }
+ else {
+ ok($id, 'Got lock');
+ }
+ }
+}
+
+
Please sign in to comment.
Something went wrong with that request. Please try again.