diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index a49ae883f94d2..0ce4742decd7b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -37,7 +37,6 @@ class AbstractSource(Source, ABC): in this class to create an Airbyte Specification compliant Source. """ - SLICE_LOG_PREFIX = "slice:" @abstractmethod def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: @@ -237,11 +236,7 @@ def _read_incremental( has_slices = False for _slice in slices: has_slices = True - if self.log_slice_message(logger): - yield AirbyteMessage( - type=MessageType.LOG, - log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), - ) + yield from stream_instance.start_of_slice_messages(logger, _slice) records = stream_instance.read_records( sync_mode=SyncMode.incremental, stream_slice=_slice, @@ -298,11 +293,7 @@ def _read_full_refresh( ) total_records_counter = 0 for _slice in slices: - if self.log_slice_message(logger): - yield AirbyteMessage( - type=MessageType.LOG, - log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), - ) + yield from stream_instance.start_of_slice_messages(logger, _slice) record_data_or_messages = stream_instance.read_records( stream_slice=_slice, sync_mode=SyncMode.full_refresh, diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py index c3ccdd3affe57..0a448b81474f8 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py @@ -1,7 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +import logging from dataclasses import InitVar, dataclass, field from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union @@ -129,3 +129,9 @@ def stream_slices( """ # this is not passing the cursor field because it is known at init time return self.retriever.stream_slices(sync_mode=sync_mode, stream_state=stream_state) + + +@dataclass +class DeclarativeStreamTestReadDecorator(DeclarativeStream): + def log_slice_message(self, logger: logging.Logger): + return True diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 690678a87e362..9ddb5c8819b91 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -118,10 +118,6 @@ def read( self._configure_logger_level(logger) yield from super().read(logger, config, catalog, state) - def log_slice_message(self, logger: logging.Logger): - # Commented so the unit tests fail - return self._constructor._emit_connector_builder_messages or super(self).log_slice_message(logger) - def _configure_logger_level(self, logger: logging.Logger): """ Set the log level to logging.DEBUG if debug mode is enabled diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c86a848b129cc..b67e5b64a5bbd 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -19,7 +19,7 @@ ) from airbyte_cdk.sources.declarative.checks import CheckStream from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime -from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream +from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream, DeclarativeStreamTestReadDecorator from airbyte_cdk.sources.declarative.decoders import JsonDecoder from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor @@ -458,16 +458,29 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi if model.transformations: for transformation_model in model.transformations: transformations.append(self._create_component_from_model(model=transformation_model, config=config)) - return DeclarativeStream( - name=model.name, - primary_key=primary_key, - retriever=retriever, - schema_loader=schema_loader, - stream_cursor_field=cursor_field or "", - transformations=transformations, - config=config, - parameters=model.parameters, - ) + if self._emit_connector_builder_messages: + return DeclarativeStreamTestReadDecorator( + name=model.name, + primary_key=primary_key, + retriever=retriever, + schema_loader=schema_loader, + stream_cursor_field=cursor_field or "", + transformations=transformations, + config=config, + parameters=model.parameters, + ) + else: + return DeclarativeStream( + name=model.name, + primary_key=primary_key, + retriever=retriever, + schema_loader=schema_loader, + stream_cursor_field=cursor_field or "", + transformations=transformations, + config=config, + parameters=model.parameters, + ) + def _merge_stream_slicers(self, model: DeclarativeStreamModel, config: Config) -> Optional[StreamSlicer]: incremental_sync = ( diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py index 17c0076c3cf57..a01933c9f0c96 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py @@ -4,6 +4,7 @@ import inspect +import json import logging import typing from abc import ABC, abstractmethod @@ -11,7 +12,8 @@ from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union import airbyte_cdk.sources.utils.casing as casing -from airbyte_cdk.models import AirbyteLogMessage, AirbyteStream, AirbyteTraceMessage, SyncMode +from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStream, AirbyteTraceMessage, Level, SyncMode +from airbyte_cdk.models import Type as MessageType # list of all possible HTTP methods which can be used for sending of request bodies from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader @@ -75,6 +77,8 @@ class Stream(ABC): Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol. """ + SLICE_LOG_PREFIX = "slice:" + # Use self.logger in subclasses to log any messages @property def logger(self): @@ -247,6 +251,21 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late """ return {} + def start_of_slice_messages(self, logger: logging.Logger, stream_slice: Mapping[str, Any]) -> typing.Iterator[AirbyteMessage]: + if self.log_slice_message(logger): + yield AirbyteMessage( + type=MessageType.LOG, + log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(stream_slice, default=str)}"), + ) + + def log_slice_message(self, logger: logging.Logger): + """ + + :param logger: + :return: + """ + return logger.isEnabledFor(logging.DEBUG) + @staticmethod def _wrapped_primary_key(keys: Optional[Union[str, List[str], List[List[str]]]]) -> Optional[List[List[str]]]: """