Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: mogilefs/MogileFS-Server
base: 2b41220812
...
head fork: mogilefs/MogileFS-Server
compare: 662f33cbc5
Checking mergeability… Don't worry, you can still create the pull request.
  • 4 commits
  • 4 files changed
  • 0 commit comments
  • 2 contributors
Commits on Feb 29, 2012
Eric Wong enable TCP keepalives for accepted sockets
Some clients may remain connected and idle for hours/days at a
time.  If their connections drop due to catastrophic failure and
the TCP implementation is unable to notify the server, the
server could eventually run out of file descriptors and/or
memory for new clients.

This change allows castastrophic network failures to be detected
after roughly 2 hours on most setups.  Sysadmins are free to
tweak the TCP keepalive knobs in sysctl or similar.
91623de
Eric Wong replicate: retry on short writes to destination
Neither write(2), sendto(2), nor any similar syscall is ever
guaranteed to have write-in-full behavior on sockets, so we need
to manually retry on unwritten portions.  This is difficult to
reproduce consistently (because the sockets are already in
blocking mode), but occasionally gets triggered on weak
networks.
57fadef
@dormando dormando note written bytes on replication PUT failure 215dd9e
@dormando dormando reduce UPDATE's to device table
each tracker would update all device rows every 15 seconds. For setups with
lots of trackers, lots of devices, or both, this could end up with a huge
number of UPDATE's per second.

Now it tries pretty hard to update each row at most once every 15 seconds, at
the expensive of one extra DB read per monitor cycle.
662f33c
View
3  lib/MogileFS/Server.pm
@@ -27,7 +27,7 @@ use Time::HiRes ();
use Net::Netmask;
use LWP::UserAgent;
use List::Util;
-use Socket ();
+use Socket qw(SO_KEEPALIVE);
use MogileFS::Util qw(daemonize);
use MogileFS::Sys;
@@ -130,6 +130,7 @@ sub run {
Reuse => 1,
Listen => 1024 )
or die "Error creating socket: $@\n";
+ $server->sockopt(SO_KEEPALIVE, 1);
# save sub to accept a client
push @servers, $server;
View
34 lib/MogileFS/Worker/Monitor.pm
@@ -4,7 +4,6 @@ use warnings;
use base 'MogileFS::Worker';
use fields (
- 'last_db_update', # devid -> time. update db less often than poll interval.
'last_test_write', # devid -> time. time we last tried writing to a device.
'skip_host', # hostid -> 1 if already noted dead (reset every loop)
'seen_hosts', # IP -> 1 (reset every loop)
@@ -29,7 +28,6 @@ sub new {
my $self = fields::new($class);
$self->SUPER::new($psock);
- $self->{last_db_update} = {};
$self->{last_test_write} = {};
$self->{iow} = MogileFS::IOStatWatcher->new;
$self->{prev_data} = { domain => {}, class => {}, host => {},
@@ -75,6 +73,14 @@ sub usage_refresh {
debug("Monitor running; scanning usage files");
my $have_dbh = $self->validate_dbh;
+ my $updateable_devices;
+
+ # See if we should be allowed to update the device table rows.
+ if ($have_dbh && Mgd::get_store()->get_lock('mgfs:device_update', 0)) {
+ # Fetch the freshlist list of entries, to avoid excessive writes.
+ $updateable_devices = { map { $_->{devid} => $_ }
+ Mgd::get_store()->get_all_devices };
+ }
$self->{skip_host} = {}; # hostid -> 1 if already noted dead.
$self->{seen_hosts} = {}; # IP -> 1
@@ -89,11 +95,16 @@ sub usage_refresh {
}
$cur_iow->{$dev->id} = $self->{devutil}->{cur}->{$dev->id};
next if $self->{skip_host}{$dev->hostid};
- $self->check_device($dev, $have_dbh) if $dev->dstate->should_monitor;
+ $self->check_device($dev, $have_dbh, $updateable_devices)
+ if $dev->dstate->should_monitor;
$self->still_alive; # Ping parent if needed so we don't time out
# given lots of devices.
}
+ if ($have_dbh) {
+ Mgd::get_store()->release_lock('mgfs:device_update');
+ }
+
$self->{devutil}->{prev} = $cur_iow;
# Set the IOWatcher hosts (once old monitor code has been disabled)
@@ -304,7 +315,7 @@ sub ua {
}
sub check_device {
- my ($self, $dev, $have_dbh) = @_;
+ my ($self, $dev, $have_dbh, $updateable_devices) = @_;
my $devid = $dev->id;
my $host = $dev->host;
@@ -367,14 +378,15 @@ sub check_device {
}
# only update database every ~15 seconds per device
- my $last_update = $self->{last_db_update}{$dev->id} || 0;
- my $next_update = $last_update + UPDATE_DB_EVERY;
my $now = time();
- if ($now >= $next_update && $have_dbh) {
- Mgd::get_store()->update_device_usage(mb_total => int($total / 1024),
- mb_used => int($used / 1024),
- devid => $devid);
- $self->{last_db_update}{$devid} = $now;
+ if ($have_dbh && $updateable_devices) {
+ my $devrow = $updateable_devices->{$devid};
+ my $last = ($devrow && $devrow->{mb_asof}) ? $devrow->{mb_asof} : 0;
+ if ($last + UPDATE_DB_EVERY < $now) {
+ Mgd::get_store()->update_device_usage(mb_total => int($total / 1024),
+ mb_used => int($used / 1024),
+ devid => $devid);
+ }
}
# next if we're not going to try this now
View
19 lib/MogileFS/Worker/Replicate.pm
@@ -643,11 +643,20 @@ sub http_copy {
$remain -= $bytes;
$bytes_to_read = $remain if $remain < $bytes_to_read;
- my $wbytes = $dsock->send($data);
- $written += $wbytes;
- return $dest_error->("Error: wrote $wbytes; expected to write $bytes; failed putting to $dpath")
- unless $wbytes == $bytes;
- $intercopy_cb->();
+ my $data_len = $bytes;
+ my $data_off = 0;
+ while (1) {
+ my $wbytes = syswrite($dsock, $data, $data_len, $data_off);
+ unless (defined $wbytes) {
+ return $dest_error->("Error: syswrite failed after $written bytes with: $!; failed putting to $dpath");
+ }
+ $written += $wbytes;
+ $intercopy_cb->();
+ last if ($data_len == $wbytes);
+
+ $data_len -= $wbytes;
+ $data_off += $wbytes;
+ }
die if $bytes_to_read < 0;
next if $bytes_to_read;
View
5 lib/Mogstored/SideChannelListener.pm
@@ -2,6 +2,7 @@ package Mogstored::SideChannelListener;
use strict;
use base 'Perlbal::TCPListener';
use Mogstored::SideChannelClient;
+use Socket qw(SO_KEEPALIVE);
sub new {
my ($class, $hostport) = @_;
@@ -9,7 +10,9 @@ sub new {
# exploding/warning. so we created this stub service above in our static
# config, just for this.
my $svc = Perlbal->service("mgmt") or die "Where is mgmt service?";
- return $class->SUPER::new($hostport, $svc);
+ my $self = $class->SUPER::new($hostport, $svc);
+ $self->{sock}->sockopt(SO_KEEPALIVE, 1);
+ return $self;
}
sub event_read {

No commit comments for this range

Something went wrong with that request. Please try again.