Skip to content

Commit

Permalink
change topic dict on dataframe
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-quix committed Oct 9, 2023
1 parent b527018 commit 18fc140
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ def __init__(
self._real_consumer: Optional[RowConsumerProto] = None
self._real_producer: Optional[RowProducerProto] = None
if not topics_in:
raise ValueError("Topic list cannot be empty")
self._topics_in = {t.real_name: t for t in topics_in}
self._topics_out = {t.real_name: t for t in topics_out or []}
raise ValueError("Topic Input list cannot be empty")
self._topics_in = {t.name: t for t in topics_in}
self._topics_out = {t.name: t for t in topics_out or []}

def apply(self, func: Callable[[Row], Optional[Union[Row, List[Row]]]]) -> Self:
"""
Expand Down Expand Up @@ -131,15 +131,15 @@ def topics_in(self) -> Mapping[str, Topic]:
Get a mapping with Topics for the StreamingDataFrame input topics
:return: dict of {<topic_name>: <Topic>}
"""
return {t.name: t for t in self._topics_in.values()}
return self._topics_in

@property
def topics_out(self) -> Mapping[str, Topic]:
"""
Get a mapping with Topics for the StreamingDataFrame output topics
:return: dict of {<topic_name>: <Topic>}
"""
return {t.name: t for t in self._topics_out.values()}
return self._topics_out

@property
def consumer(self) -> RowConsumerProto:
Expand Down

0 comments on commit 18fc140

Please sign in to comment.