Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgxpool support for LISTEN #1121

Open
Jille opened this issue Dec 6, 2021 · 15 comments
Open

pgxpool support for LISTEN #1121

Jille opened this issue Dec 6, 2021 · 15 comments

Comments

@Jille
Copy link
Contributor

Jille commented Dec 6, 2021

I want to use LISTEN together with pgxpool. pgx has WaitForNotification() and pgconn has Config.OnNotification.

I'd like to use one connection of a pool for having a LISTEN open, if that connection dies another one should run the LISTEN command. We probably need a callback to the application that it's possible notifications were missed in that gap.

A caveat we should document is that one has to be careful because notifications are not delivered during transactions, so if the application uses the pool for long transactions notifications might be delayed.

Alternatively I could build this myself using AfterConnect, but we'd need a BeforeDisconnect or AfterDisconnect to start the LISTEN on another connection.

Without either option, we'd have to set up the listening on every connection in the pool and do deduplication, but I guess the pool doesn't guarantee there's always at least one connection open.

@jackc
Copy link
Owner

jackc commented Dec 10, 2021

In my own applications I handle listen by acquiring a dedicated connection from the pool and never returning it. Here is code similar to what I use for background email sending.

// ListenAndSend listens for PostgreSQL notifications of new outbound emails and sends them. Errors are logged but do
// not terminate the method. The only way to terminate this method is via context cancellation.
func (es *EmailSender) ListenAndSend(ctx context.Context) {
	for {
		es.SendQueuedEmails(ctx) // send any emails queued prior to listening for notificiations
		es.listenAndSendOneConn(ctx)

		select {
		case <-ctx.Done():
			return
		default:
			// If listenAndSendOneConn returned and ctx has not been cancelled that means there was a fatal database error.
			// Wait a while to avoid busy-looping while the database is unreachable.
			time.Sleep(time.Minute)
		}
	}
}

func (es *EmailSender) listenAndSendOneConn(ctx context.Context) {
	conn, err := es.db.Acquire(ctx)
	if err != nil {
		if !pgconn.Timeout(err) {
			current.Logger(ctx).Err(err).Msg("failed to acquire database connection to listen for queued emails")
		}
		return
	}
	defer conn.Release()

	_, err = conn.Exec(ctx, "listen outbound_email_queued")
	if err != nil {
		if !pgconn.Timeout(err) {
			current.Logger(ctx).Err(err).Msg("failed to listen to outbound_email_queued")
		}
		return
	}

	for {
		notification, err := conn.Conn().WaitForNotification(ctx)
		if err != nil {
			if !pgconn.Timeout(err) {
				current.Logger(ctx).Err(err).Msg("failed while waiting for notification of outbound_email_queued")
			}
			return
		}

		// Parse notification and send email.
	}
}

IMO this style is simpler / more direct than dealing with an abstraction built on pool.

@Jille
Copy link
Contributor Author

Jille commented Dec 14, 2021

(Your code snippet isn't unlistening before returning the connection to the pool, causing a memory leak -- not sure if an error can occur that doesn't cause the connection to be closed though)

Definitely simpler, but I was hoping not to use up an extra connection for only listening.

@jackc
Copy link
Owner

jackc commented Dec 14, 2021

(Your code snippet isn't unlistening before returning the connection to the pool, causing a memory leak -- not sure if an error can occur that doesn't cause the connection to be closed though)

I don't think any non-fatal errors are possible, but I guess explicitly closing the connection wouldn't hurt.

Definitely simpler, but I was hoping not to use up an extra connection for only listening.

I don't think the additional complexity is worth saving a single connection. As you mentioned in your original post there are a number of sticky edge cases -- some of which I don't think are actually solvable.

@EmilLaursen
Copy link
Contributor

EmilLaursen commented Dec 16, 2021

Is it expected that the WaitForNotification receives an unexpected EOF error exactly every hour? Looking at pgxpool, I see that the default connection lifetime is 1 hour. The error makes sense if the underlying connection is closed. However, this also happens with a single connection (using pgx.ConnectConfig), where I can not find a default connection lifetime.

I would prefer a connection that stays open for the duration of the application, modulo any fatal errors of cause.

@jackc
Copy link
Owner

jackc commented Dec 18, 2021

pgxpool only will close connections that are in the pool. It will never close a connection that is acquired. And there is no connection lifetime for a standalone connection. My guess is that something in the network is closing idle or long lived connections.

@G2G2G2G
Copy link

G2G2G2G commented Oct 18, 2022

@jackc in that code is it safe if you get sent a bunch of messages at the same time, all will go through?
I see it eventually gets to a channel in pgconn.go, so I assume that is the case.. that they would queue and be handled?
in my receiving switch I execute the handling all concurrently so the switch should free up pretty fast, but just want to be sure since I'm a dummy
thanks

@jackc
Copy link
Owner

jackc commented Oct 18, 2022

Yes, it will buffer the messages until they are handled.

@nkreiger
Copy link

nkreiger commented Jan 9, 2023

@jackc Any chance you know how to flush notifications that were missed? Say my service crashes, I missed 10 notifications. If I restart it starts waiting for new notifications, but it looks like it leaves the missed ones in the channel.

If I use a tool like psql it picks them up still.

@jackc
Copy link
Owner

jackc commented Jan 14, 2023

@nkreiger

Any chance you know how to flush notifications that were missed? Say my service crashes, I missed 10 notifications. If I restart it starts waiting for new notifications, but it looks like it leaves the missed ones in the channel.

If I use a tool like psql it picks them up still.

I don't know how this would work. The PostgreSQL server sends the notice to connected clients. If the service has crashed it isn't connected.

What I do is write jobs / messages to a table and send the notice. When a service starts it checks the table and handles anything previously recorded while it was disconnected.

@nkreiger
Copy link

@jackc maybe I'm wrong and need to check again. But after sending n+ messages, and starting PSQL in Listen, it picks up the previously sent messages that went before it was listening. I was hoping to do something like that on startup.

@jackc
Copy link
Owner

jackc commented Jan 25, 2023

maybe I'm wrong and need to check again. But after sending n+ messages, and starting PSQL in Listen, it picks up the previously sent messages that went before it was listening. I was hoping to do something like that on startup.

@nkreiger If you can do it I'd like to know how, but AFAIK it doesn't work that way.

@konart
Copy link

konart commented Mar 9, 2023

@jackc do I need to specify some additional options to pgxpool with this example?

I'm trying to listen to notifications from postgres (basically N instances of a service write to a table and postgres noitfies instances about any changes), however after some time (sometimes 15 minutes, sometimes 90 minutes...) I get two errors from WaitForNotification():

unexpected EOF
conn closed

(note that I have an endless loop there for testing right now, otherwise I'd returned after getting unexpected EOF)

I've checked everthing I possbile can on postgress side and it seems like the client is dropping the connection:

could not receive data from client: Connection reset by peer
closing because: client unexpected eof

PS: notify may be idle (as in no new messages) for any amount of time in theory

@jackc
Copy link
Owner

jackc commented Mar 9, 2023

do I need to specify some additional options to pgxpool with this example?

@konart I don't think so. Perhaps there is something on the network closing the connection?

@0xBradock
Copy link

0xBradock commented Jun 4, 2023

Hello,

I am having difficulties to understand the correct way to use your package for my case.
Any help is very much appreciated.

Context

I need to listen to notification on a few channels, so I will need a (*pgx.Conn) imported from "github.com/jackc/pgx/v5", listen on notifications with Exec then block with WaitForNotification. This can't be done with pgxpool (so far as I understood).
Then, I wish to launch a go routine to execute a query with a parameter from each notification. So, I will also need to use github.com/jackc/pgx/v5/pgxpool, to create a pool for the queries, Acquire a new connection, QueryRow (because I know that I will get only one row) then finally Release.

Questions

  1. Is it correct to use pgx.Conn for the notifications + pgxpool to handle the queries?
  2. During dev I am using AcquireCount to keep track of memory. Does the code below implements the correct way to manage the connection pool?
  3. Suppose the result from my query is complex, it is possible to get a response as a []byte instead of needing to create a struct that represent the data?

Code

I am omitting error handling for brevity.

import (
	"github.com/jackc/pgx/v5/pgconn"
	"github.com/jackc/pgx/v5/pgxpool"
)

type NotificationPayload struct {
	ID     string `json:"id"`
	Action string `json:"action"`
}

func main() {
	config := &Config{
		pgUrl:         "postgres://root:secret@localhost:5432/postgres",
		notifications: []string{"orders_notification", "contacts_notification"},
	}

	ctx := context.Background()

        // Pool for queries
        pool, _ := pgxpool.New(ctx, config.pgUrl)
	defer pool.Close()

        // mainConn for notifications
	mainConn, _ := pgx.Connect(ctx, config.pgUrl)
	defer mainConn.Close(ctx)

	for _, notificationName := range config.notifications {
		// 👇 listen to notifications (pgx)
		mainConn.Exec(ctx, fmt.Sprintf("listen %s", notificationName))
	}

	for {
		log.Debug("\nwaiting for notification")
		// 👇 synch (pgx)
		n, _ := mainConn.WaitForNotification(ctx)

		go func(n *pgconn.Notification) {
			// 👇 manage pool (pgxpool)
			poolConn, _ := pool.Acquire(ctx)
			handleNotification(ctx, poolConn, n)
			poolConn.Release()
		}(n)
	}
}

func handleNotification(ctx context.Context, conn *pgxpool.Conn, pgNotif *pgconn.Notification) {
	payload := &NotificationPayload{}
	json.Unmarshal([]byte(pgNotif.Payload), payload)

	// ✅ this works if I specify the column
	var result string
	_ := conn.QueryRow(ctx, "SELECT col_with_string_value FROM orders WHERE id=$1", payload.ID).Scan(&result)
	log.Infof("row: %s", row) // row: <string_value>

	// 🎯 this is what I would like to do
	var result []byte
	_ := conn.QueryRow(ctx, "SELECT * FROM orders WHERE id=$1", payload.ID).Scan(&result)
	log.Infof("row: %+v", result) // row: ...values from all columns
}

Thanks,

@jackc
Copy link
Owner

jackc commented Jun 5, 2023

@Kmelow

Your overall idea is correct. But here are a few suggestions / other possibilities.

Is it correct to use pgx.Conn for the notifications + pgxpool to handle the queries?

Yes, but mainConn could be acquired from the pool and then call https://pkg.go.dev/github.com/jackc/pgx/v5/pgxpool#Conn.Hijack instead of a separate Connect call. No real difference in functionality, but might be simpler.

During dev I am using AcquireCount to keep track of memory. Does the code below implements the correct way to manage the connection pool?

I don't understand exactly what you are trying to do with AcquireCount, but the pool management appears correct. But probably use a defer to release poolConn in case handleNotification panics.

Suppose the result from my query is complex, it is possible to get a response as a []byte instead of needing to create a struct that represent the data?

I suppose you could use RawValues on the Rows object. Or maybe row_to_json on the server side to bundle the results together. I suspect scanning to a struct would be cleaner though...


You might also take a look at https://github.com/jackc/pgxlisten for some ideas. I don't publicize it much because I don't want to imply there is any support or that the API won't change, but it is what I use in production for listen / notify.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants