Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype of stopping after N rows loaded from taps #8364

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 46 additions & 11 deletions src/meltano/core/block/extract_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import asyncio
import logging
import os
import typing as t
from contextlib import asynccontextmanager, closing

Expand Down Expand Up @@ -38,6 +39,29 @@
logger = structlog.getLogger(__name__)


TAP_ROW_LIMITED = False
try:
TAP_ROW_LIMIT = int(os.environ.get("TAP_ROW_LIMIT"))
TAP_ROW_LIMITED = True

Check warning on line 45 in src/meltano/core/block/extract_load.py

View check run for this annotation

Codecov / codecov/patch

src/meltano/core/block/extract_load.py#L45

Added line #L45 was not covered by tests
except (ValueError, TypeError):
TAP_ROW_LIMITED = False


class GotEnoughRowsException(Exception):
pass


class RowLimitHandler:
def __init__(self, row_limit):
self.linecount = 0
self.row_limit = row_limit

Check warning on line 57 in src/meltano/core/block/extract_load.py

View check run for this annotation

Codecov / codecov/patch

src/meltano/core/block/extract_load.py#L56-L57

Added lines #L56 - L57 were not covered by tests

def writeline(self, line):
self.linecount += 1

Check warning on line 60 in src/meltano/core/block/extract_load.py

View check run for this annotation

Codecov / codecov/patch

src/meltano/core/block/extract_load.py#L60

Added line #L60 was not covered by tests
if self.linecount > self.row_limit:
raise GotEnoughRowsException

Check warning on line 62 in src/meltano/core/block/extract_load.py

View check run for this annotation

Codecov / codecov/patch

src/meltano/core/block/extract_load.py#L62

Added line #L62 was not covered by tests


class BlockSetHasNoStateError(Exception):
"""Block has no state."""

Expand Down Expand Up @@ -223,6 +247,10 @@
plugin_invoker=self.invoker_for(ctx),
plugin_args=plugin_args,
)

if TAP_ROW_LIMITED:
pi.add_output_handler("stdout", RowLimitHandler(TAP_ROW_LIMIT))

Check warning on line 252 in src/meltano/core/block/extract_load.py

View check run for this annotation

Codecov / codecov/patch

src/meltano/core/block/extract_load.py#L252

Added line #L252 was not covered by tests

self._blocks.append(block)
self._env.update(ctx.env)
return block
Expand Down Expand Up @@ -734,7 +762,10 @@
line_length_limit=self.line_length_limit,
stream_buffer_size=self.stream_buffer_size,
)
raise output_futures_failed.exception() # noqa: RSE102
if not isinstance(ex, GotEnoughRowsException):
raise output_futures_failed.exception() # noqa: RSE102

Check warning on line 766 in src/meltano/core/block/extract_load.py

View check run for this annotation

Codecov / codecov/patch

src/meltano/core/block/extract_load.py#L766

Added line #L766 was not covered by tests
else:
logger.debug("Target Rows Reached")

Check warning on line 768 in src/meltano/core/block/extract_load.py

View check run for this annotation

Codecov / codecov/patch

src/meltano/core/block/extract_load.py#L768

Added line #L768 was not covered by tests
else:
# If all the output handlers completed without raising an
# exception, we still need to wait for all the underlying block
Expand All @@ -754,16 +785,20 @@
else:
logger.warning("Intermediate block in sequence failed.")
await self._stop_all_blocks(start_idx)
raise RunnerError(
(
"Unexpected completion sequence in ExtractLoadBlock set. "
"Intermediate block (likely a mapper) failed."
),
{
PluginType.EXTRACTORS: 1,
PluginType.LOADERS: 1,
},
)

if not isinstance(
output_futures_failed.exception(), GotEnoughRowsException
):
raise RunnerError(

Check warning on line 792 in src/meltano/core/block/extract_load.py

View check run for this annotation

Codecov / codecov/patch

src/meltano/core/block/extract_load.py#L792

Added line #L792 was not covered by tests
(
"Unexpected completion sequence in ExtractLoadBlock set. "
"Intermediate block (likely a mapper) failed."
),
{
PluginType.EXTRACTORS: 1,
PluginType.LOADERS: 1,
},
)

async def _handle_head_completed(
self,
Expand Down