Skip to content
Matthieu Monsch edited this page Jan 6, 2016 · 33 revisions

This page is meant to provide a brief overview of avsc's API:

What is a Type?

Each Avro type maps to a corresponding JavaScript Type:

  • int maps to IntType.
  • arrays map to ArrayTypes.
  • records map to RecordTypes.
  • etc.

An instance of a Type knows how to decode and encode and its corresponding objects. For example the StringType knows how to handle JavaScript strings:

var stringType = new avsc.types.StringType();
var buf = stringType.toBuffer('Hi'); // Buffer containing 'Hi''s Avro encoding.
var str = stringType.fromBuffer(buf); // === 'Hi'

The toBuffer and fromBuffer methods above are convenience functions which encode and decode a single object into/from a standalone buffer.

Each type also provides other methods which can be useful. Here are a few (refer to the API documentation for the full list):

  • JSON-encoding:

    var jsonString = type.toString('Hi'); // === '"Hi"'
    var str = type.fromString(jsonString); // === 'Hi'
  • Validity checks:

    var b1 = stringType.isValid('hello'); // === true ('hello' is a valid string.)
    var b2 = stringType.isValid(-2); // === false (-2 is not.)
  • Random object generation:

    var s = stringType.random(); // A random string.

How do I get a Type?

It is possible to instantiate types directly by calling their constructors (available in the avsc.types namespace; this is what we used earlier), but in the vast majority of use-cases they will be automatically generated by parsing an existing schema.

avsc exposes a parse method to do the heavy lifting:

// Equivalent to what we did earlier.
var stringType = avsc.parse({type: 'string'});

// A slightly more complex type.
var mapType = avsc.parse({type: 'map', values: 'long'});

// The sky is the limit!
var personType = avsc.parse({
  name: 'Person',
  type: 'record',
  fields: [
    {name: 'name', type: 'string'},
    {name: 'phone', type: ['null', 'string'], default: null},
    {name: 'address', type: {
      name: 'Address',
      type: 'record',
      fields: [
        {name: 'city', type: 'string'},
        {name: 'zip', type: 'int'}
      ]
    }}
  ]
});

Of course, all the type methods are available. For example:

personType.isValid({
  name: 'Ann',
  phone: null,
  address: {city: 'Cambridge', zip: 02139}
}); // === true

personType.isValid({
  name: 'Bob',
  phone: {string: '617-000-1234'},
  address: {city: 'Boston'}
}); // === false (Missing the zip code.)

Since schemas are often stored in separate files, passing a path to parse will attempt to load a JSON-serialized schema from there:

var couponType = avsc.parse('./Coupon.avsc');

For advanced use-cases, parse also has a few options which are detailed the API documentation.

What about Avro files?

Avro files (meaning Avro object container files) hold serialized Avro records along with their schema. Reading them is as simple as calling createFileDecoder:

var personStream = avsc.createFileDecoder('./persons.avro');

personStream is a readable stream of decoded records, which we can for example use as follows:

personStream.on('data', function (person) {
  if (person.address.city === 'San Francisco') {
    doSomethingWith(person);
  }
});

In case we need the records' type or the file's codec, they are available by listening to the 'metadata' event:

personStream.on('metadata', function (type, codec) { /* Something useful. */ });

To access a file's header synchronously, there also exists an extractFileHeader method:

var header = avsc.extractFileHeader('persons.avro');

Writing to an Avro container file is possible using createFileEncoder:

var encoder = avsc.createFileEncoder('./processed.avro', type);

And RPC?

Finally, avsc provides an efficient and "type-safe" API for communicating with remote node processes via Protocols.

To enable this, we first declare the types involved inside an Avro protocol. For example, consider the following simple protocol which supports two calls (saved as ./math.avpr):

{
  "protocol": "Math",
  "doc": "A sample interface for performing math.",
  "messages": {
    "multiply": {
      "doc": "A call for multiplying doubles.",
      "request": [
        {"name": "numbers", "type": {"type": "array", "items": "double"}}
      ],
      "response": "double"
    },
    "add": {
      "doc": "A call which adds integers, optionally after some delay.",
      "request": [
        {"name": "numbers", "type": {"type": "array", "items": "int"}},
        {"name": "delay", "type": "float", "default": 0}
      ],
      "response": "int"
    }
  }
}

Servers and clients then share the same protocol and respectively:

  • Implement interface calls (servers):

    var protocol = avsc.parse('./math.avpr')
      .on('add', function (req, ee, cb) {
        var sum = req.numbers.reduce(function (agg, el) { return agg + el; }, 0);
        setTimeout(function () { cb(null, sum); }, 1000 * req.delay);
      })
      .on('multiply', function (req, ee, cb) {
        var prod = req.numbers.reduce(function (agg, el) { return agg * el; }, 1);
        cb(null, prod);
      });
  • Call the interface (clients):

    var protocol = avsc.parse('./math.avpr');
    var ee; // Message emitter, see below for various instantiation examples.
    
    protocol.emit('add', {numbers: [1, 3, 5], delay: 2}, ee, function (err, res) {
      console.log(res); // 9!
    });
    protocol.emit('multiply', {numbers: [4, 2]}, ee, function (err, res) {
      console.log(res); // 8!
    });

avsc supports communication between any two node processes connected by binary streams. See below for a few different common use-cases.

Persistent streams

E.g. UNIX sockets, TCP sockets, WebSockets, (and even stdin/stdout).

Client

var net = require('net');

var ee = protocol.createEmitter(net.createConnection({port: 8000}));

Server

var net = require('net');

net.createServer()
  .on('connection', function (con) { protocol.createListener(con); })
  .listen(8000);

Transient streams

For example HTTP requests/responses.

Client

var http = require('http');

var ee = protocol.createEmitter(function (cb) {
  return http.request({
    port: 3000,
    headers: {'content-type': 'avro/binary'},
    method: 'POST'
  }).on('response', function (res) { cb(res); });
});

Server

Using express for example:

var app = require('express')();

app.post('/', function (req, res) {
  protocol.createListener(function (cb) { cb(res); return req; });
});

app.listen(3000);

Next steps

The API documentation provides a comprehensive list of available functions and their options. The Advanced usage section goes through a few examples to show how the API can be used.

Clone this wiki locally