Skip to content


Latest commit





Folders and files

Last commit message
Last commit date

parent directory

This example describes the use of an important operator that is highly applicable
in many telco scenarios. That operator is called DeDuplicate, which eliminates
duplicate tuples for a specified duration of time. It has two output ports.
In the first output port, all the non-duplicate tuples are sent. In the 
optional second output port, all the duplicate tuples are sent.
namespace my.sample;

composite Main {
		bookInfo = tuple<rstring title, rstring isbn, rstring author, rstring publisher, float32 price>;
		stream <bookInfo> DefaultBoolInfo1 = Beacon() {
				iterations: 100u;
		} // End of BookInfo1 = Beacon()

		// In a custom operator, let us set the attributes of the BookInfo1 tuples.
		stream<bookInfo> BookInfo1 = Custom(DefaultBoolInfo1 as DBF1) {
				state: mutable uint32 cnt = 0;
				onTuple DBF1: {
					DBF1 = {
						title = "Title" + (rstring)cnt,
						isbn = "ISBN" + (rstring)cnt,
						author= "Author" + (rstring)cnt,
						publisher = "Publisher" + (rstring)cnt,
						price= (float32)random()*(float32)100.0				
					submit(DBF1, BookInfo1);
				} // End of onTuple DBF1	
		} // End of Custom(DefaultBoolInfo1)
		stream <bookInfo> BookInfo2 = Custom(BookInfo1) {
					mutable uint32 cnt = 0u;
			onTuple BookInfo1: {
				// Submit once.
				submit(BookInfo1, BookInfo2);
				// We will duplicate every other tuple.
				if (++cnt%2u == 0u) {
					submit(BookInfo1, BookInfo2);
				} // End of if (++cnt%2u == 0)
			} // End of onTuple BookInfo1						
		} // End of BookInfo2 = Custom(BookInfo1)
		// Let us filter the duplicate tuples now.
		(stream <bookInfo> BookInfo3; stream <bookInfo> BookInfo4) = DeDuplicate(BookInfo2) {
				// If you don't specifiy the timeOut parameter, it will deduplicate for 10 minutes by default.
				timeOut: 120.0;
				// If you don't specify the key parameter, then it will use the whole tuple for duplicate comparison algorithm.
				key: title, isbn, author, publisher, price;
		} // End of BookInfo3 = DeDuplicate(BookInfo2)
		// Now let us connect to the non-duplicate tuples sent by the Deduplicate operator.
		() as ScreenWriter1= Custom(BookInfo3) {
					mutable int32 nonDuplicateTupleCnt = 0;
				onTuple BookInfo3: {
					if (nonDuplicateTupleCnt++ == 0) {
						printStringLn("\na)Non-Duplicate tuples sent by the Deduplicate operator:");
					} // End of if (nonDuplicateTupleCnt++ == 0)
					printStringLn ((rstring) nonDuplicateTupleCnt + "a)" + (rstring) BookInfo3);
				} // End of onTuple BookInfo3
		} // End of ScreenWriter1 = Custom(BookInfo3)
		// Now let us connect to the duplicate tuples sent by the Deduplicate operator.
		() as ScreenWriter2= Custom(BookInfo4) {
					mutable int32 duplicateTupleCnt = 0;
				onTuple BookInfo4: {
					if (duplicateTupleCnt++ == 0) {
						printStringLn("\nb)Duplicate tuples sent by the Deduplicate operator:");
					} // End of if (duplicateTupleCnt++ == 0)
					printStringLn ((rstring) duplicateTupleCnt + "b)" + (rstring) BookInfo4);
				} // End of onTuple BookInfo4
		} // End of ScreenWriter2 = Custom(BookInfo4)
} // End of the main composite.