Skip to content

Commit

Permalink
Update usage of SQLAlchemy to apply changes from upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolasramy committed Apr 26, 2024
1 parent b62869f commit 12a163d
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
9 changes: 5 additions & 4 deletions dagster_toolbox/resources/postgres_io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(
f"postgresql+psycopg2://"
f"{self.username}:{self.password}@{self.hostname}:{self.port}/{self.dbname}" # noqa: E501
)
self.connection = self.engine.connect()

def _get_path(self, context) -> str:
if context.has_asset_key:
Expand Down Expand Up @@ -80,7 +81,7 @@ def _rm_object(self, key, obj):
sql_statement = f"DELETE FROM {self.schema_name} "
sql_statement += where_statement
self.logger.debug(f"Delete on {where_statement} for key {key}")
self.engine.execute(sql_statement)
self.connection.execute(sql_statement)

def _has_object(self, key, obj):

Expand All @@ -90,7 +91,7 @@ def _has_object(self, key, obj):
sql_statement += where_statement

try:
results = self.engine.execute(sql_statement).fetchall()
results = self.connection.execute(sql_statement).fetchall()
found_object = bool(len(results))

except SQLAlchemyProgrammingError as e:
Expand Down Expand Up @@ -138,7 +139,7 @@ def load_input(self, context) -> DataFrame:
if context.partition_key:
sql_statement += f"WHERE partition_key = '{context.partition_key}'"

obj = pandas.read_sql(sql_statement, con=self.engine)
obj = pandas.read_sql(sql_statement, con=self.connection)

return obj

Expand All @@ -162,7 +163,7 @@ def handle_output(self, context, obj):
self._rm_object(key, obj)
obj.to_sql(
self.schema_name,
con=self.engine,
con=self.connection,
index=False,
if_exists="append",
)
Expand Down
5 changes: 3 additions & 2 deletions dagster_toolbox/resources/postgres_partitioned_io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(
f"postgresql+psycopg2://"
f"{self.username}:{self.password}@{self.hostname}:{self.port}/{self.dbname}" # noqa: E501
)
self.connection = self.engine.connect()

def _get_path(self, context) -> str:
if context.has_asset_key:
Expand Down Expand Up @@ -85,7 +86,7 @@ def _rm_object(self, key, obj):
sql_statement = f"DELETE FROM {self.schema_name} "
sql_statement += where_statement
self.logger.debug(f"Delete on {where_statement} for key {key}")
self.engine.execute(sql_statement)
self.connection.execute(sql_statement)

def _has_object(self, key, obj):

Expand Down Expand Up @@ -143,7 +144,7 @@ def load_input(self, context) -> DataFrame:
if context.partition_key:
sql_statement += f"WHERE partition_key = '{context.partition_key}'"

obj = pandas.read_sql(sql_statement, con=self.engine)
obj = pandas.read_sql(sql_statement, con=self.connection)

return obj

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name="dagster-toolbox",
version="0.0.12",
version="0.0.13",
packages=find_packages(),
author="Nicolas RAMY",
author_email="nicolas.ramy@darkelda.com",
Expand Down

0 comments on commit 12a163d

Please sign in to comment.