Skip to content
Merged
5 changes: 3 additions & 2 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ if [ -z "$sanitizer" ]; then
fi

cmake .. \
-DCMAKE_BUILD_TYPE=${build_type} \
-DCMAKE_BUILD_TYPE="$build_type" \
-DENABLE_PROTON_ALL=OFF \
-DENABLE_PROTON_SERVER=ON \
-DENABLE_PROTON_CLIENT=ON \
Expand Down Expand Up @@ -73,5 +73,6 @@ cmake .. \
-DENABLE_SNOWFLAKE_FUNCS=ON \
-DENABLE_ENCRYPT_DECRYPT_FUNCS=ON \
-DENABLE_DEBUG_FUNCS=ON \
-DENABLE_URL_FUNCS=ON
-DENABLE_URL_FUNCS=ON \
-DENABLE_AVRO=ON

1 change: 1 addition & 0 deletions docker/packager/packager
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def parse_env_variables(build_type, compiler, sanitizer, package_type, image_typ
cmake_flags.append('-DENABLE_KRB5=ON')
cmake_flags.append('-DENABLE_BROTLI=ON')
cmake_flags.append('-DENABLE_S3=ON')
cmake_flags.append('-DENABLE_AVRO=ON')

# krb5: Disabled in environments other than Linux and native Darwin.
# Reference: contrib/krb5-cmake/CMakeLists.txt:3
Expand Down
2 changes: 2 additions & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map proton Enum and CapnProto Enum", 0)\
M(String, rawstore_time_extraction_type, "", "_tp_time extraction type (string, json, regex)", 0) \
M(String, rawstore_time_extraction_rule, "", "_tp_time extraction rule (string, json, regex)", 0) \
M(URI, kafka_schema_registry_url, "", "For ProtobufSingle format: Kafka Schema Registry URL.", 0) \
M(String, kafka_schema_registry_credentials, "", "Credetials to be used to fetch schema from the `kafka_schema_registry_url`, with format '<username>:<password>'.", 0) \
/** proton: ends. */
// End of FORMAT_FACTORY_SETTINGS
// Please add settings non-related to formats into the COMMON_SETTINGS above.
Expand Down
4 changes: 4 additions & 0 deletions src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.schema.format_schema = settings.format_schema;
format_settings.schema.format_schema_path = context->getFormatSchemaPath();
format_settings.schema.is_server = context->hasGlobalContext() && (context->getGlobalContext()->getApplicationType() == Context::ApplicationType::SERVER);
/// proton: starts
format_settings.schema.kafka_schema_registry_url = settings.kafka_schema_registry_url.toString();
format_settings.schema.kafka_schema_registry_credentials = settings.kafka_schema_registry_credentials;
/// proton: ends
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
format_settings.template_settings.resultset_format = settings.format_template_resultset;
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
Expand Down
29 changes: 14 additions & 15 deletions src/Formats/FormatSchemaFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <IO/ReadBufferFromString.h>
#include <Processors/Formats/ISchemaWriter.h>

#include <format>

namespace DB
{

Expand All @@ -18,24 +20,12 @@ extern const int INVALID_DATA;

namespace
{

String basename(const std::filesystem::path & path)
{
return path.filename().replace_extension().string();
}

String formatSchemaValidationErrors(SchemaValidationErrors errors)
{
assert(!errors.empty());
String ret;
for (size_t i = 0; i < errors.size(); ++i)
{
if (i > 0)
ret.append("; ");
ret.append(fmt::format("line: {}, columns: {}, error: {}", errors[i].line, errors[i].col, errors[i].error));
}

return ret;
}
}

void FormatSchemaFactory::registerSchema(const String & schema_name, const String & format, std::string_view schema_body, ExistsOP exists_op, ContextPtr & context)
Expand All @@ -53,8 +43,17 @@ void FormatSchemaFactory::registerSchema(const String & schema_name, const Strin
std::lock_guard lock(mutex);
auto writer = FormatFactory::instance().getExternalSchemaWriter(format, schema_body, context, format_settings);
assert(writer); /* confirmed with checkSchemaType */
if (auto errors = writer->validate(); !errors.empty())
throw Exception(ErrorCodes::INVALID_DATA, "Invalid Protobuf schema, errors: {}", formatSchemaValidationErrors(errors));

try
{
writer->validate();
}
catch (DB::Exception & e)
{
e.addMessage(std::format("{} schema {} was invalid", format, schema_name));
e.rethrow();
}


auto result = writer->write(exists_op == ExistsOP::Replace);
if (!result && exists_op == ExistsOP::Throw)
Expand Down
4 changes: 4 additions & 0 deletions src/Formats/FormatSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ struct FormatSettings
std::string format_schema;
std::string format_schema_path;
bool is_server = false;
/// proton: starts
std::string kafka_schema_registry_url;
std::string kafka_schema_registry_credentials;
/// proton: ends
} schema;

struct
Expand Down
115 changes: 115 additions & 0 deletions src/Formats/KafkaSchemaRegistry.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#include <Formats/KafkaSchemaRegistry.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadHelpers.h>
#include <format>

#include <Poco/JSON/Parser.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <boost/algorithm/string/predicate.hpp>

namespace DB
{

namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}

KafkaSchemaRegistry::KafkaSchemaRegistry(const String & base_url_, const String & credentials_)
: base_url(base_url_)
, logger(&Poco::Logger::get("KafkaSchemaRegistry"))
{
assert(!base_url.empty());

if (auto pos = credentials_.find(':'); pos == credentials_.npos)
credentials.setUsername(credentials_);
else
{
credentials.setUsername(credentials_.substr(0, pos));
credentials.setPassword(credentials_.substr(pos + 1));
}
}

String KafkaSchemaRegistry::fetchSchema(UInt32 id)
{
try
{
try
{
Poco::URI url(base_url, std::format("/schemas/ids/{}", id));
LOG_TRACE(logger, "Fetching schema id = {}", id);

/// One second for connect/send/receive. Just in case.
ConnectionTimeouts timeouts({1, 0}, {1, 0}, {1, 0});

Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(url.getHost());

if (!credentials.empty())
credentials.authenticate(request);

auto session = makePooledHTTPSession(url, timeouts, 1);
std::istream * response_body{};
try
{
session->sendRequest(request);

Poco::Net::HTTPResponse response;
response_body = receiveResponse(*session, request, response, false);
}
catch (const Poco::Exception & e)
{
/// We use session data storage as storage for exception text
/// Depend on it we can deduce to reconnect session or reresolve session host
session->attachSessionData(e.message());
throw;
}
Poco::JSON::Parser parser;
auto json_body = parser.parse(*response_body).extract<Poco::JSON::Object::Ptr>();
auto schema = json_body->getValue<std::string>("schema");
LOG_TRACE(logger, "Successfully fetched schema id = {}\n{}", id, schema);
return schema;
}
catch (const Exception &)
{
throw;
}
catch (const Poco::Exception & e)
{
throw Exception(Exception::CreateFromPocoTag{}, e);
}
}
catch (Exception & e)
{
e.addMessage(std::format("while fetching schema with id {}", id));
throw;
}
}

UInt32 KafkaSchemaRegistry::readSchemaId(ReadBuffer & in)
{
uint8_t magic;
uint32_t schema_id;

try
{
readBinaryBigEndian(magic, in);
readBinaryBigEndian(schema_id, in);
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::CANNOT_READ_ALL_DATA)
/// empty or incomplete message without magic byte or schema id
throw Exception(ErrorCodes::INCORRECT_DATA, "Missing magic byte or schema identifier.");
else
throw;
}

if (magic != 0x00)
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid magic byte before schema identifier."
" Must be zero byte, found 0x{:x} instead", magic);

return schema_id;
}

}
29 changes: 29 additions & 0 deletions src/Formats/KafkaSchemaRegistry.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include <IO/ReadBuffer.h>

#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/URI.h>

namespace DB
{

/// A helper class helps working with Kafka schema registry.
class KafkaSchemaRegistry final
{
public:
static UInt32 readSchemaId(ReadBuffer & in);

/// \param credentials_ is expected to be formatted in "<username>:<password>".
KafkaSchemaRegistry(const String & base_url_, const String & credentials_);

String fetchSchema(UInt32 id);

private:
Poco::URI base_url;
Poco::Net::HTTPBasicCredentials credentials;

Poco::Logger* logger;
};

}
46 changes: 14 additions & 32 deletions src/Formats/ProtobufSchemas.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include "config.h"

#if USE_PROTOBUF
# include <Common/Exception.h>
# include <Common/LRUCache.h>
# include <Formats/FormatSchemaInfo.h>
# include <Processors/Formats/ISchemaWriter.h>
# include <Formats/KafkaSchemaRegistry.h>
# include <Formats/ProtobufSchemas.h>
# include <Processors/Formats/ISchemaWriter.h>
# include <google/protobuf/compiler/importer.h>
# include <Common/Exception.h>


namespace DB
Expand All @@ -14,32 +16,7 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int CANNOT_PARSE_PROTOBUF_SCHEMA;
/// proton: starts
extern const int INVALID_DATA;
/// proton: ends
}

/// proton: starts
namespace
{
class ErrorCollector final: public google::protobuf::io::ErrorCollector
{
private:
SchemaValidationErrors errors;

public:
void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override
{
errors.emplace_back(line, column, message);
}

const SchemaValidationErrors & getErrors() const
{
return errors;
}
};
}
/// proton: starts

ProtobufSchemas & ProtobufSchemas::instance()
{
Expand Down Expand Up @@ -106,17 +83,22 @@ const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForFormatSch
}

/// proton: starts
SchemaValidationErrors ProtobufSchemas::validateSchema(std::string_view schema)
/// Overrides google::protobuf::io::ErrorCollector:
void ProtobufSchemas::AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message)
{
throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA,
"Cannot parse schema, found an error at line {}, column {}, error: {}", line, column, message);
}

void ProtobufSchemas::validateSchema(std::string_view schema)
{
google::protobuf::io::ArrayInputStream input{schema.data(), static_cast<int>(schema.size())};
ErrorCollector error_collector;
google::protobuf::io::Tokenizer tokenizer(&input, &error_collector);
google::protobuf::io::Tokenizer tokenizer(&input, this);
google::protobuf::FileDescriptorProto descriptor;
google::protobuf::compiler::Parser parser;

parser.RecordErrorsTo(&error_collector);
parser.RecordErrorsTo(this);
parser.Parse(&tokenizer, &descriptor);
return error_collector.getErrors();
}
/// proton: ends

Expand Down
21 changes: 14 additions & 7 deletions src/Formats/ProtobufSchemas.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
#include <base/types.h>
#include <boost/noncopyable.hpp>

/// proton: starts
#include <Formats/KafkaSchemaRegistry.h>

#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/tokenizer.h>
/// proton: ends


namespace google
{
Expand All @@ -21,28 +28,28 @@ namespace protobuf
namespace DB
{
class FormatSchemaInfo;
/// proton: starts
struct SchemaValidationError;
using SchemaValidationErrors = std::vector<SchemaValidationError>;
/// proton: ends

/** Keeps parsed google protobuf schemas parsed from files.
* This class is used to handle the "Protobuf" input/output formats.
*/
class ProtobufSchemas : private boost::noncopyable
class ProtobufSchemas : public google::protobuf::io::ErrorCollector /* proton: updated */
{
public:
static ProtobufSchemas & instance();

ProtobufSchemas();
~ProtobufSchemas();
~ProtobufSchemas() override; /* proton: updated */

/// Parses the format schema, then parses the corresponding proto file, and returns the descriptor of the message type.
/// The function never returns nullptr, it throws an exception if it cannot load or parse the file.
const google::protobuf::Descriptor * getMessageTypeForFormatSchema(const FormatSchemaInfo & info);

/// proton: starts
SchemaValidationErrors validateSchema(std::string_view schema);
void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override;

/// Validates the given schema and throw a DB::Exception if the schema is invalid.
/// The exception will contain the first error encountered when validating the schema, i.e. there could be more errors.
void validateSchema(std::string_view schema);
/// proton: ends
private:
class ImporterWithSourceTree;
Expand Down
1 change: 1 addition & 0 deletions src/KafkaLog/KafkaWALSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ struct KafkaWALSettings
settings.push_back(fmt::format("shared_subscription_flush_threshold_bytes={}", shared_subscription_flush_threshold_bytes));
settings.push_back(fmt::format("shared_subscription_flush_threshold_ms={}", shared_subscription_flush_threshold_ms));
settings.push_back(fmt::format("auth.security.protocol={}", auth.security_protocol));
settings.push_back(fmt::format("auth.sasl.mechanism={}", auth.sasl_mechanism));
settings.push_back(fmt::format("auth.username={}", auth.username));
settings.push_back(fmt::format("auth.password={}", auth.password));
settings.push_back(fmt::format("auth.ssl.ca.location={}", auth.ssl_ca_cert_file));
Expand Down
Loading