Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Better connection pooling for SQLTaps #1352

Closed
qbatten opened this issue Jan 26, 2023 · 6 comments
Closed

[Bug]: Better connection pooling for SQLTaps #1352

qbatten opened this issue Jan 26, 2023 · 6 comments
Labels
kind/Bug Something isn't working valuestream/SDK

Comments

@qbatten
Copy link
Contributor

qbatten commented Jan 26, 2023

Singer SDK Version

0.18.0

Python Version

3.6 (deprecated)

Bug scope

Taps (catalog, state, stream maps, etc.)

Operating System

N/A

Description

Right now, SQLTaps are doing some non-optimal stuff with their connections. This is causing issues with tap-postgres (See issue 6). The basic issue is too many connections are being opened and they're not being closed, nor are they being. I think there are several good options to solve this.

There's some previous discussion of this problem on tap-postgres issue 6. There's also some important notes on why things currently are the way they are on an older SDK issue, #21. I'd love to hear from Meltano team about whether any of the issues noted in 21 have changed or whether the state of things is pretty similar to what's described there.

Notes on an ideal end state

  • A single tap manages all its connections together in one pool (this can be done by the tap having a single Engine; or by using several engines and a QueuePool they all share)
  • SQLAlchemy recommends keeping the Engine at a super high level: "the Engine is most efficient when created just once at the module level of an application, not per-object or per-function call." (src). My uneducated gut guess is that one Engine should exist per Tap instance, although that mighttt conflict with other aspects of the way the Tap>Stream>Connector objects are designed so curious to hear what Meltano team thinks.
  • Connections should be closed when they're finished; either by opening w a context manager (ideal) or some other method.
  • I know the Meltano team has spent a lot of time thinking about the right way for Taps, Streams, and Connectors to relate to each other. We ofc can't mess with any of that. I'm missing a lot of that context, so hope to hear from them about those requirements (Connection pooling at the Tap level (esp. Database-type streams) #21 has some of this, just not sure whether its all up to date).

Summary of problems, as I see it

(These all overlap / are closely related)

  • A new Engine is being created ~every time a new connection is created (this is called whenever a SQLConnector connects for the first time, and only the connection is held onto as an attr, not the Engine).
  • The connection, and the associated SQLAlchemy Engine, is attached to the SQLConnector, so each SQLConnector (and therefore each stream) is opening its own separate Engine that doesn't share any resources (e.g. a Pool) with the other Engines created by the Tap
  • Connections aren't being closed or released after they're opened.

Solutions & Improvements

Items 1, 2, and 3 are problem to be solved; the sub-sections are possible solutions to choose between for that problem

1. Connection pooling

These solutions are somewhat separate from the issue of just making sure connections get closed. Most of these will make that a lot easier though!

A. SQLTap manages the Engine

  • SQLTap has an engine attr. That gets passed to it's streams in discover_streams, or whenever they're initialized, and then on from streams to connectors when the stream initializes the connector.
  • Streams would optionally accept an Engine and just pass it to the Connector on init if there is an engine attached to the stream
  • Connectors, similarly, optionally accept an Engine, but also retain all their own engine creation & handling logic.

The engine could be created on Tap init, or treated the way _connection on SQLConnector is rn. Creating an Engine alone does not result in any interaction w the DB ("The create_engine() call itself does not establish any actual DBAPI connections directly." (src).

I think this hews most closely to SQLAlchemy's suggested usage.

B. SQLTap has a Pool that it passes down to streams & connectors

This is basically the same as above, but instead of an Engine, it's a QueuePool. This would also serve to manage the connection pool across all streams etc. Notttt sure about pros & cons vs option 1 here, except that SQLAlchemy says engines should be instantiated for a module if possible, so seems like the higher up we can push the Engine, the better (that makes me lean towards 1)

2. Hold onto the Engine

Regardless of whether any of the solutions to problem 1 get adopted, I think that at minimum, SQLConnector should create an engine on init and hold onto its Engine as an attr. Then, create_sqlalchemy_connection should just make a connection using the existing engine.

3. Closing connections when they're not being used

A. Use a context manager

If we make the Engine more broadly available, we can just do sqlalchemy's recommended strategy for using a connection, which is as a context manager: with engine.connect() as connection:

B. Just close em

Wherever a connection gets open, just remember to call connection.close()

C. Yield

Not sure how crazy this is, but if we're attached to the structure of create_sqlalchemy_connection returning a connection, we could yield here instead of return and then cleanup after the yield. That means less restructuring, although there might be other implications of this change that I'm not aware of.

@qbatten qbatten added kind/Bug Something isn't working valuestream/SDK labels Jan 26, 2023
@edgarrmondragon
Copy link
Collaborator

My preference is for the design in 1 (A and B may just be implementation details). I think more generally we need to centralize connection pooling (SQL, HTTP, etc.) instead of having the stream classes re-instantiate the connections/sessions on their own. You correctly pointed to #21.

This will be necessary for log-replication AFAICT, since a central connection would be in charge of yielding all the records from different streams.

@qbatten
Copy link
Contributor Author

qbatten commented Jan 26, 2023

Cool! Re: handling connection pooling for both HTTP and SQL, Does it make sense to handle those separately in their respective subclasses of Tap, Stream, and Connector? (e.g. I dont think there's a Tap subclass for HTTP right now.) In other words, should I proceed with adding an Engine attr to SQLTap, or do you want to do a more general solution that'd apply to SQL and other taps at the base Tap class level?

@qbatten
Copy link
Contributor Author

qbatten commented Jan 26, 2023

Ok, spent some time pairing on this today with @teej (thank you!) He had some awesome insights which clarified for me what's going on here and he offered some nice ways to resolve the issue. Will just summarize/interpret his thoughts here, with some minor editorializing on my part.

Seems like the underlying problem here is that the lifecycle of the connection is not being managed. It's not really connection pooling that's the main issue (that's a secondary issue), it's that each Stream assumes that it has an ongoing, open database connection, and there's no management of when connections are opened or closed. That might be fine with e.g. SQLite, but it doesn't work with Postgres.

The first fix here is probably to make one of these objects fully own the connection, and be responsible for the full lifecycle of each connection, especially making sure that a single connection is opened and then closed for each unit of work. I think that SQLConnector is probably the right object to be doing that. This would mean two significant changes:

  1. Make a SQLConnector (private?) method that can be used as a context manager, and to use that every single time a connection is opened. (An alternative is just to make sure that any connection opened gets closed as soon as it's done, but I think that's not as nice & is more prone to error.)
  2. Only the SQLConnector can use the connection. No other objects should access or be aware of the SQLConnector's connection; and anything that other objects ask the SQLConnector to do that involve interaction w the database should be accessible via a SQLConnector method that correctly opens and closes the connection for that db interaction.
    • The main place I see where the connection being directly accessed by another obj is here in SQLStream. I think a simple sol'n here would be to have a new method on SQLConnector that returns a generator of results, and SQLStream.get_records grabs that iterator and iterates through it, something along those lines.

Any thoughts, corrections, additions? Does that seem like a good direction to head in?

@kgpayne
Copy link
Contributor

kgpayne commented Jan 27, 2023

This discussion fits well with our pre-1.0 aspiration to refactor how connectors work in general 🙌 I have created an issue for the general case in #1355, which I think helps resolve this example too.

The TLDR; is that in some circumstances (like this one) it is appropriate to create a singleton Connector for use by Tap and Streams for all interactions with the remote system (to manage auth, connections, throttling/backoff, request limit quotas etc.). Many of these concerns are difficult to manage if each Stream has its own helper methods for those functions. Therefore, we can resolve these issues by centralising all interactions onto a Connector (as described above) and then allowing the developer to decide if a singleton or multiple instances are passed down to individual streams. I offered a code example in #1355, but here it is again for illustration:

from singer_sdk import RESTStream, Tap
from singer_sdk.connector import Connector, RESTConnector

from .streams import UsersStream, GroupsStream

STREAM_TYPES = [
    UsersStream,
    GroupsStream,
]

class ExampleTap(Tap):
        _connector = None

    def get_connector(self) -> Connector:
        """Get a configured connector for this Tap.
        
        Connector returned can either be a singleton (one instance used by the Tap and Streams)
        or a new instance on every call, according to the particular use case and remote system.
        """
        # create a singleton connector on first access
        if not self._connector:
            self._connector = RESTConnector(
                **self.config
            )
        return self._connector

    def discover_streams(self) -> List[Stream]:
        """Return a list of discovered streams.

        This method could optionally make use of `self.get_connector()` to make requests to
        the remote system to discover streams.
        """
        return [stream_class(tap=self, connector=self.get_connector()) for stream_class in STREAM_TYPES]

Needless to say, each stream would then use self.connector to access methods for submitting requests 🙂

@qbatten
Copy link
Contributor Author

qbatten commented Jan 27, 2023

@kgpayne Ahh I like that! Thank you. Will proceed that way.

@qbatten
Copy link
Contributor Author

qbatten commented Feb 14, 2023

Resolved by #1394, and another sub-component of this issue in tap-postgres by MeltanoLabs/tap-postgres#22

@qbatten qbatten closed this as completed Feb 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/Bug Something isn't working valuestream/SDK
Projects
None yet
Development

No branches or pull requests

3 participants