Skip to content

Commit

Permalink
Clarify non deterministic column initialization code
Browse files Browse the repository at this point in the history
  • Loading branch information
tools4origins committed Sep 26, 2021
1 parent b87e861 commit 6d60eb2
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions pysparkling/sql/internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,20 +358,21 @@ def select(self, *exprs):

def select_mapper(partition_index, partition):
# Initialize non deterministic functions so that they are reproducible
initialized_cols = [col.initialize(partition_index) for col in cols]
generators = [col for col in initialized_cols if col.may_output_multiple_rows]
non_generators = [col for col in initialized_cols if not col.may_output_multiple_rows]
for col in cols:
col.initialize(partition_index)
generators = [col for col in cols if col.may_output_multiple_rows]
non_generators = [col for col in cols if not col.may_output_multiple_rows]
number_of_generators = len(generators)
if number_of_generators > 1:
raise Exception(
"Only one generator allowed per select clause"
f" but found {number_of_generators}: {', '.join(generators)}"
f" but found {number_of_generators}: {', '.join(str(g) for g in generators)}"
)

return self.get_select_output_field_lists(
partition,
non_generators,
initialized_cols,
cols,
generators[0] if generators else None
)

Expand Down Expand Up @@ -423,8 +424,8 @@ def filter(self, condition):
condition = parse(condition)

def mapper(partition_index, partition):
initialized_condition = condition.initialize(partition_index)
return (row for row in partition if initialized_condition.eval(row, self.bound_schema))
condition.initialize(partition_index)
return (row for row in partition if condition.eval(row, self.bound_schema))

return self._with_rdd(
self._rdd.mapPartitionsWithIndex(mapper),
Expand Down

0 comments on commit 6d60eb2

Please sign in to comment.