Skip to content

Commit

Permalink
add delete_p, insert_p, query_p, select_p and update_p methods to Moj…
Browse files Browse the repository at this point in the history
…o::Pg::Database
  • Loading branch information
kraih committed Nov 2, 2017
1 parent 66cae12 commit 288faf0
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 16 deletions.
4 changes: 3 additions & 1 deletion Changes
@@ -1,5 +1,7 @@

4.02 2017-07-21
4.02 2017-11-02
- Added delete_p, insert_p, query_p, select_p and update_p methods to
Mojo::Pg::Database.

4.01 2017-07-20
- Decreased default max_connections from 5 to 1 in Mojo::Pg.
Expand Down
2 changes: 1 addition & 1 deletion Makefile.PL
Expand Up @@ -30,6 +30,6 @@ WriteMakefile(
},
},
PREREQ_PM =>
{'DBD::Pg' => 3.005001, Mojolicious => '7.32', 'SQL::Abstract' => '1.81'},
{'DBD::Pg' => 3.005001, Mojolicious => '7.51', 'SQL::Abstract' => '1.81'},
test => {TESTS => 't/*.t t/*/*.t'}
);
25 changes: 11 additions & 14 deletions lib/Mojo/Pg.pm
Expand Up @@ -183,20 +183,17 @@ Mojo::Pg - Mojolicious ♥ PostgreSQL
});
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
# Concurrent non-blocking queries (synchronized with a delay)
Mojo::IOLoop->delay(
sub {
my $delay = shift;
$pg->db->query('select now() as now' => $delay->begin);
$pg->db->query('select * from names' => $delay->begin);
},
sub {
my ($delay, $time_err, $time, $names_err, $names) = @_;
if (my $err = $time_err || $names_err) { die $err }
say $time->hash->{now};
say $_->{name} for $names->hashes->each;
}
)->wait;
# Concurrent non-blocking queries (synchronized with promises)
my $now = $pg->db->query_p('select now() as now');
my $names = $pg->db->query_p('select * from names');
$now->all($names)->then(sub {
my ($now, $names) = @_;
say $now->[0]->hash->{now};
say $_->{name} for $names->[0]->hashes->each;
})->catch(sub {
my $err = shift;
warn "Something went wrong: $err";
})->wait;
# Send and receive notifications non-blocking
$pg->pubsub->listen(foo => sub {
Expand Down
92 changes: 92 additions & 0 deletions lib/Mojo/Pg/Database.pm
Expand Up @@ -18,6 +18,10 @@ for my $name (qw(delete insert select update)) {
my ($self, @cb) = (shift, ref $_[-1] eq 'CODE' ? pop : ());
return $self->query($self->pg->abstract->$name(@_), @cb);
};
monkey_patch __PACKAGE__, "${name}_p", sub {
my $self = shift;
return $self->query_p($self->pg->abstract->$name(@_));
};
}

sub DESTROY {
Expand Down Expand Up @@ -109,6 +113,14 @@ sub query {
$self->_watch;
}

sub query_p {
my $self = shift;
my $promise = Mojo::IOLoop->delay;
$self->query(
@_ => sub { $_[1] ? $promise->reject($_[1]) : $promise->resolve($_[2]) });
return $promise;
}

sub tables {
my @tables = shift->dbh->tables('', '', '', '');
return [grep { $_ !~ /^(?:pg_catalog|information_schema)\./ } @tables];
Expand Down Expand Up @@ -290,6 +302,22 @@ L<SQL::Abstract>.
# "delete from some_table where foo = 'bar' returning id"
$db->delete('some_table', {foo => 'bar'}, {returning => 'id'});
=head2 delete_p
my $promise = $db->delete_p($table, \%where, \%options);
Same as L</"delete">, but performs all operations non-blocking and returns a
L<Mojo::IOLoop::Delay> object to be used as a promise instead of accepting a
callback.
$db->delete_p('some_table')->then(sub {
my $results = shift;
...
})->catch(sub {
my $err = shift;
...
})->wait;
=head2 disconnect
$db->disconnect;
Expand Down Expand Up @@ -336,6 +364,22 @@ L<SQL::Abstract>.
# "insert into some_table (foo) values ('bar') returning id, foo"
$db->insert('some_table', {foo => 'bar'}, {returning => ['id', 'foo']});
=head2 insert_p
my $promise = $db->insert_p($table, \@values || \%fieldvals, \%options);
Same as L</"insert">, but performs all operations non-blocking and returns a
L<Mojo::IOLoop::Delay> object to be used as a promise instead of accepting a
callback.
$db->insert_p(some_table => {foo => 'bar'})->then(sub {
my $results = shift;
...
})->catch(sub {
my $err = shift;
...
})->wait;
=head2 is_listening
my $bool = $db->is_listening;
Expand Down Expand Up @@ -403,6 +447,22 @@ used to bind specific L<DBD::Pg> data types to placeholders.
use DBD::Pg ':pg_types';
$db->query('insert into bar values (?)', {type => PG_BYTEA, value => $bytes});
=head2 query_p
my $promise = $db->query_p('select * from foo');
Same as L</"query">, but performs all operations non-blocking and returns a
L<Mojo::IOLoop::Delay> object to be used as a promise instead of accepting a
callback.
$db->query_p('insert into foo values (?, ?, ?)' => @values)->then(sub {
my $results = shift;
...
})->catch(sub {
my $err = shift;
...
})->wait;
=head2 select
my $results = $db->select($source, $fields, $where, $order);
Expand Down Expand Up @@ -435,6 +495,22 @@ L<SQL::Abstract>.
# "select * from some_table where foo like '%test%'"
$db->select('some_table', undef, {foo => {-like => '%test%'}});
=head2 select_p
my $promise = $db->select_p($source, $fields, $where, $order);
Same as L</"select">, but performs all operations non-blocking and returns a
L<Mojo::IOLoop::Delay> object to be used as a promise instead of accepting a
callback.
$db->select_p(some_table => ['foo'] => {bar => 'yada'})->then(sub {
my $results = shift;
...
})->catch(sub {
my $err = shift;
...
})->wait;
=head2 tables
my $tables = $db->tables;
Expand Down Expand Up @@ -481,6 +557,22 @@ L<SQL::Abstract>.
# "update some_table set foo = 'bar' where id = 23 returning id"
$db->update('some_table', {foo => 'bar'}, {id => 23}, {returning => 'id'});
=head2 update_p
my $promise = $db->update_p($table, \%fieldvals, \%where, \%options);
Same as L</"update">, but performs all operations non-blocking and returns a
L<Mojo::IOLoop::Delay> object to be used as a promise instead of accepting a
callback.
$db->update_p(some_table => {foo => 'baz'} => {foo => 'bar'})->then(sub {
my $results = shift;
...
})->catch(sub {
my $err = shift;
...
})->wait;
=head1 SEE ALSO
L<Mojo::Pg>, L<Mojolicious::Guides>, L<http://mojolicious.org>.
Expand Down
37 changes: 37 additions & 0 deletions t/crud.t
Expand Up @@ -95,6 +95,43 @@ $db->update('crud_test3', {names => ['foo', 'bar', 'baz', 'yada']}, {id => 1});
is_deeply $db->select('crud_test3')->hashes->to_array,
[{id => 1, names => ['foo', 'bar', 'baz', 'yada']}], 'right structure';

# Promises
$result = undef;
$pg->db->insert_p('crud_test', {name => 'promise'}, {returning => '*'})
->then(sub { $result = shift->hash })->wait;
is $result->{name}, 'promise', 'right result';
$result = undef;
$db->select_p('crud_test', '*', {name => 'promise'})
->then(sub { $result = shift->hash })->wait;
is $result->{name}, 'promise', 'right result';
$result = undef;
my $first = $pg->db->query_p("select * from crud_test where name = 'promise'");
my $second = $pg->db->query_p("select * from crud_test where name = 'promise'");
$first->all($second)->then(
sub {
my ($first, $second) = @_;
$result = [$first->[0]->hash, $second->[0]->hash];
}
)->wait;
is $result->[0]{name}, 'promise', 'right result';
is $result->[1]{name}, 'promise', 'right result';
$result = undef;
$db->update_p(
'crud_test',
{name => 'promise_two'},
{name => 'promise'},
{returning => '*'}
)->then(sub { $result = shift->hash })->wait;
is $result->{name}, 'promise_two', 'right result';
$db->delete_p('crud_test', {name => 'promise_two'}, {returning => '*'})
->then(sub { $result = shift->hash })->wait;
is $result->{name}, 'promise_two', 'right result';

# Promises (rejected)
my $fail;
$db->dollar_only->query_p('does_not_exist')->catch(sub { $fail = shift })->wait;
like $fail, qr/does_not_exist/, 'right error';

# Clean up once we are done
$pg->db->query('drop schema mojo_crud_test cascade');

Expand Down

0 comments on commit 288faf0

Please sign in to comment.