Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: create schema and table on add_sink #1036

Merged
merged 28 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c0917b0
start on schema and table creation on
Oct 4, 2022
1af0a57
linting
Oct 4, 2022
5e41500
add default schema name
Oct 4, 2022
05ea897
add schema to table metadata
Oct 4, 2022
7bb0e70
Merge branch 'main' into kgpayne/issue1027
Oct 4, 2022
6281e7d
Merge branch 'main' into kgpayne/issue1027
Oct 4, 2022
19b3ccb
Merge branch 'main' into kgpayne/issue1027
Oct 4, 2022
6d7e156
Merge branch 'main' into kgpayne/issue1027
Oct 5, 2022
059301e
Merge branch 'main' into kgpayne/issue1027
edgarrmondragon Oct 5, 2022
e68c045
Add missing import for `singer_sdk.helpers._catalog`
edgarrmondragon Oct 5, 2022
0347950
Merge branch 'main' into kgpayne/issue1027
Oct 11, 2022
c7abd72
undo connection module
Oct 11, 2022
c59bd5e
fix copy-paste formatting
Oct 11, 2022
7fd3bb1
fix test
Oct 11, 2022
615e5a6
more connector changes
Oct 11, 2022
4171a95
fix docstring
Oct 11, 2022
5bf574a
Merge branch 'main' into kgpayne/issue1027
Oct 11, 2022
b60ddca
add schema creation test
Oct 12, 2022
1e28606
Merge branch 'kgpayne/issue1027' of github.com:meltano/sdk into kgpay…
Oct 12, 2022
b49ee49
Merge branch 'main' into kgpayne/issue1027
Oct 12, 2022
3e92f07
Merge branch 'main' into kgpayne/issue1027
Oct 13, 2022
9a94766
Merge branch 'main' into kgpayne/issue1027
edgarrmondragon Oct 14, 2022
6283762
Merge branch 'main' into kgpayne/issue1027
Oct 18, 2022
b64a7e3
Merge branch 'main' into kgpayne/issue1027
Oct 19, 2022
d33b822
remove create_table_with_records method
Oct 19, 2022
5233657
Merge branch 'kgpayne/issue1027' of github.com:meltano/sdk into kgpay…
Oct 19, 2022
e3e3a30
Update singer_sdk/sinks/sql.py
Oct 19, 2022
7139d08
Merge branch 'main' into kgpayne/issue1027
Oct 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,14 @@ def activate_version(self, new_version: int) -> None:
"Ignoring."
)

def setup(self) -> None:
"""Perform any setup actions at the beginning of a Stream.

Setup is executed once per Sink instance, after instantiation. If a Schema
change is detected, a new Sink is instantiated and this method is called again.
"""
pass

def clean_up(self) -> None:
"""Perform any clean up actions required at end of a stream.

Expand Down
87 changes: 53 additions & 34 deletions singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from singer_sdk.plugin_base import PluginBase
from singer_sdk.sinks.batch import BatchSink
from singer_sdk.streams.sql import SQLConnector
from singer_sdk.streams import SQLConnector


class SQLSink(BatchSink):
Expand Down Expand Up @@ -38,11 +38,7 @@ def __init__(
connector: Optional connector to reuse.
"""
self._connector: SQLConnector
if connector:
self._connector = connector
else:
self._connector = self.connector_class(dict(target.config))

self._connector = connector or self.connector_class(dict(target.config))
super().__init__(target, stream_name, schema, key_properties)

@property
Expand All @@ -65,36 +61,76 @@ def connection(self) -> sqlalchemy.engine.Connection:

@property
def table_name(self) -> str:
"""Returns the table name, with no schema or database part.
"""Return the table name, with no schema or database part.

Returns:
The target table name.
"""
parts = self.stream_name.split("-")

if len(parts) == 1:
return self.stream_name
else:
return parts[-1]
return self.stream_name if len(parts) == 1 else parts[-1]

@property
def schema_name(self) -> Optional[str]:
"""Returns the schema name or `None` if using names with no schema part.
"""Return the schema name or `None` if using names with no schema part.

Returns:
The target schema name.
"""
return None # Assumes single-schema target context.
parts = self.stream_name.split("-")
if len(parts) == 2:
return parts[0]
if len(parts) == 3:
return parts[1]
return None
kgpayne marked this conversation as resolved.
Show resolved Hide resolved

@property
def database_name(self) -> Optional[str]:
"""Returns the DB name or `None` if using names with no database part.
"""Return the DB name or `None` if using names with no database part.

Returns:
The target database name.
"""
return None # Assumes single-DB target context.

@property
def full_table_name(self) -> str:
"""Return the fully qualified table name.

Returns:
The fully qualified table name.
"""
return self.connector.get_fully_qualified_name(
table_name=self.table_name,
schema_name=self.schema_name,
db_name=self.database_name,
)

@property
def full_schema_name(self) -> str:
"""Return the fully qualified schema name.

Returns:
The fully qualified schema name.
"""
return self.connector.get_fully_qualified_name(
schema_name=self.schema_name, db_name=self.database_name
)

def setup(self) -> None:
"""Set up Sink.

This method is called on Sink creation, and creates the required Schema and
Table entities in the target database.
"""
if self.schema_name:
self.connector.prepare_schema(self.schema_name)
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
self.connector.prepare_table(
full_table_name=self.full_table_name,
schema=self.schema,
primary_keys=self.key_properties,
as_temp_table=False,
)

def process_batch(self, context: dict) -> None:
"""Process a batch with the given batch context.

Expand All @@ -106,31 +142,12 @@ def process_batch(self, context: dict) -> None:
"""
# If duplicates are merged, these can be tracked via
# :meth:`~singer_sdk.Sink.tally_duplicate_merged()`.
self.connector.prepare_table(
full_table_name=self.full_table_name,
schema=self.schema,
primary_keys=self.key_properties,
as_temp_table=False,
)
self.bulk_insert_records(
full_table_name=self.full_table_name,
schema=self.schema,
records=context["records"],
)

@property
def full_table_name(self) -> str:
"""Gives the fully qualified table name.

Returns:
The fully qualified table name.
"""
return self.connector.get_fully_qualified_name(
self.table_name,
self.schema_name,
self.database_name,
)

def create_table_with_records(
self,
full_table_name: Optional[str],
Expand All @@ -154,6 +171,8 @@ def create_table_with_records(
if primary_keys is None:
primary_keys = self.key_properties
partition_keys = partition_keys or None
# TODO: determine if this call to `prepare_table` is necessary
kgpayne marked this conversation as resolved.
Show resolved Hide resolved
# (in addition to in `setup` above)
self.connector.prepare_table(
full_table_name=full_table_name,
primary_keys=primary_keys,
Expand Down
73 changes: 52 additions & 21 deletions singer_sdk/streams/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class SQLConnector:
The connector class serves as a wrapper around the SQL connection.

The functions of the connector are:

- connecting to the source
- generating SQLAlchemy connection and engine objects
- discovering schema catalog entries
Expand Down Expand Up @@ -76,6 +75,7 @@ def create_sqlalchemy_connection(self) -> sqlalchemy.engine.Connection:

By default this will create using the sqlalchemy `stream_results=True` option
described here:

https://docs.sqlalchemy.org/en/14/core/connections.html#using-server-side-cursors-a-k-a-stream-results

Developers may override this method if their provider does not support
Expand Down Expand Up @@ -191,7 +191,6 @@ def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:

Developers may override this method to accept additional input argument types,
to support non-standard types, or to provide custom typing logic.

If overriding this method, developers should call the default implementation
from the base class for all unhandled cases.

Expand All @@ -205,7 +204,7 @@ def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:

@staticmethod
def get_fully_qualified_name(
table_name: str,
table_name: str | None = None,
schema_name: str | None = None,
db_name: str | None = None,
delimiter: str = ".",
Expand All @@ -219,23 +218,23 @@ def get_fully_qualified_name(
delimiter: Generally: '.' for SQL names and '-' for Singer names.

Raises:
ValueError: If table_name is not provided or if neither schema_name or
db_name are provided.
ValueError: If all 3 name parts not supplied.

Returns:
The fully qualified name as a string.
"""
if db_name and schema_name:
result = delimiter.join([db_name, schema_name, table_name])
elif db_name:
result = delimiter.join([db_name, table_name])
elif schema_name:
result = delimiter.join([schema_name, table_name])
elif table_name:
result = table_name
else:
parts = []

if db_name:
parts.append(db_name)
if schema_name:
parts.append(schema_name)
if table_name:
parts.append(table_name)

if not parts:
raise ValueError(
"Could not generate fully qualified name for stream: "
"Could not generate fully qualified name: "
+ ":".join(
[
db_name or "(unknown-db)",
Expand All @@ -245,7 +244,7 @@ def get_fully_qualified_name(
)
)

return result
return delimiter.join(parts)

@property
def _dialect(self) -> sqlalchemy.engine.Dialect:
Expand Down Expand Up @@ -487,6 +486,18 @@ def table_exists(self, full_table_name: str) -> bool:
sqlalchemy.inspect(self._engine).has_table(full_table_name),
)

def schema_exists(self, schema_name: str) -> bool:
"""Determine if the target database schema already exists.

Args:
schema_name: The target database schema name.

Returns:
True if the database schema exists, False if not.
"""
schema_names = sqlalchemy.inspect(self._engine).get_schema_names()
return schema_name in schema_names

def get_table_columns(
self, full_table_name: str, column_names: list[str] | None = None
) -> dict[str, sqlalchemy.Column]:
Expand Down Expand Up @@ -547,6 +558,14 @@ def column_exists(self, full_table_name: str, column_name: str) -> bool:
"""
return column_name in self.get_table_columns(full_table_name)

def create_schema(self, schema_name: str) -> None:
"""Create target schema.

Args:
schema_name: The target schema to create.
"""
self._engine.execute(sqlalchemy.schema.CreateSchema(schema_name))

def create_empty_table(
self,
full_table_name: str,
Expand All @@ -573,7 +592,8 @@ def create_empty_table(

_ = partition_keys # Not supported in generic implementation.

meta = sqlalchemy.MetaData()
_, schema_name, table_name = self.parse_full_table_name(full_table_name)
meta = sqlalchemy.MetaData(schema=schema_name)
columns: list[sqlalchemy.Column] = []
primary_keys = primary_keys or []
try:
Expand All @@ -592,7 +612,7 @@ def create_empty_table(
)
)

_ = sqlalchemy.Table(full_table_name, meta, *columns)
_ = sqlalchemy.Table(table_name, meta, *columns)
meta.create_all(self._engine)

def _create_empty_column(
Expand Down Expand Up @@ -630,6 +650,16 @@ def _create_empty_column(
)
)

def prepare_schema(self, schema_name: str) -> None:
"""Create the target database schema.

Args:
schema_name: The target schema name.
"""
schema_exists = self.schema_exists(schema_name)
if not schema_exists:
self.create_schema(schema_name)

def prepare_table(
self,
full_table_name: str,
Expand Down Expand Up @@ -788,6 +818,7 @@ def _sort_types(

For example, [Smallint, Integer, Datetime, String, Double] would become
[Unicode, String, Double, Integer, Smallint, Datetime].

String types will be listed first, then decimal types, then integer types,
then bool types, and finally datetime and date. Higher precision, scale, and
length will be sorted earlier.
Expand Down Expand Up @@ -823,7 +854,7 @@ def _get_type_sort_key(
def _get_column_type(
self, full_table_name: str, column_name: str
) -> sqlalchemy.types.TypeEngine:
"""Gets the SQL type of the declared column.
"""Get the SQL type of the declared column.

Args:
full_table_name: The name of the table.
Expand Down Expand Up @@ -937,7 +968,7 @@ def _singer_catalog_entry(self) -> CatalogEntry:

@property
def connector(self) -> SQLConnector:
"""The connector object.
"""Return a connector object.

Returns:
The connector object.
Expand All @@ -946,7 +977,7 @@ def connector(self) -> SQLConnector:

@property
def metadata(self) -> MetadataMapping:
"""The Singer metadata.
"""Return the Singer metadata.

Metadata from an input catalog will override standard metadata.

Expand Down
7 changes: 4 additions & 3 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,15 @@ def add_sink(
"""
self.logger.info(f"Initializing '{self.name}' target sink...")
sink_class = self.get_sink_class(stream_name=stream_name)
result = sink_class(
sink = sink_class(
target=self,
stream_name=stream_name,
schema=schema,
key_properties=key_properties,
)
self._sinks_active[stream_name] = result
return result
sink.setup()
self._sinks_active[stream_name] = sink
return sink

def _assert_sink_exists(self, stream_name: str) -> None:
"""Raise a RecordsWithoutSchemaException exception if stream doesn't exist.
Expand Down
Loading