Skip to content

Commit

Permalink
resolving conflicts after yroot changes
Browse files Browse the repository at this point in the history
  • Loading branch information
srinisv123 committed May 31, 2011
2 parents 7ff9095 + 049b3c1 commit 194be94
Show file tree
Hide file tree
Showing 19 changed files with 389 additions and 13 deletions.
29 changes: 28 additions & 1 deletion bin/pogo-rexec
Expand Up @@ -296,6 +296,24 @@ sub main
push @{ $opts->{scp_options} }, ("-o", "PubkeyAuthentication=no");
push @{ $opts->{ssh_options} }, ("-o", "PubkeyAuthentication=no");
}
# split if hostname:rootname combination is present
my @target;
if ( $args->{host} =~ /:/ )
{
LOGDIE "Rootname present but missing command_root_transform property"
unless ( $args->{command_root_transform} );
@target = split( ":", $args->{host} );
$args->{host} = $target[0];
}

if ( defined $args->{command_root_transform} )
{
LOGDIE "\${command} substring not found in $args->{command_root_transform} \n"
unless $args->{command_root_transform} =~ /\${command}/;

LOGDIE "\${rootname} substring not found in $args->{command_root_transform} \n"
unless $args->{command_root_transform} =~ /\${rootname}/;
}

# insert POGO_JOB_ID into the remote environment
$remote_env->{POGO_JOB_ID} = $args->{job_id};
Expand All @@ -320,6 +338,15 @@ sub main
# that way we can pass args to our script
}

# substitute the command_root_transform with the rootname
# and the name of the pogo worker stub
if ( defined $args->{command_root_transform} && defined $target[1] )
{
$args->{command_root_transform} =~ s/\${rootname}/$target[1]/g;
$args->{command_root_transform} =~ s/\${command}/$args->{command}->[0]/g;
$args->{command}->[0] = $args->{command_root_transform};
}

# Prepend "sudo -u <user>" if we are running as someone else
if ( $args->{run_as} ne $args->{user} )
{
Expand All @@ -342,7 +369,7 @@ sub main
execute(
$args, "ssh", @{ $opts->{ssh_options} },
"--", $args->{user} . "@" . $args->{host},
"rm", $tmpfile
"rm -f", $tmpfile
);
}

Expand Down
6 changes: 6 additions & 0 deletions bin/pogo-worker-stub 100644 → 100755
Expand Up @@ -20,6 +20,7 @@ my $locked = 0;
my $lockfh = undef;
my $opts = undef;
my $tempfile = undef;
my ($stubdir, $stubname) = $0 =~ m/(.*)(\/.*\/.*)$/;

sub cleanup {
my $msg = shift;
Expand All @@ -32,6 +33,9 @@ sub cleanup {
close $lockfh;
unlink LOCKFILE;
}
unlink $stubname;
print "Removed the stub $stubname \n";

die "ERROR: pogo-worker died: $msg\n\n";
}

Expand Down Expand Up @@ -111,6 +115,8 @@ sub main {
unlink $tempfile;
}
unlink LOCKFILE;
unlink $stubname;
print "Removed the stub $stubname \n";
print "\n";
return $ret;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/Pogo/API/V3.pm
Expand Up @@ -129,7 +129,7 @@ sub _rpc_run
my $opts = {};
foreach my $arg (
qw(invoked_as namespace target user run_as password pvt_key_passphrase client_private_key
timeout job_timeout command retry prehook posthook secrets email
timeout job_timeout command retry prehook posthook secrets email root_type
im_handle client requesthost concurrent exe_name exe_data)
)
{
Expand Down
76 changes: 76 additions & 0 deletions lib/Pogo/Dispatcher.pm
Expand Up @@ -70,6 +70,7 @@ sub run #{{
# start these puppies up
Pogo::Engine->init($instance);
Pogo::Dispatcher::AuthStore->init($instance);
load_root_transform();

# handle workers
tcp_server(
Expand Down Expand Up @@ -109,6 +110,81 @@ sub run #{{

# }}}

# Loads root transforms from plugins into Zookeeper
sub load_root_transform
{
eval { load_root_plugin(); };
LOGDIE $@ if $@;
my $path = "/pogo/root/";
while ( my ( $k, $v ) = each( %{ $instance->{root} } ) )
{
store->delete( $path . $k );
store->create( $path . $k, $v )
or LOGDIE "couldn't create '$path' node: " . store->get_error_name;
}
}

# Look into Pogo/Plugin/Root for root transform plugins
# The plugins should have the following interface
# sub root_type : returns root type string
# sub transform : return transform string
# sub priority : return priority integer
# Creates a hash of root_type:transform from the plugins
# along with an entry default:root_type
# which is the root_type with highest priority
sub load_root_plugin
{
DEBUG "loading plugin";
my $plugin_base_class = "Pogo::Plugin::Root";
( my $plugin_base_dir = $plugin_base_class ) =~ s#::#/#g;
my $rex = qr/(.*)\.pm$/;
my $dir;

for my $incdir (@INC)
{
if ( -d "$incdir/$plugin_base_dir" )
{
$dir = "$incdir/$plugin_base_dir";
if ( !<$dir/*> )
{
ERROR "Skipping empty plugin dir $dir";
next;
}
DEBUG "Found plugin dir: $dir";
last;
}
}

if ( !$dir )
{
INFO "No plugins found";
return;
}

$instance->{root} = {};
my @default_root;
opendir DIR, $dir or LOGDIE "Cannot open dir $dir \n";
for my $entry ( readdir DIR )
{
next if $entry !~ $rex;
my $plugin_class = "${plugin_base_class}::$1";
eval "require $plugin_class";

my $plugin = $plugin_class->new();

$instance->{root}->{ $plugin->root_type() } = $plugin->transform();
if ( $default_root[0] < $plugin->priority() )
{
$default_root[0] = $plugin->priority();
$default_root[1] = $plugin->root_type();
}
DEBUG "Loaded plugin " . $plugin_class;
}
$instance->{root}->{default} = $default_root[1];

closedir DIR;
}

sub purge_queue
{
my ( $class, $jobid ) = @_;
Expand Down
1 change: 1 addition & 0 deletions lib/Pogo/Dispatcher/WorkerConnection.pm
Expand Up @@ -136,6 +136,7 @@ sub queue_task
"execute",
{ job_id => $job->id,
command => $job->worker_command,
command_root_transform => $job->command_root_transform,
user => $job->user,
run_as => $job->run_as,
password => $job->password,
Expand Down
2 changes: 1 addition & 1 deletion lib/Pogo/Engine.pm
Expand Up @@ -511,7 +511,7 @@ sub run
my $opts = {};
foreach my $arg (
qw(invoked_as namespace range user run_as password pvt_key_passphrase client_private_key
timeout job_timeout command retry prehook posthook secrets email
timeout job_timeout command retry prehook posthook secrets email root_type
im_handle client requesthost concurrent exe_name exe_data)
)
{
Expand Down
19 changes: 16 additions & 3 deletions lib/Pogo/Engine/Job.pm
Expand Up @@ -25,6 +25,7 @@ use JSON;
use Log::Log4perl qw(:easy);
use MIME::Base64 qw(encode_base64);
use Time::HiRes qw(time);
use JSON::XS qw(encode_json);

use Pogo::Common;
use Pogo::Engine::Store qw(store);
Expand Down Expand Up @@ -518,6 +519,19 @@ sub command { return $_[0]->meta('command'); }
sub concurrent { return $_[0]->meta('concurrent'); }
sub state { return store->get( $_[0]->{path} ); }

# Returns the transform for a given root type
# root_type precedence :
# root_type param in client.conf >
# root_type param in namespace >
# zookeeper /pogo/root/default
sub command_root_transform
{
my $root = $_[0]->meta('root_type');
$root = $_[0]->{ns}->get_conf->{globals}->{root_type} unless $root;
$root = store->get("/pogo/root/default") unless $root;
return store->get( "/pogo/root/" . $root );
}

sub set_state
{
my ( $self, $state, $msg, @extra ) = @_;
Expand Down Expand Up @@ -573,7 +587,7 @@ sub start
};

my $fetch_cont = sub {
my( $hinfo ) = @_;
my ($hinfo) = @_;
local *__ANON__ = 'AE:cb:fetch_target_meta:cont';
DEBUG $self->id . ": adding hosts";
DEBUG $self->id . ": computing slots";
Expand All @@ -583,8 +597,7 @@ sub start
};

DEBUG "Calling fetch_target_meta for @$flat_targets";
$ns->fetch_target_meta( $flat_targets, $ns->name, $fetch_errc,
$fetch_cont );
$ns->fetch_target_meta( $flat_targets, $ns->name, $fetch_errc, $fetch_cont );
DEBUG "After fetch_target_meta";
return 1;
}
Expand Down
9 changes: 9 additions & 0 deletions lib/Pogo/Engine/Namespace.pm
Expand Up @@ -397,6 +397,15 @@ sub set_conf

my $name = $self->name;

# globals processing
foreach my $global ( keys %{ $conf_in->{globals} } )
{
DEBUG "processing '$name/global/$global'";
$conf->{globals}->{$global} = delete $conf_in->{globals}->{$global};
}

delete $conf_in->{globals};

# plugin processing
foreach my $plugin ( keys %{ $conf_in->{plugins} } )
{
Expand Down
3 changes: 2 additions & 1 deletion lib/Pogo/Engine/Store/ZooKeeper.pm
Expand Up @@ -80,7 +80,8 @@ sub new
$self->{handle}->{data_read_len} = 1048576;

# this is sorta ugly, but whatever
foreach my $path (qw{/pogo /pogo/ns /pogo/job /pogo/host /pogo/lock /pogo/stats /pogo/taskq})
foreach
my $path (qw{/pogo /pogo/ns /pogo/job /pogo/host /pogo/lock /pogo/stats /pogo/taskq /pogo/root})
{
if ( !$self->exists($path) )
{
Expand Down
30 changes: 30 additions & 0 deletions lib/Pogo/Plugin/Root/Dummyroot.pm
@@ -0,0 +1,30 @@
package Pogo::Plugin::Root::Dummyroot;

# Copyright (c) 2010-2011 Yahoo! Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

use 5.008;

sub new
{
my $self = {};
bless $self;
return $self;
}

sub root_type { return undef; }
sub transform { return undef; }
sub priority { return undef; }

1;
3 changes: 2 additions & 1 deletion lib/Pogo/Worker/Connection.pm
Expand Up @@ -27,6 +27,7 @@ use POSIX qw(WEXITSTATUS SIGTERM);
use Time::HiRes qw(time);
use IO::File;
use Scalar::Util qw(refaddr);
use File::Path;

sub new
{
Expand Down Expand Up @@ -211,7 +212,7 @@ sub execute
$n++;
} while ( -f $output_filename );

mkdir($save_dir) unless ( -d $save_dir );
mkpath($save_dir) unless ( -d $save_dir );

# Send args
$writer->print( encode_json( $task->{args} ) );
Expand Down
2 changes: 1 addition & 1 deletion t/50_smoke_zookeeper.t
Expand Up @@ -91,7 +91,7 @@ test_pogo
my ( $handle, $data ) = @_;
$data =~ m/Node count: (\d+)/;
my $nodes = $1;
ok( $nodes == 13, "node count $nodes" );
ok( $nodes == 18, "node count $nodes" );
$cv->send(1);
}
);
Expand Down
11 changes: 10 additions & 1 deletion t/53_smoke_dispatcher.t
Expand Up @@ -18,13 +18,14 @@ use 5.008;
use common::sense;

use Test::Exception;
use Test::More tests => 10;
use Test::More tests => 12;

use Carp qw(confess);
use FindBin qw($Bin);
use Log::Log4perl qw(:easy);
use Sys::Hostname qw(hostname);
use YAML::XS qw(LoadFile);
use Pogo::Engine::Store qw(store);

use lib "$Bin/lib";
use PogoTester;
Expand All @@ -34,7 +35,14 @@ test_pogo
{
my $t;

# root plugin
lives_ok { $t = store->get("/pogo/root/default"); } 'default plugin'
or diag explain $t;
is( $t, 'dummyroot3', 'plugins loaded')
or diag explain $t;

# ping
undef $t;
lives_ok { $t = client->ping(); } 'ping send'
or diag explain $t;
ok( $t->is_success, 'ping success ' . $t->status_msg )
Expand Down Expand Up @@ -66,6 +74,7 @@ test_pogo
or diag explain $t;
ok( $t->is_success, "loadconf success " . $t->status_msg )
or diag explain $t;

};

done_testing;
Expand Down

0 comments on commit 194be94

Please sign in to comment.