Skip to content
nik hanselmann edited this page Apr 30, 2014 · 33 revisions

Each block is briefly detailed below, along with the rules that define each block. To make a block in streamtools, double click anywhere on the page and type the name of the block as they appear below. For programmatic access, see the API docs.

Blocks rely on some general concepts:

  • gojee path: The path rules all use gojee syntax to specify which value you'd like to use in the block. Paths always start with the period, which indicates the top-level of the message. So if you want to refer to the whole message use .. If you want to refer to a specific value then they follow the first period. So if you have a message that looks like

      {
          "user":{
              "username":"bob_the_user"
              "id": 1234
          }
      }
    

    and you'd like to refer to the username then the gojee path would be .user.username.

  • gojee expression: gojee also allows for expressions. So we can write expressions like .user.id > 1230, which are especially useful in the filter and map blocks.

  • duration string: We use Go's duration strings to specify time periods. They are a number followed by a unit and are pretty intuitive. So 10ms is 10 milliseconds; 5h is 5 hours and so on.

  • route: every block has a set of routes. Routes can either be inbound, query, or outbound routes. Inbound routes receive data from somewhere and send it to the block. Query routes are two-way: they accept an inbound query and return information back to the requester. Outbound routes send data from a block to a connection.

generator blocks

These blocks emit messages on their own.

  • ticker. This block emits the time regularly. The time between emissions is specified by the Interval.
    • Rules:
      • Interval: duration string (1s)

flow blocks

These blocks are useful for shaping the stream in one way or another

  • javascript This block creates a Javascript VM and runs a bit of Javascript once per message. In order to get data in and out of Javascript, the block creates a global variable specified by MessageIn that contains the incoming message. Once the script is finished executing, the block takes the value from the global variable specified by MessageOut.

    • Rules:
      • MessageIn: string (input)
      • MessageOut: string (output)
      • Script: Javascript (output = input)
  • join. This block joins two streams together. It waits until it has seen a message on both its inputs, then emits the joined message.

  • map. This block maps inbound data onto outbound data. The Map rule needs to be valid JSON, where each key is a string and each value is a valid gojee expression.

    • Rules:
      • Map: gojee expression
      • Additive: (True)
  • mask. This block allows you to select a subset of the inbound message. To create a mask, you need to build up an empty JSON that looks like the message you'd like out. So, for example, if your in bound message looks like

      {
        "A":"foo",
        "B":"bar"
      }
    

    and you just want B to come out of the mask block then make the Mask rule:

      {
        "B":{}
      }
    

    You can supply any valid JSON to the Mask block. If you specify an empty JSON {} then all values will pass.

    • Rules:
      • Mask: mask JSON
  • filter. The filter block applies the provided rule to incoming messages. If the rule evaluates to true then the messages is emitted. If the rule evaluates to false the messages is discarded. The Filter rule can be any valid gojee expression. So, for example, if the inbound message looks like

      {
          "temperature": 43
      }
    

    and you only want to emit messages when the temperature value is above 50, then the filter rule would be

      .temperature > 50
    
    • Rules:
      • Filter:gojee expression (. != null)
  • pack. The pack block groups blocks together based on a common value. This is almost like an online "group-by" operation, but care needs to be taken in the stream setting as we have to decide when to emit the "packed" message. Imagine you have messages like

      {
           "option": "a"
      } 
    

    where option can be a, b or c. We would like to make a stream that packs together all the as together into a single message, and similarly for the bs and cs. We only emit the packed message for a particular value of option when we haven't seen any messages with that value for 20 seconds, at which point we emit the bunch all at once. Here we would specify 20s for the EmitAfter rule and .option as the Path rule. If we saw three a messages in that 20s the output of the pack block looks like

      {
           "pack":[{"option": "a"},{"option": "a"},{"option": "a"}]
      }
    

    Our main use case for this at the NYT is to create per-reader reading sessions. So we set the Path to our user-id and we emit after 20 minutes of not hearing anything from that reader's user-id. Every page-view our readers generate get packed into a per-reader message, generating a stream of reading sessions.

    • Rules:
      • EmitAfter: duration string
      • Path: gojee path
  • unpack. The unpack block takes an array of objects and emits each object as a separate message. See the citibike example, where we unpack a big array of citibike stations into individual messages we can filter.

  • sync. The sync block takes an disordered stream and creates a properly timed, ordered stream at the expense of introducing a lag. To explain this block imagine you have a stream that looks like

      {"val":"a", "time":23 } ... {"val":"b", "time":14} ... {"val":"c", "time":10}
    

    Ideally you'd like the stream to be ordered by the timestamp in the message, so the c message comes first, the b message comes second and the a message comes third. In addition, you'd like the time between the messages to respect the timestamp inside the message.

    The sync block achieves this by storing the stream for a fixed amount time (the Lag) and then emitting at the time inside the inbound messages plus the lag. This means we have to wait for a while to get our messages but when we do get them, they're in a stream whose dynamics reflect the timestamp inside the message.

    This can be very helpful if the plumbing between your sensor and streamtools introduces dynamics that would confuse your analysis. For example, it's quite common for a system to wait until it has a collection of messages from its sensor before it makes an HTTP request to post those messages to the next stage. This means that by the time those messages make it to a streamtools pattern, they're artifcially grouped together into little pulses. You can use the sync block to recover the original stream generated by the sensor.

    • Rules:
      • Path: gojee path. This must point at a UNIX epoch time in milliseconds,
      • Lag: duration string
  • gethttp. The getHTTP block makes an HTTP GET request to a URL you specify in the inbound message. It is necessary for the HTTP endpoint to serve JSON. This block forms the backbone for any sort of polling pattern.

    • Rules:
      • Path: gojee path to a fully formed URL.
  • kullbackleibler. Calculates the Kullback Leibler divergence between two distributions p and q. The two distributions must mimic the output from the histogram block.

    • Rules:
      • QPath: gojee path to the q distribution.
      • PPath: gojee path to the p distribution.

source blocks

These blocks hook into another system and collect messages to be emitted into streamtools.

  • fromhttpstream. This block allows you to listen to a long-lived http stream. Each new JSON that appears on the stream is emitted into streamtools. Try using the 1.usa.gov endpoint, available at http://developer.usa.gov/1usagov.
    • Rules:
      • Endpoint: endpoint string
      • Auth: authorisation string
  • fromnsq. This block implements an NSQ reader. For more details on how to specify the rule for this block check out the NSQ docs.
    • Rules:
      • ReadTopic: topic to read from.
      • LookupdAddr: nsqlookupd addresss
      • ReadChannel: name of the channel
      • MaxInFlight: how many messages to take from the queue at a time. (0)
  • frompost. This block emits any message that is POSTed to its IN route. This block isn't strictly needed as you can POST JSON to any inbound route on any block. Having said that, sometimes it's a bit clearer to have a dedicated block that listens for data.
  • fromsqs. This block connects to an Amazon Simple Queueing System queue. Messages from SQS are XML; this block extracts the message string from this XML, which it assumes is newline separated JSON. Each JSON is emitted into streamtools as a separate message. See the SQS docs for more information about the rules of this block.
    • Rules:
      • SignatureVersion: the version number of the signature hash Amazon is expecting for this queue (4)
      • AccessKey: your access key
      • MaxNumberOfMessages: how many messages to pull off the queue at a time (10)
      • APIVersion: what version of the API are you using (2012-11-05)
      • SQSEndpoint: the endpoint (ARM) of the SQS queue you are reading
      • WaitTimeSeconds: how long to wait between polling (0)
      • AccessSecret: your access secret
  • fromudp. Listens for messages sent over UDP. Each message is emitted into streamtools.
    • Rules:
      • ConnectionString:
  • fromwebsocket. Connects to an existing websocket. Each message it hears from the websocket is emitted into streamtools.
    • Rules:
      • url: address of the websocket.
  • fromemail. This block connects to the given IMAP server with the given credentials. Once connected, it idles on that connection and emits any unread emails into streamtools. Once messages have been pulled, the block marks them as read. All email messages will contain the from, to, subject, internal_date and body fields.
    • Rules:
      • Host: hostname of the IMAP server. Defaults to Gmail (imap.gmail.com)
      • Username: user for the email account.
      • Password: password for the email account.
      • Mailbox: the mailbox to pull email from. Defaults to 'INBOX' which is the main mailbox for Gmail.
  • fromHTTPGetRequest. This block, when a GET request is made to the block's QUERY endpoint, emits that request into streamtools. The request can be handled by the toHTTPGetRequest block. This block, while deceptively simple, allows the creation of an API that can use streamtools as a backend. The request, should it ever be marshalled into JSON, is represented as {"channel":"1"} which indicates that the request is a channel with capacity 1.

sink blocks

These blocks send data to external systems.

  • toelasticsearch. Send JSON to an elasticsearch instance.
    • Rules:
      • Index:
      • Host:
      • IndexType:
      • Port:
  • tofile. Writes a message as JSON to a file. Each message becomes a new line of JSON.
    • Rules:
      • Filename: file to write to
  • tolog. Send messages to the log. This is a quick way to look at the data in your stream.
  • tonsq. Send messages to an existing NSQ system.
    • Rules:
      • Topic: topic you will write to
      • NsqdTCPAddrs: address of the NSQ daemon.
  • tobeanstalkd. Send jobs to an existing beanstalkd server.
    • Rules:
      • Host: the Host and port of the beanstalkd server e.g. 127.0.0.1:11300
      • TTR: Time to Run. is an integer number of seconds to allow a worker to run this job. This time is counted from the moment a worker reserves a job. If the worker does not delete, release, or bury the job within seconds, the job will time out and the server will release the job.
      • Tube : beanstalkd tube to send jobs to. if left blank, jobs are sent to the default tube.
  • toMongoDB. Saves messages to a MongoDB instance or a cluster. The messages can be saved as they come or in bulk depending on the user's needs.
    • Rules:
      • Host: The host string for the an instance e.g. localhost:27107 or a replicaset or a cluster e.g. mongohost1.example.com:27017,mongohost2.example.com:27017,mongoarbiter1.example.com
      • Database: Database to which the documents should be written to.
      • Collection : Collection to which the documents should be written to under the specified database.
      • BatchSize: the number of documents to be written together at any time in bulk. if value is set to <= 1, the documents will be written one at a time.
  • tonsqmulti. Send messages to an NSQ system in batches. This is useful if you have a fast (>1KHz) stream of data you need to send to NSQ. This block gathers messages for Interval time and then sends. It emits immediately if the block gets more than MaxBatch messages.
    • Rules:
      • Topic: topic you will write to
      • Interval: duration string (1s)
      • NsqdTCPAddrs: address of the NSQ daemon.
      • MaxBatch: size of largest batch (100)
  • toemail. Send messages to an email account. Useful if you want to send out an alert after some kind of event.
    • Rules:
      • Host: hostname of the SMTP server. Defaults to smtp.gmail.com for Gmail.
      • Port: port number for connection to SMTP server. Defaults to 587 for Gmail.
      • Username: user name of the email account.
      • Password: password of the email account.
      • toPath: the path of the email's to field within the received message.
      • fromPath: the path of the email's from field within the received message.
      • subjectPath: the path of the email's subject field within the received message.
      • bodyPath: the path of the email's body within the received message.
  • toHTTPGetRequest. This block responds to an HTTP GET request that has been generated by fromHTTPGetRequest. The inbound message needs to contain both the original request and the message you want to respond with.
    • Rules:
      • RespPath: path to the HTTP request.
      • MsgPath: path to the message you want to respond with on the HTTP request.

state blocks

These blocks maintain a state, storing something about the stream of data

  • histogram. Build a non-staionary histogram of the inbound messages. Currently this only works with discrete values.
    • Rules:
      • Path: gojee path to the value over which you'd like to build a histogram.
      • Window: duration string specifying how long to retain messages in the histogram (0)
  • count. This block counts the number of messages it has seen over the specified Window.
    • Rules:
      • Window: duration string (0)
  • timeseries. This block stores an array of the value specified by Path along with the timestamp at the time the message arrived.
    • Rules:
      • Path: gojee path
      • NumSamples: how many samples to store (0)
  • set. This stores a set of values as specified by the block's Path. Add new members through the (idempotent) ADD route. If you send a message through the ISMEMBER route, the block will emit true or false. You can also query the cardinality of the set.
    • Rules:
  • movingaverage. Performs a moving average of the values specified by the Path over the duration of the Window.
    • Rules:
      • Path: gojee path
      • Window: duration string
  • cache. Stores string values against keys. Send a key to the lookup route and the value against that key will be emitted.
    • Rules:
      • KeyPath: gojee path to the element of the inbound message to use as key
      • ValuePath: gojee path to the element to store in the cache
  • queue. This block represents a FIFO queue. You can push new messages onto the queue via the PUSH in route. You can pop messages off the queue either by hitting the POP inbound route, causing the block to emit the next message on its OUT route, or you can make a GET request to the POP query route and the block will respond with the next message. You can also peek at the next message using the PEEK query route.

random number blocks

These blocks emit random numbers when polled. So to generate a stream of random numbers, connect a generator block (like a ticker) to a random number block's POLL endpoint. Each of these blocks emits JSON of the form:

{
    "sample": 1234
}
  • zipf. This block draws a random number from a Zipf-Mandelbrot distribution when polled.
    • Rules:
      • s: (2)
      • v: (5)
      • N: (99)
  • gaussian. This block draws a random number from the Gaussian distribution when polled.
    • Rules:
      • StdDev: (1)
      • Mean: (0)
  • poisson. This block draws a random number from a Poisson distribution when polled.
    • Rules:
      • Rate: (1)
  • categorical. This block draws a random number from a Categorical distribution when polled.
    • Rules:
      • Weights: ([1]) - a list of weighting parameters. The number drawn from this distribution corresponds to the index of this list. These weights are automatically normalised to sum to one.
Clone this wiki locally