Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tree: 246c4a9919
Fetching contributors…

Cannot retrieve contributors at this time

436 lines (339 sloc) 13.476 kb
#!/usr/bin/perl -w
######################################################################
# Copyright (c) 2012, Yahoo! Inc. All rights reserved.
#
# This program is free software. You may copy or redistribute it under
# the same terms as Perl itself. Please see the LICENSE.Artistic file
# included with this project for the terms of the Artistic License
# under which this project is licensed.
######################################################################
use strict;
use warnings;
use Chisel::Builder::Engine;
use Getopt::Long;
use Log::Log4perl ( ':easy' );
main();
exit 0;
sub main {
my $engine = Chisel::Builder::Engine->new( application => "doozer-checkout" );
$engine->setup;
# command line options
my %opt;
GetOptions( \%opt, "f|file=s", "P|threads=i" )
or die usage();
die usage() if !$opt{'f'};
# lock to protect integrity of the cache
my $lockfd = $engine->lock( 'sync-roles' );
INFO "doozer sync-roles starting";
# JSON encoder
my $json_enc = JSON->new;
# Get list of roles and hosts we care about over stdin
my $stdin_txt = do { local $/; <STDIN> };
my $stdin_obj = $json_enc->decode( $stdin_txt );
my @stdin_roles = map { lc $_ } @{ $stdin_obj->{'roles'} };
my @stdin_hosts = map { lc $_ } @{ $stdin_obj->{'hosts'} };
# Create RoleSync object
my $rs = Main::RoleSync->new( threads => $opt{'P'}, c => $engine->roles );
# Update cache
$rs->sync(
file => $opt{'f'},
roles => \@stdin_roles,
hostnames => \@stdin_hosts,
);
}
sub usage {
return "$0 -f <cache-sqlite> [-P <threads>]\n";
}
package Main::RoleSync;
use AnyEvent;
use AnyEvent::Util;
use Date::Parse ();
use DBI;
use JSON;
use List::MoreUtils ( 'part', 'any', 'natatime' );
use Log::Log4perl ( ':easy' );
use Group::Client;
sub new {
my ( $class, %args ) = @_;
bless { 'rocl' => $args{'c'}, 'threads' => $args{'threads'} }, $class;
}
sub rocl {
my ( $self ) = @_;
return $self->{'rocl'};
}
sub sync {
my ( $self, %args ) = @_;
my $dbh = DBI->connect( "dbi:SQLite:dbname=$args{file}", undef, undef, { PrintError => 0, RaiseError => 1 } );
my $want_roles = $args{'roles'};
my $want_hostnames = $args{'hostnames'};
# Create tables
$dbh->do( <<'EOT' );
CREATE TABLE IF NOT EXISTS meta (
uniq DEFAULT 1 PRIMARY KEY,
roles BLOB NOT NULL,
max_mtime INT NOT NULL
)
EOT
$dbh->do( <<'EOT' );
CREATE TABLE IF NOT EXISTS host (
name BLOB NOT NULL UNIQUE,
roles BLOB DEFAULT NULL
)
EOT
# Start transaction
$dbh->begin_work;
eval {
my $meta = $dbh->selectall_arrayref( "SELECT roles, max_mtime FROM meta WHERE uniq = 1" );
if( !@$meta || !$meta->[0][1] ) {
# This is the first sync, we need to establish a base.
$self->sync_first( $dbh, $want_hostnames, $want_roles, $meta );
} else {
# Update an existing cache
$self->sync_delta( $dbh, $want_hostnames, $want_roles, $meta );
}
# Commit transaction
$dbh->commit;
INFO "Cache updated";
1;
} or do {
# Roll back and report error
chomp( my $err = $@ );
$dbh->rollback;
LOGDIE "ROLLBACK [$err]";
};
}
# Algorithm for establishing a base:
# 1) Fetch mtimes for all "roles". Record maximum mtime.
# 2) Fetch roles for all "hosts". Record host -> role mapping.
sub sync_first {
my ( $self, $dbh, $want_hosts, $want_roles ) = @_;
# Get roles client
my $c = $self->rocl;
# Fetch maximum mtime across all $want_roles
my $max_mtime = 1;
my $roles_info = $self->get_roles_info( $want_roles );
while( my ( $role, $rec ) = each %$roles_info ) {
next if !defined $rec;
my $mtime = Date::Parse::str2time( "$rec->{mtime} UTC" );
if( $max_mtime < $mtime ) {
$max_mtime = $mtime;
}
}
# Fetch roles for all $want_hosts
$self->update_host_roles( $dbh, $want_hosts );
# Now set max_mtime so we can use sync_delta next time
$dbh->do( "INSERT INTO meta (uniq, max_mtime, roles) VALUES (1, ?, ?)",
undef, $max_mtime, JSON->new->encode( $want_roles ) );
return;
}
# Algorithm for updating:
# 1) RolesDB query for roles with mtime_after our recorded maximum mtime. Record new maximum mtime.
# 2) For each role that is both updated (new mtime) AND is in the "roles" list, get members.
# 3) For each hostname that is NOT IN BOTH OLD/NEW version of that role, fetch roles. Record host -> role mapping.
sub sync_delta {
my ( $self, $dbh, $want_hosts, $want_roles, $meta ) = @_;
# Create roles client and JSON encoder
my $json_enc = JSON->new;
my $c = $self->rocl;
# Bail if max_mtime is not set
my ( $cache_roles_json, $max_mtime ) = @{ $meta->[0] };
my $cache_roles = $json_enc->decode( $cache_roles_json );
if( !$max_mtime || $max_mtime <= 0 ) {
LOGDIE "max_mtime is not set in our cache! cannot continue.";
}
# Lookup table for $want_roles and $want_hosts
my %want_hosts_lookup = map { $_ => 1 } @$want_hosts;
my %want_roles_lookup = map { $_ => 1 } @$want_roles;
# Find roles that have been updated since our last max_mtime.
INFO "Scanning roles updated since max_mtime = $max_mtime";
my %updated_roles_lookup =
map { lc "$_->{ns}.$_->{name}" => $_ } @{ $c->roles( mtime_after => $max_mtime ) };
my @updated_wanted_roles;
foreach my $updated_role ( values %updated_roles_lookup ) {
# Keep max_mtime updated
my $mtime = Date::Parse::str2time( "$updated_role->{mtime} UTC" );
if( $max_mtime < $mtime ) {
$max_mtime = $mtime;
}
# If this is a role we are interested in, remember that we need to check its members later.
my $updated_role_name = lc "$updated_role->{ns}.$updated_role->{name}";
if( $want_roles_lookup{$updated_role_name} ) {
push @updated_wanted_roles, $updated_role_name;
}
}
# Include any new roles (i.e. in $want_roles, but not in $cache_roles)
do {
my %cache_roles_lookup = map { $_ => 1 } @{$cache_roles};
foreach my $new_role ( grep { !$cache_roles_lookup{$_} } @$want_roles ) {
# Pretend it's been updated
$updated_roles_lookup{$new_role} = 1;
push @updated_wanted_roles, $new_role;
}
};
# Build a list of hosts we need to update...
my %updated_hosts;
# ... by checking OLD members of the updated roles ...
my %cache_hosts_lookup; # On the side, maintain this lookup table of which hosts are in the cache
my $sth_host = $dbh->prepare( "SELECT name, roles FROM host" );
$sth_host->execute;
while( my ( $host, $host_roles_json ) = $sth_host->fetchrow_array ) {
# Delete this host if it is not in @$want_hosts
if( ! $want_hosts_lookup{$host} ) {
INFO "Remove host: $host";
$dbh->do( "DELETE FROM host WHERE name = ?", undef, $host );
next;
}
$cache_hosts_lookup{$host} = 1;
if( defined $host_roles_json ) {
my $host_roles = $json_enc->decode( $host_roles_json );
$updated_hosts{$host}{$_} = 1 for grep { $updated_roles_lookup{$_} && $want_roles_lookup{$_} } @$host_roles;
}
}
# and also NEW members of the updated roles.
my $updated_wanted_roles_members = $self->get_roles_info( \@updated_wanted_roles, 'members' );
while( my ( $role, $role_info ) = each %$updated_wanted_roles_members ) {
# If get_roles_info reported that a role does not exist, treat it as if it has no members
my $role_hosts = defined $role_info ? [ map { lc $_ } @{ $role_info->{'members'} } ] : [];
foreach my $host ( grep { $want_hosts_lookup{$_} } @$role_hosts ) {
# - Remove host if it was also in the OLD memberlist
# - Add host if it was not
if( $updated_hosts{$host}{$role} ) {
delete $updated_hosts{$host}{$role};
} else {
$updated_hosts{$host}{$role} = 1;
}
}
}
# Also include new hosts (i.e. in $want_hosts, but not in the cache "host" table)
foreach my $new_host ( @$want_hosts ) {
if( !$cache_hosts_lookup{$new_host} ) {
INFO "New host: $new_host";
$updated_hosts{$new_host}{'dummy'} = 1; # XXX stupid
}
}
# Update hosts we thought we needed to update
$self->update_host_roles( $dbh, [ grep { %{ $updated_hosts{$_} } } keys %updated_hosts ] );
# Update "roles we care about" list in the cache
$dbh->do( "UPDATE meta SET roles = ?", undef, $json_enc->encode( $want_roles ) );
# Update "max_mtime" in the cache
$dbh->do( "UPDATE meta SET max_mtime = ?", undef, $max_mtime );
return;
}
# Get info for some list of roles
# Optionally request a particular resource ($rsrc)
# Return undef if a role does not exist
sub get_roles_info {
my ( $self, $want_roles, $rsrc ) = @_;
INFO "Scanning " . ( scalar @$want_roles ) . " roles [get_roles_info]";
return if !@$want_roles;
my $c = $self->rocl;
my $ichunk = 0;
my @chunks = part { $ichunk++ % $self->{'threads'} } @$want_roles;
my $cv = AnyEvent->condvar;
my $danger_danger;
my $return_me = {};
foreach my $chunk ( @chunks ) {
$cv->begin;
fork_call {
my $return_me_chunk = {};
foreach my $want_role ( @$chunk ) {
INFO "Scanning role $want_role";
eval {
$return_me_chunk->{$want_role} = $c->role( $want_role, $rsrc );
1;
} or do {
if( "$@" =~ /role .+ does not exist/ ) {
# This error can be converted to a retval of undef
$return_me_chunk->{$want_role} = undef;
} else {
# Other errors need to be rethrown
die $@;
}
};
}
return $return_me_chunk;
}
sub {
my ( $return_me_chunk ) = @_;
if( !defined $return_me_chunk ) {
$danger_danger++;
ERROR "get_roles_info: " . ( "$@" || "$!" );
} else {
$return_me->{$_} = $return_me_chunk->{$_} for keys %$return_me_chunk;
}
$cv->end;
};
}
$cv->recv;
if( $danger_danger ) {
LOGDIE "get_roles_info bailing out!";
}
return $return_me;
}
# Fill cache with hosts' roles
sub update_host_roles {
my ( $self, $dbh, $want_hosts ) = @_;
INFO "Scanning roles for " . ( scalar @$want_hosts ) . " hosts [update_host_roles]";
return if !@$want_hosts;
my $c = $self->rocl;
my $ichunk = 0;
my @chunks = part { $ichunk++ % $self->{'threads'} } @$want_hosts;
my $cv = AnyEvent->condvar;
my $danger_danger;
foreach my $chunk ( @chunks ) {
$cv->begin;
fork_call {
$self->get_host_roles( $chunk, $c );
}
sub {
my ( $retval ) = @_;
if( !defined $retval ) {
$danger_danger++;
ERROR "get_host_roles: " . ( "$@" || "$!" );
} else {
# Update hosts we got back from RolesDB
my $json_enc = JSON->new;
while( my ( $hostname, $roles ) = each %$retval ) {
$dbh->do( "REPLACE INTO host (name,roles) VALUES (?, ?)",
undef, $hostname, $json_enc->encode( $roles ) );
}
# Handle hosts that RolesDB *didn't* tell us about
# If they already exist in the cache, don't change their entry.
# If they don't exist in the cache, mark them undef so we don't try to sync them again needlessly.
foreach my $hostname ( @$chunk ) {
$dbh->do( "INSERT OR IGNORE INTO host (name) VALUES (?)", undef, $hostname );
}
}
$cv->end;
};
}
$cv->recv;
if( $danger_danger ) {
LOGDIE "update_host_roles bailing out!";
}
}
# Helper for update_host_roles
sub get_host_roles {
my ( $self, $want_hosts ) = @_;
INFO "Scanning roles for " . ( scalar @$want_hosts ) . " hosts [get_host_roles]";
my $c = $self->rocl;
my $return_me = {};
# Fetch roles for all $want_hosts
my $want_hosts_iter = natatime( 200, @$want_hosts );
while( my @want_hosts_part = $want_hosts_iter->() ) {
INFO "Scanning batch of " . scalar @want_hosts_part;
my $hosts = $c->hostname( \@want_hosts_part, "roles", lax => 1 );
# roles_client may return undef if none of the hostnames are found
if( !defined $hosts ) {
WARN
"roles_client returned undef, which may mean that an entire batch of nodes was not in any roles. Leaving their cached records (if any) unchanged.";
$hosts = [];
}
# Update host -> role mappings for anything RolesDB told us about
foreach my $host ( @$hosts ) {
my @host_roles = map { lc "$_->{ns}.$_->{name}" } @{ $host->{'roles'} };
$return_me->{ $host->{'name'} } = \@host_roles;
}
}
return $return_me;
}
Jump to Line
Something went wrong with that request. Please try again.