Skip to content

Commit

Permalink
Push and pop commands in client are now thread-safe.
Browse files Browse the repository at this point in the history
  • Loading branch information
abisxir committed Jan 20, 2016
1 parent a381b06 commit 84413ff
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 37 deletions.
38 changes: 38 additions & 0 deletions examples/stress_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from turbomq import TurboClient
from threading import Thread
import time

thread_count = 64
message_size = 512
message_count_per_thread = 25000

def send(count, size):
c = TurboClient('tcp://127.0.0.1:33444')
q = c.get_queue('stress')

message = 'M' * size;

for i in xrange(count):
q.push('in', message)

c = TurboClient('tcp://127.0.0.1:33444')
q = c.get_queue('stress')


start_time = time.time()
threads = []
for i in xrange(thread_count):
t = Thread(target=send, args=(message_count_per_thread, message_size))
t.start()
threads.append(t)

for t in threads:
t.join()

total_message_count = message_count_per_thread * thread_count
elapsed_time = time.time() - start_time
tps = total_message_count / elapsed_time

print('TPS:{}'.format(tps))

q.push('in', 'shutdown-{}'.format(total_message_count))
40 changes: 40 additions & 0 deletions examples/stress_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from turbomq import TurboEngine
from threading import Thread
import time

def process(engine):
q = engine.get_queue('stress')
total_bytes = 0
total_messages = 0
start_time = 0
unit = (1.0 / (1024 * 1024))
total_sent_messages = 0

while True:
message = q.pop('in', 60)
if message is not None:
data = message.content
if start_time == 0:
start_time = time.time()
if data.startswith('shutdown'):
total_sent_messages = int(data.split('-')[1])
break
total_messages += 1
total_bytes += len(data)

elapsed_time = time.time() - start_time
tps = total_messages / elapsed_time
throughput = total_bytes / elapsed_time

print('Total sent messages: {}'.format(total_sent_messages))
print('Total received messages: {}'.format(total_messages))
print('Error: {}\n'.format(total_sent_messages - total_messages))

print('Elapsed time: {:.3f}s'.format(elapsed_time))
print('Total transferred data: {:.1f} MB'.format(unit * total_bytes))
print('TPS: {:.0f} messages/second'.format(tps))
print('Throughput: {:.1f} mbps'.format(unit * throughput))

e = TurboEngine('tcp://127.0.0.1:33444')
e.run()
process(e)
110 changes: 105 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,123 @@
include_dirs=['./include'])
]

long_description = open('README.md').read()
long_description = """
TurboMQ
=======
**TurboMQ** is a simple message queue system. I hope it is fast enough
to merit the name. In our test it could provide and consume millions of
messages in a second. But we delegate the final judgment to the
developers who use the library. Consider that , currently, it is too
experimental and there will be dramatical change in both functionality
and protocols.
Why TurboMQ is developed?
=========================
First, I want to explain why a new message queue system is developed.
There are many message queue systems available and some of them are
popular and stable like **RabbitMQ** or **ZMQ**. The most important
reason behind this implementation is that most of message queue systems
are designed to handle backend processing like distributing jobs between
nodes to process huge amount of data or just complete the remained part
of a business transaction. Certainly, TurboMQ can be used to distribute
works between nodes. Moreover, it originally designed to support
millions of providers and consumers working with millions of queues and
topics.
The most close (as queue functionality) system is **Redis**. It has a
remarkable IO mechanism to handle network connections. However, it can
just utilize one core for one instance. Do we really want to use just
one core of for example 8 available cores? Or do we want to configure
clustering inside one machine to just use all available cores?
**ZMQ** is a good library. It is fast, stable and useful for many
purposes. Nonetheless, there is a serious problem in topic-based PUB-SUB
queues. The consumers (subscribers) has to be connected before providers
`(missing message problem solver)`_ otherwise the message is going to be
lost.
Technical information
=====================
**TurboMQ** is a python module. To avoid GIL problems, it is developed
using pure **C** and **Cython**. It uses its own event loop system. The
benefit is that it is a real multi-threaded event loop and can exploit
all available cores. The drawback is that it does not support windows.
Are the bad news finished? No, kqueue has not implemented yet and it
uses (slow) posix POLL in BSD families. Is there any other good news?
Yes, windows and kqueue support is going to be implemented very soon.
Installation
============
Installation is easy. The package can be installed by pip:
$ sudo pip install turbomq
You need to download or clone it and then type the python magic:
$ sudo python setup.py install
Usage
============
To use **TurboMQ** just import and run the server. The following code runs a server for 10 minutes.
.. code-block:: python
from turbomq import TurboEngine
import time
# You can pass the thread count as a second parameter.
# Otherwise, it will automatically selects 4 threads per core.
e = TurboEngine('tcp://127.0.0.1:33444')
e.run()
# "run" method will not block the main thread.
# So you need to simply wait or run your own loop as you want.
time.sleep(10.0 * 60)
# "stop" method just shuts TCP sockets down.
e.stop()
# After destroy all resources will be freed.
# Then you can not use this instance anymore.
e.destroy()
This code sends a message to server and receives it again.
.. code-block:: python
from turbomq import TurboClient
# Connects to the server.
c = TurboClient('tcp://127.0.0.1:33444')
# Creates a mirror queue in client side.
q = c.get_queue('test')
# Both topic key and data is mandatory in push.
q.push('hello', 'turbo')
# In pop you need to determine a timeout.
# So this will wait two seconds. If timeout is exceeded, it will return None.
print(q.pop('hello', 2))
"""

setup(
name='turbomq',
ext_modules=cythonize(extensions),
version='0.1.5',
version='0.1.7',
description='TurboMQ - Message Queue System',
long_description=long_description,
author='Abi M.Sangarab',
author_email='abi@singiro.com',
url='https://github.com/turbomq/engine',
download_url='https://github.com/turbomq/engine/archive/0.1.5.tar.gz',
download_url='https://github.com/turbomq/engine/archive/0.1.7.tar.gz',
classifiers=[
'Development Status :: 3 - Alpha',
'License :: OSI Approved :: GNU General Public License',
'Programming Language :: Python :: 2.7',
'Topic :: Message Queue :: Message Processing',
'Programming Language :: C'
],
keywords='turbomq message queue amqp',
license='GNU General Public',
Expand Down
26 changes: 20 additions & 6 deletions src/client.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ include "common.pyx"

import socket
import time
from threading import Lock

cdef int64_t PUSH_COMMAND = 0
cdef int64_t POP_COMMAND = 1
Expand Down Expand Up @@ -63,6 +64,7 @@ cdef class TurboClient:
def __cinit__(self, bytes url):
self.socket = None
self.url = url
self.lock = Lock()
self.connect()

def connect(self):
Expand Down Expand Up @@ -100,9 +102,13 @@ cdef class TurboClient:
turbo_out_stream_append_str(out, topic)
turbo_out_stream_append(out, &content_size, sizeof(int32_t))
turbo_out_stream_append(out, content, content_size)
sent = turbo_out_stream_send(out)
if sent <= 0:
result = -1
with self.lock:
with nogil:
sent = turbo_out_stream_send(out)
if sent <= 0:
result = -1

with nogil:
turbo_out_stream_destroy(&out)

return result
Expand All @@ -119,17 +125,25 @@ cdef class TurboClient:
turbo_out_stream_append_str(out, qname)
turbo_out_stream_append_str(out, topic)
turbo_out_stream_append(out, &timeout, sizeof(int8_t))
if turbo_out_stream_send(out) <= 0:
result = -1

with self.lock:
with nogil:
if turbo_out_stream_send(out) <= 0:
result = -1

with nogil:
turbo_out_stream_destroy(&out)

if result < 0:
return NULL

input = turbo_in_stream_create(self.socket_fd)

with self.lock:
with nogil:
turbo_in_stream_recv(input);

with nogil:
turbo_in_stream_recv(input);
return turbo_in_stream_read_message(input)

cdef TurboMessage pop(self, char* qname, char* topic, int8_t timeout):
Expand Down
35 changes: 14 additions & 21 deletions src/engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include "array_list.h"
#include "command.h"

typedef struct
typedef struct
{
turbo_engine_t parent;
turbo_hashmap_t* queues;
Expand All @@ -51,11 +51,9 @@ int engine_command_push(turbo_command_t* command, turbo_remote_client_t* client)
turbo_message_t* message;
turbo_queue_t* queue;
turbo_engine_t* engine = turbo_command_get_context(command);

turbo_in_stream_read_str(client->in_stream, qname);
//printf("\tQueue:[%s].\n", qname);
turbo_in_stream_read_str(client->in_stream, topic);
//printf("\tTopic:[%s].\n", topic);
turbo_in_stream_read_int32(client->in_stream, &size);
content = malloc(size);
if(content == NULL)
Expand All @@ -65,7 +63,6 @@ int engine_command_push(turbo_command_t* command, turbo_remote_client_t* client)
return -1;
}
turbo_in_stream_read(client->in_stream, content, size);
//printf("\tContent size:[%d].\n", size);
queue = turbo_engine_get_queue(engine, qname);
message = turbo_message_create_ex(client->ip, content, size);
turbo_queue_push(queue, topic, message);
Expand All @@ -82,39 +79,36 @@ int engine_command_pop(turbo_command_t* command, turbo_remote_client_t* client)
turbo_queue_t* queue;
int result;
turbo_engine_t* engine = turbo_command_get_context(command);

turbo_in_stream_read_str(client->in_stream, qname);
//printf("\tQueue:[%s].\n", qname);
turbo_in_stream_read_str(client->in_stream, topic);
//printf("\tTopic:[%s].\n", topic);
turbo_in_stream_read(client->in_stream, &timeout, sizeof(timeout));
//printf("\tTimeout:[%d].\n", timeout);

queue = turbo_engine_get_queue(engine, qname);
message = turbo_queue_pop(queue, topic, timeout);

if(client->out_stream != NULL)
{
turbo_out_stream_destroy(&client->out_stream);
}

client->out_stream = turbo_out_stream_create(client->socketfd);
if(client->out_stream == NULL)
{
print_system_error();
return -1;
}

result = turbo_out_stream_append_message(client->out_stream, message);

turbo_message_destroy(&message);

if(result == -1)
{
print_system_error();
return result;
}

return 0;
}

Expand All @@ -133,17 +127,17 @@ turbo_engine_t* turbo_engine_create(const char* protocol, const char* host, int
engine->ioloop = turbo_ioloop_create((turbo_engine_t*)engine, protocol, host, port, num_threads);
engine->queues = turbo_hashmap_str_create();
engine->commands = turbo_array_list_create(engine_release_command);

/*
* Adding push command
*/
turbo_array_list_append(engine->commands, turbo_command_create(0, engine_command_push, engine));

/*
* Adding pop command
*/
turbo_array_list_append(engine->commands, turbo_command_create(1, engine_command_pop, engine));

pthread_mutex_init(&engine->lock, NULL);
return (turbo_engine_t*)engine;
}
Expand Down Expand Up @@ -223,7 +217,7 @@ turbo_queue_t* turbo_engine_get_queue(turbo_engine_t* base, const char* name)
* Locks the engine to create a new queue
*/
pthread_mutex_lock(&engine->lock);

/*
* We are going to double check it
*/
Expand All @@ -236,12 +230,11 @@ turbo_queue_t* turbo_engine_get_queue(turbo_engine_t* base, const char* name)
queue = turbo_queue_create(name);
turbo_hashmap_put(engine->queues, strdup(name), queue);
}

/*
* Unlock the engine
*/
pthread_mutex_unlock(&engine->lock);

return queue;
}

Loading

0 comments on commit 84413ff

Please sign in to comment.