Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KeyError while using tumbling window table #285

Closed
2 tasks done
shakeel-techie opened this issue Jan 28, 2019 · 2 comments
Closed
2 tasks done

KeyError while using tumbling window table #285

shakeel-techie opened this issue Jan 28, 2019 · 2 comments

Comments

@shakeel-techie
Copy link

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

from collections import Counter
import random
from datetime import timedelta, datetime
import faust

from faust.worker import logger
app = faust.App('windowing', broker='kafka://localhost:9092')


class Model(faust.Record, serializer='json'):
    username: str
    errorMessage: str
    source_address: str
    source_address_country_code: str


TOPIC = 'tumbling_topic_test'
tumbling_topic = app.topic(TOPIC, value_type=Model)
tumbling_table = app.Table('tumbling_table_new', default=list).tumbling(10, expires=timedelta(minutes=1))


def ip_address_counter(ip_list):
    count_dict = Counter(ip_list)
    for k, v in count_dict.items():
        # if IP same IP is more than 3 trigger alert
        logger.info('Count for the IP {0} is {1}'.format(k, v))


@app.agent(tumbling_topic)
async def my_stream(stream):
    async for event in stream:
        source_address = event.source_address
        error_str = event.errorMessage
        if error_str == "Failed authentication":
            # append source ip to table "ip_counter"
            tumbling_table['ip_counter'] += [source_address]
            # append error message to table "error_str_counter"
            tumbling_table['error_str_counter'] += [error_str]

            ip_list = list(tumbling_table['ip_counter'].value())
            logger.info("My list of IP{0}".format(ip_list))

            error_list = list(tumbling_table['error_str_counter'].value())
            logger.info("Error list {0}".format(error_list))

            if len(error_list) >= 3:
                ip_address_counter(ip_list)


@app.timer(3.0, on_leader=True)
async def publish_every_2secs():
    ip_list = ["0.0.0.0", "9.9.9.9", "1.1.1.1", "2.2.2.2"]
    ip = random.choice(ip_list)
    msg = Model(username="someuser", errorMessage="Failed authentication", source_address=ip,
                source_address_country_code="IN")
    await tumbling_topic.send(value=msg)


if __name__ == '__main__':
    app.main()

Expected behavior

Process a list, to check in the list(error_list) if there are same IP addresses occurring >= 3 times in the current window.

Actual behavior

ERROR: KeyError: ('error_str_counter', WindowRange(start=1548143270.0, end=1548143279.9))

Full traceback

[2019-01-22 13:25:51,924: ERROR]: [^--Table: tumbling_table_new]: Crashed reason=KeyError(('error_str_counter', WindowRange(start=1548143270.0, end=1548143279.9)),)
Traceback (most recent call last):
  File "C:\Users\vellore\Envs\env_pythonic\lib\site-packages\mode\services.py", line 744, in _execute_task
    await task
  File "C:\Users\vellore\Envs\env_pythonic\lib\site-packages\mode\services.py", line 447, in _and_transition
    return await fun(self, *args, **kwargs)
  File "C:\Users\vellore\Envs\env_pythonic\lib\site-packages\faust\tables\base.py", line 277, in _clean_data
    self._del_old_keys()
  File "C:\Users\vellore\Envs\env_pythonic\lib\site-packages\faust\tables\base.py", line 292, in _del_old_keys
    del self.data[key]
  File "C:\Users\vellore\Envs\env_pythonic\lib\site-packages\mode\utils\collections.py", line 82, in __delitem__
    del self.data[key]
KeyError: ('error_str_counter', WindowRange(start=1548143270.0, end=1548143279.9)) 

Versions

  • Python 3.6.5
  • Faust 1.4.5
  • Windows 10
  • Kafka 2.11-2.1.0
@ask
Copy link
Contributor

ask commented May 1, 2019

I think this was fixed in a recent version!

I ran your example and this seems to confirm it, but please let me know if you still have issues.

@ask ask closed this as completed May 1, 2019
@shakeel-techie
Copy link
Author

Thank you ask. It is working!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants