# Writing data

Your might want to write your data in various output formats. Take a look at the
[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)
page for a list of all the available I/O transforms in Beam.

If none of those work for you, you might need to create your own output transform.

> ℹ️ For a more in-depth guide, take a look at the
[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page.


## Creating an output transform

The most straightforward way to write data would be to use a `Map` transform to write each element into our desired output format. In most cases, however, this would result in a lot of overhead creating, connecting to, and/or deleting resources.

Instead, most data services are optimized to write _batches_ of elements at a time. Batch writes only connects to the service once, and can load many elements at a time.

Here, we discuss two common ways of batching elements for optimized writes: _fixed-sized batches_, and
_[windows](https://beam.apache.org/documentation/programming-guide/#windowing)
of elements_.

## Writing fixed-sized batches

If the order of the elements _is not_ important, we can simply create fixed-sized batches and write those independently.

We can use
[`GroupIntoBatches`](https://beam.apache.org/documentation/transforms/python/aggregation/groupintobatches)
to get fixed-sized batches. Note that it expects `(key, value)` pairs. Since `GroupIntoBatches` is an _aggregation_, all the elements in a batch _must_ fit into memory for each worker.

> ℹ️ `GroupIntoBatches` requires a `(key, value)` pair. For simplicity, this example uses a placeholder `None` key and discards it later. Depending on your data, there might be a key that makes more sense. Using a _balanced_ key, where each key contains around the same number of elements, may help parallelize the batching process.

Let's create something similar to `WriteToText` but keep it simple with a unique identifier in the file name instead of the file count.

In [None]:
import apache_beam as beam
import glob
import os
import uuid

class WriteBatchesToFiles(beam.PTransform):
  def __init__(self, file_name_prefix, file_name_suffix, batch_size):
    self.file_name_prefix = file_name_prefix
    self.file_name_suffix = file_name_suffix
    self.batch_size = batch_size

  @staticmethod
  def write_file(lines, file_name_prefix, file_name_suffix):
    file_name = f"{file_name_prefix}-{uuid.uuid4().hex}{file_name_suffix}"
    with open(file_name, 'w') as f:
      for line in lines:
        f.write(f"{line}\n")

  def expand(self, pcollection):

    return (
        pcollection
        # For simplicity we key with `None` and discard it.
        | 'Key with None' >> beam.WithKeys(lambda _: None)
        | 'Group into batches' >> beam.GroupIntoBatches(self.batch_size)
        | 'Discard key' >> beam.Values()
        | 'Write file' >> beam.Map(
            self.write_file,
            file_name_prefix=self.file_name_prefix,
            file_name_suffix=self.file_name_suffix,
        )
    )

output_file_name_prefix = 'outputs/batch'
file_name_suffix = '.txt'
# Remove existing files matching the output file_name pattern.
for f in glob.glob(f"{output_file_name_prefix}*{file_name_suffix}"):
    os.remove(f)
with beam.Pipeline() as pipeline:
  (
      pipeline
      | 'Create file lines' >> beam.Create([
          '1 Each element must be a string.',
          '2 It writes one element per line.',
          '3 There are no guarantees on the line order.',
          '4 The data might be written into multiple files.',
          '5 The data might be written into multiple files.',
      ])
      | 'Write batches to files' >> WriteBatchesToFiles(
          output_file_name_prefix,
          file_name_suffix=file_name_suffix,
          batch_size=2,
      )
  )

In [None]:
# Lets look at the output files and contents.
!head outputs/batch*.txt