Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: aziomq/aziomq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: master
Choose a base ref
...
head repository: zeromq/azmq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref
Can’t automatically merge. Don’t worry, you can still create the pull request.

Commits on Nov 19, 2014

  1. Clean up sample code

    recent version of asio::buffer now handle std::array
    Thomas Rodgers committed Nov 19, 2014
    Copy the full SHA
    86501ac View commit details
  2. Merge pull request #3 from rodgert/master

    Clean up sample code
    rodgert committed Nov 19, 2014
    Copy the full SHA
    4c5e643 View commit details

Commits on Nov 20, 2014

  1. Initial support for CZMQ zthread like functionality

    Thomas Rodgers committed Nov 20, 2014
    Copy the full SHA
    f1359e5 View commit details
  2. Merge pull request #6 from rodgert/master

    Initial support for CZMQ zthread like functionality
    rodgert committed Nov 20, 2014
    Copy the full SHA
    96c5d75 View commit details

Commits on Nov 21, 2014

  1. Copy the full SHA
    55d89e5 View commit details
  2. Copy the full SHA
    896cb53 View commit details
  3. Copy the full SHA
    89bb4b2 View commit details
  4. Copy the full SHA
    120d14f View commit details
  5. ZeroMQ finder uses CMake variable ZMQ_ROOT if specified and otherwise…

    … environment variable ZMQ_ROOT
    oliora committed Nov 21, 2014
    Copy the full SHA
    8756db3 View commit details
  6. Copy the full SHA
    74014f6 View commit details
  7. Copy the full SHA
    3e14125 View commit details
  8. Use boost::mutex instead of std::mutex because last one crashes if us…

    …ed as static variable
    oliora committed Nov 21, 2014
    Copy the full SHA
    daf71a0 View commit details
  9. Copy the full SHA
    a797b80 View commit details
  10. Copy the full SHA
    ca05847 View commit details
  11. Copy the full SHA
    3fc1a87 View commit details
  12. Copy the full SHA
    53df51a View commit details
  13. Copy the full SHA
    3881842 View commit details
  14. Copy the full SHA
    d171919 View commit details
  15. resolves #1 add socket monitor functionality

    Also addresses some surprising socket clean up behavior with
    naive use of io_service.stop() and async operations which retain the
    socket by shared_ptr (creating a reference cycle).
    Thomas Rodgers committed Nov 21, 2014
    Copy the full SHA
    93f2fd3 View commit details
  16. fix #10 spurious failures on receive/receive_vec

    Thomas Rodgers committed Nov 21, 2014
    Copy the full SHA
    2034683 View commit details

Commits on Nov 22, 2014

  1. Merge pull request #8 from rodgert/master

    resolves #1 add socket monitor functionality
    rodgert committed Nov 22, 2014
    Copy the full SHA
    b684fb0 View commit details
  2. Merge branch 'pr/9'

    Conflicts:
    	src/lib/CMakeLists.txt
    Thomas Rodgers committed Nov 22, 2014
    Copy the full SHA
    b35ce39 View commit details

Commits on Nov 24, 2014

  1. Copy the full SHA
    3671a46 View commit details
  2. Memory leak on Linux fixed

    oliora committed Nov 24, 2014
    Copy the full SHA
    44c2a64 View commit details
  3. Merge pull request #12 from oliora/master

    Fixed Windows compilation errors and memory leak
    hintjens committed Nov 24, 2014
    Copy the full SHA
    2f243b6 View commit details
  4. Typo fixed

    oliora committed Nov 24, 2014
    Copy the full SHA
    6ee64ea View commit details
  5. Merge pull request #13 from oliora/master

    Fixed typo added in previous commit
    hintjens committed Nov 24, 2014
    Copy the full SHA
    a4756b9 View commit details
  6. Copy the full SHA
    fa9a2d8 View commit details
  7. Merge pull request #14 from oliora/master

    Rolling back changes made by mistake.
    rodgert committed Nov 24, 2014
    Copy the full SHA
    7895b05 View commit details
  8. Added myself to AUTHORS

    oliora committed Nov 24, 2014
    Copy the full SHA
    3b25401 View commit details
  9. Merge pull request #15 from oliora/master

    Added myself to AUTHORS
    rodgert committed Nov 24, 2014
    Copy the full SHA
    a648832 View commit details

Commits on Nov 25, 2014

  1. Expose thread options in thread namespace

    Minor refactoring of thread_service details
    Thomas Rodgers committed Nov 25, 2014
    Copy the full SHA
    a66a60a View commit details
  2. Merge remote-tracking branch 'upstream/master'

    Thomas Rodgers committed Nov 25, 2014
    Copy the full SHA
    72e149f View commit details
  3. Merge pull request #19 from rodgert/master

    Expose thread options in thread namespace
    hintjens committed Nov 25, 2014
    Copy the full SHA
    b54282d View commit details
  4. fix #17 Monitoring events comes empty

    Two issues, receive_buffer_op was capturing buffer by reference, it_ and
    end_ pointed to the supplied buffer argument, not the local instance.
    receive_buffer_op was not performing buffer copy.
    Thomas Rodgers committed Nov 25, 2014
    Copy the full SHA
    ef0eadd View commit details
  5. Merge pull request #20 from rodgert/master

    fix #17 Monitoring events comes empty
    hintjens committed Nov 25, 2014
    Copy the full SHA
    624b963 View commit details
  6. Remove unused file

    Thomas Rodgers committed Nov 25, 2014
    Copy the full SHA
    1fed877 View commit details
  7. Introduce inline v1 namespaces for public API

    Thomas Rodgers committed Nov 25, 2014
    Copy the full SHA
    cd22cae View commit details
  8. Rename scope guards AZIOMQ->AZMQ

    Thomas Rodgers committed Nov 25, 2014
    Copy the full SHA
    633b142 View commit details
  9. Copy the full SHA
    be13b35 View commit details
  10. Merge pull request #22 from oliora/master

    Tests build scripts cleaned up. All tests registered to run on 'make test'
    hintjens committed Nov 25, 2014
    Copy the full SHA
    31fe0ba View commit details
  11. Rename aziomq namespace/dir to azmq

    Thomas Rodgers committed Nov 25, 2014
    Copy the full SHA
    02938b9 View commit details
  12. Copy the full SHA
    58c3225 View commit details

Commits on Nov 26, 2014

  1. Update README.md to reflect name change

    Thomas Rodgers committed Nov 26, 2014
    Copy the full SHA
    ea6fb9e View commit details
  2. Merge remote-tracking branch 'upstream/master'

    Conflicts:
    	test/CMakeLists.txt
    	test/context_ops/CMakeLists.txt
    	test/message/CMakeLists.txt
    	test/socket/CMakeLists.txt
    	test/socket_ops/CMakeLists.txt
    	test/thread/CMakeLists.txt
    Thomas Rodgers committed Nov 26, 2014
    Copy the full SHA
    056ae49 View commit details
  3. More updates of AzioMQ -> AZMQ

    Thomas Rodgers committed Nov 26, 2014
    Copy the full SHA
    95b22af View commit details
  4. Merge pull request #23 from oliora/master

    README extended with Windows related info
    rodgert committed Nov 26, 2014
    Copy the full SHA
    ba8b78a View commit details
  5. Merge remote-tracking branch 'upstream/master'

    Thomas Rodgers committed Nov 26, 2014
    Copy the full SHA
    bf95a1f View commit details
  6. More README.md changes AzioMQ -> AZMQ

    Thomas Rodgers committed Nov 26, 2014
    Copy the full SHA
    717e182 View commit details
  7. Bump Boost minimum version to 1.54

    Thomas Rodgers committed Nov 26, 2014
    Copy the full SHA
    6c0a589 View commit details
Showing with 5,446 additions and 2,738 deletions.
  1. +16 −0 .cmake-format
  2. +88 −0 .github/workflows/cmake-superbuild/CMakeLists.txt
  3. +81 −0 .github/workflows/cmake.yml
  4. +3 −1 AUTHORS
  5. +33 −0 AzmqCPack.cmake
  6. +11 −0 AzmqCPackOptions.cmake.in
  7. +72 −21 CMakeLists.txt
  8. +2 −3 CONTRIBUTING.md
  9. +50 −0 FindAzmqLibzmq.cmake
  10. +57 −26 README.md
  11. +0 −64 aziomq/detail/reactor_op.hpp
  12. +0 −188 aziomq/detail/receive_op.hpp
  13. +0 −160 aziomq/detail/send_op.hpp
  14. +0 −244 aziomq/detail/socket_ops.hpp
  15. +0 −530 aziomq/detail/socket_service.hpp
  16. +0 −105 aziomq/endpoint.hpp
  17. +0 −28 aziomq/error.hpp
  18. +0 −699 aziomq/socket.hpp
  19. +81 −0 azmq/actor.hpp
  20. +10 −13 aziomq/io_service.hpp → azmq/context.hpp
  21. +289 −0 azmq/detail/actor_service.hpp
  22. +51 −0 azmq/detail/basic_io_object.hpp
  23. +27 −0 azmq/detail/config.hpp
  24. +26 −0 azmq/detail/config/condition_variable.hpp
  25. +28 −0 azmq/detail/config/lock_guard.hpp
  26. +25 −0 azmq/detail/config/mutex.hpp
  27. +24 −0 azmq/detail/config/thread.hpp
  28. +29 −0 azmq/detail/config/unique_lock.hpp
  29. +28 −10 {aziomq → azmq}/detail/context_ops.hpp
  30. +44 −0 azmq/detail/reactor_op.hpp
  31. +201 −0 azmq/detail/receive_op.hpp
  32. +147 −0 azmq/detail/send_op.hpp
  33. +38 −0 azmq/detail/service_base.hpp
  34. +32 −19 {aziomq → azmq}/detail/socket_ext.hpp
  35. +376 −0 azmq/detail/socket_ops.hpp
  36. +728 −0 azmq/detail/socket_service.hpp
  37. +52 −0 azmq/error.hpp
  38. +138 −72 {aziomq → azmq}/message.hpp
  39. +53 −11 {aziomq → azmq}/option.hpp
  40. +84 −0 azmq/signal.hpp
  41. +918 −0 azmq/socket.hpp
  42. +9 −9 {aziomq → azmq}/util/expected.hpp
  43. +7 −7 {aziomq → azmq}/util/scope_guard.hpp
  44. +23 −0 azmq/version.hpp
  45. +6 −0 azmqConfig.cmake
  46. +75 −0 cmake/CompilerWarnings.cmake
  47. +0 −45 config/FindAzioMQ.cmake
  48. +0 −45 config/FindZeroMQ.cmake
  49. +1 −0 doc/CMakeLists.txt
  50. +1 −0 doc/examples/CMakeLists.txt
  51. +5 −0 doc/examples/actor/CMakeLists.txt
  52. +127 −0 doc/examples/actor/main.cpp
  53. +0 −1 src/CMakeLists.txt
  54. +0 −8 src/lib/CMakeLists.txt
  55. +0 −37 src/lib/context_ops.cpp
  56. +0 −30 src/lib/error.cpp
  57. +0 −20 src/lib/socket_service.cpp
  58. +27 −12 test/CMakeLists.txt
  59. +7 −0 test/actor/CMakeLists.txt
  60. +68 −0 test/actor/main.cpp
  61. +0 −25 test/assert.ipp
  62. +7 −5 test/context_ops/CMakeLists.txt
  63. +27 −31 test/context_ops/main.cpp
  64. +41 −0 test/cpp20/socket/CMakeLists.txt
  65. +77 −0 test/cpp20/socket/main.cpp
  66. +5 −5 test/message/CMakeLists.txt
  67. +165 −70 test/message/main.cpp
  68. +7 −0 test/signal/CMakeLists.txt
  69. +26 −0 test/signal/main.cpp
  70. +9 −5 test/socket/CMakeLists.txt
  71. +737 −94 test/socket/main.cpp
  72. +7 −5 test/socket_ops/CMakeLists.txt
  73. +140 −90 test/socket_ops/main.cpp
16 changes: 16 additions & 0 deletions .cmake-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
format:
line_width: 123
tab_size: 4
max_subgroups_hwrap: 4
max_pargs_hwrap: 4
max_rows_cmdline: 8
separate_ctrl_name_with_space: true
separate_fn_name_with_space: false
dangle_parens: false
min_prefix_chars: 8
max_lines_hwrap: 3
line_ending: unix
markup:
bullet_char: '*'
enum_char: .
enable_markup: false
88 changes: 88 additions & 0 deletions .github/workflows/cmake-superbuild/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
cmake_minimum_required(VERSION 3.0)
project(azmq-super-build)

#options
set(BOOST_URL "system" CACHE STRING "boost source url")
set(ZMQ_TAG "v4.3.4" CACHE STRING "zmq tag")
option(ZMQ_BUILD_WITH_CMAKE "use cmake (default to autotools)" no)
option(BUILD_SHARED_LIBS "build shared (default to static)" no)

# setup configuration derived from the option
include(ProcessorCount)
ProcessorCount(CPU_COUNT)

if (BUILD_SHARED_LIBS)
message(STATUS " build with shared libraries")
set(ZMQ_BUILD_STATIC no)
set(ZMQ_BUILD_SHARED yes)
else ()
message(STATUS " build with static libraries")
set(ZMQ_BUILD_STATIC yes)
set(ZMQ_BUILD_SHARED no)
endif ()

include(ExternalProject)

set(BOOST_177 "https://boostorg.jfrog.io/artifactory/main/release/1.77.0/source/boost_1_77_0.tar.bz2")
set(BOOST_176 "https://boostorg.jfrog.io/artifactory/main/release/1.76.0/source/boost_1_76_0.tar.bz2")
set(BOOST_174 "https://boostorg.jfrog.io/artifactory/main/release/1.74.0/source/boost_1_74_0.tar.bz2")
set(BOOST_172 "https://boostorg.jfrog.io/artifactory/main/release/1.72.0/source/boost_1_72_0.tar.bz2")
set(BOOST_170 "https://boostorg.jfrog.io/artifactory/main/release/1.70.0/source/boost_1_70_0.tar.bz2")
set(BOOST_168 "https://boostorg.jfrog.io/artifactory/main/release/1.68.0/source/boost_1_68_0.tar.bz2")

set(DEVROOT ${CMAKE_BINARY_DIR}/devroot)

set(CMAKE_ARGS
-DCMAKE_BUILD_TYPE=Debug
-DCMAKE_PREFIX_PATH=${DEVROOT}
-DCMAKE_INSTALL_PREFIX=${DEVROOT}
-DCMAKE_FIND_ROOT_PATH=${DEVROOT}
-DBUILD_SHARED_LIBS=${BUILD_SHARED_LIBS})

set(LIBS "")
if (NOT (ZMQ_TAG STREQUAL "system"))
if (ZMQ_BUILD_WITH_CMAKE)
message(STATUS " build libzmq with cmake ")
ExternalProject_Add(
zmq
GIT_REPOSITORY https://github.com/zeromq/libzmq.git
GIT_TAG ${ZMQ_TAG}
CMAKE_ARGS ${CMAKE_ARGS}
-DZMQ_BUILD_TESTS=OFF
-DWITH_PERF_TOOL=OFF
-DWITH_LIBSODIUM=OFF
-DBUILD_STATIC=${ZMQ_BUILD_STATIC}
-DBUILD_SHARED=${ZMQ_BUILD_SHARED})
else ()
message(STATUS " build libzmq with autotools ")
ExternalProject_Add(
zmq
GIT_REPOSITORY https://github.com/zeromq/libzmq.git
GIT_TAG ${ZMQ_TAG}
PATCH_COMMAND ./autogen.sh
CONFIGURE_COMMAND <SOURCE_DIR>/configure --prefix=${DEVROOT} --enable-static=${ZMQ_BUILD_STATIC}
--enable-shared=${ZMQ_BUILD_SHARED}
BUILD_COMMAND make -j${CPU_COUNT}
INSTALL_COMMAND make install)
endif ()
list(APPEND LIBS zmq)
endif ()

if (NOT (BOOST_URL STREQUAL "system"))
ExternalProject_Add(
boost
URL ${${BOOST_URL}}
BUILD_IN_SOURCE 1
INSTALL_DIR ${DEVROOT}
CONFIGURE_COMMAND ./bootstrap.sh --prefix=<INSTALL_DIR>
BUILD_COMMAND ""
INSTALL_COMMAND ./b2 install --prefix=<INSTALL_DIR> -j${CPU_COUNT})
list(APPEND LIBS boost)
endif ()

ExternalProject_Add(
azmq
DEPENDS ${LIBS}
CMAKE_ARGS ${CMAKE_ARGS} -D AZMQ_BUILD_TESTS=YES
SOURCE_DIR ${CMAKE_SOURCE_DIR}/../../..
TEST_BEFORE_INSTALL YES)
81 changes: 81 additions & 0 deletions .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
name: CMake

on:
push:
branches:
- '**'
pull_request:
branches: [master]

jobs:
build:
strategy:
fail-fast: false
matrix:
config:
- {
name: "ubuntu22",
os: ubuntu-22.04,
boost_url: system,
zmq_tag: system,
system_packages: "libboost-all-dev libzmq3-dev ",
cc: "gcc",
cxx: "g++",
}
- {
name: "ubuntu20 stock",
os: ubuntu-20.04,
boost_url: system,
zmq_tag: system,
system_packages: "libboost-all-dev libzmq3-dev ",
cc: "gcc",
cxx: "g++",
}
- {
name: "macos12 ",
os: macos-12,
boost_url: system,
zmq_tag: system,
system_packages: "zmq boost",
cc: "clang",
cxx: "clang++",
}
- {
name: "macos11 ",
os: macos-11,
boost_url: system,
zmq_tag: system,
system_packages: "zmq boost",
cc: "clang",
cxx: "clang++",
}

runs-on: ${{ matrix.config.os }}

env:
CC: ${{ matrix.config.cc }}
CXX: ${{ matrix.config.cxx }}
CTEST_OUTPUT_ON_FAILURE: 1
PACKAGES: ${{ matrix.config.system_packages }}

steps:
- uses: actions/checkout@v3
- name: buildtools on ubuntu
if: startsWith(matrix.config.name, 'ubuntu')
shell: bash
run: |
if [ -n "$PACKAGES" ]; then
sudo apt-get install -y $PACKAGES
fi
- name: buildtools on macos
if: startsWith(matrix.config.name, 'macos') && matrix.config.*.system_packages
shell: bash
run: |
if [ -n "$PACKAGES" ]; then
brew install $PACKAGES
fi
- name: build
shell: bash
run: |
cmake -DBOOST_URL=${{matrix.config.boost_url}} -DZMQ_TAG=${{matrix.config.zmq_tag}} -S .github/workflows/cmake-superbuild -B build
cmake --build build
4 changes: 3 additions & 1 deletion AUTHORS
Original file line number Diff line number Diff line change
@@ -3,4 +3,6 @@ Copyright (c) 2013-2014 Thomas W Rodgers <rodgert@twrodgers.com>
Individual Contributors
=======================
Thomas W Rodgers <rodgert@twrodgers.com>
<trodgers@drw.com>
Andrey Upadyshev <oliora@gmail.com>
Tim Blechmann <tim@klingt.org>
Adam Boseley <adam.boseley@gmail.com>
33 changes: 33 additions & 0 deletions AzmqCPack.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
set(CPACK_PACKAGE_DESCRIPTION_SUMMARY "AZMQ Boost Asio + ZeroMQ")
set(CPACK_PACKAGE_DESCRIPTION
"The azmq library provides Boost Asio style bindings for ZeroMQ
This library is built on top of ZeroMQ's standard C interface and is
intended to work well with C++ applications which use the Boost libraries
in general, and Asio in particular.
The main abstraction exposed by the library is azmq::socket which
provides an Asio style socket interface to the underlying zeromq socket
and interfaces with Asio's io_service(). The socket implementation
participates in the io_service's reactor for asynchronous IO and
may be freely mixed with other Asio socket types (raw TCP/UDP/Serial/etc.).")
set(CPACK_PACKAGE_VERSION "1.1.0")

# Debian-specific packaging
set(CPACK_DEBIAN_PACKAGE_ARCHITECTURE "all")
set(CPACK_DEBIAN_PACKAGE_DEPENDS "libzmq3-dev (>= 4.0.0), libboost-dev (>= 1.68.0)")
set(CPACK_DEBIAN_PACKAGE_NAME "libazmq-dev")
set(CPACK_DEBIAN_PACKAGE_SECTION "libdevel")

# RPM-specific packaging
set(CPACK_RPM_PACKAGE_ARCHITECTURE "noarch")
set(CPACK_RPM_PACKAGE_DESCRIPTION "${CPACK_PACKAGE_DESCRIPTION}")
set(CPACK_RPM_PACKAGE_GROUP "Development/Libraries")
set(CPACK_RPM_PACKAGE_LICENSE "Boost")
set(CPACK_RPM_PACKAGE_NAME "azmq-devel")
set(CPACK_RPM_PACKAGE_REQUIRES "zeromq-devel >= 4.0.0")

configure_file("${azmq_SOURCE_DIR}/AzmqCPackOptions.cmake.in" "${azmq_BINARY_DIR}/AzmqCPackOptions.cmake" @ONLY)
set(CPACK_PROJECT_CONFIG_FILE "${azmq_BINARY_DIR}/AzmqCPackOptions.cmake")

include(CPack)
11 changes: 11 additions & 0 deletions AzmqCPackOptions.cmake.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# This file is configured at cmake time, and loaded at cpack time.
# To pass variables to cpack from cmake, they must be configured
# in this file.

if(CPACK_GENERATOR STREQUAL "DEB")
set(CPACK_PACKAGE_FILE_NAME "@CPACK_DEBIAN_PACKAGE_NAME@_@CPACK_PACKAGE_VERSION@_@CPACK_DEBIAN_PACKAGE_ARCHITECTURE@")
endif()

if(CPACK_GENERATOR MATCHES "RPM")
set(CPACK_PACKAGE_FILE_NAME "@CPACK_RPM_PACKAGE_NAME@-@CPACK_PACKAGE_VERSION@.@CPACK_RPM_PACKAGE_ARCHITECTURE@")
endif()
93 changes: 72 additions & 21 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,28 +1,79 @@
cmake_minimum_required(VERSION 2.8)
project(aziomq)
cmake_minimum_required(VERSION 3.16.3...3.25)
# -- v3.16.3 is the default version in the current ubuntu lts release

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --std=c++11")
project(azmq VERSION 1.1.0 LANGUAGES CXX)

if(USE_LIBCXX)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -lc++abi")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -lc++abi")
set(ADDITIONAL_LIBS "pthread")
endif()
if (PROJECT_SOURCE_DIR STREQUAL CMAKE_SOURCE_DIR)
set(CMAKE_MASTER_PROJECT YES)
endif ()

# -- options
option(AZMQ_BUILD_TESTS "run unit tests" ${CMAKE_MASTER_PROJECT})
option(BUILD_SHARED_LIBS "use the libzmq static or shared" yes)
option(AZMQ_DEVELOPER "Add verbose warning for developers " ${CMAKE_MASTER_PROJECT})
option(AZMQ_BUILD_DOC "Add documentation example" ${CMAKE_MASTER_PROJECT})

# -- config settings --
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)

# --- dependencies --
find_package(
Boost 1.68
COMPONENTS system
date_time
thread
chrono
random
REQUIRED)

include(FindAzmqLibzmq.cmake)

set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
find_package(Threads REQUIRED)

# --- create library --
add_library(${PROJECT_NAME} INTERFACE)
add_library(Azmq::${PROJECT_NAME} ALIAS ${PROJECT_NAME})

set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/config")
target_link_libraries(${PROJECT_NAME} INTERFACE Azmq::libzmq Boost::boost ${CMAKE_THREAD_LIBS_INIT})
target_include_directories(${PROJECT_NAME} INTERFACE "$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>"
"$<INSTALL_INTERFACE:include>")

find_package(Boost 1.53 COMPONENTS system log unit_test_framework REQUIRED)
find_package(ZeroMQ 4.0 REQUIRED)
if (AZMQ_DEVELOPER)
include(cmake/CompilerWarnings.cmake)
add_more_warnings_to(${PROJECT_NAME})
endif ()

include_directories(${Boost_INCLUDE_DIRS}
${ZeroMQ_INCLUDE_DIRS}
${PROJECT_SOURCE_DIR})
if (CMAKE_MASTER_PROJECT AND NOT CMAKE_SKIP_INSTALL_RULES)
include(AzmqCPack.cmake)

enable_testing()
add_subdirectory(src)
add_subdirectory(test)
#add_subdirectory(doc/examples)
include(CMakePackageConfigHelpers)

install(DIRECTORY ${PROJECT_SOURCE_DIR}/aziomq
DESTINATION include)
# generate the version file for the config file
write_basic_package_version_file(${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}ConfigVersion.cmake
VERSION ${PROJECT_VERSION} COMPATIBILITY SameMajorVersion)

# install the public headers
install(DIRECTORY azmq DESTINATION include)

install(TARGETS ${PROJECT_NAME} EXPORT ${PROJECT_NAME}Targets RUNTIME DESTINATION bin ARCHIVE DESTINATION lib)
configure_file(${PROJECT_NAME}Config.cmake "${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}Config.cmake" COPYONLY)
set(ConfigPackageLocation lib/cmake/${PROJECT_NAME})
install(EXPORT ${PROJECT_NAME}Targets NAMESPACE Azmq:: DESTINATION ${ConfigPackageLocation})
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}Config.cmake
${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}ConfigVersion.cmake DESTINATION ${ConfigPackageLocation})

install(FILES "${CMAKE_CURRENT_SOURCE_DIR}/FindAzmqLibzmq.cmake" DESTINATION ${ConfigPackageLocation})
endif ()

if (AZMQ_BUILD_TESTS)
enable_testing()
message(STATUS "Building Tests. Be sure to check out test/constexpr_tests for constexpr testing")
add_subdirectory(test)
endif ()

if (AZMQ_BUILD_DOC)
add_subdirectory(doc)
endif()
5 changes: 2 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -63,9 +63,8 @@ but should otherwise not commit their own contributions.
All AUTHORS share in collective ownership of the code, there is no Copyright
Assignment process.

ZeroMQ and it's C4 process strongly favor the GPLv3, however this library exists
mainly to provide a Boost-consistent interface to ZeroMQ and therefore should be
usable under under the terms of the accompanying Boost license.
The library is licensed under the BOOST license, and all contributions must be
licensable under the terms outlined in the accompanying LICENSE-BOOST_1_0 file.

### Creating Stable Releases

50 changes: 50 additions & 0 deletions FindAzmqLibzmq.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Determine the Azmq::libzmq library
#
# This script is a wrapper around find_package and pkg-config
# so inclusion of azmq just works for both install types
#
# If you need different logic define the "Azmq::libzmq" target
# before calling add_subdirectory or find azmq

if (TARGET Azmq::libzmq)
# allow overriding from the calling script
message(STATUS "using provided Azmq::libzmq target")
elseif (TARGET libzmq AND BUILD_SHARED_LIBS)
message(STATUS "using existing shared library target")
add_library(Azmq::libzmq ALIAS libzmq)
elseif (TARGET libzmq-static AND NOT BUILD_SHARED_LIBS)
message(STATUS "using existing static library target")
add_library(Azmq::libzmq ALIAS libzmq-static)
else ()
# try finding the package
find_package(ZeroMQ QUIET)
if (ZeroMQ_FOUND)
# libzmq exports different targets depending on shared vs static
# select the right one based upon BUILD_SHARED_LIBS
if (BUILD_SHARED_LIBS)
if (TARGET libzmq)
message(STATUS "using cmake config libzmq")
add_library(Azmq::libzmq ALIAS libzmq)
else ()
message(FATAL_ERROR "libzmq not exported in the cmake configuration")
endif ()
else ()
if (TARGET libzmq-static)
message(STATUS "using cmake config static libzmq")
add_library(Azmq::libzmq ALIAS libzmq-static)
else ()
message(FATAL_ERROR "libzmq-static not exported in the cmake configuration")
endif ()
endif ()
else ()
# fallback to pkg-config
message(STATUS "CMake libzmq package not found, trying again with pkg-config (normal install of zeromq)")
find_package(PkgConfig REQUIRED)
pkg_check_modules(LIBZMQ REQUIRED IMPORTED_TARGET GLOBAL libzmq)
if (LIBZMQ_FOUND)
add_library(Azmq::libzmq ALIAS PkgConfig::LIBZMQ)
else ()
message(FAIL_ERROR "Can't find the required libzmq library")
endif ()
endif ()
endif ()
83 changes: 57 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,41 @@
# aziomq
# AZMQ Boost Asio + ZeroMQ

## Welcome
The aziomq library provides Boost Asio style bindings for ZeroMQ
The azmq library provides Boost Asio style bindings for ZeroMQ

This library is built on top of ZeroMQ's standard C interface.
This library is built on top of ZeroMQ's standard C interface and is
intended to work well with C++ applications which use the Boost libraries
in general, and Asio in particular.

The main abstraction exposed by the library is aziomq::socket which
The main abstraction exposed by the library is azmq::socket which
provides an Asio style socket interface to the underlying zeromq socket
and interfaces with Asio's io_service(). The socket implementation
participates in the io_service's reactor for asynchronous IO and
may be freely mixed with other Asio socket types (raw TCP/UDP/Serial/etc.).

## Building and installation

Building requires a recent version of CMake (2.8 or later), and a C++ compiler
which supports '-std=c++11'. Currently this has been tested with -
* OSX10.9 Mavericks XCode6
* Arch Linux, Ubuntu w/GCC4.8
Building requires a recent version of CMake (2.8.12 or later for Visual Studio, 2.8 or later for the rest), and a C++ compiler
which supports C++11. Currently this has been tested with -
* Xcode 5.1 on OS X 10.8
* Xcode 6 on OS X 10.9
* Xcode 6.4 on OS X 10.10
* Xcode 7.1 on OS X 10.11
* GCC 4.8 + Boost 1.48 on CentOS 6
* GCC 4.8 + Boost 1.53 on CentOS 7
* GCC 4.8 on Arch Linux and Ubuntu
* GCC 4.8 on Ubuntu
* GCC 5.3 + Boost 1.60 on Ubuntu
* Microsoft Visual Studio 2013 on Windows Server 2008 R2

Library dependencies are -
* Boost 1.53 or later
* Boost 1.48 or later
* ZeroMQ 4.0.x

To build -
Tests and example code require -
* Boost 1.54 or later

To build on Linux / OS X -
```
$ mkdir build && cd build
$ cmake ..
@@ -31,50 +44,68 @@ $ make test
$ make install
```

To change the default install location use -DCMAKE_INSTALL_PREFIX when invoking cmake
You can also change where the build looks for Boost and CMake by setting -
To build on Windows -
```
> mkdir build
> cd build
> cmake ..
> cmake --build . --config Release
> ctest . -C Release
```
You can also open Visual Studio solution from `build` directory after invoking CMake.

To change the default install location use `-DCMAKE_INSTALL_PREFIX` when invoking CMake.

To change where the build looks for Boost and ZeroMQ use `-DBOOST_ROOT=<my custom Boost install>` and `-DZMQ_ROOT=<my custom ZeroMQ install>` when invoking CMake. Or set `BOOST_ROOT` and `ZMQ_ROOT` environment variables.

## Packaging via CPack

Building and packaging -
```
$ export BOOST_ROOT=<my custom Boost install>
$ export ZMQ_ROOT=<my custom ZeroMQ install>
$ mkdir build && cd build
$ cmake ..
$ make
$ ...
> mkdir build
> cd build
> cmake -DAZMQ_NO_TESTS=1 -DCPACK_PACKAGE_CONTACT=maintainer@example.com ..
> cpack -G TGZ
> cpack -G RPM
> cpack -G DEB
```

## Example Code
This is an aziomq version of the code presented in the ZeroMQ guide at
This is an azmq version of the code presented in the ZeroMQ guide at
http://zeromq.org/intro:read-the-manual

```
#include <aziomq/socket.hpp>
```cpp
#include <azmq/socket.hpp>
#include <boost/asio.hpp>
#include <array>

namespace asio = boost::asio;

int main(int argc, char** argv) {
asio::io_service ios;
aziomq::socket subscriber(ios, ZMQ_SUB);
azmq::sub_socket subscriber(ios);
subscriber.connect("tcp://192.168.55.112:5556");
subscriber.connect("tcp://192.168.55.201:7721");
subscriber.set_option(aziomq::socket::subscribe("NASDAQ"));
subscriber.set_option(azmq::socket::subscribe("NASDAQ"));

aziomq::socket publisher(ios, ZMQ_PUB);
azmq::pub_socket publisher(ios);
publisher.bind("ipc://nasdaq-feed");

std::array<char, 256> buf;
for (;;) {
auto size = subscriber.receive(asio::buffer(buf));
publisher.send(asio::buffer(const_cast<const char*>(buf.data()), size));
publisher.send(asio::buffer(buf));
}
return 0;
}
```

Further examples may be found in doc/examples

## Build status

![AZMQ build status](https://github.com/zeromq/azmq/actions/workflows/cmake.yml/badge.svg)

## Copying

Use of this software is granted under the the BOOST 1.0 license
@@ -83,5 +114,5 @@ included with the distribution.

## Contributing

AzioMQ uses the [C4.1 (Collective Code Construction Contract)](http://rfc.zeromq.org/spec:22) process for contributions.
AZMQ uses the [C4.1 (Collective Code Construction Contract)](http://rfc.zeromq.org/spec:22) process for contributions.
See the accompanying CONTRIBUTING file for more information.
64 changes: 0 additions & 64 deletions aziomq/detail/reactor_op.hpp

This file was deleted.

188 changes: 0 additions & 188 deletions aziomq/detail/receive_op.hpp

This file was deleted.

160 changes: 0 additions & 160 deletions aziomq/detail/send_op.hpp

This file was deleted.

244 changes: 0 additions & 244 deletions aziomq/detail/socket_ops.hpp

This file was deleted.

530 changes: 0 additions & 530 deletions aziomq/detail/socket_service.hpp

This file was deleted.

105 changes: 0 additions & 105 deletions aziomq/endpoint.hpp

This file was deleted.

28 changes: 0 additions & 28 deletions aziomq/error.hpp

This file was deleted.

699 changes: 0 additions & 699 deletions aziomq/socket.hpp

This file was deleted.

81 changes: 81 additions & 0 deletions azmq/actor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_ACTOR_HPP_
#define AZMQ_ACTOR_HPP_

#include "socket.hpp"
#include "detail/actor_service.hpp"

#include <boost/asio/io_service.hpp>

#include <functional>

namespace azmq { namespace actor {
AZMQ_V1_INLINE_NAMESPACE_BEGIN

using is_alive = detail::actor_service::is_alive;
using detached = detail::actor_service::detached;
using start = detail::actor_service::start;
using last_error = detail::actor_service::last_error;

/** \brief create an actor bound to one end of a pipe (pair of inproc sockets)
* \param peer io_service to associate the peer (caller) end of the pipe
* \param f Function accepting socket& as the first parameter and a
* number of additional args
* \returns peer socket
*
* \remark The newly created actor will run in a boost::thread, and will
* receive the 'server' end of the pipe as it's first argument. The actor
* will be attached to the lifetime of the returned socket and will run
* until it is destroyed.
*
* \remark Each actor has an associated io_service and the supplied socket
* will be created on this io_service. The actor may access this by calling
* get_io_service() on the supplied socket.
*
* \remark The associated io_service is configured to stop the spawned actor
* on SIG_KILL and SIG_TERM.
*
* \remark Termination:
* well behaved actors should ultimately call run() on the io_service
* associated with the supplied socket. This allows the 'client' end of
* the socket's lifetime to cleanly signal termination. If for some
* reason, this is not possible, the caller should set the 'detached'
* option on the 'client' socket. This detaches the actor's associated
* thread from the client socket so that it will not be joined at
* destruction time. It is then up to the caller to work out the termination
* signal for the background thread; for instance by sending a termination
* message.
*
* Also note, the default signal handling for the background thread is
* designed to call stop() on the associated io_service, so not calling
* run() in your handler means you are responsible for catching these
* signals in some other way.
*/
template<typename Function, typename... Args>
socket spawn(boost::asio::io_service & peer, bool defer_start, Function && f, Args&&... args) {
auto& t = boost::asio::use_service<detail::actor_service>(peer);
return t.make_pipe(defer_start, std::bind(std::forward<Function>(f),
std::placeholders::_1,
std::forward<Args>(args)...));
}

template<typename Function, typename... Args>
socket spawn(boost::asio::io_service & peer, Function && f, Args&&... args) {
auto& t = boost::asio::use_service<detail::actor_service>(peer);
return t.make_pipe(false, std::bind(std::forward<Function>(f),
std::placeholders::_1,
std::forward<Args>(args)...));
}

AZMQ_V1_INLINE_NAMESPACE_END
} // namespace actor
} // namespace azmq
#endif // AZMQ_ACTOR_HPP_

23 changes: 10 additions & 13 deletions aziomq/io_service.hpp → azmq/context.hpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of aziomq
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZIOMQ_IO_SERVICE_HPP_
#define AZIOMQ_IO_SERVICE_HPP_
#ifndef AZMQ_CONTEXT_HPP_
#define AZMQ_CONTEXT_HPP_

#include "detail/socket_service.hpp"
#include "option.hpp"
@@ -16,10 +16,8 @@
#include <boost/asio/io_service.hpp>
#include <zmq.h>

namespace aziomq {
namespace io_service {
using service_type = detail::socket_service;

namespace azmq {
AZMQ_V1_INLINE_NAMESPACE_BEGIN
using io_threads = detail::context_ops::io_threads;
using max_sockets = detail::context_ops::max_sockets;
using ipv6 = detail::context_ops::ipv6;
@@ -33,8 +31,7 @@ namespace io_service {
boost::system::error_code set_option(boost::asio::io_service & io_service,
const Option & option,
boost::system::error_code & ec) {

return boost::asio::use_service<service_type>(io_service).set_option(option, ec);
return boost::asio::use_service<detail::socket_service>(io_service).set_option(option, ec);
}

/** \brief set options on the zeromq context.
@@ -58,7 +55,7 @@ namespace io_service {
boost::system::error_code get_option(boost::asio::io_service & io_service,
Option & option,
boost::system::error_code & ec) {
return boost::asio::use_service<service_type>(io_service).get_option(option, ec);
return boost::asio::use_service<detail::socket_service>(io_service).get_option(option, ec);
}

/** \brief get option from zeromq context
@@ -72,6 +69,6 @@ namespace io_service {
if (get_option(io_service, option))
throw boost::system::system_error(ec);
}
} // namespace io_service
} // namespace aziomq
#endif // AZIOMQ_IO_SERVICE_HPP_
AZMQ_V1_INLINE_NAMESPACE_END
} // namespace azmq
#endif // AZMQ_CONTEXT_HPP_
289 changes: 289 additions & 0 deletions azmq/detail/actor_service.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_DETAIL_ACTOR_SERVICE_HPP_
#define AZMQ_DETAIL_ACTOR_SERVICE_HPP_

#include "../error.hpp"
#include "../socket.hpp"
#include "../option.hpp"
#include "service_base.hpp"
#include "socket_service.hpp"
#include "config/thread.hpp"
#include "config/mutex.hpp"
#include "config/unique_lock.hpp"
#include "config/condition_variable.hpp"

#include <boost/version.hpp>
#include <boost/assert.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/container/flat_map.hpp>

#if BOOST_VERSION < 107000
# define AZMQ_DETAIL_USE_IO_SERVICE 1
#endif

#include <string>
#include <vector>
#include <memory>
#include <atomic>
#include <sstream>
#include <exception>

namespace azmq {
namespace detail {
class actor_service
: public azmq::detail::service_base<actor_service> {
public:
inline static std::string get_uri(const char* pfx);

#ifdef AZMQ_DETAIL_USE_IO_SERVICE
actor_service(boost::asio::io_service & ios)
#else
actor_service(boost::asio::io_context & ios)
#endif
: azmq::detail::service_base<actor_service>(ios)
{ }

void shutdown_service() override { }

using is_alive = opt::boolean<static_cast<int>(opt::limits::lib_actor_min)>;
using detached = opt::boolean<static_cast<int>(opt::limits::lib_actor_min) + 1>;
using start = opt::boolean<static_cast<int>(opt::limits::lib_actor_min) + 2>;
using last_error = opt::exception_ptr<static_cast<int>(opt::limits::lib_actor_min) + 3>;

template<typename T>
socket make_pipe(bool defer_start, T&& data) {
#ifdef AZMQ_DETAIL_USE_IO_SERVICE
return make_pipe(get_io_service(), defer_start, std::forward<T>(data));
#else
return make_pipe(get_io_context(), defer_start, std::forward<T>(data));
#endif
}

template<typename T>
#ifdef AZMQ_DETAIL_USE_IO_SERVICE
static socket make_pipe(boost::asio::io_service & ios, bool defer_start, T&& data) {
#else
static socket make_pipe(boost::asio::io_context & ios, bool defer_start, T&& data) {
#endif
auto p = std::make_shared<model<T>>(std::forward<T>(data));
auto res = p->peer_socket(ios);
associate_ext(res, handler(std::move(p), defer_start));
return std::move(res);
}

private:
struct concept_ {
using ptr = std::shared_ptr<concept_>;

boost::asio::io_service io_service_;
boost::asio::signal_set signals_;
pair_socket socket_;
thread_t thread_;

using lock_type = unique_lock_t<mutex_t>;
mutable lock_type::mutex_type mutex_;
mutable condition_variable_t cv_;
bool ready_;
bool stopped_;
std::exception_ptr last_error_;

concept_()
: signals_(io_service_, SIGINT, SIGTERM)
, socket_(io_service_)
, ready_(false)
, stopped_(true)
{
socket_.bind(get_uri("pipe"));
}

virtual ~concept_() = default;

#ifdef AZMQ_DETAIL_USE_IO_SERVICE
pair_socket peer_socket(boost::asio::io_service & peer) {
#else
pair_socket peer_socket(boost::asio::io_context & peer) {
#endif
pair_socket res(peer);
auto uri = socket_.endpoint();
BOOST_ASSERT_MSG(!uri.empty(), "uri empty");
res.connect(uri);
return res;
}

bool joinable() const { return thread_.joinable(); }

void stop() {
if (!joinable()) return;
io_service_.stop();
thread_.join();
}

void stopped() {
lock_type l{ mutex_ };
stopped_ = true;
}

bool is_stopped() const {
lock_type l{ mutex_ };
return stopped_;
}

void ready() {
{
lock_type l{ mutex_ };
ready_ = true;
}
cv_.notify_all();
}

void detach() {
if (!joinable()) return; // already detached
lock_type l { mutex_ };
cv_.wait(l, [this] { return ready_; });
thread_.detach();
}

void set_last_error(std::exception_ptr last_error) {
lock_type l { mutex_ };
last_error_ = last_error;
}

std::exception_ptr last_error() const {
lock_type l { mutex_ };
return last_error_;
}

virtual void run() = 0;

static void run(ptr p) {
lock_type l { p->mutex_ };
p->signals_.async_wait([p](boost::system::error_code const&, int) {
p->io_service_.stop();
});
p->stopped_ = false;
p->thread_ = thread_t([p] {
p->ready();
try {
p->run();
} catch (...) {
p->set_last_error(std::current_exception());
}
p->stopped();
});
}
};

template<typename Function>
struct model : concept_ {
Function data_;

model(Function data)
: data_(std::move(data))
{ }

void run() override { data_(socket_); }
};

struct handler {
concept_::ptr p_;
bool defer_start_;

handler(concept_::ptr p, bool defer_start)
: p_(std::move(p))
, defer_start_(defer_start)
{ }

void on_install(boost::asio::io_service&, void*) {
if (defer_start_) return;
defer_start_ = false;
concept_::run(p_);
}

void on_remove() {
if (defer_start_) return;
p_->stop();
}

template<typename Option>
boost::system::error_code set_option(Option const& opt,
boost::system::error_code & ec) {
switch (opt.name()) {
case is_alive::static_name::value :
ec = make_error_code(boost::system::errc::no_protocol_option);
break;
case detached::static_name::value :
{
if (*static_cast<detached::value_t const*>(opt.data()))
p_->detach();
}
break;
case start::static_name::value :
{
if (*static_cast<start::value_t const*>(opt.data()) && defer_start_) {
defer_start_ = false;
concept_::run(p_);
}
}
break;
case last_error::static_name::value :
ec = make_error_code(boost::system::errc::no_protocol_option);
break;
default:
ec = make_error_code(boost::system::errc::not_supported);
break;
}
return ec;
}

template<typename Option>
boost::system::error_code get_option(Option & opt,
boost::system::error_code & ec) {
switch (opt.name()) {
case is_alive::static_name::value :
{
auto v = static_cast<is_alive::value_t*>(opt.data());
*v = !p_->is_stopped();
}
break;
case detached::static_name::value :
{
auto v = static_cast<detached::value_t*>(opt.data());
*v = !p_->joinable();
}
break;
case start::static_name::value :
ec = make_error_code(boost::system::errc::no_protocol_option);
break;
case last_error::static_name::value :
{
auto v = static_cast<last_error::value_t*>(opt.data());
*v = p_->last_error();
}
break;
default:
ec = make_error_code(boost::system::errc::not_supported);
break;
}
return ec;
}
};
};

std::string actor_service::get_uri(const char* pfx) {
static std::atomic_ulong id{ 0 };
std::ostringstream stm;
stm << "inproc://azmq-" << pfx << "-" << id++;
return stm.str();
}

} // namespace detail
} // namespace azmq
#endif // AZMQ_DETAIL_ACTOR_SERVICE_HPP_

51 changes: 51 additions & 0 deletions azmq/detail/basic_io_object.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_DETAIL_BASIC_IO_OBJECT_HPP__
#define AZMQ_DETAIL_BASIC_IO_OBJECT_HPP__

#include <boost/asio/io_service.hpp>
#include <boost/asio/basic_io_object.hpp>

namespace azmq {
namespace detail {
template<typename Service>
class core_access {
public:
using service_type = Service;
using implementation_type = typename service_type::implementation_type;

template<typename T>
core_access(T & that)
: service_(that.get_service())
, impl_(that.get_implementation())
{ }

service_type & service() { return service_; }
implementation_type & implementation() { return impl_; }

private:
service_type & service_;
implementation_type & impl_;
};

template<typename Service>
class basic_io_object
: public boost::asio::basic_io_object<Service> {

friend class core_access<Service>;

public:
basic_io_object(boost::asio::io_service& ios)
: boost::asio::basic_io_object<Service>(ios)
{ }
};
} // namespace detail
} // namespace azmq
#endif // AZMQ_DETAIL_BASIC_IO_OBJECT_HPP__

27 changes: 27 additions & 0 deletions azmq/detail/config.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_DETAIL_CONFIG_HPP_
#define AZMQ_DETAIL_CONFIG_HPP_

#if !defined AZMQ_USE_STANDALONE_ASIO
# include <boost/config.hpp>
# define AZMQ_NO_CX11_INLINE_NAMESPACES BOOST_NO_CXX11_INLINE_NAMESPACES
#else // AZMQ_USE_STANDALONG_ASIO
// Assume a competent C++11 implementation
# define ASIO_STANDALONE 1
#endif //!defined AZMQ_USE_STANDALONE_ASIO

#if !defined AZMQ_NO_CX11_INLINE_NAMESPACES
# define AZMQ_V1_INLINE_NAMESPACE_BEGIN inline namespace v1 {
# define AZMQ_V1_INLINE_NAMESPACE_END }
#else
# define AZMQ_V1_INLINE_NAMESPACE_BEGIN
# define AZMQ_V1_INLINE_NAMESPACE_END
#endif
#endif // AZMQ_DETAIL_CONFIG_HPP_
26 changes: 26 additions & 0 deletions azmq/detail/config/condition_variable.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_DETAIL_CONFIG_CONDITION_VARIABLE_HPP_
#define AZMQ_DETAIL_CONFIG_CONDITION_VARIABLE_HPP_

#if !defined(AZMQ_DISABLE_STD_CONDITION_VARIABLE)
# include <condition_variable>
# define AZMQ_HAS_STD_CONDITION_VARIABLE 1
namespace azmq { namespace detail {
using condition_variable_t = std::condition_variable;
} }
#else // defined(AZMQ_DISABLE_STD_CONDITION_VARIABLE)
# include <boost/thread/condition_variable.hpp>
namespace azmq { namespace detail {
using condition_variable_t = boost::condition_variable;
} }
# endif // !defined(AZMQ_DISABLE_STD_CONDITION_VARIABLE)
#endif // !defined(AZMQ_HAS_STD_CONDITION_VARIABLE)


28 changes: 28 additions & 0 deletions azmq/detail/config/lock_guard.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_DETAIL_CONFIG_LOCK_GUARD_HPP_
#define AZMQ_DETAIL_CONFIG_LOCK_GUARD_HPP_

#if !defined(AZMQ_DISABLE_STD_LOCK_GUARD)
# include <mutex>
# define AZMQ_HAS_STD_LOCK_GUARD 1
namespace azmq { namespace detail {
template<typename T>
using lock_guard_t = std::lock_guard<T>;
} }
#else // defined(AZMQ_DISABLE_STD_LOCK_GUARD)
# include <boost/thread/lock_guard.hpp>
namespace azmq { namespace detail {
template<typename T>
using lock_guard_t = boost::lock_guard<T>;
} }
# endif // !defined(AZMQ_DISABLE_STD_LOCK_GUARD)
#endif // !defined(AZMQ_HAS_STD_LOCK_GUARD)


25 changes: 25 additions & 0 deletions azmq/detail/config/mutex.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_DETAIL_CONFIG_MUTEX_HPP_
#define AZMQ_DETAIL_CONFIG_MUTEX_HPP_

#if !defined(AZMQ_DISABLE_STD_MUTEX)
# include <mutex>
# define AZMQ_HAS_STD_MUTEX 1
namespace azmq { namespace detail {
using mutex_t = std::mutex;
} }
#else // defined(AZMQ_DISABLE_STD_MUTEX)
# include <boost/thread/mutex.hpp>
namespace azmq { namespace detail {
using mutex_t = boost::mutex;
} }
# endif // !defined(AZMQ_DISABLE_STD_MUTEX)
#endif // !defined(AZMQ_HAS_STD_MUTEX)

24 changes: 24 additions & 0 deletions azmq/detail/config/thread.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_DETAIL_CONFIG_THREAD_HPP_
#define AZMQ_DETAIL_CONFIG_THREAD_HPP_

#if !defined(AZMQ_DISABLE_STD_THREAD)
# include <thread>
# define AZMQ_HAS_STD_THREAD 1
namespace azmq { namespace detail {
using thread_t = std::thread;
} }
#else // defined(AZMQ_DISABLE_STD_THREAD)
# include <boost/thread/thread.hpp>
namespace azmq { namespace detail {
using thread_t = boost::thread;
} }
# endif // !defined(AZMQ_DISABLE_STD_THREAD)
#endif // !defined(AZMQ_HAS_STD_THREAD)
29 changes: 29 additions & 0 deletions azmq/detail/config/unique_lock.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_DETAIL_CONFIG_UNIQUE_LOCK_HPP_
#define AZMQ_DETAIL_CONFIG_UNIQUE_LOCK_HPP_

#if !defined(AZMQ_DISABLE_STD_UNIQUE_LOCK)
# include <mutex>
# define AZMQ_HAS_STD_UNIQUE_LOCK 1
namespace azmq { namespace detail {
template<typename T>
using unique_lock_t = std::unique_lock<T>;
} }
#else // defined(AZMQ_DISABLE_STD_UNIQUE_LOCK)
# include <boost/thread/unique_lock.hpp>
namespace azmq { namespace detail {
template<typename T>
using unique_lock_t = boost::unique_lock<T>;
} }
# endif // !defined(AZMQ_DISABLE_STD_UNIQUE_LOCK)
#endif // !defined(AZMQ_HAS_STD_UNIQUE_LOCK)



38 changes: 28 additions & 10 deletions aziomq/detail/context_ops.hpp → azmq/detail/context_ops.hpp
Original file line number Diff line number Diff line change
@@ -1,40 +1,59 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of aziomq
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZIOMQ_DETAIL_CONTEXT_OPS_HPP__
#define AZIOMQ_DETAIL_CONTEXT_OPS_HPP__
#ifndef AZMQ_DETAIL_CONTEXT_OPS_HPP__
#define AZMQ_DETAIL_CONTEXT_OPS_HPP__

#include "../error.hpp"
#include "../option.hpp"

#include <boost/assert.hpp>
#include <boost/system/error_code.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/locks.hpp>

#include <zmq.h>

#include <memory>

namespace aziomq {
namespace azmq {
namespace detail {
struct context_ops {
using context_type = std::shared_ptr<void>;
using lock_type = boost::lock_guard<boost::mutex>;

using io_threads = opt::integer<ZMQ_IO_THREADS>;
using max_sockets = opt::integer<ZMQ_MAXMSGSIZE>;
using max_sockets = opt::integer<ZMQ_MAX_SOCKETS>;
using ipv6 = opt::boolean<ZMQ_IPV6>;

static context_type get_context(bool create_new = false);
static context_type ctx_new() {
return context_type(zmq_ctx_new(), zmq_ctx_term);
}

static context_type get_context(bool create_new = false) {
static boost::mutex mtx;
static std::weak_ptr<void> ctx;

if (create_new) return ctx_new();

lock_type l{ mtx };
auto p = ctx.lock();
if (!p) ctx = p = ctx_new();
return p;
}

template<typename Option>
static boost::system::error_code set_option(context_type & ctx,
Option const& option,
boost::system::error_code & ec) {
BOOST_ASSERT_MSG(ctx, "context must not be null");
auto rc = zmq_ctx_set(ctx.get(), option.name(), option.value());
if (!rc)
if (rc < 0)
ec = make_error_code();
return ec;
}
@@ -52,7 +71,6 @@ namespace detail {
}
};
} // namespace detail
} // namespace aziomq

#endif // AZIOMQ_DETAIL_CONTEXT_OPS_HPP__
} // namespace azmq

#endif // AZMQ_DETAIL_CONTEXT_OPS_HPP__
44 changes: 44 additions & 0 deletions azmq/detail/reactor_op.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_DETAIL_REACTOR_OP_HPP_
#define AZMQ_DETAIL_REACTOR_OP_HPP_

#include "../message.hpp"
#include "socket_ops.hpp"

#include <boost/optional.hpp>
#include <boost/asio/io_service.hpp>

namespace azmq {
namespace detail {
class reactor_op {
public:
using socket_type = socket_ops::socket_type;
using flags_type = socket_ops::flags_type;
boost::system::error_code ec_;
size_t bytes_transferred_ = 0;

virtual ~reactor_op() = default;
virtual bool do_perform(socket_type& socket) = 0;
virtual void do_complete() = 0;

static boost::system::error_code canceled() { return boost::asio::error::operation_aborted; }

protected:
bool try_again() const {
return ec_.value() == boost::system::errc::resource_unavailable_try_again;
}

bool is_canceled() const { return ec_ == canceled(); }
};

} // namespace detail
} // namespace azmq
#endif // AZMQ_DETAIL_REACTOR_OP_HPP_

201 changes: 201 additions & 0 deletions azmq/detail/receive_op.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_DETAIL_RECEIVE_OP_HPP_
#define AZMQ_DETAIL_RECEIVE_OP_HPP_

#include "../error.hpp"
#include "../message.hpp"
#include "socket_ops.hpp"
#include "reactor_op.hpp"

#include <boost/version.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/executor_work_guard.hpp>
#if BOOST_VERSION >= 107900
#include <boost/asio/recycling_allocator.hpp>
#include <boost/asio/bind_allocator.hpp>
#endif

#include <zmq.h>

#include <iterator>
#include <type_traits>

namespace azmq {
namespace detail {
template<typename MutableBufferSequence>
class receive_buffer_op_base : public reactor_op {
public:
receive_buffer_op_base(MutableBufferSequence const& buffers, flags_type flags)
: buffers_(buffers)
, flags_(flags)
{ }

virtual bool do_perform(socket_type& socket) override {
return do_perform_impl(socket);
}

private:
template<typename Buff = MutableBufferSequence>
typename std::enable_if<std::is_same<Buff, azmq::message>::value, bool>::type do_perform_impl(socket_type& socket)
{
ec_ = boost::system::error_code();
bytes_transferred_ += socket_ops::receive(const_cast<azmq::message&>(buffers_), socket, flags_ | ZMQ_DONTWAIT, ec_);
if (ec_)
return !try_again();
return true;
}

template<typename Buff = MutableBufferSequence>
typename std::enable_if<!std::is_same<Buff, azmq::message>::value, bool>::type do_perform_impl(socket_type& socket)
{
ec_ = boost::system::error_code();
bytes_transferred_ += socket_ops::receive(buffers_, socket, flags_ | ZMQ_DONTWAIT, ec_);
if (ec_)
return !try_again();
return true;
}

protected:
bool more() const {
return ec_ == boost::system::errc::no_buffer_space && bytes_transferred_;
}

private:
typename std::conditional<std::is_same<MutableBufferSequence, azmq::message>::value, MutableBufferSequence const&, MutableBufferSequence>::type buffers_;
flags_type flags_;
};

template<typename MutableBufferSequence,
typename Handler>
class receive_buffer_op : public receive_buffer_op_base<MutableBufferSequence> {
public:
receive_buffer_op(MutableBufferSequence const& buffers,
Handler handler,
socket_ops::flags_type flags)
: receive_buffer_op_base<MutableBufferSequence>(buffers, flags)
, handler_(std::move(handler))
, work_guard(boost::asio::make_work_guard(handler_))
{ }

virtual void do_complete() override {
#if BOOST_VERSION >= 107900
auto alloc = boost::asio::get_associated_allocator(
handler_, boost::asio::recycling_allocator<void>());
#endif
boost::asio::dispatch(work_guard.get_executor(),
#if BOOST_VERSION >= 107900
boost::asio::bind_allocator(alloc,
#endif
[ec_ = this->ec_, handler_ = std::move(handler_), bytes_transferred_ = this->bytes_transferred_]() mutable {
handler_(ec_, bytes_transferred_);
})
#if BOOST_VERSION >= 107900
)
#endif
;
}

private:
Handler handler_;
boost::asio::executor_work_guard<typename boost::asio::associated_executor<Handler>::type> work_guard;
};

template<typename MutableBufferSequence,
typename Handler>
class receive_more_buffer_op : public receive_buffer_op_base<MutableBufferSequence> {
public:
receive_more_buffer_op(MutableBufferSequence const& buffers,
Handler handler,
socket_ops::flags_type flags)
: receive_buffer_op_base<MutableBufferSequence>(buffers, flags)
, handler_(std::move(handler))
, work_guard(boost::asio::make_work_guard(handler_))
{ }

virtual void do_complete() override {
#if BOOST_VERSION >= 107900
auto alloc = boost::asio::get_associated_allocator(
handler_, boost::asio::recycling_allocator<void>());
#endif
boost::asio::dispatch(work_guard.get_executor(),
#if BOOST_VERSION >= 107900
boost::asio::bind_allocator(alloc,
#endif
[ec_ = this->ec_, handler_ = std::move(handler_), bytes_transferred_ = this->bytes_transferred_, more = this->more()]() mutable {
handler_(ec_, std::make_pair(bytes_transferred_, more));
})
#if BOOST_VERSION >= 107900
)
#endif
;
}

private:
Handler handler_;
boost::asio::executor_work_guard<typename boost::asio::associated_executor<Handler>::type> work_guard;
};

class receive_op_base : public reactor_op {
public:
receive_op_base(socket_ops::flags_type flags)
: flags_(flags)
{ }

virtual bool do_perform(socket_type& socket) override {
ec_ = boost::system::error_code();
bytes_transferred_ = socket_ops::receive(msg_, socket, flags_ | ZMQ_DONTWAIT, ec_);
if (ec_)
return !try_again();
return true;
}

protected:
message msg_;
flags_type flags_;
};

template<typename Handler>
class receive_op : public receive_op_base {
public:
receive_op(Handler handler,
socket_ops::flags_type flags)
: receive_op_base(flags)
, handler_(std::move(handler))
, work_guard(boost::asio::make_work_guard(handler_))
{ }

virtual void do_complete() override {
#if BOOST_VERSION >= 107900
auto alloc = boost::asio::get_associated_allocator(
handler_, boost::asio::recycling_allocator<void>());
#endif
boost::asio::dispatch(work_guard.get_executor(),
#if BOOST_VERSION >= 107900
boost::asio::bind_allocator(alloc,
#endif
[ec_ = this->ec_, handler_ = std::move(handler_), msg_ = std::move(msg_), bytes_transferred_ = this->bytes_transferred_]() mutable {
handler_(ec_, msg_, bytes_transferred_);
})
#if BOOST_VERSION >= 107900
)
#endif
;
}

private:
Handler handler_;
boost::asio::executor_work_guard<typename boost::asio::associated_executor<Handler>::type> work_guard;
};
} // namespace detail
} // namespace azmq
#endif // AZMQ_DETAIL_RECEIVE_OP_HPP_


147 changes: 147 additions & 0 deletions azmq/detail/send_op.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_DETAIL_SEND_OP_HPP_
#define AZMQ_DETAIL_SEND_OP_HPP_
#include "../error.hpp"
#include "../message.hpp"
#include "socket_ops.hpp"
#include "reactor_op.hpp"

#include <boost/version.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/executor_work_guard.hpp>
#if BOOST_VERSION >= 107900
#include <boost/asio/recycling_allocator.hpp>
#include <boost/asio/bind_allocator.hpp>
#endif

#include <zmq.h>
#include <iterator>

namespace azmq {
namespace detail {

template<typename ConstBufferSequence>
class send_buffer_op_base : public reactor_op {
public:
send_buffer_op_base(ConstBufferSequence const& buffers, flags_type flags)
: buffers_(buffers)
, flags_(flags)
{ }

virtual bool do_perform(socket_type& socket) override {
ec_ = boost::system::error_code();
bytes_transferred_ += socket_ops::send(buffers_, socket, flags_ | ZMQ_DONTWAIT, ec_);
if (ec_) {
return !try_again();
}
return true;
}

private:
ConstBufferSequence buffers_;
flags_type flags_;
};

template<typename ConstBufferSequence,
typename Handler>
class send_buffer_op : public send_buffer_op_base<ConstBufferSequence> {
public:
send_buffer_op(ConstBufferSequence const& buffers,
Handler handler,
reactor_op::flags_type flags)
: send_buffer_op_base<ConstBufferSequence>(buffers, flags)
, handler_(std::move(handler))
, work_guard(boost::asio::make_work_guard(handler_))
{ }

virtual void do_complete() override {
#if BOOST_VERSION >= 107900
auto alloc = boost::asio::get_associated_allocator(
handler_, boost::asio::recycling_allocator<void>());
#endif
boost::asio::dispatch(work_guard.get_executor(),
#if BOOST_VERSION >= 107900
boost::asio::bind_allocator(alloc,
#endif
[ec_ = this->ec_, handler_ = std::move(handler_), bytes_transferred_ = this->bytes_transferred_]() mutable {
handler_(ec_, bytes_transferred_);
})
#if BOOST_VERSION >= 107900
)
#endif
;
}

private:
Handler handler_;
boost::asio::executor_work_guard<typename boost::asio::associated_executor<Handler>::type> work_guard;
};

class send_op_base : public reactor_op {
public:
send_op_base(message msg, flags_type flags)
: msg_(std::move(msg))
, flags_(flags)
{ }

virtual bool do_perform(socket_type & socket) override {
ec_ = boost::system::error_code();
bytes_transferred_ = socket_ops::send(msg_, socket, flags_ | ZMQ_DONTWAIT, ec_);
if (ec_)
return !try_again(); // some other error
return true;
};

private:
message msg_;
flags_type flags_;
};

template<typename Handler>
class send_op : public send_op_base {
public:
send_op(message msg,
Handler handler,
flags_type flags)
: send_op_base(std::move(msg), flags)
, handler_(std::move(handler))
, work_guard(boost::asio::make_work_guard(handler_))
{ }

virtual void do_complete() override {
#if BOOST_VERSION >= 107900
auto alloc = boost::asio::get_associated_allocator(
handler_, boost::asio::recycling_allocator<void>());
#endif
boost::asio::dispatch(work_guard.get_executor(),
#if BOOST_VERSION >= 107900
boost::asio::bind_allocator(alloc,
#endif
[ec_ = this->ec_, handler_ = std::move(handler_), bytes_transferred_ = this->bytes_transferred_]() mutable {
handler_(ec_, bytes_transferred_);
})
#if BOOST_VERSION >= 107900
)
#endif
;

}

private:
Handler handler_;
boost::asio::executor_work_guard<typename boost::asio::associated_executor<Handler>::type> work_guard;
};

} // namespace detail
} // namespace azmq
#endif // AZMQ_DETAIL_SEND_OP_HPP_


38 changes: 38 additions & 0 deletions azmq/detail/service_base.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_DETAIL_SERVICE_BASE_HPP_
#define AZMQ_DETAIL_SERVICE_BASE_HPP_

#include <boost/asio/io_service.hpp>

namespace azmq {
namespace detail {
template <typename T>
class service_id
: public boost::asio::io_service::id
{ };

template<typename T>
class service_base
: public boost::asio::io_service::service {
public :
static azmq::detail::service_id<T> id;

// Constructor.
service_base(boost::asio::io_service& io_service)
: boost::asio::io_service::service(io_service)
{ }
};

template <typename T>
azmq::detail::service_id<T> service_base<T>::id;
} // namespace detail
} // namespace azmq
#endif // AZMQ_DETAIL_SERVICE_BASE_HPP_

51 changes: 32 additions & 19 deletions aziomq/detail/socket_ext.hpp → azmq/detail/socket_ext.hpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of aziomq
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZIOMQ_DETAIL_SOCKET_EXT_HPP__
#define AZIOMQ_DETAIL_SOCKET_EXT_HPP__
#ifndef AZMQ_DETAIL_SOCKET_EXT_HPP__
#define AZMQ_DETAIL_SOCKET_EXT_HPP__
#include "../error.hpp"

#include <boost/assert.hpp>
@@ -16,32 +16,45 @@
#include <memory>
#include <typeindex>

namespace aziomq {
namespace azmq {
namespace detail {
struct socket_ext {
template<typename T>
socket_ext(T data) : ptr_(new model<T>(std::move(data)))
socket_ext(T data)
: ptr_(new model<T>(std::move(data)))
{ }

~socket_ext() { on_remove(); }

// MSVS 2013 can not generate move ctor/assignment

socket_ext(socket_ext&& op): ptr_(std::move(op.ptr_)) { }

socket_ext& operator= (socket_ext&& op) {
ptr_ = std::move(op.ptr_);
return *this;
}

void on_install(boost::asio::io_service& ios, void * socket) const {
BOOST_ASSERT_MSG(ptr_, "reusing moved instance of socket_ext");
ptr_->on_install(ios, socket);
}

void on_remove() const {
BOOST_ASSERT_MSG(ptr_, "reusing moved instance of socket_ext");
ptr_->on_remove();
void on_remove() {
if (ptr_)
ptr_->on_remove();
ptr_.reset();
}

template<typename Option>
boost::system::error_code set_option(Option const& opt, boost::system::error_code & ec) const {
BOOST_ASSERT_MSG(ptr_, "reusing moved instance of socket_ext");
BOOST_ASSERT_MSG(ptr_, "reusing (re)moved instance of socket_ext");
return ptr_->set_option(opt_model<Option>(const_cast<Option&>(opt)), ec);
}

template<typename Option>
boost::system::error_code get_option(Option & opt, boost::system::error_code & ec) const {
BOOST_ASSERT_MSG(ptr_, "reusing moved instance of socket_ext");
BOOST_ASSERT_MSG(ptr_, "reusing (re)moved instance of socket_ext");
return ptr_->set_option(opt_model<Option>(opt), ec);
}

@@ -63,25 +76,25 @@ namespace detail {

int name() const override { return data_.name(); }
void const* data() const override { return data_.data(); }
void* data() override { return data_.data(); }
size_t size() override { return data_.size(); }
void* data() override { return data_.data(); }
size_t size() override { return data_.size(); }
};

struct concept {
virtual ~concept() = default;
struct concept_ {
virtual ~concept_() = default;

virtual void on_install(boost::asio::io_service &, void *) = 0;
virtual void on_remove() = 0;
virtual boost::system::error_code set_option(opt_concept const&, boost::system::error_code &) = 0;
virtual boost::system::error_code get_option(opt_concept &, boost::system::error_code &) = 0;
};
std::unique_ptr<concept> ptr_;
std::unique_ptr<concept_> ptr_;

template<typename T>
struct model : concept {
struct model : concept_ {
T data_;

model(T data) : data_(std::move(data)) { }
model(T data): data_(std::move(data)) { }

void on_install(boost::asio::io_service & ios, void * socket) override { data_.on_install(ios, socket); }
void on_remove() override { data_.on_remove(); }
@@ -95,6 +108,6 @@ namespace detail {
};
};
} // namespace detail
} // namespace aziomq
#endif // AZIOMQ_DETAIL_SOCKET_EXT_HPP__
} // namespace azmq
#endif // AZMQ_DETAIL_SOCKET_EXT_HPP__

376 changes: 376 additions & 0 deletions azmq/detail/socket_ops.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,376 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_DETAIL_SOCKET_OPS_HPP__
#define AZMQ_DETAIL_SOCKET_OPS_HPP__

#include "../error.hpp"
#include "../message.hpp"
#include "context_ops.hpp"

#include <boost/assert.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/socket_base.hpp>
#if ! defined BOOST_ASIO_WINDOWS
#include <boost/asio/posix/stream_descriptor.hpp>
#else
#include <boost/asio/ip/tcp.hpp>
#endif
#include <boost/system/error_code.hpp>
#include <boost/random/taus88.hpp>
#include <boost/random/uniform_int_distribution.hpp>
#include <boost/range/metafunctions.hpp>

#include <zmq.h>

#include <cerrno>
#include <iterator>
#include <memory>
#include <regex>
#include <string>
#include <sstream>
#include <type_traits>

namespace azmq {
namespace detail {
struct socket_ops {
using endpoint_type = std::string;

struct socket_close {
void operator()(void* socket) {
int v = 0;
auto rc = zmq_setsockopt(socket, ZMQ_LINGER, &v, sizeof(int));
BOOST_ASSERT_MSG(rc == 0, "set linger=0 on shutdown"); (void)rc;
zmq_close(socket);
}
};

enum class dynamic_port : uint16_t {
first = 0xc000,
last = 0xffff
};

using raw_socket_type = void*;
using socket_type = std::unique_ptr<void, socket_close>;

#if ! defined BOOST_ASIO_WINDOWS
using posix_sd_type = boost::asio::posix::stream_descriptor;
using native_handle_type = boost::asio::posix::stream_descriptor::native_handle_type;
struct stream_descriptor_close {
void operator()(posix_sd_type* sd) {
sd->release();
delete sd;
}
};
using stream_descriptor = std::unique_ptr<posix_sd_type, stream_descriptor_close>;
#else
using native_handle_type = boost::asio::ip::tcp::socket::native_handle_type;
using stream_descriptor = std::unique_ptr<boost::asio::ip::tcp::socket>;
#endif
using flags_type = message::flags_type;
using more_result_type = std::pair<size_t, bool>;

static socket_type create_socket(context_ops::context_type context,
int type,
boost::system::error_code & ec) {
BOOST_ASSERT_MSG(context, "Invalid context");
auto res = zmq_socket(context.get(), type);
if (!res) {
ec = make_error_code();
return socket_type();
}
return socket_type(res);
}

static stream_descriptor get_stream_descriptor(boost::asio::io_service & io_service,
socket_type & socket,
boost::system::error_code & ec) {
BOOST_ASSERT_MSG(socket, "invalid socket");
native_handle_type handle = 0;
auto size = sizeof(native_handle_type);
stream_descriptor res;
auto rc = zmq_getsockopt(socket.get(), ZMQ_FD, &handle, &size);
if (rc < 0)
ec = make_error_code();
else {
#if ! defined BOOST_ASIO_WINDOWS
res.reset(new boost::asio::posix::stream_descriptor(io_service, handle));
#else
// Use duplicated SOCKET, because ASIO socket takes ownership over it so destroys one in dtor.
::WSAPROTOCOL_INFO pi;
::WSADuplicateSocket(handle, ::GetCurrentProcessId(), &pi);
handle = ::WSASocket(pi.iAddressFamily/*AF_INET*/, pi.iSocketType/*SOCK_STREAM*/, pi.iProtocol/*IPPROTO_TCP*/, &pi, 0, 0);
res.reset(new boost::asio::ip::tcp::socket(io_service, boost::asio::ip::tcp::v4(), handle));
#endif
}
return res;
}

static boost::system::error_code cancel_stream_descriptor(stream_descriptor & sd,
boost::system::error_code & ec) {
BOOST_ASSERT_MSG(sd, "invalid stream_descriptor");
return sd->cancel(ec);
}

static boost::system::error_code bind(socket_type & socket,
endpoint_type & ep,
boost::system::error_code & ec) {
BOOST_ASSERT_MSG(socket, "invalid socket");
static const std::regex simple_tcp("^tcp://.*:(\\d+)$");
static const std::regex dynamic_tcp("^(tcp://.*):([*!])(\\[(\\d+)?-(\\d+)?\\])?$");
std::smatch mres;
int rc = -1;
if (std::regex_match(ep, mres, simple_tcp)) {
if (zmq_bind(socket.get(), ep.c_str()) == 0)
rc = boost::lexical_cast<uint16_t>(mres.str(1));
} else if (std::regex_match(ep, mres, dynamic_tcp)) {
auto const& hostname = mres.str(1);
auto const& opcode = mres.str(2);
auto const& first_str = mres.str(4);
auto const& last_str = mres.str(5);
auto first = first_str.empty() ? static_cast<uint16_t>(dynamic_port::first)
: boost::lexical_cast<uint16_t>(first_str);
auto last = last_str.empty() ? static_cast<uint16_t>(dynamic_port::last)
: boost::lexical_cast<uint16_t>(last_str);
uint16_t port = first;
if (opcode[0] == '!') {
static boost::random::taus88 gen;
boost::random::uniform_int_distribution<> port_range(port, last);
port = port_range(gen);
}
auto attempts = last - first;
while (rc < 0 && attempts--) {
ep = hostname + ":" + std::to_string(port);
if (zmq_bind(socket.get(), ep.c_str()) == 0)
rc = port;
if (++port > last)
port = first;
}
} else {
rc = zmq_bind(socket.get(), ep.c_str());
}
if (rc < 0)
ec = make_error_code();
return ec;
}

static boost::system::error_code unbind(socket_type & socket,
endpoint_type const& ep,
boost::system::error_code & ec) {
BOOST_ASSERT_MSG(socket, "invalid socket");
auto rc = zmq_unbind(socket.get(), ep.c_str());
if (rc < 0)
ec = make_error_code();
return ec;
}

static boost::system::error_code connect(socket_type & socket,
endpoint_type const& ep,
boost::system::error_code & ec) {
BOOST_ASSERT_MSG(socket, "invalid socket");
auto rc = zmq_connect(socket.get(), ep.c_str());
if (rc < 0)
ec = make_error_code();
return ec;
}

static boost::system::error_code disconnect(socket_type & socket,
endpoint_type const& ep,
boost::system::error_code & ec) {
BOOST_ASSERT_MSG(socket, "invalid socket");
auto rc = zmq_disconnect(socket.get(), ep.c_str());
if (rc < 0)
ec = make_error_code();
return ec;
}

template<typename Option>
static boost::system::error_code set_option(socket_type & socket,
Option const& opt,
boost::system::error_code & ec) {
auto rc = zmq_setsockopt(socket.get(), opt.name(), opt.data(), opt.size());
if (rc < 0)
ec = make_error_code();
return ec;
}

template<typename Option>
static boost::system::error_code get_option(socket_type & socket,
Option & opt,
boost::system::error_code & ec) {
BOOST_ASSERT_MSG(socket, "invalid socket");
size_t size = opt.size();
auto rc = zmq_getsockopt(socket.get(), opt.name(), opt.data(), &size);
if (rc < 0)
ec = make_error_code();
return ec;
}

static int get_events(socket_type & socket,
boost::system::error_code & ec) {
BOOST_ASSERT_MSG(socket, "invalid socket");
int evs = 0;
size_t size = sizeof(evs);
for(;;) {
auto rc = zmq_getsockopt(socket.get(), ZMQ_EVENTS, &evs, &size);
if (rc < 0) {
if (errno == EINTR)
continue;
ec = make_error_code();
return 0;
}
break;
}
return evs;
}

static int get_socket_kind(socket_type & socket,
boost::system::error_code & ec) {
BOOST_ASSERT_MSG(socket, "invalid socket");
int kind = 0;
size_t size = sizeof(kind);
auto rc = zmq_getsockopt(socket.get(), ZMQ_TYPE, &kind, &size);
if (rc < 0)
ec = make_error_code();
return kind;
}

static bool get_socket_rcvmore(socket_type & socket) {
BOOST_ASSERT_MSG(socket, "invalid socket");
int more = 0;
size_t size = sizeof(more);
auto rc = zmq_getsockopt(socket.get(), ZMQ_RCVMORE, &more, &size);
if (rc == 0)
return more == 1;
return false;
}

static size_t send(message msg,
socket_type & socket,
flags_type flags,
boost::system::error_code & ec) {
BOOST_ASSERT_MSG(socket, "Invalid socket");
auto rc = zmq_msg_send(&msg.msg_, socket.get(), flags);
if (rc < 0) {
ec = make_error_code();
return 0;
}
return rc;
}

template<typename ConstBufferSequence>
static auto send(ConstBufferSequence const& buffers,
socket_type & socket,
flags_type flags,
boost::system::error_code & ec) ->
typename boost::enable_if<boost::has_range_const_iterator<ConstBufferSequence>, size_t>::type
{
size_t res = 0;
auto last = std::distance(std::begin(buffers), std::end(buffers)) - 1;
decltype(last) index = 0u;
for (auto it = std::begin(buffers); it != std::end(buffers); ++it, ++index) {
auto f = index == last ? flags
: flags | ZMQ_SNDMORE;
res += send(message(*it), socket, f, ec);
if (ec) return 0u;
}
return res;
}

static size_t receive(message & msg,
socket_type & socket,
flags_type flags,
boost::system::error_code & ec) {
BOOST_ASSERT_MSG(socket, "Invalid socket");
auto rc = zmq_msg_recv(&msg.msg_, socket.get(), flags);
if (rc < 0) {
ec = make_error_code();
return 0;
}
return rc;
}

template<typename MutableBufferSequence>
static auto receive(MutableBufferSequence const& buffers,
socket_type & socket,
flags_type flags,
boost::system::error_code & ec) ->
typename boost::enable_if<boost::has_range_const_iterator<MutableBufferSequence>, size_t>::type
{
size_t res = 0;
message msg;
auto it = std::begin(buffers);
do {
auto sz = receive(msg, socket, flags, ec);
if (ec)
return 0;

if (msg.buffer_copy(*it++) < sz) {
ec = make_error_code(boost::system::errc::no_buffer_space);
return 0;
}

res += sz;
flags |= ZMQ_RCVMORE;
} while ((it != std::end(buffers)) && msg.more());

if (msg.more())
ec = make_error_code(boost::system::errc::no_buffer_space);
return res;
}

static size_t receive_more(message_vector & vec,
socket_type & socket,
flags_type flags,
boost::system::error_code & ec) {
size_t res = 0;
message msg;
bool more = false;
do {
auto sz = receive(msg, socket, flags, ec);
if (ec)
return 0;
more = msg.more();
vec.emplace_back(std::move(msg));
res += sz;
flags |= ZMQ_RCVMORE;
} while (more);
return res;
}

static size_t flush(socket_type & socket,
boost::system::error_code & ec) {
size_t res = 0;
message msg;
while (get_socket_rcvmore(socket)) {
auto sz = receive(msg, socket, ZMQ_RCVMORE, ec);
if (ec)
return 0;
res += sz;
};
return res;
}

static std::string monitor(socket_type & socket,
int events,
boost::system::error_code & ec) {
BOOST_ASSERT_MSG(socket, "Invalid socket");
std::ostringstream stm;
stm << "inproc://monitor-" << socket.get();
auto addr = stm.str();
auto rc = zmq_socket_monitor(socket.get(), addr.c_str(), events);
if (rc < 0)
ec = make_error_code();
return addr;
}
};
} // namespace detail
} // namespace azmq
#endif // AZMQ_DETAIL_SOCKET_OPS_HPP__

728 changes: 728 additions & 0 deletions azmq/detail/socket_service.hpp

Large diffs are not rendered by default.

52 changes: 52 additions & 0 deletions azmq/error.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_ERROR_HPP_
#define AZMQ_ERROR_HPP_
#include "detail/config.hpp"

#include <boost/system/error_code.hpp>
#include <boost/version.hpp>
#include <zmq.h>
#include <string>


#if BOOST_VERSION < 105400
#ifndef BOOST_NOEXCEPT
#ifdef BOOST_NO_CXX11_NOEXCEPT
#define BOOST_NOEXCEPT
#else
#define BOOST_NOEXCEPT noexcept
#endif
#endif
#define BOOST_SYSTEM_NOEXCEPT BOOST_NOEXCEPT
#endif

namespace azmq {
AZMQ_V1_INLINE_NAMESPACE_BEGIN
/** \brief custom error_category to map zeromq errors */
class error_category : public boost::system::error_category {
public:
const char* name() const BOOST_SYSTEM_NOEXCEPT override {
return "ZeroMQ";
}

std::string message(int ev) const override {
return std::string(zmq_strerror(ev));
}
};

inline boost::system::error_code make_error_code(int ev = errno) {
static error_category cat;

return boost::system::error_code(ev, cat);
}
AZMQ_V1_INLINE_NAMESPACE_END
} // namespace azmq
#endif // AZMQ_ERROR_HPP_

210 changes: 138 additions & 72 deletions aziomq/message.hpp → azmq/message.hpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of aziomq
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZIOMQ_MESSAGE_HPP__
#define AZIOMQ_MESSAGE_HPP__
#ifndef AZMQ_MESSAGE_HPP__
#define AZMQ_MESSAGE_HPP__

#include "error.hpp"
#include "util/scope_guard.hpp"

#include <boost/assert.hpp>
#include <boost/version.hpp>
#if BOOST_VERSION >= 105300
#include <boost/utility/string_ref.hpp>
#endif
#include <boost/asio/buffer.hpp>
#include <boost/asio/buffers_iterator.hpp>
#include <boost/system/system_error.hpp>
@@ -21,22 +25,37 @@

#include <zmq.h>

#include <type_traits>
#include <memory>
#include <vector>
#include <ostream>
#include <cstring>

namespace aziomq {

namespace azmq {
namespace detail {
struct socket_ops;
}

inline namespace V1 {
AZMQ_V1_INLINE_NAMESPACE_BEGIN

struct nocopy_t {};

#ifdef BOOST_NO_CXX11_CONSTEXPR
const nocopy_t nocopy = nocopy_t{};
#else
constexpr nocopy_t nocopy = nocopy_t{};
#endif


struct message {
typedef void (free_fn) (void *data);

using flags_type = int;

message() noexcept {
message() BOOST_NOEXCEPT {
auto rc = zmq_msg_init(&msg_);
BOOST_ASSERT_MSG(rc == 0, "zmq_msg_init return non-zero");
BOOST_ASSERT_MSG(rc == 0, "zmq_msg_init return non-zero"); (void)rc;
}

explicit message(size_t size) {
@@ -54,36 +73,97 @@ inline namespace V1 {
buffer);
}

explicit message(std::string const& str)
: message(boost::asio::buffer(str))
message(nocopy_t, boost::asio::const_buffer const& buffer)
: message(nocopy,
boost::asio::mutable_buffer(
(void *)boost::asio::buffer_cast<const void*>(buffer),
boost::asio::buffer_size(buffer)),
nullptr,
nullptr)
{}

message(nocopy_t, boost::asio::mutable_buffer const& buffer, void* hint, zmq_free_fn* deleter)
{
auto rc = zmq_msg_init_data(&msg_,
boost::asio::buffer_cast<void*>(buffer),
boost::asio::buffer_size(buffer),
deleter, hint);
if (rc)
throw boost::system::system_error(make_error_code());
}

template<class Deleter, class Enabler =
typename std::enable_if<!std::is_convertible<Deleter, free_fn *>::value>::type
>
message(nocopy_t, boost::asio::mutable_buffer const& buffer, Deleter&& deleter)
{
using D = typename std::decay<Deleter>::type;

const auto call_deleter = [](void *buf, void *hint) {
std::unique_ptr<D> deleter(reinterpret_cast<D*>(hint));
BOOST_ASSERT_MSG(deleter, "!deleter");
(*deleter)(buf);
};

std::unique_ptr<D> d(new D(std::forward<Deleter>(deleter)));
auto rc = zmq_msg_init_data(&msg_,
boost::asio::buffer_cast<void*>(buffer),
boost::asio::buffer_size(buffer),
call_deleter, d.get());
if (rc)
throw boost::system::system_error(make_error_code());
d.release();
}

message(nocopy_t, boost::asio::mutable_buffer const& buffer, free_fn* deleter)
{
BOOST_ASSERT_MSG(deleter, "!deleter");

const auto call_deleter = [](void *buf, void *hint) {
free_fn *deleter(reinterpret_cast<free_fn*>(hint));
(*deleter)(buf);
};

auto rc = zmq_msg_init_data(&msg_,
boost::asio::buffer_cast<void*>(buffer),
boost::asio::buffer_size(buffer),
call_deleter, reinterpret_cast<void *>(deleter));
if (rc)
throw boost::system::system_error(make_error_code());
}

#if BOOST_VERSION >= 105300
explicit message(boost::string_ref str)
: message(boost::asio::buffer(str.data(), str.size()))
{ }
#endif

message(message && rhs) noexcept
message(message && rhs) BOOST_NOEXCEPT
: msg_(rhs.msg_)
{
auto rc = zmq_msg_init(&rhs.msg_);
BOOST_ASSERT_MSG(rc == 0, "zmq_msg_init return non-zero");
BOOST_ASSERT_MSG(rc == 0, "zmq_msg_init return non-zero"); (void)rc;
}

message& operator=(message && rhs) noexcept {
message& operator=(message && rhs) BOOST_NOEXCEPT {
msg_ = rhs.msg_;
auto rc = zmq_msg_init(&rhs.msg_);
BOOST_ASSERT_MSG(rc == 0, "zmq_msg_init return non-zero");
BOOST_ASSERT_MSG(rc == 0, "zmq_msg_init return non-zero"); (void)rc;

return *this;
}

message(message const& rhs) {
auto rc = zmq_msg_init(const_cast<zmq_msg_t*>(&msg_));
auto rc = zmq_msg_init(&msg_);
BOOST_ASSERT_MSG(rc == 0, "zmq_msg_init return non-zero");
rc = zmq_msg_copy(const_cast<zmq_msg_t*>(&msg_),
rc = zmq_msg_copy(&msg_,
const_cast<zmq_msg_t*>(&rhs.msg_));
if (rc)
throw boost::system::system_error(make_error_code());
}

message& operator=(message const& rhs) {
auto rc = zmq_msg_copy(const_cast<zmq_msg_t*>(&msg_),
auto rc = zmq_msg_copy(&msg_,
const_cast<zmq_msg_t*>(&rhs.msg_));
if (rc)
throw boost::system::system_error(make_error_code());
@@ -92,82 +172,68 @@ inline namespace V1 {

~message() { close(); }

operator boost::asio::const_buffer() const {
auto pv = zmq_msg_data(const_cast<zmq_msg_t*>(&msg_));
return boost::asio::buffer(pv, size());
boost::asio::const_buffer cbuffer() const BOOST_NOEXCEPT {
return boost::asio::buffer(data(), size());
}

operator boost::asio::mutable_buffer() {
if (is_shared())
deep_copy();
operator boost::asio::const_buffer() const BOOST_NOEXCEPT {
return cbuffer();
}

auto pv = zmq_msg_data(const_cast<zmq_msg_t*>(&msg_));
return boost::asio::buffer(pv, size());
boost::asio::const_buffer buffer() const BOOST_NOEXCEPT {
return cbuffer();
}

size_t buffer_copy(boost::asio::mutable_buffer const& target) const {
return boost::asio::buffer_copy(target, boost::asio::buffer(*this));
boost::asio::mutable_buffer buffer() {
if (is_shared())
deep_copy();

return boost::asio::buffer(const_cast<void *>(data()), size());
}

bool operator==(const message & rhs) const {
auto pa = reinterpret_cast<char*>(
zmq_msg_data(const_cast<zmq_msg_t*>(&msg_)));
auto pb = reinterpret_cast<char*>(
zmq_msg_data(const_cast<zmq_msg_t*>(&rhs.msg_)));
return std::equal(pa, pa + size(), pb);
template<typename T>
T const& buffer_cast() const {
return *boost::asio::buffer_cast<T const*>(buffer());
}

std::string string() const {
auto pv = zmq_msg_data(const_cast<zmq_msg_t*>(&msg_));
auto sz = size();
return std::string(reinterpret_cast<char const*>(pv), sz);
size_t buffer_copy(boost::asio::mutable_buffer const& target) const {
return boost::asio::buffer_copy(target, buffer());
}

friend
std::ostream& operator<<(std::ostream& stm, message const& that) {
return stm << "message{sz=" << that.size() << "}";
bool operator==(const message & rhs) const BOOST_NOEXCEPT {
return !operator!=(rhs);
}

size_t size() const { return zmq_msg_size(const_cast<zmq_msg_t*>(&msg_)); }
bool operator!=(const message & rhs) const BOOST_NOEXCEPT {
const auto sz = size();

void rebuild() {
close();
auto rc = zmq_msg_init(&msg_);
if (rc)
throw boost::system::system_error(make_error_code());
return sz != rhs.size()
|| 0 != std::memcmp(data(), rhs.data(), sz);
}

void rebuild(size_t size) {
close();
auto rc = zmq_msg_init_size(&msg_, size);
if (rc)
throw boost::system::system_error(make_error_code());
std::string string() const {
return std::string(static_cast<const char *>(data()), size());
}

void rebuild(boost::asio::const_buffer const& buffer) {
close();
auto sz = boost::asio::buffer_size(buffer);
auto rc = zmq_msg_init_size(&msg_, sz);
if (rc)
throw boost::system::system_error(make_error_code());
boost::asio::buffer_copy(boost::asio::buffer(zmq_msg_data(&msg_), sz),
buffer);
const void *data() const BOOST_NOEXCEPT {
return zmq_msg_data(const_cast<zmq_msg_t*>(&msg_));
}

void rebuild(std::string const& str) { rebuild(boost::asio::buffer(str)); }
size_t size() const BOOST_NOEXCEPT {
return zmq_msg_size(const_cast<zmq_msg_t*>(&msg_));
}

bool more() const {
return zmq_msg_more(const_cast<zmq_msg_t*>(&msg_));
bool more() const BOOST_NOEXCEPT {
return zmq_msg_more(const_cast<zmq_msg_t*>(&msg_)) ? true : false;
}

private:
friend detail::socket_ops;
zmq_msg_t msg_;

void close() {
void close() BOOST_NOEXCEPT {
auto rc = zmq_msg_close(&msg_);
if (rc)
throw boost::system::system_error(make_error_code());
BOOST_ASSERT_MSG(rc == 0, "zmq_msg_close return non-zero"); (void)rc;
}

// note, this is a bit fragile, currently the last two bytes in a
@@ -177,13 +243,13 @@ inline namespace V1 {
type_offset = sizeof(zmq_msg_t) - 2
};

uint8_t flags() const {
uint8_t flags() const BOOST_NOEXCEPT {
auto pm = const_cast<zmq_msg_t*>(&msg_);
auto p = reinterpret_cast<uint8_t*>(pm);
return p[flags_offset];
}

uint8_t type() const {
uint8_t type() const BOOST_NOEXCEPT {
auto pm = const_cast<zmq_msg_t*>(&msg_);
auto p = reinterpret_cast<uint8_t*>(pm);
return p[type_offset];
@@ -196,7 +262,7 @@ inline namespace V1 {
type_cmsg = 104
};

bool is_shared() const {
bool is_shared() const BOOST_NOEXCEPT {
// TODO use shared message property if libzmq version >= 4.1
return (flags() & flag_shared) || type() == type_cmsg;
}
@@ -217,7 +283,7 @@ inline namespace V1 {
if (rc)
throw boost::system::system_error(make_error_code());

auto pdst = zmq_msg_data(const_cast<zmq_msg_t*>(&msg_));
auto pdst = zmq_msg_data(&msg_);
auto psrc = zmq_msg_data(&tmp);
::memcpy(pdst, psrc, sz);
}
@@ -247,7 +313,7 @@ inline namespace V1 {
}

message& dereference() const {
msg_.rebuild(*it_);
msg_ = message(*it_);
return msg_;
}
};
@@ -278,6 +344,6 @@ inline namespace V1 {
message_vector to_message_vector(BufferSequence const& buffers) {
return message_vector(std::begin(buffers), std::end(buffers));
}
} // namespace V1
} // namespace aziomq
#endif // AZIOMQ_MESSAGE_HPP__
AZMQ_V1_INLINE_NAMESPACE_END
} // namespace azmq
#endif // AZMQ_MESSAGE_HPP__
64 changes: 53 additions & 11 deletions aziomq/option.hpp → azmq/option.hpp
Original file line number Diff line number Diff line change
@@ -1,38 +1,48 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of aziomq
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZIOMQ_OPTION_HPP_
#define AZIOMQ_OPTION_HPP_
#ifndef AZMQ_OPTION_HPP_
#define AZMQ_OPTION_HPP_

#include "error.hpp"

#include <zmq.h>
#include <boost/asio/buffer.hpp>
#include <boost/logic/tribool.hpp>

#include <vector>
#include <string>
#include <exception>

namespace aziomq { namespace opt {
// limits for user/aziomq-defined options (should be well outside of the valid ZMQ range)
namespace azmq { namespace opt {
AZMQ_V1_INLINE_NAMESPACE_BEGIN
// limits for user/azmq-defined options (should be well outside of the valid ZMQ range)
enum class limits : int {
lib_min = 1000000,
lib_ctx_min = lib_min,
lib_ctx_max = lib_ctx_min + 9999,
lib_socket_min,
lib_socket_max = lib_socket_min + 9999,
lib_max = lib_socket_max,
lib_actor_min,
lib_actor_max = lib_actor_min + 9999,
lib_max,
user_min = 2000000,
user_ctx_min = user_min,
user_ctx_max = user_ctx_min + 9999,
user_socket_min,
user_socket_max = user_socket_min + 9999,
user_max = user_socket_max
user_max
};

inline int operator+(limits l) {
return static_cast<int>(l);
}

template<typename T, int N>
struct base {
using static_name = std::integral_constant<int, N>;
@@ -60,6 +70,7 @@ namespace aziomq { namespace opt {
template<int N>
struct boolean {
using static_name = std::integral_constant<int, N>;
using value_t = int;
int value_;

boolean() : value_{ 0 } { }
@@ -70,23 +81,54 @@ namespace aziomq { namespace opt {
void* data() { return reinterpret_cast<void*>(&value_); }
size_t size() const { return sizeof(int); }

bool value() const { return value_; }
bool value() const { return 0 != value_; }
};

template<int N>
struct binary {
using static_name = std::integral_constant<int, N>;
using value_t = void*;
std::vector<char> data_;
void* pv_;
size_t size_;

binary() : pv_(nullptr), size_(0) { }
binary(void* pv, size_t size) : pv_(pv), size_(size) { }
binary(std::string const& str)
: data_(std::begin(str), std::end(str))
, pv_(data_.data())
, size_(data_.size())
{ }

binary(void const* pv, size_t size)
: data_(static_cast<char const*>(pv), static_cast<char const*>(pv) + size)
, pv_(data_.data())
, size_(size)
{ }

int name() const { return N; }
const void* data() const { pv_; }
const void* data() const { return pv_; }
void* data() { return pv_; }
size_t size() const { size_; }
size_t size() const { return size_; }
};

template<int N>
struct exception_ptr {
using static_name = std::integral_constant<int, N>;
using value_t = std::exception_ptr;
std::exception_ptr p_;

exception_ptr() { }
exception_ptr(std::exception_ptr p) : p_(p) { }

int name() const { return N; }
const void* data() const { return &p_; }
void* data() { return &p_; }
size_t size() const { return sizeof(p_); }

std::exception_ptr value() const { return p_; }
};
AZMQ_V1_INLINE_NAMESPACE_END
} }

#endif // AZIOMQ_OPTION_HPP_
#endif // AZMQ_OPTION_HPP_
84 changes: 84 additions & 0 deletions azmq/signal.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef AZMQ_SIGNAL_HPP_
#define AZMQ_SIGNAL_HPP_

#include "socket.hpp"

namespace azmq {
namespace signal {
AZMQ_V1_INLINE_NAMESPACE_BEGIN
/** \brief Send a signal over a socket. A signal is a short message carrying a
* success/failure code (by convention, 0 means OK). Signals are encoded to be
* distinguishable from "normal" messages.
* \param s socket& to signal on
* \param status uint8_t to send
* \param ec boost::system::error_code&
* \return boost::system::error_code
*/
boost::system::error_code send(socket & s, uint8_t status,
boost::system::error_code & ec) {
uint64_t v = 0x77664433221100u + status;
auto buf = boost::asio::buffer(&v, sizeof(v));
s.send(buf, 0, ec);
return ec;
}

/** \brief Send a signal over a socket. A signal is a short message carrying a
* success/failure code (by convention, 0 means OK). Signals are encoded to be
* distinguishable from "normal" messages.
* \param s socket& to signal on
* \param status uint8_t to send
* \throw boost::system::system_error
*/
void send(socket & s, uint8_t status) {
boost::system::error_code ec;
if (send(s, status, ec))
throw boost::system::system_error(ec);
}

/** \brief Wait on a signal from a socket. Use this with signal() to coordiante
* over thread/actor pipes
* \param s socket& to receive signal from
* \param ec boost::system::error_code
* \return signal
*/
uint8_t wait(socket & s, boost::system::error_code & ec) {
message msg;
while (true) {
auto sz = s.receive(msg, 0, ec);
if (ec)
return 0;
if (sz == sizeof(uint64_t)) {
auto v = msg.buffer_cast<uint64_t>();
if ((v & 0xffffffffffff00u) == 0x77664433221100u)
return v & 255;
}
}
}

/** \brief Wait on a signal from a socket. Use this with signal() to coordiante
* over thread/actor pipes
* \param s socket& to receive signal from
* \return signal
* \throw boost::system::system_error
*/
uint8_t wait(socket & s) {
boost::system::error_code ec;
auto res = wait(s, ec);
if (ec)
throw boost::system::system_error(ec);
return res;
}

AZMQ_V1_INLINE_NAMESPACE_END
} // namespace signal
} // namespace azmq
#endif // AZMQ_SIGNAL_HPP_

918 changes: 918 additions & 0 deletions azmq/socket.hpp

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions aziomq/util/expected.hpp → azmq/util/expected.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of aziomq
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -10,8 +10,8 @@
from the "Systematic Error Handling in C++" talk given at C++ And
Beyond 2012
*/
#ifndef AZIOMQ_EXPECTED_HPP_
#define AZIOMQ_EXPECTED_HPP_
#ifndef AZMQ_EXPECTED_HPP_
#define AZMQ_EXPECTED_HPP_

#include <boost/assert.hpp>
#include <exception>
@@ -20,11 +20,11 @@
#include <stdexcept>
#include <cassert>

namespace aziomq { namespace util {
// define AZIOMQ_LOG_UNCHECKED *BEFORE* including expected.hpp to forward declare the following
namespace azmq { namespace util {
// define AZMQ_LOG_UNCHECKED *BEFORE* including expected.hpp to forward declare the following
// function to be called any time an exception is present and unchecked in an expected<T>
// when it's destructor is called
#ifdef AZIOMQ_LOG_UNCHECKED
#ifdef AZMQ_LOG_UNCHECKED
void log_expected_unchecked(std::exception_ptr err);
#endif

@@ -61,7 +61,7 @@ class expected {
}

~expected() {
#ifdef AZIOMQ_LOG_UNCHECKED
#ifdef AZMQ_LOG_UNCHECKED
if (unchecked_)
log_expected_unchecked(err_);
#else
@@ -139,5 +139,5 @@ class expected {
}
};
} // namespace util
} // namespace aziomq
#endif // AZIOMQ_EXPECTED_HPP_
} // namespace azmq
#endif // AZMQ_EXPECTED_HPP_
14 changes: 7 additions & 7 deletions aziomq/util/scope_guard.hpp → azmq/util/scope_guard.hpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of aziomq
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
This is an implementation of Andrei Alexandrescu's ScopeGuard type from
the "Systematic Error Handling in C++" talk given at C++ And Beyond 2012
*/
#ifndef AZIOMQ_SCOPE_GUARD_HPP_
#define AZIOMQ_SCOPE_GUARD_HPP_
#ifndef AZMQ_SCOPE_GUARD_HPP_
#define AZMQ_SCOPE_GUARD_HPP_

#include <utility>

namespace aziomq {
namespace azmq {
namespace util {
template<class F>
class scope_guard_t {
@@ -49,7 +49,7 @@ scope_guard_t<F> operator+(scope_guard_on_exit, F && func) {
return scope_guard<F>(std::forward<F>(func));
}
} // namespace util
} // namespace aziomq
} // namespace azmq

#define CONCATENATE_IMPL(s1, s2) s1##s2
#define CONCATENATE(s1, s2) CONCATENATE_IMPL(s1, s2)
@@ -62,5 +62,5 @@ scope_guard_t<F> operator+(scope_guard_on_exit, F && func) {
#endif // __COUNTER__

#define SCOPE_EXIT\
auto ANONYMOUS_VARIABLE(SCOPE_EXIT_STATE) = aziomq::util::scope_guard_on_exit() + [&]()
#endif // AZIOMQ_SCOPE_GUARD_HPP_
auto ANONYMOUS_VARIABLE(SCOPE_EXIT_STATE) = azmq::util::scope_guard_on_exit() + [&]()
#endif // AZMQ_SCOPE_GUARD_HPP_
23 changes: 23 additions & 0 deletions azmq/version.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/

#ifndef AZMQ_VERSION_HPP_
#define AZMQ_VERSION_HPP_

/* Version macros for compile-time API version detection */
#define AZMQ_VERSION_MAJOR 1
#define AZMQ_VERSION_MINOR 0
#define AZMQ_VERSION_PATCH 2

#define AZMQ_MAKE_VERSION(major, minor, patch) \
((major) *10000 + (minor) *100 + (patch))
#define AZMQ_VERSION \
AZMQ_MAKE_VERSION (AZMQ_VERSION_MAJOR, AZMQ_VERSION_MINOR, AZMQ_VERSION_PATCH)

#endif // AZMQ_VERSION_HPP_
6 changes: 6 additions & 0 deletions azmqConfig.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
include(CMakeFindDependencyMacro)
find_package(Boost 1.68 COMPONENTS system date_time thread chrono random REQUIRED)
include(${CMAKE_CURRENT_LIST_DIR}/FindAzmqLibzmq.cmake)
set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
find_package(Threads REQUIRED)
include("${CMAKE_CURRENT_LIST_DIR}/azmqTargets.cmake")
75 changes: 75 additions & 0 deletions cmake/CompilerWarnings.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
function(add_more_warnings_to project)
# inspired from Jason Turner's excellent reference
# https://github.com/lefticus/cppbestpractices/blob/master/02-Use_the_Tools_Available.md
set(MSVC_WARNINGS
/W4 # Baseline reasonable warnings
/w14242 # 'identifier': conversion from 'type1' to 'type1', possible loss of data
/w14254 # 'operator': conversion from 'type1:field_bits' to 'type2:field_bits', possible loss of data
/w14263 # 'function': member function does not override any base class virtual member function
/w14265 # 'classname': class has virtual functions, but destructor is not virtual instances of this class may not
# be destructed correctly
/w14287 # 'operator': unsigned/negative constant mismatch
/we4289 # nonstandard extension used: 'variable': loop control variable declared in the for-loop is used outside
# the for-loop scope
/w14296 # 'operator': expression is always 'boolean_value'
/w14311 # 'variable': pointer truncation from 'type1' to 'type2'
/w14545 # expression before comma evaluates to a function which is missing an argument list
/w14546 # function call before comma missing argument list
/w14547 # 'operator': operator before comma has no effect; expected operator with side-effect
/w14549 # 'operator': operator before comma has no effect; did you intend 'operator'?
/w14555 # expression has no effect; expected expression with side- effect
/w14619 # pragma warning: there is no warning number 'number'
/w14640 # Enable warning on thread un-safe static member initialization
/w14826 # Conversion from 'type1' to 'type_2' is sign-extended. This may cause unexpected runtime behavior.
/w14905 # wide string literal cast to 'LPSTR'
/w14906 # string literal cast to 'LPWSTR'
/w14928 # illegal copy-initialization; more than one user-defined conversion has been implicitly applied
/permissive- # standards conformance mode for MSVC compiler.
)

set(CLANG_WARNINGS
-Wall
-Wextra # reasonable and standard
-Wshadow # warn the user if a variable declaration shadows one from a parent context
-Wnon-virtual-dtor # warn the user if a class with virtual functions has a non-virtual destructor. This helps
# catch hard to track down memory errors
-Wold-style-cast # warn for c-style casts
-Wcast-align # warn for potential performance problem casts
-Wunused # warn on anything being unused
-Woverloaded-virtual # warn if you overload (not override) a virtual function
-Wpedantic # warn if non-standard C++ is used
-Wconversion # warn on type conversions that may lose data
-Wsign-conversion # warn on sign conversions
-Wnull-dereference # warn if a null dereference is detected
-Wdouble-promotion # warn if float is implicit promoted to double
-Wformat=2 # warn on security issues around functions that format output (ie printf)
-Wimplicit-fallthrough # warn on statements that fallthrough without an explicit annotation
)

if(WARNINGS_AS_ERRORS)
set(CLANG_WARNINGS ${CLANG_WARNINGS} -Werror)
set(MSVC_WARNINGS ${MSVC_WARNINGS} /WX)
endif()

set(GCC_WARNINGS
${CLANG_WARNINGS}
-Wmisleading-indentation # warn if indentation implies blocks where blocks do not exist
-Wduplicated-cond # warn if if / else chain has duplicated conditions
-Wduplicated-branches # warn if if / else branches have duplicated code
-Wlogical-op # warn about logical operations being used where bitwise were probably wanted
-Wuseless-cast # warn if you perform a cast to the same type
)

if(MSVC)
set(PROJECT_WARNINGS ${MSVC_WARNINGS})
elseif(CMAKE_CXX_COMPILER_ID MATCHES ".*Clang")
set(PROJECT_WARNINGS ${CLANG_WARNINGS})
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set(PROJECT_WARNINGS ${GCC_WARNINGS})
else()
message(AUTHOR_WARNING "No compiler warnings set for '${CMAKE_CXX_COMPILER_ID}' compiler.")
endif()

# XXX NO! target_compile_options(${project} INTERFACE ${PROJECT_WARNINGS})
add_compile_options(${PROJECT_WARNINGS})
endfunction()
45 changes: 0 additions & 45 deletions config/FindAzioMQ.cmake

This file was deleted.

45 changes: 0 additions & 45 deletions config/FindZeroMQ.cmake

This file was deleted.

1 change: 1 addition & 0 deletions doc/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add_subdirectory(examples)
1 change: 1 addition & 0 deletions doc/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add_subdirectory(actor)
5 changes: 5 additions & 0 deletions doc/examples/actor/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
project(actor)

add_executable(${PROJECT_NAME} main.cpp)

target_link_libraries(${PROJECT_NAME} Azmq::azmq)
127 changes: 127 additions & 0 deletions doc/examples/actor/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#include <azmq/actor.hpp>

#include <boost/utility/string_ref.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

#include <memory>
#include <array>
#include <atomic>
#include <iostream>

namespace asio = boost::asio;
namespace pt = boost::posix_time;

class server_t {
public:
server_t(asio::io_service & ios)
: pimpl_(std::make_shared<impl>())
, frontend_(azmq::actor::spawn(ios, run, pimpl_))
{ }

void ping() {
frontend_.send(asio::buffer("PING"));
frontend_.async_receive(asio::buffer(buf_), [this](boost::system::error_code const& ec, size_t bytes_transferred) {
if (ec)
return;
if (boost::string_ref(buf_.data(), bytes_transferred - 1) == "PONG")
pimpl_->pongs_++;
});
}

friend std::ostream & operator<<(std::ostream & stm, server_t const & that) {
return stm << "pings=" << that.pimpl_->pings_
<< ", pongs=" << that.pimpl_->pongs_;
}

private:
// for such a simple example, this is overkill, but is a useful pattern for
// real servers that need to maintain state
struct impl {
std::atomic<unsigned long> pings_;
std::atomic<unsigned long> pongs_;
std::array<char, 256> buf_;

impl()
: pings_(0)
, pongs_(0)
{ }
};
using ptr = std::shared_ptr<impl>;
ptr pimpl_;

// we schedule async receives for the backend socket here
static void do_receive(azmq::socket & backend, std::weak_ptr<impl> pimpl) {
if (auto p = pimpl.lock()) {
backend.async_receive(asio::buffer(p->buf_), [&backend, pimpl](boost::system::error_code const& ec, size_t bytes_transferred) {
if (ec)
return; // exit on error

if (auto p = pimpl.lock()) {
if (boost::string_ref(p->buf_.data(), bytes_transferred - 1) != "PING")
return; // exit if not PING
p->pings_++;
backend.send(asio::buffer("PONG"));

// schedule another receive
do_receive(backend, pimpl);
}
});
}
}

// This is the function run by the background thread
static void run(azmq::socket & backend, ptr pimpl) {
do_receive(backend, pimpl);
backend.get_io_service().run();
}

azmq::socket frontend_;
std::array<char, 256> buf_;
};


// ping every 250ms
void schedule_ping(asio::deadline_timer & timer, server_t & server) {
server.ping();

timer.expires_from_now(pt::milliseconds(250));
timer.async_wait([&](boost::system::error_code const& ec) {
if (ec)
return;
schedule_ping(timer, server);
});
};

int main(int argc, char** argv) {
asio::io_service ios;

std::cout << "Running...";
std::cout.flush();

// halt on SIGINT or SIGTERM
asio::signal_set signals(ios, SIGTERM, SIGINT);
signals.async_wait([&](boost::system::error_code const&, int) {
ios.stop();
});

server_t server(ios);

asio::deadline_timer timer(ios);
schedule_ping(timer, server);

// run for 5 secods
asio::deadline_timer deadline(ios, pt::seconds(5));
deadline.async_wait([&](boost::system::error_code const&) {
ios.stop();
});

ios.run();

std::cout << "Done. Results - " << server << std::endl;

return 0;
}
1 change: 0 additions & 1 deletion src/CMakeLists.txt

This file was deleted.

8 changes: 0 additions & 8 deletions src/lib/CMakeLists.txt

This file was deleted.

37 changes: 0 additions & 37 deletions src/lib/context_ops.cpp

This file was deleted.

30 changes: 0 additions & 30 deletions src/lib/error.cpp

This file was deleted.

20 changes: 0 additions & 20 deletions src/lib/socket_service.cpp

This file was deleted.

39 changes: 27 additions & 12 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,16 +1,31 @@
add_subdirectory(message)
add_subdirectory(context_ops)
add_subdirectory(socket_ops)
add_subdirectory(socket)
#include_directories("${PROJECT_SOURCE_DIR}/src")
#add_executable(test_aziomq main.cpp)
#target_link_libraries(test_aziomq ${Boost_LIBRARIES}
#${ZeroMQ_LIBRARIES}
#aziomq)
cmake_minimum_required(VERSION 3.16...3.25)
project(test_azmq LANGUAGES CXX)

#add_test(message ${CMAKE_CURRENT_BINARY_DIR}/test_context_ops)
#add_test(message ${CMAKE_CURRENT_BINARY_DIR}/test_socket_ops)
#add_test(message ${CMAKE_CURRENT_BINARY_DIR}/test_socket)
if (TEST_INSTALLED_VERSION)
find_package(azmq 1.1 REQUIRED)
enable_testing()
endif ()

# NOTE: current Catch2 version is 3.2.0 is not compatible! CK
find_package(Catch2 2.13 CONFIG QUIET)
if (NOT TARGET Catch2::Catch2)
include(FetchContent)
FetchContent_Declare(Catch2 GIT_REPOSITORY https://github.com/catchorg/Catch2.git GIT_TAG v2.13.7)
FetchContent_MakeAvailable(Catch2)
list(APPEND CMAKE_MODULE_PATH ${catch2_SOURCE_DIR}/contrib)
endif ()

macro (add_catch_test name)
# XXX not really needed? CK include(CTest)
target_link_libraries(${name} Catch2::Catch2)
include(Catch)
catch_discover_tests(${name})
endmacro ()

add_subdirectory(message)
add_subdirectory(context_ops)
add_subdirectory(socket_ops)
add_subdirectory(socket)
add_subdirectory(signal)
add_subdirectory(actor)
add_subdirectory(cpp20/socket)
7 changes: 7 additions & 0 deletions test/actor/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
project(test_actor)

add_executable(${PROJECT_NAME} main.cpp)

target_link_libraries(${PROJECT_NAME} Azmq::azmq)

add_catch_test(${PROJECT_NAME})
68 changes: 68 additions & 0 deletions test/actor/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#include <azmq/actor.hpp>
#include <azmq/util/scope_guard.hpp>

#include <boost/asio/buffer.hpp>

#include <array>
#include <thread>
#include <iostream>

#define CATCH_CONFIG_MAIN
#include <catch2/catch.hpp>

std::array<boost::asio::const_buffer, 2> snd_bufs = {{
boost::asio::buffer("A"),
boost::asio::buffer("B")
}};

std::string subj(const char* name) {
return std::string("inproc://") + name;
}

TEST_CASE( "Async Send/Receive", "[actor]" ) {
boost::system::error_code ecc;
size_t btc = 0;

boost::system::error_code ecb;
size_t btb = 0;
{
std::array<char, 2> a;
std::array<char, 2> b;

std::array<boost::asio::mutable_buffer, 2> rcv_bufs = {{
boost::asio::buffer(a),
boost::asio::buffer(b)
}};

boost::asio::io_service ios;
auto s = azmq::actor::spawn(ios, [&](azmq::socket & ss) {
ss.async_receive(rcv_bufs, [&](boost::system::error_code const& ec, size_t bytes_transferred) {
ecb = ec;
btb = bytes_transferred;
ios.stop();
});
ss.get_io_service().run();
});

s.async_send(snd_bufs, [&] (boost::system::error_code const& ec, size_t bytes_transferred) {
ecc = ec;
btc = bytes_transferred;
});

boost::asio::io_service::work w(ios);
ios.run();
}

REQUIRE(ecc == boost::system::error_code());
REQUIRE(btc == 4);
REQUIRE(ecb == boost::system::error_code());
REQUIRE(btb == 4);
}
25 changes: 0 additions & 25 deletions test/assert.ipp

This file was deleted.

12 changes: 7 additions & 5 deletions test/context_ops/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include_directories("${PROJECT_SOURCE_DIR}/src")
add_executable(test_context_ops main.cpp)
target_link_libraries(test_context_ops ${Boost_LIBRARIES}
${ZeroMQ_LIBRARIES}
aziomq)
project(test_context_ops)

add_executable(${PROJECT_NAME} main.cpp)

target_link_libraries(${PROJECT_NAME} Azmq::azmq)

add_catch_test(${PROJECT_NAME})
58 changes: 27 additions & 31 deletions test/context_ops/main.cpp
Original file line number Diff line number Diff line change
@@ -1,54 +1,50 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of aziomq
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#include <aziomq/detail/context_ops.hpp>
#include <azmq/detail/context_ops.hpp>

#define BOOST_ENABLE_ASSERT_HANDLER
#include <boost/assert.hpp>
#include <boost/system/error_code.hpp>

#include <string>
#include <iostream>
#include <exception>

#include "../assert.ipp"
#define CATCH_CONFIG_MAIN
#include <catch2/catch.hpp>

void test_context() {
auto ctx = aziomq::detail::context_ops::get_context();
auto ctx2 = aziomq::detail::context_ops::get_context(true);
BOOST_ASSERT_MSG(ctx != ctx2, "expecting ctx != ctx2");

auto ctx3 = aziomq::detail::context_ops::get_context();
BOOST_ASSERT_MSG(ctx == ctx3, "expecting ctx == ctx3");
TEST_CASE( "context_creation", "[context]") {
auto ctx = azmq::detail::context_ops::get_context();
auto ctx2 = azmq::detail::context_ops::get_context(true);
REQUIRE(ctx != ctx2);

auto ctx3 = azmq::detail::context_ops::get_context();
REQUIRE(ctx == ctx3);
}

void test_context_options() {
auto ctx = aziomq::detail::context_ops::get_context();
using io_threads = aziomq::detail::context_ops::io_threads;
TEST_CASE( "context_options", "[context]" ) {
auto ctx = azmq::detail::context_ops::get_context();
using io_threads = azmq::detail::context_ops::io_threads;
boost::system::error_code ec;
aziomq::detail::context_ops::set_option(ctx, io_threads(2), ec);
BOOST_ASSERT_MSG(!ec, "error setting io_threads option");
azmq::detail::context_ops::set_option(ctx, io_threads(2), ec);
REQUIRE(!ec);

io_threads res;
aziomq::detail::context_ops::get_option(ctx, res, ec);
BOOST_ASSERT_MSG(!ec, "error getting io_threads option");
BOOST_ASSERT(res.value() == 2);
io_threads res = 0;
azmq::detail::context_ops::get_option(ctx, res, ec);
REQUIRE(!ec);
REQUIRE(res.value() == 2);
}

int main(int argc, char **argv) {
std::cout << "Testing context operations...";
try {
test_context();
test_context_options();
} catch (std::exception const& e) {
std::cout << "Failure\n" << e.what() << std::endl;
return 1;
}
std::cout << "Success" << std::endl;
return 0;
TEST_CASE( "invalid option", "[context]" ) {
auto ctx = azmq::detail::context_ops::get_context();
using io_threads = azmq::detail::context_ops::io_threads;
boost::system::error_code ec;
azmq::detail::context_ops::set_option(ctx, io_threads(-1), ec);
REQUIRE(ec);
REQUIRE(ec.value() == EINVAL);
}
41 changes: 41 additions & 0 deletions test/cpp20/socket/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
project(test_socket_cpp20)
cmake_minimum_required(VERSION 3.16)

include(CheckIncludeFileCXX)
check_include_file_cxx(coroutine FOUND_COROUTINE_HEADER)
set(REQUIRES_MSG "coroutine support")
if (FOUND_COROUTINE_HEADER)
message(STATUS "${PROJECT_NAME} found ${REQUIRES_MSG}")
else()
message(WARNING "${PROJECT_NAME} requires ${REQUIRES_MSG}")
return()
endif ()

set(REQUIRES_MSG "a c++20 compiler")
if (cxx_std_20 IN_LIST CMAKE_CXX_COMPILE_FEATURES)
message(STATUS "${PROJECT_NAME} found ${REQUIRES_MSG}")
else()
message(WARNING "${PROJECT_NAME} requires ${REQUIRES_MSG}")
return()
endif ()

set(REQUIRES_MSG "boost version >= 107000")
if (Boost_VERSION_MACRO GREATER_EQUAL 107000)
message(STATUS "${PROJECT_NAME} found ${REQUIRES_MSG}")
else()
message(WARNING "${PROJECT_NAME} requires ${REQUIRES_MSG}, but has version : ${Boost_VERSION_MACRO}")
return()
endif ()

add_executable(${PROJECT_NAME} main.cpp)
target_link_libraries(${PROJECT_NAME}
Azmq::azmq
Boost::boost
Boost::thread
Boost::system
${ZeroMQ_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT})

target_compile_features(${PROJECT_NAME} PUBLIC cxx_std_20)

add_catch_test(${PROJECT_NAME})
77 changes: 77 additions & 0 deletions test/cpp20/socket/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#include <azmq/socket.hpp>

#include <boost/asio/io_context.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/use_awaitable.hpp>

#include <boost/current_function.hpp>

#include <coroutine>
#include <array>
#include <string>

#define CATCH_CONFIG_MAIN
#include <catch2/catch.hpp>

std::string subj(const char *name) {
return std::string("inproc://") + name;
}

TEST_CASE("coroutine send/receive message", "[socket_cpp20]") {
boost::asio::io_context ioc;

azmq::socket sb(ioc, ZMQ_ROUTER);
sb.bind(subj(BOOST_CURRENT_FUNCTION));

azmq::socket sc(ioc, ZMQ_DEALER);
sc.connect(subj(BOOST_CURRENT_FUNCTION));

boost::optional<size_t> btc{};
boost::optional<size_t> btb{};

//sending coroutine
co_spawn(ioc, [&]() -> boost::asio::awaitable<void> {
std::array<boost::asio::const_buffer, 2> snd_bufs = {{
boost::asio::buffer("A"),
boost::asio::buffer("B")
}};

btc = co_await azmq::async_send(sc, snd_bufs, boost::asio::use_awaitable);
co_return;
}, boost::asio::detached);

//receiving coroutine
co_spawn(ioc, [&]() -> boost::asio::awaitable<void> {
std::array<char, 5> ident;
std::array<char, 2> a;
std::array<char, 2> b;

std::array<boost::asio::mutable_buffer, 3> rcv_bufs = {{
boost::asio::buffer(ident),
boost::asio::buffer(a),
boost::asio::buffer(b)
}};

btb = co_await azmq::async_receive(sb, rcv_bufs, boost::asio::use_awaitable);
co_return;
}, boost::asio::detached);

ioc.run();

REQUIRE(btb.has_value());
REQUIRE(btb.value() == 9);

REQUIRE(btc.has_value());
REQUIRE(btc.value() == 4);
}

10 changes: 5 additions & 5 deletions test/message/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
add_executable(test_message main.cpp)
target_link_libraries(test_message ${Boost_LIBRARIES}
${ZeroMQ_LIBRARIES}
aziomq)
project(test_message)

add_test(message ${CMAKE_CURRENT_BINARY_DIR}/test_message)
add_executable(${PROJECT_NAME} main.cpp)

target_link_libraries(${PROJECT_NAME} Azmq::azmq)

add_catch_test(${PROJECT_NAME})
235 changes: 165 additions & 70 deletions test/message/main.cpp
Original file line number Diff line number Diff line change
@@ -1,96 +1,209 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of aziomq
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#include <iostream>
#include <aziomq/message.hpp>
#include <azmq/message.hpp>

#define BOOST_ENABLE_ASSERT_HANDLER
#include <boost/assert.hpp>
#include <boost/asio/buffer.hpp>

#include <string>
#include <algorithm>
#include <array>
#include <iterator>

#include "../assert.ipp"
#define CATCH_CONFIG_MAIN
#include <catch2/catch.hpp>

void test_message_constructors() {

TEST_CASE( "message_constructors", "[message]" ) {
// default init range has 0 size
aziomq::message m;
BOOST_ASSERT(m.size() == 0);
azmq::message m;
REQUIRE(m.size() == 0);

// pre-sized message construction
aziomq::message mm(42);
BOOST_ASSERT(mm.size() == 42);
azmq::message mm(42);
REQUIRE(mm.size() == 42);

// implicit construction from asio::const_buffer
std::string s("This is a test");
aziomq::message mstr(boost::asio::buffer(s));
BOOST_ASSERT(s.size() == mstr.size());
BOOST_ASSERT(s == mstr.string());
azmq::message mstr(boost::asio::buffer(s));
REQUIRE(s.size() == mstr.size());
REQUIRE(s == mstr.string());

// construction from string
aziomq::message mmstr(s);
BOOST_ASSERT(s == mmstr.string());
azmq::message mmstr(s);
REQUIRE(s == mmstr.string());
}

char global_buf[1024];
int global_hint;
int global_ctr;


void free_fn1(void *buf)
{
REQUIRE(buf == &global_buf);
++global_ctr;
}

void test_message_buffer_operations() {
aziomq::message mm(42);
void free_fn2(void *buf, void *hint)
{
REQUIRE(buf == &global_buf);
REQUIRE(hint == &global_hint);
++global_ctr;
}


TEST_CASE( "deleter", "[message]" ) {
global_ctr = 0;

{
azmq::message m1(azmq::nocopy, boost::asio::buffer("static const buf"));
REQUIRE(17 == m1.size());
}

{
azmq::message m2(azmq::nocopy, boost::asio::buffer(global_buf), [](void *buf){
REQUIRE(buf == &global_buf);
++global_ctr;
});
REQUIRE(sizeof(global_buf) == m2.size());
}
REQUIRE(1 == global_ctr);

{
azmq::message m3(azmq::nocopy, boost::asio::buffer(global_buf), &global_hint, [](void *buf, void *hint){
REQUIRE(buf == global_buf);
REQUIRE(hint == &global_hint);
++global_ctr;
});
REQUIRE(sizeof(global_buf) == m3.size());
}
REQUIRE(2 == global_ctr);

{
char buf2[16];
int x = 42;
azmq::message m4(azmq::nocopy, boost::asio::buffer(buf2), [x, &buf2](void *buf){
REQUIRE(buf == buf2);
REQUIRE(x == 42);
++global_ctr;
});
REQUIRE(sizeof(buf2) == m4.size());
}
REQUIRE(3 == global_ctr);

{
azmq::message m5(azmq::nocopy, boost::asio::buffer(global_buf), &global_hint, free_fn2);
REQUIRE(sizeof(global_buf) == m5.size());
}
REQUIRE(4 == global_ctr);

{
azmq::message m6(azmq::nocopy, boost::asio::buffer(global_buf), &free_fn1);
REQUIRE(sizeof(global_buf) == m6.size());
}
REQUIRE(5 == global_ctr);


{
azmq::message m7;
{
azmq::message m8(azmq::nocopy, boost::asio::buffer(global_buf), free_fn1);
REQUIRE(sizeof(global_buf) == m8.size());
m7 = m8;
REQUIRE(sizeof(global_buf) == m7.size());

}
REQUIRE(5 == global_ctr); // msg is not deleted yet
}
REQUIRE(6 == global_ctr);
}

TEST_CASE( "message_buffer_operations", "[message]" ) {
azmq::message mm(42);
// implicit cast to const_buffer
boost::asio::const_buffer b = mm;
BOOST_ASSERT(boost::asio::buffer_size(b) == mm.size());
boost::asio::const_buffer b = mm.cbuffer();
REQUIRE(boost::asio::buffer_size(b) == mm.size());

// implicit cast to mutable_buffer
boost::asio::mutable_buffer bb = mm;
BOOST_ASSERT(boost::asio::buffer_size(bb) == mm.size());
boost::asio::mutable_buffer bb = mm.buffer();
REQUIRE(boost::asio::buffer_size(bb) == mm.size());
}

void test_message_copy_operations() {
aziomq::message m(42);
aziomq::message mm(m);
BOOST_ASSERT(m.size() == mm.size() && mm.size() == 42);
TEST_CASE( "message_copy_operations", "[message]" ) {
azmq::message m(42);
azmq::message mm(m);
REQUIRE(m.size() == 42);
REQUIRE(mm.size() == 42);

aziomq::message mmm = m;
BOOST_ASSERT(m.size() == mmm.size() && mmm.size() == 42);
azmq::message mmm = m;
REQUIRE(m.size() == 42);
REQUIRE(mmm.size() == 42);
}

void test_message_move_operations() {
aziomq::message m;
aziomq::message mm(42);
TEST_CASE( "message_move_operations", "[message]" ) {
azmq::message m;
azmq::message mm(42);

// move assignment
m = std::move(mm);
BOOST_ASSERT(m.size() == 42);
BOOST_ASSERT(mm.size() == 0);
REQUIRE(m.size() == 42);
REQUIRE(mm.size() == 0);

// move construction
aziomq::message mmm(std::move(m));
BOOST_ASSERT(m.size() == 0);
BOOST_ASSERT(mmm.size() == 42);
azmq::message mmm(std::move(m));
REQUIRE(m.size() == 0);
REQUIRE(mmm.size() == 42);
}

void test_write_through_mutable_buffer() {
aziomq::message m("This is a test");
TEST_CASE( "write_through_mutable_buffer", "[message]" ) {
azmq::message m("This is a test");

aziomq::message mm(m);
boost::asio::mutable_buffer bb = mm;
azmq::message mm(m);
boost::asio::mutable_buffer bb = mm.buffer();
auto pstr = boost::asio::buffer_cast<char*>(bb);
pstr[0] = 't';

auto s = mm.string();
BOOST_ASSERT(std::string("this is a test") == s);
REQUIRE(std::string("this is a test") == s);

auto ss = m.string();
BOOST_ASSERT(s != ss);
REQUIRE(s != ss);
}

void test_message_sequence() {
TEST_CASE( "comparison", "[message]" ) {
using boost::asio::buffer;

REQUIRE(azmq::message(buffer("bla-bla", 7)) == azmq::message(buffer("bla-bla", 7)));
REQUIRE_FALSE(azmq::message(buffer("bla-bla", 7)) != azmq::message(buffer("bla-bla", 7)));

REQUIRE(azmq::message(buffer("bla-bla", 7)) != azmq::message(buffer("bla-bla", 6)));
REQUIRE_FALSE(azmq::message(buffer("bla-bla", 7)) == azmq::message(buffer("bla-bla", 6)));

REQUIRE(azmq::message(buffer("bla-bla", 6)) != azmq::message(buffer("bla-bla", 7)));
REQUIRE_FALSE(azmq::message(buffer("bla-bla", 6)) == azmq::message(buffer("bla-bla", 7)));

REQUIRE_FALSE(azmq::message(buffer("bla-bla", 7)) == azmq::message(buffer("bla-BLB", 7)));
REQUIRE(azmq::message(buffer("bla-bla", 7)) != azmq::message(buffer("bla-BLB", 7)));

REQUIRE_FALSE(azmq::message(buffer("bla-BLB", 7)) == azmq::message(buffer("bla-bla", 7)));
REQUIRE(azmq::message(buffer("bla-BLB", 7)) != azmq::message(buffer("bla-bla", 7)));
}


TEST_CASE( "message_data", "[message]" ) {
azmq::message m("bla-bla");

REQUIRE(m.size() == 7);
REQUIRE(0 == memcmp(m.data(), "bla-bla", 7));
}

TEST_CASE( "message_sequence", "[message]" ) {
std::string foo("foo");
std::string bar("bar");

@@ -100,40 +213,22 @@ void test_message_sequence() {
}};

// make a message_vector from a range
auto res = aziomq::to_message_vector(bufs);
BOOST_ASSERT(res.size() == bufs.size());
BOOST_ASSERT(foo == res[0].string());
BOOST_ASSERT(bar == res[1].string());
auto res = azmq::to_message_vector(bufs);
REQUIRE(res.size() == bufs.size());
REQUIRE(foo == res[0].string());
REQUIRE(bar == res[1].string());

// implicit conversion
res.push_back(boost::asio::buffer("BAZ"));
BOOST_ASSERT(res.size() == bufs.size() + 1);
REQUIRE(res.size() == bufs.size() + 1);

// range of const_buffer -> range of message
auto range = aziomq::const_message_range(bufs);
BOOST_ASSERT(std::distance(std::begin(bufs), std::end(bufs)) ==
std::distance(std::begin(range), std::end(range)));
auto range = azmq::const_message_range(bufs);
REQUIRE(std::distance(std::begin(bufs), std::end(bufs)) ==
std::distance(std::begin(range), std::end(range)));

auto it = std::begin(range);
for(auto& buf : bufs) {
BOOST_ASSERT(aziomq::message(buf) == *it++);
REQUIRE(azmq::message(buf) == *it++);
}
}

int main(int argc, char **argv) {
std::cout << "Testing message operations...";
try {
test_message_constructors();
test_message_copy_operations();
test_message_move_operations();
test_message_buffer_operations();
test_write_through_mutable_buffer();
test_message_sequence();
} catch (std::exception const& e) {
std::cout << "Failure\n" << e.what() << std::endl;
return 1;
}
std::cout << "Success" << std::endl;
return 0;
}

7 changes: 7 additions & 0 deletions test/signal/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
project(test_signal)

add_executable(${PROJECT_NAME} main.cpp)

target_link_libraries(${PROJECT_NAME} Azmq::azmq)

add_catch_test(${PROJECT_NAME})
26 changes: 26 additions & 0 deletions test/signal/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
This file is part of azmq
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#include <azmq/signal.hpp>

#include <boost/asio/io_service.hpp>

#define CATCH_CONFIG_MAIN
#include <catch2/catch.hpp>

TEST_CASE( "Send/Receive a signal", "[signal]" ) {
boost::asio::io_service ios;
azmq::pair_socket sb(ios);
azmq::pair_socket sc(ios);

sb.bind("inproc://test");
sc.connect("inproc://test");

azmq::signal::send(sb, 123);
REQUIRE( azmq::signal::wait(sc) == 123);
}
14 changes: 9 additions & 5 deletions test/socket/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#include_directories("${PROJECT_SOURCE_DIR}/src")
add_executable(test_socket main.cpp)
target_link_libraries(test_socket ${Boost_LIBRARIES}
${ZeroMQ_LIBRARIES}
aziomq)
project(test_socket)

add_executable(${PROJECT_NAME} main.cpp)

find_package(Boost COMPONENTS coroutine)

target_link_libraries(${PROJECT_NAME} Azmq::azmq Boost::coroutine)

add_catch_test(${PROJECT_NAME})
831 changes: 737 additions & 94 deletions test/socket/main.cpp

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions test/socket_ops/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include_directories("${PROJECT_SOURCE_DIR}/src")
add_executable(test_socket_ops main.cpp)
target_link_libraries(test_socket_ops ${Boost_LIBRARIES}
${ZeroMQ_LIBRARIES}
aziomq)
project(test_socket_ops)

add_executable(${PROJECT_NAME} main.cpp)

target_link_libraries(${PROJECT_NAME} Azmq::azmq)

add_catch_test(${PROJECT_NAME})
230 changes: 140 additions & 90 deletions test/socket_ops/main.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#include <aziomq/detail/context_ops.hpp>
#include <aziomq/detail/socket_ops.hpp>
#include <azmq/detail/context_ops.hpp>
#include <azmq/detail/socket_ops.hpp>

#define BOOST_ENABLE_ASSERT_HANDLER
#include <boost/assert.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/current_function.hpp>

#include <array>
#include <iostream>
#include <chrono>
#include <thread>

#include "../assert.ipp"
#define CATCH_CONFIG_MAIN
#include <catch2/catch.hpp>

auto ctx = aziomq::detail::context_ops::get_context();
auto ctx = azmq::detail::context_ops::get_context();

std::array<boost::asio::const_buffer, 2> snd_bufs = {{
boost::asio::buffer("A"),
@@ -21,54 +22,114 @@ std::string subj(const char* name) {
return std::string("inproc://") + name;
}

void test_send_receive_inproc_discrete_calls() {
TEST_CASE( "Tcp Dynamic Binding Expressions", "[socket_ops]" ) {
boost::system::error_code ec;
auto sb = aziomq::detail::socket_ops::create_socket(ctx, ZMQ_ROUTER, ec);
BOOST_ASSERT(!ec);
aziomq::detail::socket_ops::bind(sb, subj(__PRETTY_FUNCTION__), ec);
BOOST_ASSERT(!ec);

auto sc = aziomq::detail::socket_ops::create_socket(ctx, ZMQ_DEALER, ec);
BOOST_ASSERT(!ec);
aziomq::detail::socket_ops::connect(sc, subj(__PRETTY_FUNCTION__), ec);
BOOST_ASSERT(!ec);

// Send and receive one at a time
aziomq::detail::socket_ops::send(snd_bufs, sc, ZMQ_SNDMORE, ec);
BOOST_ASSERT(!ec);
auto sb = azmq::detail::socket_ops::create_socket(ctx, ZMQ_ROUTER, ec);
REQUIRE(ec == boost::system::error_code());

std::string uri{ "tcp://127.0.0.1:5560" };
azmq::detail::socket_ops::bind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());
azmq::detail::socket_ops::unbind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());
std::this_thread::sleep_for(std::chrono::milliseconds(100));

std::string orig_uri{ "tcp://127.0.0.1:*" };
uri = orig_uri;
azmq::detail::socket_ops::bind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());
REQUIRE(uri != orig_uri);
azmq::detail::socket_ops::unbind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());
std::this_thread::sleep_for(std::chrono::milliseconds(100));

orig_uri = "tcp://127.0.0.1:!";
uri = orig_uri;
azmq::detail::socket_ops::bind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());
REQUIRE(uri != orig_uri);
azmq::detail::socket_ops::unbind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());
std::this_thread::sleep_for(std::chrono::milliseconds(100));

orig_uri = "tcp://127.0.0.1:*[60000-]";
uri = orig_uri;
azmq::detail::socket_ops::bind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());
REQUIRE(uri != orig_uri);
azmq::detail::socket_ops::unbind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());
std::this_thread::sleep_for(std::chrono::milliseconds(100));

orig_uri = "tcp://127.0.0.1:![-60000]";
uri = orig_uri;
azmq::detail::socket_ops::bind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());
REQUIRE(uri != orig_uri);
azmq::detail::socket_ops::unbind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());
std::this_thread::sleep_for(std::chrono::milliseconds(100));

orig_uri = "tcp://127.0.0.1:![55000-55999]";
uri = orig_uri;
azmq::detail::socket_ops::bind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());
REQUIRE(uri != orig_uri);
azmq::detail::socket_ops::unbind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

aziomq::message msg;
TEST_CASE( "Inproc Send/Receive discrete calls", "[socket_ops]" ) {
boost::system::error_code ec;
auto sb = azmq::detail::socket_ops::create_socket(ctx, ZMQ_ROUTER, ec);
REQUIRE(ec == boost::system::error_code());
auto uri = subj(BOOST_CURRENT_FUNCTION);
azmq::detail::socket_ops::bind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());

auto sc = azmq::detail::socket_ops::create_socket(ctx, ZMQ_DEALER, ec);
REQUIRE(ec == boost::system::error_code());
azmq::detail::socket_ops::connect(sc, uri, ec);
REQUIRE(ec == boost::system::error_code());

// Send multipart message
azmq::detail::socket_ops::send(snd_bufs, sc, 0, ec);
REQUIRE(ec == boost::system::error_code());

azmq::message msg;
// Identity comes first
aziomq::detail::socket_ops::receive(msg, sb, 0, ec);
BOOST_ASSERT(!ec);
BOOST_ASSERT(msg.more());
azmq::detail::socket_ops::receive(msg, sb, 0, ec);
REQUIRE(ec == boost::system::error_code());
REQUIRE(msg.more() == true);

// Then first part
aziomq::detail::socket_ops::receive(msg, sb, 0, ec);
BOOST_ASSERT(!ec);
BOOST_ASSERT(msg.more());
azmq::detail::socket_ops::receive(msg, sb, 0, ec);
REQUIRE(ec == boost::system::error_code());
REQUIRE(msg.more() == true);

// Finally second part
aziomq::detail::socket_ops::receive(msg, sb, 0, ec);
BOOST_ASSERT(!ec);
BOOST_ASSERT(!msg.more());
azmq::detail::socket_ops::receive(msg, sb, 0, ec);
REQUIRE(ec == boost::system::error_code());
REQUIRE(msg.more() == false);
}

void test_send_receive_inproc_mutable_bufseq() {
TEST_CASE( "Inproc Send/Receive Buffer Sequence", "[socket_ops]" ) {
boost::system::error_code ec;
auto sb = aziomq::detail::socket_ops::create_socket(ctx, ZMQ_ROUTER, ec);
BOOST_ASSERT(!ec);
aziomq::detail::socket_ops::bind(sb, subj(__PRETTY_FUNCTION__), ec);
BOOST_ASSERT(!ec);
auto sb = azmq::detail::socket_ops::create_socket(ctx, ZMQ_ROUTER, ec);
REQUIRE(ec == boost::system::error_code());
auto uri = subj(BOOST_CURRENT_FUNCTION);
azmq::detail::socket_ops::bind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());

auto sc = aziomq::detail::socket_ops::create_socket(ctx, ZMQ_DEALER, ec);
BOOST_ASSERT(!ec);
aziomq::detail::socket_ops::connect(sc, subj(__PRETTY_FUNCTION__), ec);
BOOST_ASSERT(!ec);
auto sc = azmq::detail::socket_ops::create_socket(ctx, ZMQ_DEALER, ec);
REQUIRE(ec == boost::system::error_code());
azmq::detail::socket_ops::connect(sc, uri, ec);
REQUIRE(ec == boost::system::error_code());

// Send and receive all message parts as a mutable buffer sequence
aziomq::detail::socket_ops::send(snd_bufs, sc, ZMQ_SNDMORE, ec);
BOOST_ASSERT(!ec);
azmq::detail::socket_ops::send(snd_bufs, sc, 0, ec);
REQUIRE(ec == boost::system::error_code());

std::array<char, 5> ident;
std::array<char, 2> part_A;
@@ -79,48 +140,52 @@ void test_send_receive_inproc_mutable_bufseq() {
boost::asio::buffer(part_A),
boost::asio::buffer(part_B)
}};
aziomq::detail::socket_ops::receive(rcv_msg_seq, sb, 0, ec);
BOOST_ASSERT(!ec);
BOOST_ASSERT('A' == part_A[0]);
BOOST_ASSERT('B' == part_B[0]);

azmq::detail::socket_ops::receive(rcv_msg_seq, sb, 0, ec);
REQUIRE(ec == boost::system::error_code());
REQUIRE('A' == part_A[0]);
REQUIRE('B' == part_B[0]);
}

void test_send_receive_inproc_msg_vect() {
TEST_CASE( "Inproc Send/Receive message vector", "[socket_ops]" ) {
boost::system::error_code ec;
auto sb = aziomq::detail::socket_ops::create_socket(ctx, ZMQ_ROUTER, ec);
BOOST_ASSERT(!ec);
aziomq::detail::socket_ops::bind(sb, subj(__PRETTY_FUNCTION__), ec);
BOOST_ASSERT(!ec);
auto sb = azmq::detail::socket_ops::create_socket(ctx, ZMQ_ROUTER, ec);
REQUIRE(ec == boost::system::error_code());
auto uri = subj(BOOST_CURRENT_FUNCTION);
azmq::detail::socket_ops::bind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());

auto sc = aziomq::detail::socket_ops::create_socket(ctx, ZMQ_DEALER, ec);
BOOST_ASSERT(!ec);
aziomq::detail::socket_ops::connect(sc, subj(__PRETTY_FUNCTION__), ec);
BOOST_ASSERT(!ec);
auto sc = azmq::detail::socket_ops::create_socket(ctx, ZMQ_DEALER, ec);
REQUIRE(ec == boost::system::error_code());
azmq::detail::socket_ops::connect(sc, uri, ec);
REQUIRE(ec == boost::system::error_code());

// Send and receive all message parts as a vector
aziomq::detail::socket_ops::send(snd_bufs, sc, ZMQ_SNDMORE, ec);
BOOST_ASSERT(!ec);
azmq::detail::socket_ops::send(snd_bufs, sc, 0, ec);
REQUIRE(ec == boost::system::error_code());

aziomq::message_vector rcv_msgs;
aziomq::detail::socket_ops::receive_more(rcv_msgs, sb, 0, ec);
BOOST_ASSERT(!ec);
BOOST_ASSERT(rcv_msgs.size() == 3);
azmq::message_vector rcv_msgs;
azmq::detail::socket_ops::receive_more(rcv_msgs, sb, 0, ec);
REQUIRE(ec == boost::system::error_code());
REQUIRE(rcv_msgs.size() == 3);
}

void test_send_receive_inproc_not_enough_bufs() {
TEST_CASE( "Inproc Send/Receive not enough buffers", "[socket_ops]" ) {
boost::system::error_code ec;
auto sb = aziomq::detail::socket_ops::create_socket(ctx, ZMQ_ROUTER, ec);
BOOST_ASSERT(!ec);
aziomq::detail::socket_ops::bind(sb, subj(__PRETTY_FUNCTION__), ec);
BOOST_ASSERT(!ec);

auto sc = aziomq::detail::socket_ops::create_socket(ctx, ZMQ_DEALER, ec);
BOOST_ASSERT(!ec);
aziomq::detail::socket_ops::connect(sc, subj(__PRETTY_FUNCTION__), ec);
BOOST_ASSERT(!ec);
auto sb = azmq::detail::socket_ops::create_socket(ctx, ZMQ_ROUTER, ec);
REQUIRE(ec == boost::system::error_code());
auto uri = subj(BOOST_CURRENT_FUNCTION);
azmq::detail::socket_ops::bind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());

auto sc = azmq::detail::socket_ops::create_socket(ctx, ZMQ_DEALER, ec);
REQUIRE(ec == boost::system::error_code());
azmq::detail::socket_ops::connect(sc, uri, ec);
REQUIRE(ec == boost::system::error_code());

// Verify that we get an error on multipart with too few bufs in seq
aziomq::detail::socket_ops::send(snd_bufs, sc, ZMQ_SNDMORE, ec);
BOOST_ASSERT(!ec);
azmq::detail::socket_ops::send(snd_bufs, sc, 0, ec);
REQUIRE(ec == boost::system::error_code());

std::array<char, 5> ident;
std::array<char, 2> part_A;
@@ -129,21 +194,6 @@ void test_send_receive_inproc_not_enough_bufs() {
boost::asio::buffer(ident),
boost::asio::buffer(part_A)
}};
aziomq::detail::socket_ops::receive(rcv_msg_seq_2, sb, ZMQ_RCVMORE, ec);
BOOST_ASSERT(ec);
}

int main(int argc, char **argv) {
std::cout << "Testing basic socket operations...";
try {
test_send_receive_inproc_discrete_calls();
test_send_receive_inproc_mutable_bufseq();
test_send_receive_inproc_msg_vect();
test_send_receive_inproc_not_enough_bufs();
} catch (std::exception const& e) {
std::cout << "Failure\n" << e.what() << std::endl;
return 1;
}
std::cout << "Success" << std::endl;
return 0;
azmq::detail::socket_ops::receive(rcv_msg_seq_2, sb, 0, ec);
REQUIRE(ec != boost::system::error_code());
}