Skip to content
This repository has been archived by the owner on Dec 22, 2021. It is now read-only.

Commit

Permalink
PERL-1012 Clear session pool on reconnect after fork/thread
Browse files Browse the repository at this point in the history
  • Loading branch information
xdg committed Oct 22, 2018
1 parent 78b1540 commit 72f169f
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 7 deletions.
5 changes: 3 additions & 2 deletions lib/MongoDB/MongoClient.pm
Original file line number Diff line number Diff line change
Expand Up @@ -1421,14 +1421,15 @@ sub disconnect {
$client->reconnect;
This method closes all connections to the server, as if L</disconnect> were
called, and then immediately reconnects. Use this after forking or spawning
off a new thread.
called, and then immediately reconnects. It also clears the session
cache. Use this after forking or spawning off a new thread.
=cut

sub reconnect {
my ($self) = @_;
$self->_topology->close_all_links;
$self->_server_session_pool->reset_pool;
$self->_topology->scan_all_servers(1);
return 1;
}
Expand Down
10 changes: 10 additions & 0 deletions lib/MongoDB/_ServerSession.pm
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ has transaction_id => (
default => sub { Math::BigInt->new('0') },
);

# pool_epoch
#
# tracks which pool the session came from; sessions won't be checked into
# a newer pool

has pool_epoch => (
is => 'ro',
default => -1,
);

# update_last_use
#
# $server_session->update_last_use;
Expand Down
19 changes: 18 additions & 1 deletion lib/MongoDB/_SessionPool.pm
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@ has _server_session_pool => (
is => 'lazy',
isa => ArrayRef[InstanceOf['MongoDB::_ServerSession']],
init_arg => undef,
clearer => 1,
builder => sub { [] },
);

has _pool_epoch => (
is => 'rwp',
init_arg => undef,
default => 0,
);

# Returns a L<MongoDB::ServerSession> that was at least one minute remaining
# before session times out. Returns undef if no sessions available.
Expand All @@ -62,7 +68,7 @@ sub get_server_session {
return $session;
}
}
return MongoDB::_ServerSession->new;
return MongoDB::_ServerSession->new( pool_epoch => $self->_pool_epoch );
}

# Place a session back into the pool for use. Will check that there is at least
Expand All @@ -75,6 +81,8 @@ sub get_server_session {
sub retire_server_session {
my ( $self, $server_session ) = @_;

return if $server_session->pool_epoch != $self->_pool_epoch;

my $session_timeout = $self->topology->logical_session_timeout_minutes;

# Expire old sessions from back of queue
Expand Down Expand Up @@ -112,6 +120,15 @@ sub end_all_sessions {
}
}

# When reconnecting a client after a fork, we need to clear the pool
# without ending sessions with the server and increment the pool epoch
# so existing sessions aren't checked back in.
sub reset_pool {
my ( $self ) = @_;
$self->_clear_server_session_pool;
$self->_set__pool_epoch( $self->_pool_epoch + 1 );
}

sub DEMOLISH {
my ( $self, $in_global_destruction ) = @_;

Expand Down
10 changes: 6 additions & 4 deletions t/threads/basic.t
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ $col->drop;
}

{
$col->drop;
my ($n_threads, $n_inserts) = $ENV{AUTOMATED_TESTING} ? (10,1000) : (5, 100);
note "inserting $n_inserts items each in $n_threads threads";
my @threads = map {
Expand All @@ -66,17 +67,18 @@ $col->drop;
my @ids = map { $col->insert_one({ foo => threads->self->tid })->inserted_id } 1 .. $n_inserts;
# reading our writes while still in the thread should ensure
# inserts are globally visible before the thread exits
$col->find_one({ _id => $_}) for @ids;
my @docs = map { $col->find_one({ _id => $_}) } @ids;
return @ids;
})
} 1 .. $n_threads;

my @vals = map { ( $_->tid ) x $n_inserts } @threads;
my @ids = map { $_->join } @threads;

my $expected = scalar @ids;
is scalar keys %{ { map { ($_ => undef) } @ids } }, $expected,
"we got $expected unique OIDs";
my $expected_n = $n_threads * $n_inserts;
is( $col->count_documents({}), $expected_n, "expected number of docs inserted" );
is( scalar(@ids), $expected_n, "expected number of ids returned" );
is( scalar keys %{ { map { ($_ => undef) } @ids } }, $expected_n, "ids are unique" );

is_deeply(
[map { $col->find_one({ _id => $_ })->{foo} } @ids],
Expand Down
84 changes: 84 additions & 0 deletions t/threads/sessions.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Copyright 2010 - present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

use strict;
use warnings;
use Config;
use if $Config{usethreads}, 'threads';
use Test::More;

BEGIN { plan skip_all => 'requires threads' unless $Config{usethreads} }

BEGIN { plan skip_all => 'threads not supported before Perl 5.8.5' unless $] ge "5.008005" }

BEGIN { plan skip_all => 'threads tests flaky on older Windows Perls' if $^O eq "MSWin32" && $] lt "5.020000" }

use MongoDB;
use Try::Tiny;

use lib "t/lib";
use MongoDBTest qw/skip_unless_mongod skip_unless_sessions build_client get_test_db/;

skip_unless_mongod();
skip_unless_sessions();

my $client = build_client();
my $testdb = get_test_db($client);

# Session test #11: test that pool can be cleared

subtest "clear the session pool" => sub {
my $session = $client->start_session;
my $lsid = $session->session_id->{id};
$session->end_session;

threads->create(
sub {
my $session = shift;
$client->reconnect;
my $session2 = $client->start_session;
isnt( $session2->session_id->{id}, $lsid, "child got new session id" );
},
$session
)->join;

my $session2 = $client->start_session;
is( $session2->session_id->{id}, $lsid, "parent cached session id" );
};

# Session test #12: test that pool won't accept old sessions after reset.
# This is just like #11 except the initial session is ended in both the
# parent *and* the child thread.

subtest "pool has epochs" => sub {
my $session = $client->start_session;
my $lsid = $session->session_id->{id};

threads->create(
sub {
my $session = shift;
$client->reconnect;
$session->end_session;
my $session2 = $client->start_session;
isnt( $session2->session_id->{id}, $lsid, "child got new session id" );
},
$session
)->join;

$session->end_session;
my $session2 = $client->start_session;
is( $session2->session_id->{id}, $lsid, "parent cached session id" );
};

done_testing();

0 comments on commit 72f169f

Please sign in to comment.