Permalink
Browse files

Handle simultaneous inserts of addresses by concurrent sessions.

Use a retry strategy with a plpgsql exception handler for PK violations,
or the INSERT... ON CONFLICT.. construct with postgres 9.5 and newer.
  • Loading branch information...
manitou-mail committed Jul 16, 2016
1 parent 7af8017 commit 76a860ed53fa713a3f75d781c7faacf34277b91a
Showing with 65 additions and 9 deletions.
  1. +65 −9 script/manitou-mdx
View
@@ -358,9 +358,35 @@ sub min {
sub db_reconnect {
db_connect();
+ create_db_temp_functions();
notice_log("Successful database reconnect");
}
+sub create_db_temp_functions {
+ # Insert a new email address, handling a potential concurrent session that
+ # has inserted the same address and not yet committed.
+ # Called only with PG 9.4 and older, otherwise INSERT...ON CONFLICT is used.
+ my $r = $dbh->do(q{
+ CREATE FUNCTION pg_temp.insert_address(in_email text, in_name text) returns integer as $$
+ DECLARE
+ v_addr_id int;
+ BEGIN
+ BEGIN
+ INSERT INTO addresses(addr_id,email_addr,name,last_recv_from,nb_recv_from)
+ VALUES(nextval('seq_addr_id'), in_email, in_name, now(), 1)
+ RETURNING addr_id
+ INTO v_addr_id;
+
+ EXCEPTION WHEN unique_violation THEN
+ SELECT addr_id FROM addresses WHERE email_addr=in_email
+ INTO v_addr_id;
+ END;
+
+ RETURN v_addr_id;
+ END $$ language plpgsql;
+ });
+
+}
# Extract one message from a mailbox and copy it into a temporary file
# The first line may be ^From_ but it may also be the next line
@@ -505,6 +531,7 @@ sub main_multi {
}
}
+ create_db_temp_functions();
if (defined($mailbox_file)) {
my $mail_cnt=0;
@@ -1452,9 +1479,28 @@ sub insert_addresses {
# scenario.
my $do_update_addr_last = getconf_bool("update_addresses_last", $mbox_name);
- my $sth = $dbh->prepare("SELECT addr_id,recv_pri FROM addresses WHERE email_addr=?") or die $dbh->errstr;
+ my $sth = $dbh->prepare("SELECT addr_id,recv_pri FROM addresses WHERE email_addr=?")
+ or die $dbh->errstr;
- my $sth_insert_ad = $dbh->prepare("INSERT INTO addresses(addr_id,email_addr,name) VALUES(nextval('seq_addr_id'),?,?) RETURNING addr_id") or die $dbh->errstr;
+ my $ins_query;
+
+ if ($dbh->{pg_server_version} < 90500) {
+ # use plpgsql function for potentially concurrent inserts
+ $ins_query = "select pg_temp.insert_address(?,?)";
+ }
+ else {
+ # use the ON CONFLICT mechanism of PG 9.5+ to handle the case of an address
+ # inserted but not yet committed by another transaction
+ $ins_query = qq{
+ INSERT INTO addresses(addr_id,email_addr,name)
+ VALUES(nextval('seq_addr_id'), ?, ?)
+ ON CONFLICT (email_addr)
+ DO NOTHING
+ RETURNING addr_id
+ };
+ }
+
+ my $sth_insert_ad = $dbh->prepare($ins_query);
# Collect all addresses, for all address types (from,to,cc...)
for my $addrtype (keys %AddrTypes) {
@@ -1483,13 +1529,23 @@ sub insert_addresses {
# Insert the addresses following the alphabetical order of email to suppress the risk
# of deadlocks with other parallel inserts
for my $addr (sort { $a->{email} cmp $b->{email} } @haddr) {
- $sth->execute($addr->{email});
- my ($id,$addr_pri) = $sth->fetchrow_array;
- if (!$id) {
- $sth_insert_ad->execute(substr($addr->{email},0,300),
- substr($addr->{name},0,300));
- ($id) = $sth_insert_ad->fetchrow_array;
- }
+ my ($id,$addr_pri);
+ do { # retry inserts
+ $sth->execute($addr->{email});
+ ($id, $addr_pri) = $sth->fetchrow_array;
+ if (!$id) {
+ $sth_insert_ad->execute($addr->{email}, $addr->{name});
+ ($id) = $sth_insert_ad->fetchrow_array;
+ }
+ # if $id is not set, the row was not inserrted
+ # Either the other session blocking us from inserting
+ # has committed or rolled back. In both cases, we need
+ # to retry the select and insert.
+ # In the worst case, we loop until no other session blocks us,
+ # and we can either read the addr_id assigned to this address by
+ # another session, or having our own session insert it.
+ } while (!$id);
+
$addr->{addr_id} = $id;
$addr->{prio} = $addr_pri;
# update addresses.last_recv_from

0 comments on commit 76a860e

Please sign in to comment.