Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: create schema and table on add_sink #1036

Merged
merged 28 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c0917b0
start on schema and table creation on
Oct 4, 2022
1af0a57
linting
Oct 4, 2022
5e41500
add default schema name
Oct 4, 2022
05ea897
add schema to table metadata
Oct 4, 2022
7bb0e70
Merge branch 'main' into kgpayne/issue1027
Oct 4, 2022
6281e7d
Merge branch 'main' into kgpayne/issue1027
Oct 4, 2022
19b3ccb
Merge branch 'main' into kgpayne/issue1027
Oct 4, 2022
6d7e156
Merge branch 'main' into kgpayne/issue1027
Oct 5, 2022
059301e
Merge branch 'main' into kgpayne/issue1027
edgarrmondragon Oct 5, 2022
e68c045
Add missing import for `singer_sdk.helpers._catalog`
edgarrmondragon Oct 5, 2022
0347950
Merge branch 'main' into kgpayne/issue1027
Oct 11, 2022
c7abd72
undo connection module
Oct 11, 2022
c59bd5e
fix copy-paste formatting
Oct 11, 2022
7fd3bb1
fix test
Oct 11, 2022
615e5a6
more connector changes
Oct 11, 2022
4171a95
fix docstring
Oct 11, 2022
5bf574a
Merge branch 'main' into kgpayne/issue1027
Oct 11, 2022
b60ddca
add schema creation test
Oct 12, 2022
1e28606
Merge branch 'kgpayne/issue1027' of github.com:meltano/sdk into kgpay…
Oct 12, 2022
b49ee49
Merge branch 'main' into kgpayne/issue1027
Oct 12, 2022
3e92f07
Merge branch 'main' into kgpayne/issue1027
Oct 13, 2022
9a94766
Merge branch 'main' into kgpayne/issue1027
edgarrmondragon Oct 14, 2022
6283762
Merge branch 'main' into kgpayne/issue1027
Oct 18, 2022
b64a7e3
Merge branch 'main' into kgpayne/issue1027
Oct 19, 2022
d33b822
remove create_table_with_records method
Oct 19, 2022
5233657
Merge branch 'kgpayne/issue1027' of github.com:meltano/sdk into kgpay…
Oct 19, 2022
e3e3a30
Update singer_sdk/sinks/sql.py
Oct 19, 2022
7139d08
Merge branch 'main' into kgpayne/issue1027
Oct 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions singer_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,8 @@
from singer_sdk.mapper_base import InlineMapper
from singer_sdk.plugin_base import PluginBase
from singer_sdk.sinks import BatchSink, RecordSink, Sink, SQLSink
from singer_sdk.streams import (
GraphQLStream,
RESTStream,
SQLConnector,
SQLStream,
Stream,
)
from singer_sdk.sql import SQLConnector
from singer_sdk.streams import GraphQLStream, RESTStream, SQLStream, Stream
from singer_sdk.tap_base import SQLTap, Tap
from singer_sdk.target_base import SQLTarget, Target

Expand Down
8 changes: 8 additions & 0 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,14 @@ def activate_version(self, new_version: int) -> None:
"Ignoring."
)

def setup(self) -> None:
"""Perform any setup actions at the beginning of a Stream.

Setup is executed once per Sink instance, after instantiation. If a Schema
change is detected, a new Sink is instantiated and this method is called again.
"""
pass

def clean_up(self) -> None:
"""Perform any clean up actions required at end of a stream.

Expand Down
87 changes: 53 additions & 34 deletions singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from singer_sdk.plugin_base import PluginBase
from singer_sdk.sinks.batch import BatchSink
from singer_sdk.streams.sql import SQLConnector
from singer_sdk.sql import SQLConnector


class SQLSink(BatchSink):
Expand Down Expand Up @@ -38,11 +38,7 @@ def __init__(
connector: Optional connector to reuse.
"""
self._connector: SQLConnector
if connector:
self._connector = connector
else:
self._connector = self.connector_class(dict(target.config))

self._connector = connector or self.connector_class(dict(target.config))
super().__init__(target, stream_name, schema, key_properties)

@property
Expand All @@ -65,36 +61,76 @@ def connection(self) -> sqlalchemy.engine.Connection:

@property
def table_name(self) -> str:
"""Returns the table name, with no schema or database part.
"""Return the table name, with no schema or database part.

Returns:
The target table name.
"""
parts = self.stream_name.split("-")

if len(parts) == 1:
return self.stream_name
else:
return parts[-1]
return self.stream_name if len(parts) == 1 else parts[-1]

@property
def schema_name(self) -> Optional[str]:
"""Returns the schema name or `None` if using names with no schema part.
"""Return the schema name or `None` if using names with no schema part.

Returns:
The target schema name.
"""
return None # Assumes single-schema target context.
parts = self.stream_name.split("-")
if len(parts) == 2:
return parts[0]
if len(parts) == 3:
return parts[1]
return None
kgpayne marked this conversation as resolved.
Show resolved Hide resolved

@property
def database_name(self) -> Optional[str]:
"""Returns the DB name or `None` if using names with no database part.
"""Return the DB name or `None` if using names with no database part.

Returns:
The target database name.
"""
return None # Assumes single-DB target context.

@property
def full_table_name(self) -> str:
"""Return the fully qualified table name.

Returns:
The fully qualified table name.
"""
return self.connector.get_fully_qualified_name(
table_name=self.table_name,
schema_name=self.schema_name,
db_name=self.database_name,
)

@property
def full_schema_name(self) -> str:
"""Return the fully qualified schema name.

Returns:
The fully qualified schema name.
"""
return self.connector.get_fully_qualified_name(
schema_name=self.schema_name, db_name=self.database_name
)

def setup(self) -> None:
"""Set up Sink.

This method is called on Sink creation, and creates the required Schema and
Table entities in the target database.
"""
if self.schema_name:
self.connector.prepare_schema(self.schema_name)
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
self.connector.prepare_table(
full_table_name=self.full_table_name,
schema=self.schema,
primary_keys=self.key_properties,
as_temp_table=False,
)

def process_batch(self, context: dict) -> None:
"""Process a batch with the given batch context.

Expand All @@ -106,31 +142,12 @@ def process_batch(self, context: dict) -> None:
"""
# If duplicates are merged, these can be tracked via
# :meth:`~singer_sdk.Sink.tally_duplicate_merged()`.
self.connector.prepare_table(
full_table_name=self.full_table_name,
schema=self.schema,
primary_keys=self.key_properties,
as_temp_table=False,
)
self.bulk_insert_records(
full_table_name=self.full_table_name,
schema=self.schema,
records=context["records"],
)

@property
def full_table_name(self) -> str:
"""Gives the fully qualified table name.

Returns:
The fully qualified table name.
"""
return self.connector.get_fully_qualified_name(
self.table_name,
self.schema_name,
self.database_name,
)

def create_table_with_records(
self,
full_table_name: Optional[str],
Expand All @@ -154,6 +171,8 @@ def create_table_with_records(
if primary_keys is None:
primary_keys = self.key_properties
partition_keys = partition_keys or None
# TODO: determine if this call to `prepare_table` is necessary
kgpayne marked this conversation as resolved.
Show resolved Hide resolved
# (in addition to in `setup` above)
self.connector.prepare_table(
full_table_name=full_table_name,
primary_keys=primary_keys,
Expand Down
4 changes: 4 additions & 0 deletions singer_sdk/sql/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Module for helpers common to SQL streams/sinks."""
from .connector import SQLConnector

__all__ = ["SQLConnector"]
Loading