Skip to content

Latest commit

 

History

History
77 lines (59 loc) · 4.1 KB

modern_consumer.md

File metadata and controls

77 lines (59 loc) · 4.1 KB

Чтение сообщений c использованием новых методов

	Инстанс = "Integration";
	РазрешеноСлушать = Константы.Слушать.Получить();

	Попытка
		Компонента = Новый(СтрШаблон("AddIn.%1.simpleKafka1C", Инстанс));   
	Исключение
		Подключено = ПодключитьВнешнююКомпоненту("/home/shmell/Source/Simple-Kafka_Adapter/build/libSimpleKafka1C.so", Инстанс, ТипВнешнейКомпоненты.Native, ТипПодключенияВнешнейКомпоненты.Изолированно);

		Если Подключено Тогда
			Компонента = Новый(СтрШаблон("AddIn.%1.simpleKafka1C", Инстанс));  	
		КонецЕсли;
	КонецПопытки;

	Если Компонента = Неопределено Тогда
		Возврат;	
	КонецЕсли;
	
	Компонента.УстановитьПараметр("group.id", "testCompUbuntu2");   
	Компонента.КаталогЛогов = "/home/shmell/tmp/";

	Брокер = "192.168.100.101:9092";
	Топики = "testTopic";

	Результат = Компонента.ИнициализироватьКонсьюмера(Брокер, Топики); 
	Если НЕ Результат Тогда
		ОписаниеОшибки = Компонента.ПолучитьСообщениеОбОшибке();
		ВызватьИсключение ОписаниеОшибки;	
	КонецЕсли;
	
	ВернутьДвоичныеДанные = Ложь; 
	
	Пока РазрешеноСлушать Цикл
		
		Попытка       
			СообщениеПрочитано = Компонента.ПрочитатьСообщение();
		Исключение   
			ОписаниеОшибки = Компонента.ПолучитьСообщениеОбОшибке();
			Компонента = Неопределено;                                                         
			ВызватьИсключение ОписаниеОшибки() + " " + ОписаниеОшибки;
		КонецПопытки;	 
		
		Если НЕ СообщениеПрочитано Тогда
			Продолжить;
		КонецЕсли;
				
		Сообщение = Компонента.ПолучитьДанныеСообщения(ВернутьДвоичныеДанные); // сообщение как оно есть (если передать Истина), если не указывать параметр - возвращается строка UTF-8
		Ключ = Компонента.ПолучитьКлючСообщения();
		Заголовки = ПолучитьЗаголовкиСообщения();
		Топик = ПолучитьТопикСообщения();
		БрокерИд = ПолучитьИдентификаторБрокераСообщения();
		ВременнаяМетка = ПолучитьВременнуюМеткуСообщения();
		Партиция = ПолучитьРазделСообщения();
		Смещение = ПолучитьСмещениеСообщения();
		
		// Полученное сообщение - по факту JSON
		ОбъектЧтение = Новый ЧтениеJSON;

		Если ТипЗнч(Сообщение) = Тип("ДвоичныеДанные") Тогда
			ЧтениеДанных = Новый ЧтениеДанных(Сообщение);   
			БуферДанных = ЧтениеДанных.ПрочитатьВБуферДвоичныхДанных();   
			
			ПотокДанных = Новый ПотокВПамяти(БуферДанных);   
			ОбъектЧтение.ОткрытьПоток(ПотокДанных);
		Иначе			
			ОбъектЧтение.УстановитьСтроку(Сообщение);		
		КонецЕсли;        
		
		СтруктураJSON = ПрочитатьJSON(ОбъектЧтение);
		ОбъектЧтение.Закрыть();
		
		РазрешеноСлушать = Константы.Слушать.Получить();
		
	КонецЦикла;