Skip to content
Matthieu Monsch edited this page Jan 1, 2016 · 33 revisions

This page is meant to provide a brief overview of avsc's API:

What is a Type?

Each Avro type maps to a corresponding JavaScript Type:

  • int maps to IntType.
  • arrays map to ArrayTypes.
  • records map to RecordTypes.
  • etc.

An instance of a Type knows how to decode and encode and its corresponding objects. For example the StringType knows how to handle JavaScript strings:

var stringType = new avsc.types.StringType();
var buf = stringType.toBuffer('Hi'); // Buffer containing 'Hi''s Avro encoding.
var str = stringType.fromBuffer(buf); // === 'Hi'

The toBuffer and fromBuffer methods above are convenience functions which encode and decode a single object into/from a standalone buffer.

Each type also provides other methods which can be useful. Here are a few (refer to the API documentation for the full list):

  • JSON-encoding:

    var jsonString = type.toString('Hi'); // === '"Hi"'
    var str = type.fromString(jsonString); // === 'Hi'
  • Validity checks:

    var b1 = stringType.isValid('hello'); // === true ('hello' is a valid string.)
    var b2 = stringType.isValid(-2); // === false (-2 is not.)
  • Random object generation:

    var s = stringType.random(); // A random string.

How do I get a Type?

It is possible to instantiate types directly by calling their constructors (available in the avsc.types namespace; this is what we used earlier), but in the vast majority of use-cases they will be automatically generated by parsing an existing schema.

avsc exposes a parse method to do the heavy lifting:

// Equivalent to what we did earlier.
var stringType = avsc.parse({type: 'string'});

// A slightly more complex type.
var mapType = avsc.parse({type: 'map', values: 'long'});

// The sky is the limit!
var personType = avsc.parse({
  name: 'Person',
  type: 'record',
  fields: [
    {name: 'name', type: 'string'},
    {name: 'phone', type: ['null', 'string'], default: null},
    {name: 'address', type: {
      name: 'Address',
      type: 'record',
      fields: [
        {name: 'city', type: 'string'},
        {name: 'zip', type: 'int'}
      ]
    }}
  ]
});

Of course, all the type methods are available. For example:

personType.isValid({
  name: 'Ann',
  phone: null,
  address: {city: 'Cambridge', zip: 02139}
}); // === true

personType.isValid({
  name: 'Bob',
  phone: {string: '617-000-1234'},
  address: {city: 'Boston'}
}); // === false (Missing the zip code.)

Since schemas are often stored in separate files, passing a path to parse will attempt to load a JSON-serialized schema from there:

var couponType = avsc.parse('./Coupon.avsc');

For advanced use-cases, parse also has a few options which are detailed the API documentation.

What about Avro files?

Avro files (meaning Avro object container files) hold serialized Avro records along with their schema. Reading them is as simple as calling createFileDecoder:

var personStream = avsc.createFileDecoder('./persons.avro');

personStream is a readable stream of decoded records, which we can for example use as follows:

personStream.on('data', function (person) {
  if (person.address.city === 'San Francisco') {
    doSomethingWith(person);
  }
});

In case we need the records' type or the file's codec, they are available by listening to the 'metadata' event:

personStream.on('metadata', function (type, codec) { /* Something useful. */ });

To access a file's header synchronously, there also exists an extractFileHeader method:

var header = avsc.extractFileHeader('persons.avro');

Writing to an Avro container file is possible using createFileEncoder:

var encoder = avsc.createFileEncoder('./processed.avro', type);

And RPC?

Finally, avsc provides an efficient and "type-safe" API for communicating with remote node processes via Protocols.

To enable this, wwe need only declare the types involved inside an Avro protocol schema. For example, consider the following simple protocol which supports two calls (saved as ./math.avpr):

{
  "protocol": "Math",
  "doc": "A sample protocol for performing simple math.",
  "messages": {
    "add": {
      "doc": "A call which adds integers, optionally after some delay.",
      "request": [
        {"name": "numbers", "type": {"type": "array", "items": "int"}},
        {"name": "delay", "type": "float", "default": 0}
      ],
      "response": "int"
    },
    "multiply": {
      "doc": "Another call, this time multiplying doubles.",
      "request": [
        {"name": "numbers", "type": "double"},
      ],
      "response": "double"
    }
  }
}

We can then use this protocol to communicate between any two node processes connected by binary streams. See below for a few different common use-cases.

Persistent streams

E.g. UNIX sockets, TCP sockets, WebSockets, (and even stdin/stdout).

Server

var avsc = require('avsc'),
    net = require('net');

// Sample implementation of our calls.
var protocol = avsc.parse('./math.avpr')
  .on('add', function (req, ee, cb) {
    var sum = req.numbers.reduce(function (agg, el) { return agg + el; }, 0);
    setTimeout(function () { cb(null, sum); }, 1000 * req.delay);
  })
  .on('multiply', function (req, ee, cb) {
    var prod = req.numbers.reduce(function (agg, el) { return agg * el; }, 1);
    cb(null, prod);
  });

// Listen for connections.
net.createServer()
  .on('connection', function (con) { protocol.createListener(con); })
  .listen(8000);

Client

var avsc = require('avsc'),
    net = require('net');

// Connect to the server.
var protocol = avsc.parse('./math.avpr');
var socket = net.createConnection({port: 8000});
var ee = protocol.createEmitter(socket);

// A few sample calls.
protocol.emit('add', {numbers: [1, 3, 5], delay: 2}, ee, function (err, res) {
  console.log(res); // 9!
  socket.destroy(); // Allow the process to exit.
});
protocol.emit('multiply', {numbers: [4, 2]}, ee, function (err, res) {
  console.log(res); // 8!
});

Transient streams

For example HTTP requests/responses.

Server

Using express for example:

var avsc = require('avsc'),
    app = require('express')();

var protocol = avsc.parse('./math.avpr')
  .on('add', function (req, ee, cb) {
    var sum = req.numbers.reduce(function (agg, el) { return agg + el; }, 0);
    setTimeout(function () { cb(null, sum); }, 1000 * req.delay);
  })
  .on('multiply', function (req, ee, cb) {
    var prod = req.numbers.reduce(function (agg, el) { return agg * el; }, 1);
    cb(null, prod);
  });

app.post('/', function (req, res) {
  protocol.createListener(function (cb) { cb(res); return req; });
});

app.listen(3000);

Client

var avsc = require('avsc'),
    http = require('http');

var protocol = avsc.parse('./math.avpr');
var ee = protocol.createEmitter(function (cb) {
  return http.request({
    port: 3000,
    headers: {'content-type': 'avro/binary'},
    method: 'POST'
  }).on('response', function (res) { cb(res); });
});

protocol.emit('add', {numbers: [1, 3, 5], delay: 2}, ee, function (err, res) {
  console.log(res); // 9 again!
});
protocol.emit('multiply', {numbers: [4, 2]}, ee, function (err, res) {
  console.log(res); // 8 also!
});

Next steps

The API documentation provides a comprehensive list of available functions and their options. The Advanced usage section goes through a few examples to show how the API can be used.

Clone this wiki locally