Skip to content

list-family/ansq

master
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Code

Latest commit

 

Git stats

Files

Permalink
Failed to load latest commit information.
Type
Name
Latest commit message
Commit time
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

ansq - Async NSQ

PyPI version Tests Coverage PyPI - Python Version

Written with native Asyncio NSQ package

Installation

python -m pip install ansq

Overview

  • Reader — high-level class for building consumers with nsqlookupd support
  • Writer — high-level producer class supporting async publishing of messages to nsqd over the TCP protocol
  • NSQConnection — low-level class representing a TCP connection to nsqd:
    • full TCP wrapper
    • one connection for sub and pub
    • self-healing: when the connection is lost, reconnects, sends identify and auth commands, subscribes to previous topic/channel

Features

  • SUB
  • PUB
  • Discovery
  • Backoff
  • TLS
  • Deflate
  • Snappy
  • Sampling
  • AUTH

Usage

Consumer

A simple consumer reads messages from "example_topic" and prints them to stdout.

import asyncio

import ansq


async def main():
    reader = await ansq.create_reader(
        topic="example_topic",
        channel="example_channel",
    )

    async for message in reader.messages():
        print(f"Message: {message.body}")
        await message.fin()

    await reader.close()


if __name__ == "__main__":
    asyncio.run(main())

Producer

A simple producer sends a "Hello, world!" message to "example_topic".

import asyncio

import ansq


async def main():
    writer = await ansq.create_writer()
    await writer.pub(
        topic="example_topic",
        message="Hello, world!",
    )
    await writer.close()


if __name__ == "__main__":
    asyncio.run(main())

Contributing

Create and activate a development virtual environment.

python -m venv venv
source venv/bin/activate

Install ansq in editable mode and its testing dependencies.

python -m pip install -e .[testing]

Run tests.

pytest