Skip to content

Publish to Kafka

Alex Golshani edited this page Sep 6, 2020 · 4 revisions

Trubka is not only a general purpose Kafka consumer. You can also use Trubka to publish protocol buffer messages, plain text and arbitrary bytes to Kafka.

Publishing Plain Text

$ echo 'Random Data' | trubka produce plain <topic> --brokers <broker:port>

or

$ trubka produce plain <topic> 'Random Data' --brokers <broker:port>

Publishing Protobuf

The protocol buffer bytes can be provided to the proto producer using one the following methods:

  • Hex/Base64 encoded string of the proto bytes
  • Json template

The first method is pretty straightforward. You have the string representation of your proto bytes in HEX or Base64 format. All you need to do is to simply feed the bytes in:

$> echo 'Hex/Base64 encoded bytes' | trubka produce proto <topic> <contract> \
--proto-root <dir> --brokers <broker:port>

or

$> trubka produce proto <topic> <contract> 'Hex/Base64 encoded bytes' \
--proto-root <dir> --brokers <broker:port>

But this method is only useful when you already have the protocol buffer bytes, which is not the case most of the time. To solve that problem, Trubka lets you provide the Json representation of your message and it automatically converts it to proto bytes, as long as the two schemas are compatible.

For example, if you have the following contract living in your --proto-root directory:

syntax = "proto3";

package contracts;

option go_package = "git.ourdomain.com/contracts";
import "google/protobuf/timestamp.proto";

message EntityDefined {
  string entity_id = 1;
  google.protobuf.Timestamp defined_date_utc = 2;
  oneof authentication_oneof {
    string api_key = 3;
    string token = 4;
  }
}

You can use the JSON payload below to publish a proto message to Kafka:

{
  "entityId": "entity id",
  "definedDateUtc": "2020-02-17T23:36:13Z",
  "apiKey": "api key"
}

To make this even easier, Trubka comes with a useful command which can generate the Json schema, based on the protocol buffer contract.

$> trubka produce schema <contract> --proto-root /protocol_buffers_dir > EntityDefined.json
$> cat EntityDefined.json | trubka produce proto <topic> contracts.EntityDefined --proto-root /protocol_buffers_dir

The produce schema command goes through all the proto fields recursively and generates the json schema. The command is smart enough to ignore the deprecated proto fields and choose the right json representation for each data type. It also picks only one type from the available options of oneof fields.

Publishing indefinitely

You can ask Trubka to keep publishing in a loop until it's manually stopped. To do so, the --count, -c flag must be set to zero. You can also optionally set the --sleep flag to put a gap between messages if you would rather not to smash your cluster. The --sleep flag can be set to any valid Go duration strings. As an example, let's publish 1 msg/sec to Kafka in a loop:

$> trubka -b <broker> produce plain <topic> 'Hello World' -c 0 -s 1s

Templates and Random Data Generators

So far we have seen how to publish static content to Kafka which means we have been publishing the same content to a topic in Kafka. But what if we want to publish some random data to Kafka for testing purposes? The data which still complies with the contract, but with automatically generated random values!

Trubka comes with a flexible templating language (based on the amazing gofakeit library) which lets you randomise the content of each message. Here is an example of publishing randomly generated plain text content to Kafka:

$> echo 'Time: Now(), IP: IP(v4), MAC: MacAddress()' | trubka -b <broker:port> produce plain -g <topic>

Notice the -g flag to enable template parsing (more on the template functions here).

For protocol buffer contracts, the produce schema command makes your life even easier. Run the produce schema command with -g flag and you will see the magic yourself.

$> trubka produce schema contracts.EntityDefined --proto-root /protocol_buffers_dir -g > EntityDefined.json
EntityDefined.json
{
    "api_key": "Str(?????)",
    "defined_date_utc": "Now('2006-01-02T15:04:05Z07:00','UTC')",
    "entity_id": "Str(?????)"
 }

// This will be translated into a random json payload before being converted to proto bytes
{
    "api_key": "xhyed",
    "defined_date_utc": "2020-02-18T03:10:42Z",
    "entity_id": "mhaac"
}

As you can see, Trubka has automatically detected the field types and replaced their values with some relevant template functions. These are customisable placeholders which will be replaced with actual values at the time of publishing.

So, let's publish 100 random proto messages into our topic:

$> cat EntityDefined.json | trubka -b <broker:port> produce proto <topic> contracts.EntityDefined -g -c 100 --proto-root /protocol_buffers_dir

Once again, remember to provide the same -g flag to the produce command to enable template parsing, otherwise you will be publishing raw templates to Kafka.

Supported Template Functions

Here is a list of the template functions currently supported by Trubka:

Function(s) Description Usage/Example
Str(????) Generates a random string by replacing each ? with a random letter. Str(??): gd
Int(####) Generates a random integer by replacing each # with a random digit. Int(##): 73
Int(2#1): 281
IntS(####) Generates the string representation of an integer by replacing each # with a random digit. IntS(##): "73"
Int(from,to) Generates an integer between from and to. Int(10,20): 14
IntS(from,to) Generates the string representation of an integer between from and to. IntS(10,20): "14"
Float(##.##) Generates a random floating point number by replacing each # with a random digit. Float(#.##): 1.18
Float(0.#2): 0.82
FloatS(##.##) Generates the string representation of a random floating point number by replacing each # with a random digit. FloatS(#.#): "1.2"
FloatS(1.#2): "1.72"
Float(from,to,[decimal places]) Generates a floating point number between from and to with the optional limit for decimal places. Float(0.1,1.5): 0.75
FloatS(from,to,[decimal places]) Generates the string representation of an floating point number between from and to. FloatS(0.1,1.5): "0.75"
Bool() true or false
BoolS() "true" or "false"
IP(v4) Generates a random IP v4 string. "248.177.118.254"
IP(v6) Generates a random IP v6 string. "fde8:4372:2fcc:86e5:ffff:ffff:ffff:ffff"
MacAddress() Generates a random MAC address string. "e1:74:cb:01:77:91"
Timestamp(['layout'],['timezone']) The layout must follow the Go's time formatting standard (default RFC3339) and the Timezone can be either a standard IANA value or a UTC offset in UTC±hh:mm format. Both parameters must be enclosed by single quotes if provided. Timestamp('2006-01-02T15:04:05 MST')

Timestamp('2006-01-02T15:04:05Z07:00','UTC')

Timestamp('2006-01-02T15:04:05','UTC+10')
Now(['layout'],['timezone']) Generates the current time. See the Timestamp explanation above for more details. Now('2006-01-02T15:04:05')

Now('15:04:05Z07:00','UTC')

Now('2006-01-02T15:04:05','UTC-8')
B64(...) Generates the base64 encoded value of its input. B64(Str(???))
B64(IP(v4))
B64(random data)
Email()
EmailAddress()
Generates a random email address. "address@domain.com"
Name() Generates a random full name. "John Smith"
FirstName() Generates a random first name. "John"
LastName() Generates a random last name. "Smith"
NamePrefix() Generates a random title. "Mr."
NameSuffix() Generates a random name suffix. "Jr."
Country() Generates a random country name. "Australia"
CountryAbr() Generates a random country abbreviation. "FI"
State() Generates a random US state name. "California"
StateAbr() Generates a random US state abbreviation. "California"
Street() Generates a random street address. "364 East Rapidsborough"
StreetName() Generates a random street name. "View"
StreetPrefix() Generates a random string prefix. "Lake"
StreetSuffix Generates a random string suffix. "land"
City() Generates a random city name. "Marcelside"
UUID() Generates a random Universal Unique Identifier (UUID) "5d093de6-c2e3-423d-87ad-c31f31d0e341"
Color()/Colour() Generates a random colour name. "MediumOrchid"
HexColor()/HexColour() Generates the hex code of a random colour. "#a99fb4"
Currency() Generates a random currency name. "Australian Dollar"
CurrencyAbr() Generates a random currency abbreviation. "USD"
Gender() "male" or "female"
URL() Generates a random URL "http://xitonix.io"
ProgrammingLanguage() Generates a random programming language "Go"
DomainName() Generates a random HTTP domain name. "google.com"
DomainSuffix() Generates a random HTTP domain suffix. "org"
UserAgent() Generates a random browser user agent string. "Mozilla/5.0 (Windows NT 5.0) AppleWebKit/5362 (KHTML, like Gecko) Chrome/37.0.834.0 Mobile Safari/5362"
Username() Generates a random username. "Alex1364"
TimeZone() Generates a random timezone name. "Kaliningrad Standard Time"
TimeZoneFull() Generates a random full timezone name. "(UTC+03:00) Kaliningrad, Minsk"
TimeZoneAbr() Generates a random timezone abbreviation. "KST"
Month() Generates a random month name. "January"
WeekDay() Generates a random weekday. "Friday"
HTTPMethod() Generates a random HTTP verb. "GET", "POST", etc
Pick(args...) Randomly chooses an item from the list. This function is useful to choose an item from numeric values. Pick(1,100) may choose 1 or 100
PickS(args...) Randomly chooses the string representation of an item from the list. Pick(1,Go,true) may choose "true", "Go" or "1"
PetName() Generates a random pet name. Ozzy Pawsborne
Animal() Generates a random animal name. "elk"
FarmAnimal() Generates a random farm animal name. "Chicken"
AnimalType() Generates a random animal type. "amphibians"
Cat() Generates a random cat name. "Chausie"
Dog() Generates a random dog name. "Norwich Terrier"
BeerName() Generates a random beer name. "Duvel"
BeerStyle() Generates a random beer style. "European Amber Lager"
BuzzWord() Generates a random buzz word. "Disintermediate"
CarMaker() Generates a random car maker name. "Nissan"
CarModel() Generates a random car model. "Aveo"
Company() Generates a random company name. "Moen, Pagac and Wuckert"
CompanySuffix() Generates a random company suffix. "Inc"
CreditCardCvv() Generates a random credit card Cvv code. "043"
CreditCardExp() Generates a random credit card expierety date. "05/22"
CreditCardNumber() Generates a random credit card number. 4136459948995369
CreditCardNumberS() Generates a random credit card number string. "4136459948995369"
CreditCardNumberLuhn() Generates a random credit card number int that passes luhn test. 2720996615546177
CreditCardNumberLuhnS() Generates a random credit card number string that passes luhn test. "2720996615546177"
CreditCardType() Generates a random credit card type. "Visa"
MimeType() Generates a random mime type. "application/json"
Language() Generates a random language name. "French"
Phone() Generates a random phone number. "6136459948"
PhoneFormatted() Generates a random formatted phone number. "136-459-9489"
Sentence(number of words) Generates a random sentence with the given number of words. Sentence(5): "Quia quae repellat consequatur quidem."

Please note that the name of the template functions are case sensitive.

Combining Template functions

You can also combine the template functions together whenever it makes sense. Here are a few examples:

{
  "user": {
    "id": "FirstName():LastName():IntS(10,50)",
    "machine": "MacAddress()-IP(v4)",
    "token": "B64(T-Str(????)-IntS(#####))"
  },
  "origin": "PickS(IP(v4), IP(v6))",
  "credit_card": "CreditCardNumberS() CreditCardCvv() CreditCardExp()"
}