Внешняя компонента может формировать файлы в бинарном формате AVRO, который поддерживает передачу данных по схемам, которые записываются в заголовок файла формата AVRO при его создании.
Сохранение схемы, которая будет использоваться для AVRO.
СохранитьСхемуAVRO(ИмяСхемыJSON, СхемаJSON);
ИмяСхемыJSON - Строка. Обязательный. Имя схемы.
СхемаJSON - Строка. Обязательный. Схема, по которой формируются данные (см. параметр ДанныеJSON процедуры ПреобразоватьВФорматAVRO). Схема должна быть представлена в следующем виде:
{
"type": "record",
"name": "user",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "phone",
"type": [
"null",
"string"
]
}
]
}
Возвращает значение типа булево - "Истина", если операция выполнена успешно. "Ложь", если была ошибка. Текст ошибки можно получить функцией ПолучитьСообщениеОбОшибке.
Пример использования метода СохранитьСхемуAVRO:
ИмяСхемыJSON = "user";
СхемаJSON = "{
| ""type"": ""record"",
| ""name"": ""user"",
| ""fields"": [
| {
| ""name"": ""id"",
| ""type"": ""string""
| },
| {
| ""name"": ""name"",
| ""type"": ""string""
| },
| {
| ""name"": ""phone"",
| ""type"": [
| ""null"",
| ""string""
| ]
| }
| ]
|}";
Результат = Компонента.СохранитьСхемуAVRO(
ИмяСхемыJSON,
СхемаJSON
);
Если Не Результат Тогда
ВызватьИсключение Компонента.ПолучитьСообщениеОбОшибке();
КонецЕсли;
ПреобразоватьВФорматAVRO(ДанныеJSON, ИмяСхемыJSON);
ДанныеJSON - Строка. Обязательный. Данные в формате JSON, представленные в следующем виде:
{
"id": ["1", "2", "3"],
"name": ["Ivan", "Maksim", "Vitaly"],
"phone": ["+79115672342", null, "+79185172317"],
"age": [null, 45, 25]
}
Такой массив определяет 3 объекта:
[
{
"id": "1",
"name": "Vasiliy",
"phone": "+79115672342",
"age": null
},
{
"id": "2",
"name": "Maksim",
"phone": null,
"age": 45
},
{
"id": "3",
"name": "Vitaly",
"phone": "+79185172317",
"age": 25
}
]
Пример использования метода ПреобразоватьВФорматAVRO:
ДанныеJSON = "{
| ""id"": [""1"", ""2"", ""3""],
| ""name"": [""Ivan"", ""Maksim"", ""Vitaly""],
| ""phone"": [""+79115672342"", null, ""+79185172317""],
| ""age"": [null, 45, 25]
|}";
ИмяСхемыJSON = "user";
Результат = Компонента.ПреобразоватьВФорматAVRO(
ДанныеJSON,
ИмяСхемыJSON
);
Если Не Результат Тогда
ВызватьИсключение Компонента.ПолучитьСообщениеОбОшибке();
КонецЕсли;
Обратите внимание, что:
- в каждом поле должен быть массив с одинаковым количеством элементов;
- поля должны быть перечислены строго в той последовательности, в которой определены в схеме (описание схемы далее);
- поддерживаются следующие типы: AVRO_STRING, AVRO_LONG, AVRO_INT, AVRO_FLOAT, AVRO_BOOL, AVRO_NULL, AVRO_UNION.
ИмяСхемыJSON - Строка. Обязательный. Имя схемы, которая ранее был сохранена процедурой СохранитьСхемуAVRO
Возвращает значение типа булево - "Истина", если операция выполнена успешно. "Ложь", если была ошибка. Текст ошибки можно получить функцией ПолучитьСообщениеОбОшибке.
В случае успешного формирования файла AVRO данные можно отправить в соответствующий топик в Kafka при помощи методов.
Для асинхронной отправки сообщения необходимо использовать метод ОтправитьСообщениеAVRO. Метод возвращает булево значение "Истина" практически во всех случаях, кроме следующих кейсов:
- Размер сообщения превышает установленный максимальный размер: message.max.bytes
- Запрошенный раздел (партиция) неизвестен в кластере Kafka
- Указанная тема не найдена в кластере Kafka (только если отключено автосоздание топиков на брокере)
- Произошла исключительная ситуация. Текст ошибки можно получить функцией ПолучитьСообщениеОбОшибке.
ОтправитьСообщениеAVRO(Топик, Партиция, Ключ, Заголовки)
Топик - Строка.
Партиция - Число. Если не указано значение, то по умолчанию запись производится в 0 раздел.
Ключ - Строка. Необязательный параметр. Ключ сообщения, который может использоваться в темах с уплотнением сообщений по ключу.
Заголовки - Строка. Перечень заголовков, состоящих из ключа и значения в следующем формате "ключ1,значение1;ключ2,значение2"
В случае асинхронной отправки - результаты доставки можно получить с помощью следующих архитектурных подходов:
- При создании экземпляра продюсера, создаем так же экземпляр консьюмера, подписываемся на тему, куда производится отправка. После отправки каждого сообщения - получаем с помощью экземпляра консьюмера сообщение, вычисляем хэш отправленного и полученного сообщения (или же используем ключи сообщений). Сравниваем хэши или ключи - в случае совпадения - считаем сообщение доставленным.
- Настраиваем считывание из файла лога. Если при отправке был указан key сообщения, в лог будет выведен key и результат доставки. Если key не был указан - в лог будет выведен хэш (md5) сообщения.
Порядок полей в логе: Дата время: Хэш или ключ сообщения, Статус доставки:Причина, Размер сообщения в байтах, Топик, Смещение (-1001 если все плохо), Партиция (-1 если все плохо), Брокер (-1 если все плохо).
Примеры строк в логе доставки
2023-09-24 09:59:18.54614692 Key:205f03b5-9c64-4752-9fb4-0b8695789648, Status:Persisted, Details:Success, Size:28, Topic:testTopic, Offset:20553, Partition:0, BrokerID:0
2023-09-24 10:01:57.958599455 Hash:ed59c8ed6c3209e9e6e434c2d9fbe52a, Status:Persisted, Details:Success, Size:28, Topic:testTopic, Offset:20554, Partition:0, BrokerID:0
2023-09-24 10:18:32.495522885 Error: Configuration property "socket.blocking.max.ms" value 0 is outside allowed range 1..60000
2023-09-24 12:56:10.311876162 Hash:dbee3cf28689a286066db820f6a8583e, Status:NotPersisted, Details:Local Message timed out, Size:9546, Topic:testTopic, Offset:-1001, Partition:-1, BrokerID:-1
Пример использования метода ОтправитьСообщениеAVRO:
Топик = "USER";
Результат = Компонента.ОтправитьСообщениеAVRO(
Топик,
,
"key"
);
Если Не Результат Тогда
ВызватьИсключение Компонента.ПолучитьСообщениеОбОшибке();
КонецЕсли;
Для синхронной отправки необходимо использовать метод ОтправитьСообщениеAVROСОжиданиемРезультата, состав параметров аналогичен методу асинхронной отправки.
Метод возвращает значение типа булево - "Истина", если при доставке все брокеры вернули подтверждение о приемке сообщения, "Ложь", если хотя бы один брокер не подтвердил получение или не было получено подтверждение от брокеров в течении 20 секунд. Текст ошибки можно получить функцией ПолучитьСообщениеОбОшибке.
Дополнительно, в рамках синхронной отправки рекомендуется устанавливать параметры queue.buffering.max.ms и socket.blocking.max.ms в самые минимальные значения, чтобы во внутреннем буфере сообщений не происходило накопление данных и сообщения могли отправляться сразу, минуя буфер.
Так же следует обратить внимание на параметр message.timeout.ms. Данным параметром мы указываем, сколько времени в мс. мы должны ждать подтверждения от брокеров. Брокеры могут и не отправить подтверждения, если, к примеру, они не доступны. В этом случае, рекомендуется устанавливать данный параметр в диапозоне от 1000 до 20000 мс. В противном случае - недоступности брокеров и включеном логировании в компоненте - в логе продюсера не будет никакой информации об проблеме.
УстановитьПараметр("queue.buffering.max.ms", "1");
УстановитьПараметр("socket.blocking.max.ms", "1");
УстановитьПараметр("message.timeout.ms", "5000");
ОтправитьСообщениеAVROСОжиданиемРезультата(Сообщение, Топик, Партиция, Ключ, Заголовки)
Список параметров метода идентичен методу ОтправитьСообщениеAVRO
Файл AVRO можно сохранить в файл:
СохранитьФайлAVRO(ИмяФайла)
ИмяФайла - Строка. Обязательный.