Skip to content

Commit

Permalink
Revert "Merge pull request apache#15441 from [BEAM-8823] Make FnApiRu…
Browse files Browse the repository at this point in the history
…nner work by executing ready elements instead of stages"

This reverts commit ef43645.
  • Loading branch information
robertwb committed Oct 13, 2021
1 parent a9120e0 commit a2f08e5
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 645 deletions.
12 changes: 5 additions & 7 deletions sdks/python/apache_beam/io/iobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -1148,13 +1148,11 @@ def expand(self, pcoll):
| 'Extract' >> core.FlatMap(lambda x: x[1]))
# PreFinalize should run before FinalizeWrite, and the two should not be
# fused.
pre_finalize_coll = (
do_once
| 'PreFinalize' >> core.FlatMap(
_pre_finalize,
self.sink,
AsSingleton(init_result_coll),
AsIter(write_result_coll)))
pre_finalize_coll = do_once | 'PreFinalize' >> core.FlatMap(
_pre_finalize,
self.sink,
AsSingleton(init_result_coll),
AsIter(write_result_coll))
return do_once | 'FinalizeWrite' >> core.FlatMap(
_finalize_write,
self.sink,
Expand Down
357 changes: 75 additions & 282 deletions sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py

Large diffs are not rendered by default.

442 changes: 141 additions & 301 deletions sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,6 @@ def cross_product(elem, sides):
pcoll | beam.FlatMap(cross_product, beam.pvalue.AsList(pcoll)),
equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))

def test_pardo_unfusable_side_inputs_with_separation(self):
def cross_product(elem, sides):
for side in sides:
yield elem, side

with self.create_pipeline() as p:
pcoll = p | beam.Create(['a', 'b'])
derived = ((pcoll, ) | beam.Flatten()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
import itertools
import logging
import operator
from builtins import object
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
from typing import Collection
from typing import Container
Expand All @@ -37,8 +34,6 @@
from typing import Iterable
from typing import Iterator
from typing import List
from typing import MutableMapping
from typing import NamedTuple
from typing import Optional
from typing import Set
from typing import Tuple
Expand All @@ -49,17 +44,11 @@
from apache_beam.internal import pickler
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.worker import bundle_processor
from apache_beam.transforms import combiners
from apache_beam.transforms import core
from apache_beam.utils import proto_utils
from apache_beam.utils import timestamp

if TYPE_CHECKING:
from apache_beam.runners.portability.fn_api_runner.execution import ListBuffer
from apache_beam.runners.portability.fn_api_runner.execution import PartitionableBuffer

T = TypeVar('T')

Expand Down Expand Up @@ -88,15 +77,14 @@
IMPULSE_BUFFER = b'impulse'

# TimerFamilyId is identified by transform name + timer family
# TODO(pabloem): Rename this type to express this name is unique per pipeline.
TimerFamilyId = Tuple[str, str]

BufferId = bytes

# SideInputId is identified by a consumer ParDo + tag.
SideInputId = Tuple[str, str]
SideInputAccessPattern = beam_runner_api_pb2.FunctionSpec

DataOutput = Dict[str, bytes]

# A map from a PCollection coder ID to a Safe Coder ID
# A safe coder is a coder that can be used on the runner-side of the FnApi.
# A safe coder receives a byte string, and returns a type that can be
Expand All @@ -108,27 +96,6 @@
# (MultiMap / Iterable).
DataSideInput = Dict[SideInputId, Tuple[bytes, SideInputAccessPattern]]

DataOutput = Dict[str, BufferId]

# A map of [Transform ID, Timer Family ID] to [Buffer ID, Time Domain for timer]
# The time domain comes from beam_runner_api_pb2.TimeDomain. It may be
# EVENT_TIME or PROCESSING_TIME.
OutputTimers = MutableMapping[TimerFamilyId, Tuple[BufferId, Any]]

# A map of [Transform ID, Timer Family ID] to [Buffer CONTENTS, Timestamp]
OutputTimerData = MutableMapping[TimerFamilyId,
Tuple['PartitionableBuffer',
timestamp.Timestamp]]

BundleProcessResult = Tuple[beam_fn_api_pb2.InstructionResponse,
List[beam_fn_api_pb2.ProcessBundleSplitResponse]]


# TODO(pabloem): Change tha name to a more representative one
class DataInput(NamedTuple):
data: MutableMapping[str, 'PartitionableBuffer']
timers: MutableMapping[TimerFamilyId, 'PartitionableBuffer']


class Stage(object):
"""A set of Transforms that can be sent to the worker for processing."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def set_watermark(self, wm: timestamp.Timestamp):

def upstream_watermark(self):
if self.producers:
return min(p.output_watermark() for p in self.producers)
return min(p.input_watermark() for p in self.producers)
else:
return timestamp.MAX_TIMESTAMP

Expand All @@ -71,16 +71,13 @@ def __init__(self, name):

def __str__(self):
return 'StageNode<inputs=%s,side_inputs=%s' % (
[
'%s(%s, upstream:%s)' %
(i.name, i.watermark(), i.upstream_watermark())
for i in self.inputs
], ['%s(%s)' % (i.name, i.watermark()) for i in self.side_inputs])
[i.name for i in self.inputs], [i.name for i in self.side_inputs])

def output_watermark(self):
if not self.inputs:
return timestamp.MAX_TIMESTAMP
return min(i.watermark() for i in self.inputs)
if not self.outputs:
return self.input_watermark()
else:
return min(o.watermark() for o in self.outputs)

def input_watermark(self):
if not self.inputs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -966,9 +966,6 @@ def append(self, item):
self._overlay[self._key] = list(self._underlying[self._key])
self._overlay[self._key].append(item)

def extend(self, other: Buffer) -> None:
raise NotImplementedError()

StateType = Union[CopyOnWriteState, DefaultDict[bytes, Buffer]]

def __init__(self):
Expand Down
3 changes: 0 additions & 3 deletions sdks/python/apache_beam/transforms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
from apache_beam.transforms import window
from apache_beam.transforms.combiners import CountCombineFn
from apache_beam.transforms.core import CombinePerKey
from apache_beam.transforms.core import Create
from apache_beam.transforms.core import DoFn
from apache_beam.transforms.core import FlatMap
from apache_beam.transforms.core import Flatten
Expand Down Expand Up @@ -160,8 +159,6 @@ def _extract_input_pvalues(self, pvalueish):
return pcolls, pcolls

def expand(self, pcolls):
if not pcolls:
pcolls = (self.pipeline | Create([]), )
if isinstance(pcolls, dict):
if all(isinstance(tag, str) and len(tag) < 10 for tag in pcolls.keys()):
# Small, string tags. Pass them as data.
Expand Down

0 comments on commit a2f08e5

Please sign in to comment.