Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail to update table in 2 different stream topic #531

Open
2 tasks done
tienlx93 opened this issue Feb 17, 2020 · 5 comments
Open
2 tasks done

Fail to update table in 2 different stream topic #531

tienlx93 opened this issue Feb 17, 2020 · 5 comments

Comments

@tienlx93
Copy link

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Given one Faust Table, two topic Agents:

partition_no = 4

topic_1 = app.topic('topic_1', partitions=partition_no, internal=True)
topic_2 = app.topic('topic_1', key_type=str, value_type=InfoDetail, partitions=partition_no, internal=True)
count_table = app.Table('count_table', default=int, partitions=partition_no)

First feed data to agent of topic_1, give the count_table some init value (run from first start):

def get_group(line):
    return line['group_name']

@app.agent(topic_1)
async def process_topic_1(data):
    async for line in data.group_by(get_group, name='group_name', partitions=partition_no):
        count_table[GROUP] = line['init_value']

Then add some data (run in real time) for count_table for agent of topic_2:

@app.agent(topic_2)
async def process_topic_2(data):
    async for line in data.group_by(InfoDetail.GROUP, partitions=partition_no):
        count_table[GROUP] +=1

Expected behavior

The count_table[GROUP] from process_topic_2 start from line['init_value'] value and count up

Actual behavior

The count_table[GROUP] from process_topic_2 start from 0 and count up

Full traceback

No error observed

Versions

  • Python version: 3.7.3
  • Faust version: 1.10.2
  • Operating system: Ubuntu 18.04
  • Kafka version: 2.3.0
  • RocksDB version (if applicable): 6.3.6
@tienlx93
Copy link
Author

Update: I think it failed because the re-partition topics for Table work differently on 2 topics.

My question is: How to generate the init data for table for topic_2. Because the init data is in different format (dict) than InfoDetail so I cannot send init data to topic_2

@StephenSorriaux
Copy link
Contributor

Hi,

Is your need to first consume the topic topic_1 to initialize the count_table table and then to consume the same topic topic_1 to update the count_table (or is it there a typo for the topic_2 variable init?)?

@tienlx93
Copy link
Author

Hi @StephenSorriaux ,

I mean topic_1 is for running real time flow. I need the initial data for count_table with value different than 0.

Currently, the solution about create another topic_2 to init data of count_table failed because the different of re-partition topics. If using in topci_1, do you have some suggestion how to control the init state?

Thanks.

@ask
Copy link
Contributor

ask commented Feb 27, 2020

It would be nice if we had a command to feed initial data to a table.

I'm not sure this is the best way of going about doing this, if we had a command for this
it would probably be sending values to the changelog topic directly.

You also don't have to populate the table with initial values, instead you can
just use table.setdefault(key, default_value):

#: Default count for groups where we have no default in COUNT_DEFAULTS.
COUNT_DEFAULT: int = 300

#: Default count by group name.
COUNT_DEFAULTS: Mapping[str, int] = {
   'foo': 600,
}

def get_count_or_default(group: str) -> int:
    return count_table.setdefault(group, COUNT_DEFAULTS.get(group, COUNT_DEFAULT))

@app.agent()
async def s(stream):
   async for line in stream:
       count_table[line.group_name] = get_count_or_default(line.group_name) + 1

If you have the same default for all groups then even easier:

count_table = app.Table('count_table', default=lambda: 300, partitions=partition_no)

@ask
Copy link
Contributor

ask commented Feb 28, 2020

Here's another way to set initial values..., send to the changelog topic:

INITIAL_COUNT = {
    'foo': 300,
    'bar': 600,
}


@app.command()
async def init():
    async with app.producer:
        await count_table.changelog_topic.maybe_declare()
        await asyncio.sleep(1)
        for key, value in INITIAL_COUNT.items():
            await (await(count_table.changelog_topic.send(
               key=key, value=value)))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants