Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When relative_to_field is used with a datetime field delta windows are always empty #223

Open
2 tasks done
mstump opened this issue Nov 24, 2018 · 5 comments
Open
2 tasks done

Comments

@mstump
Copy link
Contributor

mstump commented Nov 24, 2018

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

import asyncio
import faust
import random
import string
import uuid
import json
import dateutil.parser

from datetime import datetime, timedelta
from faust import windows
from faust.serializers import codecs
from typing import Any

class PageView(faust.Record, serializer='page_view_json'):
    id: str = None
    user: str = None
    occurred_at: datetime = None


    def _jsonSupport(*args):

        class JSONSupport(codecs.Codec):

            def _dumps(self, obj: Any) -> bytes:
                obj["occurred_at"] = obj["occurred_at"].isoformat()
                return json.dumps(obj).encode()

            def _loads(self, s: bytes) -> Any:
                obj = json.loads(s)
                obj["occurred_at"] = dateutil.parser.parse(obj["occurred_at"])
                return PageView(**obj)

        codecs.register('page_view_json', JSONSupport())

    _jsonSupport()

start_time = datetime.utcnow()
ttl = 4

app = faust.App(
    'page_views',
    broker='kafka://localhost:9092',
    topic_partitions=4,
)

page_view_topic = app.topic('page_views', value_type=PageView)

active_users_table = app.Table(
    'active_users',
    default=None).tumbling(
        ttl,
        expires=timedelta(seconds=30),
        key_index=True
    ).relative_to_field(PageView.occurred_at)

@app.timer(interval=2, on_leader=True)
async def generator():
    user = ''.join(random.choices(string.ascii_uppercase + string.digits, k=7))
    page_view = PageView(str(uuid.uuid4()), user, datetime.utcnow())
    await page_view_topic.send(value=page_view)

@app.agent(page_view_topic)
async def print_windowed_events(stream):
    async for page_view in stream:
        active_users_table[page_view.user] = page_view.occurred_at

        print('-- New Event (every 2 secs) --')
        print(f"seconds since start: {page_view.occurred_at - start_time}")
        print(f"{repr([k for (k,v) in active_users_table.items()])}")
        print(f"{repr([k for (k,v) in active_users_table.items().delta(ttl)])}")

Expected behavior

Delta window should include items from time window at the specified delta.

Actual behavior

When window.relative_to_field is used items for delta windows are always empty.

[2018-11-23 23:19:51,171: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:19:51,171: WARNING]: seconds since start: -1 day, 23:57:36.936266
[2018-11-23 23:19:51,172: WARNING]: ['D599WCN']
[2018-11-23 23:19:51,172: WARNING]: []
[2018-11-23 23:19:51,783: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:19:51,784: WARNING]: seconds since start: 0:00:18.157651
[2018-11-23 23:19:51,784: WARNING]: ['7PU9N73']
[2018-11-23 23:19:51,784: WARNING]: []
[2018-11-23 23:19:53,813: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:19:53,814: WARNING]: seconds since start: 0:00:20.163283
[2018-11-23 23:19:53,814: WARNING]: ['MCPRP7E']
[2018-11-23 23:19:53,815: WARNING]: []
[2018-11-23 23:19:55,782: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:19:55,783: WARNING]: seconds since start: 0:00:22.164461
[2018-11-23 23:19:55,783: WARNING]: ['MCPRP7E', '1834GWX']
[2018-11-23 23:19:55,783: WARNING]: []
[2018-11-23 23:19:57,789: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:19:57,789: WARNING]: seconds since start: 0:00:24.170478
[2018-11-23 23:19:57,789: WARNING]: ['BKZ9XTO']
[2018-11-23 23:19:57,790: WARNING]: []
[2018-11-23 23:19:59,789: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:19:59,790: WARNING]: seconds since start: 0:00:26.171417
[2018-11-23 23:19:59,790: WARNING]: ['BKZ9XTO', 'SB8SCH4']
[2018-11-23 23:19:59,790: WARNING]: []
[2018-11-23 23:20:01,793: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:20:01,793: WARNING]: seconds since start: 0:00:28.175309
[2018-11-23 23:20:01,793: WARNING]: ['OH389W1']
[2018-11-23 23:20:01,794: WARNING]: []
[2018-11-23 23:20:03,798: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:20:03,798: WARNING]: seconds since start: 0:00:30.180400
[2018-11-23 23:20:03,799: WARNING]: ['OH389W1', 'B0AQNYM']
[2018-11-23 23:20:03,799: WARNING]: []
[2018-11-23 23:20:05,804: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:20:05,804: WARNING]: seconds since start: 0:00:32.185951
[2018-11-23 23:20:05,804: WARNING]: ['7EKFGZR']
[2018-11-23 23:20:05,805: WARNING]: []

Versions

  • Python version Python 3.7.1
  • Faust version Version 1.3.2 and master on commit be0f5709cf6a533d039c33b49af70e2496b5ad30
  • Operating system OSX
  • Kafka version 1.8.0_162
@mstump
Copy link
Contributor Author

mstump commented Nov 25, 2018

It turns out that this is only reproducible if the field is a datetime and not if the field is a float.

The code snip-it below works as anticipated.

import asyncio
import faust
import random
import string
import uuid
import time

from faust import windows

class PageView(faust.Record):
    id: str = None
    user: str = None
    occurred_at: float = None

start_time = time.time()
ttl = 4

app = faust.App(
    'page_view_windows',
    broker='kafka://localhost:9092',
    topic_partitions=4,
)

page_view_topic = app.topic('page_views', value_type=PageView)

active_users_table = app.Table(
    'active_users',
    default=None).tumbling(
        ttl,
        expires=timedelta(seconds=30),
        key_index=True
    ).relative_to_field(PageView.occurred_at)

@app.timer(interval=2, on_leader=True)
async def generator():
    user = ''.join(random.choices(string.ascii_uppercase + string.digits, k=7))
    page_view = PageView(str(uuid.uuid4()), user, time.time())
    await page_view_topic.send(value=page_view)

@app.agent(page_view_topic)
async def print_windowed_events(stream):
    async for page_view in stream:
        active_users_table[page_view.user] = page_view.occurred_at

        print('-- New Event (every 2 secs) --')
        print(f"seconds since start: {page_view.occurred_at - start_time}")
        print(f"{len(active_users_table.keys())}, {repr([k for (k,v) in active_users_table.items()])}")
        print(f"{repr([k for (k,v) in active_users_table.items().delta(ttl)])}")

@mstump mstump changed the title When relative_to_field is used items in delta windows are always empty When relative_to_field is used with a datetime field delta windows are always empty Nov 25, 2018
@ask
Copy link
Contributor

ask commented Nov 26, 2018

Note: You can have datetimes in json models using the isodates option like this:

class PageView(faust.Record, isodates=True, serializer='json'):
    id: str = None
    user: str = None
    occurred_at: datetime = None

@mstump
Copy link
Contributor Author

mstump commented Nov 29, 2018

Thanks for the note about isodates, missed that in the docs until I started reading the model code.

@smboy
Copy link

smboy commented Feb 25, 2019

@mstump could you please post your page_view_json data? I'm trying to refer your example to build an example process. Wondering how the structure of your incoming data is.

@forsberg
Copy link

@mstump - does the problem go away if you disable Cython? You can do so by setting the environment variable NO_CYTHON=True

Adir-Shemesh pushed a commit to Adir-Shemesh/faust that referenced this issue Jan 10, 2022
* Fix Recovery._resume_streams

* Discard fetched messages on rebalance

* update cython

* Add logs for skipped events

* QA fixes

* Track generation_id in more places

Co-authored-by: Eric Kerstens <ekerstens@expediagroup.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants