Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect UDP maximum packet size when sending batched points #185

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ jobs:
run: script/ci_setup.sh
- name: Install Boost
if: ${{ matrix.boost == true }}
run: apt-get install -y libboost-system-dev
run: |
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the switch from apt-get to apt? Did you experience any connection issues?

apt update
apt -o Acquire::Retries=3 install -y libboost-system-dev
- name: Build
run: script/ci_build.sh -DINFLUXCXX_WITH_BOOST=${{ matrix.boost }}
- name: Check deployment as cmake subdirectory
Expand Down Expand Up @@ -58,7 +60,8 @@ jobs:
- name: Setup
run: |
pip install -U conan
sudo apt-get install -y libcurl4-openssl-dev
sudo apt update
sudo apt -o Acquire::Retries=3 install -y libcurl4-openssl-dev
echo "~/.local/bin" >> $GITHUB_PATH
- name: Setup Conan
run: conan profile detect
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/systemtest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ jobs:
INFLUXDB_HTTP_AUTH_ENABLED: ${{ matrix.auth_enabled }}
INFLUXDB_ADMIN_USER: st_admin
INFLUXDB_ADMIN_PASSWORD: st_admin_pw
INFLUXDB_UDP_ENABLED: true
INFLUXDB_UDP_DATABASE: st_udp
INFLUXDB_UDP_BATCH_TIMEOUT: 10ms
env:
INFLUXDBCXX_SYSTEMTEST_HOST: influxdb
INFLUXDBCXX_SYSTEMTEST_USER: st_admin
Expand Down
3 changes: 0 additions & 3 deletions include/InfluxDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ namespace influxdb
/// Underlying transport UDP/HTTP/Unix socket
std::unique_ptr<Transport> mTransport;

/// Transmits string over transport
void transmit(std::string&& point);

/// List of global tags
std::string mGlobalTags;

Expand Down
3 changes: 3 additions & 0 deletions include/Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ namespace influxdb
/// Sends string blob
virtual void send(std::string&& message) = 0;

/// Maximum message size which can be sent using the send method
virtual std::size_t getMaxMessageSize() const = 0;

/// Sends request
virtual std::string query([[maybe_unused]] const std::string& query)
{
Expand Down
11 changes: 9 additions & 2 deletions src/BoostSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,19 @@ namespace influxdb::internal

std::unique_ptr<Transport> withUdpTransport(const http::url& uri)
{
return std::make_unique<transports::UDP>(uri.host, uri.port);
static constexpr std::uint16_t INFLUXDB_UDP_PORT{8089};
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a port as something a caller has to provide (like the host address) and not use any assumptions – unless there's a really, really good reason to do so.

(same for tcp below)

const std::uint16_t port{
http::url::PORT_NOT_SET == uri.port ? INFLUXDB_UDP_PORT : static_cast<std::uint16_t>(uri.port)};
return std::make_unique<transports::UDP>(uri.host, port);
}

std::unique_ptr<Transport> withTcpTransport(const http::url& uri)
{
return std::make_unique<transports::TCP>(uri.host, uri.port);
// Default QuestDB TCP port (TCP support was added for this purpose)
static constexpr std::uint16_t QUESTDB_TCP_PORT{9009};
const std::uint16_t port{
http::url::PORT_NOT_SET == uri.port ? QUESTDB_TCP_PORT : static_cast<std::uint16_t>(uri.port)};
return std::make_unique<transports::TCP>(uri.host, port);
}

std::unique_ptr<Transport> withUnixSocketTransport(const http::url& uri)
Expand Down
7 changes: 7 additions & 0 deletions src/HTTP.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "HTTP.h"
#include "InfluxDBException.h"

#include <limits>

namespace influxdb::transports
{
namespace
Expand Down Expand Up @@ -107,6 +109,11 @@ namespace influxdb::transports
checkResponse(response);
}

std::size_t HTTP::getMaxMessageSize() const
{
return (std::numeric_limits<std::size_t>::max)();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of handling a huge number, we could simple use 0 (= off) and skip any splitting logic.

}

void HTTP::setProxy(const Proxy& proxy)
{
session.SetProxies(cpr::Proxies{{"http", proxy.getProxy()}, {"https", proxy.getProxy()}});
Expand Down
3 changes: 3 additions & 0 deletions src/HTTP.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ namespace influxdb::transports
/// \throw InfluxDBException when send fails
void send(std::string&& lineprotocol) override;

/// Returns maximum message size
std::size_t getMaxMessageSize() const override;

/// Queries database
/// \throw InfluxDBException when query fails
std::string query(const std::string& query) override;
Expand Down
118 changes: 85 additions & 33 deletions src/InfluxDB.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,91 @@
#include "InfluxDBException.h"
#include "LineProtocol.h"
#include "BoostSupport.h"
#include <iostream>
#include <memory>
#include <string>

namespace influxdb
{
namespace
{
/// Group the points into the largest possible line-protocol messages that can be sent using the transport.
template <typename PointContainer>
void TransmitBatch(std::unique_ptr<Transport>& transport, const std::string& globalTags, PointContainer&& points)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the function doesn't change the unique_ptr itself, pass the transport by const-ref instead.

Also, please use transmitBatch() (lower case t) naming – I know this isn't used consistent everywhere (yet) :-)

(both same for the function below)

{
LineProtocol formatter{globalTags};
std::string lineProtocol;
bool appendNewLine{false};
bool messageSizeExceeded{false};

const auto maxMessageSize{transport->getMaxMessageSize()};
for (const auto& point : points)
{
auto formattedPoint{formatter.format(point)};
auto GetRequiredSize{[&appendNewLine](const std::string& fp) -> std::size_t
{
// Have to recalculate because the point may fit in a new message if
// it doesn't have a preceding newline.
return appendNewLine ? 1 + fp.size() : fp.size();
}};

while (maxMessageSize < lineProtocol.size() + GetRequiredSize(formattedPoint))
{
// Appending the current point would exceed the maximum message size.
// Flush the existing points and try again.
if (!lineProtocol.empty())
{
// If there is some existing content perhaps the current point will
// fit in a new message.
transport->send(std::move(lineProtocol));
lineProtocol.clear();
appendNewLine = false;
}
else
{
// Message is empty, so the current point is too large to be sent using this transport.
// Rather than throwing all the points away, we'll skip the current point and continue
// then raise an exception at the end.
messageSizeExceeded = true;
formattedPoint.clear();
break;
}
}

if (!formattedPoint.empty())
{
if (appendNewLine)
{
lineProtocol += '\n';
}
lineProtocol += formattedPoint;
appendNewLine = true;
}
}

// Send the last batch of points
if (!lineProtocol.empty())
{
transport->send(std::move(lineProtocol));
}

// If any points were too large to be sent using this transport, throw an exception.
if (messageSizeExceeded)
{
throw InfluxDBException{"One or more points exceeded the transport's maximum transmission size"};
}
}

void TransmitPoint(std::unique_ptr<Transport>& transport, const std::string& globalTags, Point&& point)
{
LineProtocol formatter{globalTags};
std::string formattedPoint{formatter.format(point)};
if (formattedPoint.size() > transport->getMaxMessageSize())
{
throw InfluxDBException{"Point is too large to be sent using this transport"};
}
transport->send(std::move(formattedPoint));
}
}

InfluxDB::InfluxDB(std::unique_ptr<Transport> transport)
: mPointBatch{},
Expand Down Expand Up @@ -69,25 +148,13 @@ namespace influxdb
{
if (mIsBatchingActivated && !mPointBatch.empty())
{
transmit(joinLineProtocolBatch());
// Make sure that mPointBatch is cleared even if an exception is thrown during transmission.
auto transmissionBatch{std::move(mPointBatch)};
mPointBatch.clear();
TransmitBatch(mTransport, mGlobalTags, std::move(transmissionBatch));
}
}

std::string InfluxDB::joinLineProtocolBatch() const
{
std::string joinedBatch;

LineProtocol formatter{mGlobalTags};
for (const auto& point : mPointBatch)
{
joinedBatch += formatter.format(point) + "\n";
}

joinedBatch.erase(std::prev(joinedBatch.end()));
return joinedBatch;
}


void InfluxDB::addGlobalTag(std::string_view name, std::string_view value)
{
Expand All @@ -100,11 +167,6 @@ namespace influxdb
mGlobalTags += LineProtocol::EscapeStringElement(LineProtocol::ElementType::TagValue, value);
}

void InfluxDB::transmit(std::string&& point)
{
mTransport->send(std::move(point));
}

void InfluxDB::write(Point&& point)
{
if (mIsBatchingActivated)
Expand All @@ -113,8 +175,7 @@ namespace influxdb
}
else
{
LineProtocol formatter{mGlobalTags};
transmit(formatter.format(point));
TransmitPoint(mTransport, mGlobalTags, std::move(point));
}
}

Expand All @@ -129,16 +190,7 @@ namespace influxdb
}
else
{
std::string lineProtocol;
LineProtocol formatter{mGlobalTags};

for (const auto& point : points)
{
lineProtocol += formatter.format(point) + "\n";
}

lineProtocol.erase(std::prev(lineProtocol.end()));
transmit(std::move(lineProtocol));
TransmitBatch(mTransport, mGlobalTags, points);
}
}

Expand Down
6 changes: 1 addition & 5 deletions src/InfluxDBFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@ namespace influxdb
};

auto urlCopy = url;
http::url parsedUrl = http::ParseHttpUrl(urlCopy);
if (parsedUrl.protocol.empty())
{
throw InfluxDBException("Ill-formed URI");
}
http::url parsedUrl{http::ParseHttpUrl(urlCopy)};

const auto iterator = map.find(parsedUrl.protocol);
if (iterator == map.end())
Expand Down
8 changes: 7 additions & 1 deletion src/TCP.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@

#include "TCP.h"
#include "InfluxDBException.h"
#include <limits>
#include <string>

namespace influxdb::transports
{
namespace ba = boost::asio;

TCP::TCP(const std::string& hostname, int port)
TCP::TCP(const std::string& hostname, std::uint16_t port)
: mSocket(mIoService)
{
ba::ip::tcp::resolver resolver(mIoService);
Expand Down Expand Up @@ -72,4 +73,9 @@ namespace influxdb::transports
}
}

std::size_t TCP::getMaxMessageSize() const
{
return (std::numeric_limits<std::size_t>::max)();
}

} // namespace influxdb::transports
5 changes: 4 additions & 1 deletion src/TCP.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "Transport.h"

#include <cstdint>
#include <boost/asio.hpp>
#include <chrono>
#include <string>
Expand All @@ -41,11 +42,13 @@ namespace influxdb::transports
{
public:
/// Constructor
TCP(const std::string& hostname, int port);
TCP(const std::string& hostname, std::uint16_t port);

/// Sends blob via TCP
void send(std::string&& message) override;

std::size_t getMaxMessageSize() const override;

/// check if socket is connected
bool is_connected() const;

Expand Down
45 changes: 41 additions & 4 deletions src/UDP.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,38 @@

#include "UDP.h"
#include "InfluxDBException.h"
#include <algorithm>
#include <limits>
#include <string>

namespace influxdb::transports
{
namespace
{
std::size_t GetSocketSendBufferSize(const boost::asio::ip::udp::socket& socket)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor; getSocketSendBufferSize() (see naming comment above).

{
boost::asio::ip::udp::socket::send_buffer_size sendBufferSizeOption;
socket.get_option(sendBufferSizeOption);
int sendBufferSize{sendBufferSizeOption.value()};
return (sendBufferSize >= 0 ? static_cast<std::size_t>(sendBufferSize) : 0U);
}
} // namespace


UDP::UDP(const std::string& hostname, int port)
UDP::UDP(const std::string& hostname, std::uint16_t port)
: mSocket(mIoService, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0))
{
boost::asio::ip::udp::resolver resolver(mIoService);
boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), hostname, std::to_string(port));
boost::asio::ip::udp::resolver::iterator resolverInerator = resolver.resolve(query);
mEndpoint = *resolverInerator;
try
{
// "A successful call to this function is guaranteed to return a non-empty range."
// https://www.boost.org/doc/libs/1_81_0/doc/html/boost_asio/reference/ip__basic_resolver/resolve/overload7.html
mEndpoint = *(resolver.resolve(boost::asio::ip::udp::v4(), hostname, std::to_string(port)));
}
catch (const boost::system::system_error& e)
{
throw InfluxDBException(e.what());
}
}

void UDP::send(std::string&& message)
Expand All @@ -53,4 +73,21 @@ namespace influxdb::transports
}
}

std::size_t UDP::getMaxMessageSize() const
{
// UDP header has a 16-bit length field
static constexpr std::size_t maxLengthValue{(std::numeric_limits<std::uint16_t>::max)()};
static constexpr std::size_t udpHeaderSize{8};
// Currently only IPv4 is supported
static constexpr std::size_t ipv4HeaderSize{20};
// Max UDP data size for IPv4 is 65535 - 8 - 20 = 65507
static constexpr std::size_t maxUDPDataSize{maxLengthValue - udpHeaderSize - ipv4HeaderSize};

// MacOS has a default UDP send buffer size which is smaller than maxUDPDataSize
// this can be changed by setting the sysctl net.inet.udp.maxdgram or setting the
// SO_SNDBUF option on a per socket basis. For our purposes we can just use the
// smaller of maxUDPDataSize and the send buffer size for the socket.
return std::min(maxUDPDataSize, GetSocketSendBufferSize(mSocket));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about other systems? Is the behaviour consistent across OS types?

}

} // namespace influxdb::transports
Loading