Skip to content

Writing dispatch_to_files

Flavio Poletti edited this page Apr 25, 2016 · 5 revisions

In Writing to_files we saw how flexible to_files can be. Is it sufficient to get you covered in all cases? Arguably not.

Suppose that you want to divide your outputs in two groups, one with people with nicknames starting with letter a to m, another with the rest. How do you do this?

Dispatching manually

One interesting thing about the toolkit is that you can use its functions outside of pipeline, if you need to. The summon function helps you import the right function with minimal hassle.

For example, you can do like this:

use Data::Tubes qw< pipeline summon >; # importing 'summon' too

# pre-define two output channels, for lower and other initial chars
summon('Writer::to_files'); # just like DWIM require + import
my $lower = to_files(
   'output-lower-%02d.json',
   header => "[\n", footer => "\n]\n", interlude => ",\n",
   policy => {records_threshold => 2},
   binmode => ':raw',
);
my $other = to_files(
   'output-other-%02d.json',
   header => "[\n", footer => "\n]\n", interlude => ",\n",
   policy => {records_threshold => 2},
   binmode => ':raw',
);

pipeline(
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],
   'Reader::by_line',
   ['Parser::by_format', 'name;nick;age'],
   ['Renderer::with_template_perlish',
      '  {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],

   # Printing, gets `rendered`, returns input unchanged
   sub { # wrapper!
      my $record = shift;
      my $first_char = substr $record->{structured}{nick}, 0, 1;
      return $lower->($record) if $first_char =~ m{[a-m]}mxs;
      return $other->($record);
   },

   # Options, just flush the output to the sink
   { tap => 'sink' },
)->([qw< mydata-04.txt >]);

If you have some complicated business logic... you can always use this technique! But there's more... read on.

Dispatching, done right

Considering that dispatching can be quite common, you can guess that there's something in the toolkit to get you covered. You guessed right!

Data::Tubes::Plugin::Writer provides you dispatch_to_files, that helps you streamline what we saw in the previous section. Here's how:

pipeline(
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],
   'Reader::by_line',
   ['Parser::by_format', 'name;nick;age'],
   ['Renderer::with_template_perlish',
      '  {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],

   # Printing, gets `rendered`, returns input unchanged
   [
      'Writer::dispatch_to_files',
      header    => "[\n",
      footer    => "\n]\n",
      interlude => ",\n",
      policy    => {records_threshold => 2},

      filename => 'output-[% key %]-%03n.json',
      selector => sub {
         my $record = shift;
         my $first_char = substr $record->{structured}{nick}, 0, 1;
         return 'lower' if $first_char =~ m{[a-m]}mxs;
         return 'other';
      },
   ],

   # Options, just flush the output to the sink
   { tap => 'sink' },
)->([qw< mydata-04.txt >]);

Most parameters are the same as to_files (see also Writing to_files), so we already know about them. The filename seems familiar but somewhat different. The selector is definitely a new entry.

The latter (selector) is a sub reference that receives the record as input, and is supposed to provide a key back. Whenever this key is the same, the output channel chosen by the dispatcher will be the same. In this case, we are outputting two strings, namely lower and other.

The filename is an extension on parameter filename in to_files, that allows you to put additional things in the filename that is eventually passed to to_files. As you might have guessed already, it's a Template::Perlish-compatible template string, where you can expand the variable key. So:

  • if the selector returns lower, the filename_template is expanded into output-lower-%03n.json;
  • if the selector returns other, the filename_template is expanded into output-other-%03n.json;

Custom output channels

There will be the day when just dispatching to different files will not be sufficient. Imagine the following situation:

  • the QA team provided a directory where you are supposed to put wrong records. They will watch it. You thought about using symbolic links at first, but there are rumors that the thing might have to be installed in filesystems that do not support them. Additionally, they want their files named with the UTC timestamp;
  • valid records can be either consumed by you directly, or to be fed to another team for their analysis. The other team provided you with a directory where they want their inputs, and they insisted on a naming convention based on the local timestamp;
  • your boss and you agreed to put your outputs in a specific directory, with increasing numbers in a somehow sane naming convention.

To keep the example tight, we will assume that QA takes all records a-e, you concentrte on all records f-p and the rest if for the other team.

This amount of flexibility is where filename_factory comes handy:

use POSIX qw< strftime >;
my $dir_for_qa = '/path/to/qa/';
my $dir_for_us = '/fine/path/to/us/';
my $dir_for_other = '/other/location/';
pipeline(
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],
   'Reader::by_line',
   ['Parser::by_format', 'name;nick;age'],
   ['Renderer::with_template_perlish',
      '  {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],

   # Printing, gets `rendered`, returns input unchanged
   [
      'Writer::dispatch_to_files',
      header    => "[\n",
      footer    => "\n]\n",
      interlude => ",\n",
      policy    => {records_threshold => 2},

      selector => sub {
         my $record = shift;
         my $first_char = substr $record->{structured}{nick}, 0, 1;
         return 'qa' if $first_char =~ m{[a-e]}mxs;
         return 'us' if $first_char =~ m{[f-p]}mxs;
         return 'other';
      },
      filename_factory => sub {
         my ($key, $record) = @_;
         return $dir_for_qa . strftime('%Y%m%d%H%M%S', gmtime())
           if $key eq 'qa';
         return $dir_for_other . strftime('%Y%m%d%H%M%S', localtime())
           if $key eq 'qa';
         opendir my $dh, $dir_for_us;
         my @files =
            sort { $a cmp $b }
            grep { ! -d $_ }
            map { $dir_for_us . $_ }
            readdir $dh;
         closedir $dh;
         return 'ourfile-aaaaaa' unless @files;
         my $last = $files[-1];
         $last++;
         return $last;
      },
   ],

   # Options, just flush the output to the sink
   { tap => 'sink' },
)->([qw< mydata-04.txt >]);

So there we are: using different output directories, different incompatible naming conventions... and making everyone happy.

As a reminder, option filename turns into filename_factory when you pass a sub reference.