You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have included information about relevant versions
I have verified that the issue persists when using the master branch of Faust.
Steps to reproduce
I'm trying to experiment some windowing logic with faust, but I cannot run some samples that other people says it works for them, e.g. I've picked this code from issue #223 :
import asyncio
from datetime import timedelta
import faust
import random
import string
import uuid
import time
from faust import windows
class PageView(faust.Record):
id: str = None
user: str = None
occurred_at: float = None
start_time = time.time()
ttl = 4
app = faust.App(
'page_view_windows_1',
broker='kafka://localhost:9092',
topic_partitions=1,
)
page_view_topic = app.topic('page_views_1', value_type=PageView, partitions=1)
active_users_table = app.Table(
'active_users_1',
default=None,
partitions=1).tumbling(
ttl,
expires=timedelta(seconds=30),
key_index=True
).relative_to_field(PageView.occurred_at)
@app.timer(interval=2, on_leader=True)
async def generator():
user = ''.join(random.choices(string.ascii_uppercase + string.digits, k=7))
page_view = PageView(str(uuid.uuid4()), user, time.time())
await page_view_topic.send(value=page_view)
@app.agent(page_view_topic)
async def print_windowed_events(stream):
async for page_view in stream:
active_users_table[page_view.user] = page_view.occurred_at
print('-- New Event (every 2 secs) --')
print(f"seconds since start: {page_view.occurred_at - start_time}")
print(f"{len(active_users_table.keys())}, {repr([k for (k, v) in active_users_table.items()])}")
print(f"{repr([k for (k, v) in active_users_table.items().delta(ttl)])}")
if __name__ == '__main__':
app.main()
Expected behavior
I want to see the output of the program
Actual behavior
I get this error:
ValueError: too many values to unpack (expected 2)
Full traceback
/foo/venv/bin/python /foo/faustwindow/faust_issue.py worker
+ƒaµS† v1.7.4-+-------------------------------------------------------------------+
| id | page_view_windows_1 |
| transport | [URL('kafka://localhost:9092')] |
| store | memory: |
| web | http://localhost:6066 |
| log | -stderr- (warn) |
| pid | 23320 |
| hostname | localhost |
| platform | CPython 3.7.3 (Linux x86_64) |
| drivers | |
| transport | aiokafka=1.0.3 |
| web | aiohttp=3.5.4 |
| datadir | /foo/faustwindow/page_view_windows_1-data |
| appdir | /foo/faustwindow/page_view_windows_1-data/v1 |
+-------------+-------------------------------------------------------------------+
OK ^
[2019-08-30 21:05:23,489: ERROR]: [^---Agent*: __main__.print_windowed_events]: Crashed reason=ValueError('too many values to unpack (expected 2)')
Traceback (most recent call last):
File "/foo/venv/lib/python3.7/site-packages/faust/agents/agent.py", line 625, in _execute_taskawait coro
File "/foo/faustwindow/faust_issue.py", line 50, in print_windowed_events
active_users_table[page_view.user] = page_view.occurred_at
File "/foo/venv/lib/python3.7/site-packages/faust/tables/wrappers.py", line 430, in __setitem__self.on_set_key(key, value)
File "/foo/venv/lib/python3.7/site-packages/faust/tables/wrappers.py", line 438, in on_set_key
key_index_table[key] =1
File "/foo/venv/lib/python3.7/site-packages/mode/utils/collections.py", line 505, in __setitem__self.on_key_set(key, value)
File "/foo/venv/lib/python3.7/site-packages/faust/tables/table.py", line 79, in on_key_setself._maybe_set_key_ttl(key, partition)
File "/foo/venv/lib/python3.7/site-packages/faust/tables/base.py", line 328, in _maybe_set_key_ttl
_, window_range = key
ValueError: too many values to unpack (expected 2)
sorry if this is a noob question.
I'm new to faust, and I just want to learn more about the faust windows, actually I want to test sliding windows, but unfortunately I could not find any article or sample app about them in the docs.
thanks.
Versions
Python version: 3.7.3
Faust version: 1.7.4
Operating system: Linux Mint 18.3, kernel: 4.8.0-53-generic
Kafka version: kafka_2.12-2.3.0
The text was updated successfully, but these errors were encountered:
I have gone through your example. I wasn't able to run it with current master and pypi. Seems to me an issue with windowed tables. Normal tables are working fine. I explored some code and found out that In code this line is reached if table is windowed, But key being passed in both cases is same. I tried to replace this with range_end = self.window.size, it worked fine. Although I am not sure if this is the correct way to solve this!
Thanks for your time, @sohaibfarooqi,
I've tested this example code and it worked fine.
It seems that the problem occurs when we use an event attribute as the key. or maybe it's a behavior that it's not documented yet.
I'll test again with former versions of faust, to see if the problem persists.
Checklist
I have included information about relevant versions
I have verified that the issue persists when using the
master
branch of Faust.Steps to reproduce
I'm trying to experiment some windowing logic with faust, but I cannot run some samples that other people says it works for them, e.g. I've picked this code from issue #223 :
Expected behavior
I want to see the output of the program
Actual behavior
I get this error:
Full traceback
sorry if this is a noob question.
I'm new to faust, and I just want to learn more about the faust windows, actually I want to test sliding windows, but unfortunately I could not find any article or sample app about them in the docs.
thanks.
Versions
3.7.3
1.7.4
Linux Mint 18.3, kernel: 4.8.0-53-generic
kafka_2.12-2.3.0
The text was updated successfully, but these errors were encountered: