Skip to content

Commit

Permalink
feat: support sqlalchemy engine kwargs (#517)
Browse files Browse the repository at this point in the history
  • Loading branch information
reata committed Dec 31, 2023
1 parent 337cc4a commit e5c1300
Showing 1 changed file with 22 additions and 14 deletions.
36 changes: 22 additions & 14 deletions sqllineage/core/metadata/sqlalchemy.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from typing import List
from typing import Any, Dict, List, Optional

from sqlalchemy import MetaData, Table, create_engine, inspect, make_url
from sqlalchemy import MetaData, Table, create_engine, make_url
from sqlalchemy.exc import NoSuchModuleError, NoSuchTableError, OperationalError

from sqllineage.core.metadata_provider import MetaDataProvider
Expand All @@ -16,11 +16,17 @@ class SQLAlchemyMetaDataProvider(MetaDataProvider):
SQLAlchemyMetaDataProvider queries metadata from database using SQLAlchemy
"""

def __init__(self, url: str):
def __init__(self, url: str, engine_kwargs: Optional[Dict[str, Any]] = None):
"""
:param url: sqlalchemy url
:param engine_kwargs: a dictionary of keyword arguments that will be passed to sqlalchemy create_engine
"""
super().__init__()
self.metadata_obj = MetaData()
try:
self.engine = create_engine(url)
if engine_kwargs is None:
engine_kwargs = {}
self.engine = create_engine(url, **engine_kwargs)
except NoSuchModuleError as e:
u = make_url(url)
raise MetaDataProviderException(
Expand All @@ -33,14 +39,16 @@ def __init__(self, url: str):

def _get_table_columns(self, schema: str, table: str, **kwargs) -> List[str]:
columns = []
if inspect(self.engine).has_schema(schema):
try:
sqlalchemy_table = Table(
table, self.metadata_obj, schema=schema, autoload_with=self.engine
)
columns = [c.name for c in sqlalchemy_table.columns]
except NoSuchTableError:
logger.warning("%s does not exist in database %s", table, schema)
else:
logger.warning("Schema %s does not exist", schema)
try:
sqlalchemy_table = Table(
table, self.metadata_obj, schema=schema, autoload_with=self.engine
)
columns = [c.name for c in sqlalchemy_table.columns]
except (NoSuchTableError, OperationalError):
logger.warning(
"error listing columns for table %s.%s in %s, return empty list instead",
schema,
table,
self.engine.url,
)
return columns

0 comments on commit e5c1300

Please sign in to comment.