Skip to content

Commit

Permalink
support for Connection.Blocked removed
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrmaslanka committed Apr 4, 2024
1 parent 6d7877e commit 3d56792
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 3,503 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var/
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

resources/amqp0-9-1.xml
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ have been made so far, between releases.

# v1.2.17

* can support building from XML not containing Connection.Blocked and Connection.Unblocked
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@ CoolAMQP
[![Documentation Status](https://readthedocs.org/projects/coolamqp/badge/?version=latest)](http://coolamqp.readthedocs.io/en/latest/?badge=develop)
[![license](https://img.shields.io/github/license/mashape/apistatus.svg)]()

A **magical** AMQP 0.9.1 client, that uses **heavy sorcery** to achieve speeds that other pure-Python AMQP clients cannot even hope to match.
Additionally, it's traceable using **opentracing**.
Why CoolAMQP?
-------------

* AMQP 0.9.1 client that's native Python
* heavily optimized for speed
* geared towards interfacing with [RabbitMQ](https://www.rabbitmq.com/
* supports custom RabbitMQ commands and Connection.Blocked and Connection.Unblocked
* traceable using [opentracing](https://opentracing.io/)

Documentation (WIP) is available at [Read the Docs](http://coolamqp.readthedocs.io/).

Expand Down Expand Up @@ -89,6 +95,11 @@ In order to compile the definitions:
```python
python -m compile_definitions
```
* you can alternatively perform
```python
python -m compile_definitions --no-connection-blocked
```
To generate a variant not supporting Connection.Blocked and Connection.Unblocked commands
and you're all set. The only files modified is
[definitions.py](coolamqp/framing/definitions.py).

Expand Down
34 changes: 31 additions & 3 deletions compile_definitions/__main__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from __future__ import division

import os
import sys
if sys.version.startswith('2.'):
raise RuntimeError('Cannot run under Python 2.7')

from urllib.request import urlopen
import collections
import math
import struct
import subprocess
from io import BytesIO
from xml.etree import ElementTree
from zipfile import ZipFile

import six

Expand All @@ -26,11 +34,26 @@
}


def compile_definitions(xml_file='resources/amqp0-9-1.extended.xml',
def get_xml(xml_file):
"""Download XML definition from OASIS's website"""

r = urlopen('https://raw.githubusercontent.com/postwait/node-amqp/master/amqp-0-9-1-rabbit.xml')
with open(xml_file, 'wb') as out:
out.write(r.read())


def compile_definitions(xml_file='resources/amqp0-9-1.xml',
out_file='coolamqp/framing/definitions.py'):
"""parse resources/amqp-0-9-1.xml into """
if not os.path.exists(xml_file):
get_xml(xml_file)

try:
xml = ElementTree.parse(xml_file)
except ElementTree.ParseError:
get_xml(xml_file)
xml = ElementTree.parse(xml_file)

xml = ElementTree.parse(xml_file)
out = open(out_file, 'wb')

out.write(u'''# coding=UTF-8
Expand Down Expand Up @@ -568,7 +591,12 @@ def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> %s


if __name__ == '__main__':
compile_definitions()
if '--no-connection-blocked' in sys.argv:
xml_file = 'resources/amqp0-9-1.xml'
print('Compiling without Connection.Blocked')
else:
xml_file = 'resources/amqp0-9-1.extended.xml'
compile_definitions(xml_file=xml_file)
proc = subprocess.check_output(['yapf', 'coolamqp/framing/definitions.py'])
with open('coolamqp/framing/definitions.py', 'wb') as f_out:
f_out.write(proc)
18 changes: 12 additions & 6 deletions coolamqp/attaches/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@
try:
# these extensions will be available
from coolamqp.framing.definitions import ConfirmSelect, ConfirmSelectOk, \
BasicNack, ChannelFlow, ChannelFlowOk, ConnectionBlocked, ConnectionUnblocked
BasicNack, ChannelFlow, ChannelFlowOk
except ImportError:
pass

try:
from coolamqp.framing.definitions import ConnectionUnblocked, ConnectionBlocked
except ImportError:
ConnectionBlocked, ConnectionUnblocked = None, None

from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE
from coolamqp.uplink import PUBLISHER_CONFIRMS, MethodWatch, FailWatch
from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable, \
Expand Down Expand Up @@ -108,9 +113,9 @@ def attach(self, connection):
connection.watch(FailWatch(self.on_fail))

def on_connection_blocked(self, payload):
if isinstance(payload, ConnectionBlocked):
if ConnectionBlocked is not None and isinstance(payload, ConnectionBlocked):
self.blocked = True
elif isinstance(payload, ConnectionUnblocked):
elif ConnectionUnblocked is not None and isinstance(payload, ConnectionUnblocked):
self.blocked = False

if self.content_flow:
Expand Down Expand Up @@ -321,9 +326,10 @@ def on_setup(self, payload):
mw = self.watch_for_method(ChannelFlow, self.on_flow_control)
mw.oneshot = False

mw = self.connection.watch_for_method(0, (ConnectionBlocked, ConnectionUnblocked),
self.on_connection_blocked)
mw.oneshot = False
if ConnectionBlocked is not None:
mw = self.connection.watch_for_method(0, (ConnectionBlocked, ConnectionUnblocked),
self.on_connection_blocked)
mw.oneshot = False

if self.mode == Publisher.MODE_CNPUB:
self.method_and_watch(ConfirmSelect(False), ConfirmSelectOk,
Expand Down
22 changes: 14 additions & 8 deletions coolamqp/clustering/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import logging
import typing as tp

from coolamqp.framing.definitions import ConnectionUnblocked, ConnectionBlocked
try:
from coolamqp.framing.definitions import ConnectionUnblocked, ConnectionBlocked
except ImportError:
ConnectionBlocked, ConnectionUnblocked = None, None

from coolamqp.objects import Callable
from coolamqp.uplink import Connection
from coolamqp.uplink.connection import MethodWatch
Expand Down Expand Up @@ -57,13 +61,15 @@ def connect(self, timeout=None): # type: (tp.Optional[float]) -> None
self.connection.finalize.add(self.on_fail)

# Register the on-blocking watches
mw = MethodWatch(0, (ConnectionBlocked,), lambda: self.on_blocked(True))
mw.oneshot = False
self.connection.watch(mw)

mw = MethodWatch(0, (ConnectionUnblocked,), lambda: self.on_blocked(False))
mw.oneshot = False
self.connection.watch(mw)
if ConnectionBlocked is not None:
mw = MethodWatch(0, (ConnectionBlocked,), lambda: self.on_blocked(True))
mw.oneshot = False
self.connection.watch(mw)

if ConnectionUnblocked is not None:
mw = MethodWatch(0, (ConnectionUnblocked,), lambda: self.on_blocked(False))
mw.oneshot = False
self.connection.watch(mw)

def _on_fail(self):
if self.terminating:
Expand Down
6 changes: 3 additions & 3 deletions coolamqp/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ class LoggingFrameTracer(BaseFrameTracer):
"""
A frame tracer that outputs each frame to log
:param logger: the logger to log onto
:param logger: the logger to log onto (defaults to logging.getLogger(__name__))
:param log_level: the level of logging to log with
"""
def __init__(self, logger, log_level=logging.WARNING):
self.logger = logger
def __init__(self, logger=None, log_level=logging.WARNING):
self.logger = logger or logging.getLogger(__name__)
self.log_level = log_level

def on_frame(self, timestamp, frame, direction):
Expand Down
24 changes: 24 additions & 0 deletions docs/cluster.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
CoolAMQP cluster
================

.. autoclass:: coolamqp.clustering.Cluster
:members:


Publisher
---------

.. autoclass:: coolamqp.attaches.publisher.Publisher
:members:
:undoc-members:

Consumers
---------

.. autoclass:: coolamqp.attaches.consumer.BodyReceiveMode
:members:

.. autoclass:: coolamqp.attaches.consumer.Consumer
:members:
:undoc-members:

5 changes: 0 additions & 5 deletions docs/coolamqp/cluster.rst

This file was deleted.

2 changes: 1 addition & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Welcome to CoolAMQP's documentation!
:maxdepth: 2
:caption: Contents

coolamqp/cluster
cluster
tutorial
caveats
frames
Expand Down

0 comments on commit 3d56792

Please sign in to comment.