# Introduction to Apache Arrow

In this lab, we will briefly introduce the Apache Arrow columnar in-memory format and the associated Python library `pyarrow`.

The learning objectives of this lab are as follows. You will:

- Learn about the in-memory layout of high-level languages and their run-time platforms, such that you become aware of its limitations for big data analytics applications.
- Learn about the columnar in-memory layout of Apache Arrow, such that you understand how it can help to perform better for many big data analytics use-cases.
- Learn to use `pyarrow` to store tabular data structures in a columnar fashion, such that many typical operations are executed more efficiently.

*Technical disclaimer: this lab assumes you are using the CPython implementation of Python. If you don't know what this means, you are very likely using CPython. Sometimes when design decision of Python are discussed, they may actually refer to the CPython implementation of the Python language, but for the sake of keeping the material concise, we will pretend to ignore the difference.*

## 1: A data-centric view of Python

When you're using software languages with a high level of abstraction, chances are you don't have to think much about how the data objects you are working with are stored in memory.
Typically, the in-memory layout of data in high-level languages and the virtual machines or interpreters that they run on can be quite different from what you may have seen in languages such as C, C++ or Rust.

Let's first look at a typical way to store a sequence of integers in a computer memory.
Suppose we want to store the following sequence $S$ of integers in a memory: 
$$
S = \{0, 3, 3, -7\}
$$

Most CPUs are designed to store (and operate on) such integers in their two's complement form.
CPUs have arithmetic units that operate on such integers with a pre-defined number of bits.
This is usually some power of two number of bits starting from eight, since memories are almost always addressed per byte (eight bits).
For example, if we choose to store the integers of this sequence in a byte-addressable memory starting at address zero, using 16-bits, we end up with:

| Sequence index | Value | Size (bytes) | Address | Value (binary) |
|----------------|-------|--------------|---------|----------------|
|              0 |     0 |            2 |       0 |       00000000 |
|                |       |              |       1 |       00000000 |
|              1 |     3 |            2 |       2 |       00000011 |
|                |       |              |       3 |       00000000 |
|              2 |     3 |            2 |       4 |       00000011 |
|                |       |              |       5 |       00000000 |
|              3 |    -7 |            2 |       6 |       11111001 |
|                |       |              |       7 |       11111111 |

**<center>Table 1: Example of how the C language typically stores sequence S in memory.</center>**

Note that we've stored the integers in a *little-endian* manner (we store the least-significant byte of the integer at the lowest address).
Indeed, this is how a "low-level language" like C would store this sequence of 16-bit integers on a little-endian machines such as `x86`, `arm` or `ppc64le` (`le` stands for little-endian, since `ppc64` is originally big-endian).

Let's take a look at how a high-level language like Python does this in the next example.

In [None]:
# We first define a few utility function that help us print the size and address of a Python object.
import sys


def addr(obj):
    """Returns the Python ID of an object, which for CPython is its start address in memory. """
    return id(obj)


def sizeof(obj):
    """Returns the size of a Python object in bytes."""
    return sys.getsizeof(obj)


# We typically use a Python list to store a sequence of integers, as follows:
S = [0, 3, 3, -7]

# In Python, everything is an object.
# S is an object, and the integers that S holds are also objects.
# Let's take a closer look:

print("The sequence S is at address {} and has a size of {} bytes.".format(addr(S), sizeof(S)))

fmt = "{:>5}, {:>5}, {:>12}, {:>13}, {:>14}"
print(fmt.format("Index", "Value", "Size (bytes)", "Type", "Address"))
for i, n in enumerate(S):
    print(fmt.format(i, n, sizeof(n), str(type(n)), addr(n)))

The previous cell should output something like ...

```
The sequence S is at address 140600455194240 and has a size of 120 bytes.
Index, Value, Size (bytes),          Type,        Address
    0,     0,           24, <class 'int'>, 140600553568528
    1,     3,           28, <class 'int'>, 140600553568624
    2,     3,           28, <class 'int'>, 140600553568624
    3,    -7,           28, <class 'int'>, 140600455489552
```
**<center>Output 1: Output of inspecting a Python list with integers.</center>**

... (although the addresses will be different every time you reset the Notebook).

When we compare Output 1 to the Table 1, a few things are important to notice:

1. The type of each integer is a ***class*** `int` - everything is an object in Python.
2. The size of each integer varies, compared to 2 bytes for each integer in C. Python integers are larger because every Python object has a header that is used for e.g. memory management. Also, the Python `int` object has a variable size, that generally increases when the value it represents increases.
3. At indices 1 and 2, where the values are equal, the addresses are the same. The Python interpreter tries to prevent copies of the same data wherever possible. Since the values at these indices are the same, they point to the same object in memory.

We also see the list object `S` being at a much different address than the list values. A Python list object in memory actually holds the list size and a pointer to an array of pointers to the values (those brave enough can check out [the CPython source code for the list object]).
If you add some numbers to the sequence in the code above, you will see that the size of the list object `S` will not even change; only the internally stored list size and the array of pointers are updated.
Because of the approach Python has taken, there are two levels of indirection to get from the list object to one of its actual values!
The object `S`, its internal array, and the values of `S` may be stored all over the memory.
So, consecutive sequence values may not be stored in contiguously addresses in the memory.

##### The takeaway is ...
... that when you're using a high-level language like Python, R, or even Java, the in-memory format of your data may be much different than you expect!**
When you're writing a script to automate some boring tasks, this is usually not really a problem.
However, when you have to do high-performance computing or big data analytics, you are likely to run into performance issues quickly with Python (or R, or even Java), if you don't do something about it.

[the CPython source code for the list object]: https://github.com/python/cpython/blob/main/Include/cpython/listobject.h

## 2: Apache Arrow

Based on the previous section, you may think Python is not so good, because it uses more space and more indirections (pointers) in the memory to store our data. Traversing our data structure, e.g. iterating over the list, may take longer than for other languages.
However, keep in mind what Python was designed for: portability, programmer productivity, and readibility for scripts that automate everyday tasks, but not necessarily to push a CPU to its limit in doing useful work.
Because Python is so [productive] and readible, it became very popular; so popular that people even want to use it for high-performance computing and big data analytics!
These are application domains that the language was not necessarily designed for, but where performance is at least as important as programmer productivity.

Luckily, there are work-arounds to make Python a lot more efficient.
One example you may have worked with before is the [numpy] library.
This is a Python library that helps to speed up computations on and decrease the in-memory size of multidimensional arrays and matrices.
It does so by wrapping around a C implementation of these data structures and operations, unlocking the raw computational performance of natively compiled code for modern CPUs that have e.g. SIMD instructions.

Numpy is mainly designed to operate on and store numeric data only. In big data analytics, we are often interested to work on tabular data structures that not only hold numbers, but also more complicated types, such as strings, nested lists, and many others.

One project that can help us here is [Apache Arrow]. At the base of the project, there is a language-agnostic in-memory format specification for tabular data. This describes how tables can be laid out in memory in a way that's quite efficient for most applications and architectures (even for GPU and FPGA accelerators!).

Let's first take a brief look at how to use some of the Python implementation of the Arrow library called `pyarrow`.

[numpy]: https://numpy.org/
[Apache Arrow]: https://arrow.apache.org/
[productive]: https://xkcd.com/353/

In [None]:
# We first import the pyarrow library.
import pyarrow as pa

# Let's create something a bit bigger than our previous example S.
# Let's make a sequence of dummy data that holds 10^8 64-bit integer values counting up from 0 to 10^8-1.
# We'll call this sequence P for the *P*ython list representation of the sequence.
P = list(range(int(10 ** 8)))

# We can print the first couple of values (printing everything would probably crash the notebook)
print(P[0:10])

In [None]:
# We will now convert this Python list to an Arrow array which we will call A.
A = pa.array(P)

# Let's print the Arrow Array. We don't have to just print a slice, since Arrow is built for big data, 
# and understands we probably don't want to see all 10^8 values.
print(A)

In [None]:
# Let's sum up all values in the Python list, and then sum up all values in the Arrow array.
# We will measure how long it takes (wall clock time).

import time

t0 = time.process_time()
P_result = sum(P)
t1 = time.process_time()
print("Python: {:.4f} s. Result: {}".format(t1 - t0, P_result))

t0 = time.process_time()
A_result = A.sum()
t1 = time.process_time()
print("Arrow : {:.4f} s. Result: {}".format(t1 - t0, A_result))

The output should look something like:

```
Python: 0.2569 s. Result: 4999999950000000
Arrow : 0.0305 s. Result: 4999999950000000
```

The result may be quite different depending on your computer, but if Arrow did its job, it should be faster to sum the Arrow array than summing the Python list. This is because the lay out of this particular data structure in Arrow (a sequence of 64-bit integers) looks more like what we showed in Table 1. This in-memory lay-out is much more suitable to be processed by modern CPUs. 

More technically, there are likely several components of your CPU that help make this faster in the Arrow case:
- **Cache**: Loading data from main memory is done on a per-cacheline basis. Even if we load just one integer, we get a whole cacheline that typically holds multiple integers in our cache. Since in the Arrow format, integers are placed in the memory at consecutive addresses, whenever the first integer is loaded into the cache, a number of consecutive integers are also loaded into the cache, making access to them faster next time we load them, because we don't have to go all the way to main memory anymore. The CPU can thus benefit from what is called 'spatial locality' of our data structure and sum operation. For the Python case, the objects may be scattered around the memory, which kind of breaks the 'spatial locality' of the data. In other words, when we load one Python integer into the cache, it's less likely that we load another integer that we want to use into the cache with the same amount of work.
- **Pre-fetchers**: Also because the integers are placed in the memory at consecutive addresses, a so called prefetcher can easily predict what data to load next. While the CPU is busy calculating the sum, the prefetcher can load the next cachelines into the cache already and in parallel to the busy CPU. Predicting what data to load next is very hard in the Python case, as the integers are potentially scattered around the memory, making it hard to correctly predict which data to fetch from the memory while the CPU is computing.
- **SIMD**: Your CPU probably has SIMD instructions that sum up multiple (for AVX-512: 8) 64-bit integers at once, much faster than when using "normal" instructions that only sum up two integers at a time.

An [interesting talk] by the Turing Award recipients in 2017 shows a similar example of how much performance there is to gain when we really attempt to utilize all the advances of modern processor architecture by being smart about our implementation within a specific application domain.

###### The takeaway is ...

... that in the design of many software languages, people have focused on productivity, which very often (but not always) comes at the cost of performance. If you are aware of this and if performance is an issue but you are not willing to implement your software in a language that is more performant, there are often various work-arounds such as numpy or Arrow that alleviate some of the bottlenecks (but will do so at the cost of having to write your code a bit differently).


[interesting talk]: https://www.youtube.com/watch?v=3LVeEjsn8Ts&t=2243s

## 3: Arrow RecordBatch

The previous example is not very illustrative of the advantages of Arrow over, say, numpy, because numpy would be able to sum a sequence of integers just as well. However, as mentioned before, Arrow is good at tabular data structures with more complicated types - something numpy is less good at.

The Arrow `Array` that we used, is actually typically used to represernt a column in a tabular data structure called a RecordBatch. Take a look at the following example, where we put a more complex tabular data structure into memory using the Arrow in-memory format using a RecordBatch.

In [None]:
from datetime import datetime

# Let's consider a bunch of message records on a social-media platform.
# Each message record has an id, a user, the contents, and some metadata.
messages = [
    {
        'id': 42,
        'user': 'Ronald Chevalier',
        'contents': 'Hello Arrow! This is awesome!',
        'meta': {
            'date': datetime(2021, 10, 11, 12, 13, 14, 0),
            'ref-id': None,
            'liked-by': ["Benjamin Purvis", "Tabatha Jenkins"]
        }
    },
    {
        'id': 1337,
        'user': 'Tabatha Jenkins',
        'contents': "IKR? Can't wait to use Fletcher!",
        'meta': {
            'date': datetime(2021, 10, 11, 12, 13, 14, 0),
            'ref-id': 42,
            'liked-by': ['Lonnie Donaho']
        }
    },
]

# To get this data into Arrow, we use Pandas, which provides functions that do the conversion for us.
# In a typical analytics pipeline, this data would come from storage and would be loaded directly as 
# an Arrow RecordBatch. Pandas also uses Arrow internally.
import pandas as pd
mdf = pd.DataFrame.from_records(messages, index='id')

# Let's take a look at the data in a tabular form:
display(mdf)

In [None]:
# We will now create an Arrow RecordBatch; a batch of records of the same type.
batch = pa.RecordBatch.from_pandas(mdf)

# Let's display some information about the batch:
print(batch)

In the output above, we can see that `batch` is a `pyarrow` RecordBatch.
Arrow prints the name and type of each (nested) record field.
This set of field names and their type is called the Arrow *schema*.
The schema determines how to interpret the bytes that are stored in each column.
In the next section, we'll continue to discover more of the internals of Arrow.

Let us first perform some computations on the RecordBatch:

In [None]:
# In this example, we will split the strings in the user column to obtain first and last name:

# We can select the column as follows: batch.column('user')

# Arrow also provides a library with compute functionality.
# See: https://arrow.apache.org/docs/python/api/compute.html
# In this case, we will use the split_pattern function to split the strings.
split_names = pa.compute.split_pattern(batch.column('user'), pattern=' ')

# Our result now holds a list of lists:
print(split_names)

## Exercise 3.1: How much heavier did the earth get?

In this exercise, we will load data from a file into an Apache Arrow RecordBatch, and perform a simple calculation on it.

Suppose we are interested in learning how much mass the earth has gained from meteorites that crashed into it.
We will use the [Meteorite Landings] dataset from NASA to calculate how much mass the earth has gained.
This dataset is provided as the CSV file `meteorite.csv` in the folder of this Notebook.

Your goal is to:

1. Load the data as an Apache Arrow RecordBatch.

Hints: you can use `pyarrow.csv.read_csv(...)` for this. You'll have to import csv from pyarrow seperately. This returns an Arrow Table, which works similar to a RecordBatch (but may be split up over multiple parts in the memory). If you want, can view the contents of the table using Pandas with: `display(table.to_pandas())`

2. Sum meteorite mass up to find how much mass the earth has gained.

Hints: you can use `pyarrow.compute.sum(...)` for this. This will return a `pyarrow.DoubleScalar` object.

3. Print the result as a Python float.

Hints: you can use `pyarrow.DoubleScalar.as_py()` for this. This converts the value from an Arrow representation of this value to a Python value.

*Disclaimer: this exercise by no means represents a correct scientific view of the problem of calculating the difference in mass of the Earth over time.*

[Meteorite Landings]: https://data.nasa.gov/Space-Science/Meteorite-Landings/ak9y-cwf9

In [None]:
from pyarrow import csv

# Please keep the variable names on the left hand side intact.

# 1. Load the data as an Apache Arrow RecordBatch. (Should not take more than one line)
table = ...

# 2. Sum meteorite mass up to find how much mass the earth has gained. (Should not take more than one line)
total_mass = ...

# 3. Print the result as a Python float. (Should not take more than one line)
print(...)

In [None]:
# Result check. If this assertion fails, something went wrong with your code!
assert(total_mass.equals(pa.scalar(605281210.638)))

###### The takeaways is ...
... that Apache Arrow can work with all kinds of tabular data, not just numbers, but also strings, structs, lists, and many others. It also provides various functions to perform computations on columns with these data types very efficiently.

## 4: Columnar in-memory format

When reading up on Arrow, you will often find the mention of it being a "columnar" in-memory format.
What does that mean?

Arrow stores tabular data in a column-oriented fashion (sometimes called struct-of-arrays).
To explain what this means, let's first look at the usual/straightforward way of storing this data in memory, which would be called row-oriented (sometimes called array-of-structs).

Consider the following struct in C:

```C
struct Person {
   char name[4];    // People can only have very short names to make the table below small!
   uint8_t salary;
};
```

A C-array of this struct is laid out in memory as follows:

| Address | Person / field / part |
|---------|-----------------------|
|       0 | Person 0 name character 0 |
|       1 | Person 0 name character 1 |
|       2 | Person 0 name character 2 |
|       3 | Person 0 name character 3 |
|       4 | Person 0 salary |
|       5 | Person 1 name character 0 |
|       6 | Person 1 name character 1 |
|       7 | Person 1 name character 2 |
|       8 | Person 1 name character 3 |
|       9 | Person 1 salary |
| ... | ... |

**<center>Table 2: An example of an array of structures, or row-oriented in-memory format.</center>**

*Technical disclaimer: this may differ depending on the compiler - another reason to use a language-agnostic in-memory format like Arrow!*

Suppose we would now like to perform a vectorized sum of the salaries of each person to obtain the total salary some company has to pay to its employees.
This is not immediately possible, because the name data is interleaved with the salary data. To do a vectorized sum, we need the salary data to be stored contiguously in the memory. 
The above example shows a row-oriented (array-of-structs) in-memory format for our data.

We could also take another approach, suppose we have only two people:

```C
#define NUM_PEOPLE 2

struct People {
    uint8_t salaries[NUM_PEOPLE];
    char name[4 * NUM_PEOPLE];
}
```

We would end up with the following in-memory format:

| Address | Person / field / part |
|---------|-----------------------|
|       0 | Person 0 salary |
|       1 | Person 1 salary |
|       2 | Person 0 name character 0 |
|       3 | Person 0 name character 1 |
|       4 | Person 0 name character 2 |
|       5 | Person 0 name character 3 |
|       6 | Person 1 name character 0 |
|       7 | Person 1 name character 1 |
|       8 | Person 1 name character 2 |
|       9 | Person 1 name character 3 |

**<center>Table 3: An example of a structure of arrays, or column-oriented in-memory format.</center>**

Now, if we would like to sum all salaries, their values placed contiguously in the memory.
This allows vector instructions to sum up multiple salaries at once.

When looking at this data structure as a table, where each struct field of `struct Person` in the array of `struct Person` is considered a column, it should now become clear that we store columns seperately in the case of `struct People`, rather than interleaved as in the case of an array of `struct Person`.

Back to Arrow. Arrow uses the second approach - each column is stored seperately from other columns to make sure the data in a column is stored contiguously. This way, we can perform all sorts of operations much faster (remember our previous little experiment!), especially those that work on one or a few columns, which is often the case in big data analytics.

###### The takeaway is...
... that tabular data structures can be stored in memory in a column-oriented fashion, such that all values in a column are in a contiguous piece of memory that allows operations to be performed on it faster.

## 5: Variable-length data

So far we've mainly looked at how fixed-size data is stored in memory, but what about variable-size data?
The Arrow format is designed to store variable-sized data in such a way that it is easily accessible in parallel.

Consider the an Arrow column with the strings "abc" and "de" on the first and second row.
We could store them contiguously in memory in the C way:

| Address | Character | Comment |
|---------|-----|-|
|       0 | 'a' | Start of the first string. |
|       1 | 'b' | |
|       2 | 'c' | |
|       3 | '\0' | String terminator character. | 
|       4 | 'd' Start of the second string. |
|       5 | 'e' |
|       6 | '\0' | String terminator character. |

Note C strings have a terminator character so we can know during run-time of our program where the end of the string is.

Now suppose we would like to perform a transformation on these strings (e.g. we would like to calculate their hash).
Imagine we have a ideal system that can run two software threads simultaneously without any overhead.
To distribute the work over these two threads, we could give each thread one string to work on.
For the first thread, we would tell it to start working from address 0, where the first string is located.
However, during run-time, we would not know where the second string starts without first finding the first occurence of the terminator character `\0`. This means we first have to scan the data for the terminator character, which incurs some overhead, in order to find where the second string starts. For this small example, this would be negligible, but when talking about datasets of terabytes in size, this will become very significant.

To reduce the overhead associated with working with variable-length data in parallel, Arrow takes a different approach.
So far we've worked with Arrow arrays, and looked at a few that, in terms of their in-memory layout, are identical to arrays in C.
However, Arrow arrays are only a logical view of the combination of various Arrow *buffers* where the Arrow array type determines how to interpret the buffers.
Arrays can also be nested, although we will not go into details in this lab.

To understand the previous paragraph, let's look at an example of the same data stored using Arrow.

One part will be stored in what is called the "values" buffer:

| Address | Character | Comment |
|---------|-----|-|
|       0 | 'a' | Start of the first string. |
|       1 | 'b' | |
|       2 | 'c' | |
|       3 | 'd' | Start of the second string. |
|       4 | 'e' | |
|     ... | ... | |

Another part will be stored in what is called the "offsets" buffer:

| Address | Offset | Comment |
|---------|--------|---------|
|       0 |      0 | Offset into the "values" buffer for the first string. |
|       4 |      3 | Offset into the "values" buffer for the second string. |
|       8 |      5 | Offset of the end+1 of the "values" buffer. |
|     ... |    ... | |

Here we have two Arrow buffers, which are C-like arrays that, when combined, form an Arrow `StringArray`.
The offsets buffer holds offsets into the values array, that point to where strings start.
Note that offsets are (by default) 32-bit integers, so they take up four bytes worth of address space.
This way, we don't have to traverse all characters to find the terminator character to know where a string starts, so it's easy and fast to slice the `StringArray` up into multiple parts so we can process it in parallel.

Let's take a look at RecordBatch with the messages again to confirm a `StringArray` indeed works as such.

In [None]:
# We take the 'user' column from our RecordBatch
string_array = batch.column('user')

# After running this, you will see this is a pyarrow StringArray, and it holds two strings.
display(string_array)

# We can check out the underlying Arrow buffers. There are three.
# At the first index, there is `None`, meaning there is no buffer.
# This is reserved for the "validity buffer", which we will explain later.
print("Validity buffer (unused for this example):")
display(string_array.buffers()[0])

# At the second index, there is the offsets buffer. 
# We will pretend it is an Arrow int32 array so we can view it.
print("Offsets buffer contents:")
display(pa.Int32Array.from_buffers(pa.int32(), 3, [None, string_array.buffers()[1]]))

# At the third index, there is the values buffer.
print("Values buffer contents:")
display(['{:2d} : {}'.format(i,chr(x)) for i,x in enumerate(string_array.buffers()[2].to_pybytes())])

From the above example, we can see how the offsets and values buffers together represent the column of strings 'user' from our message dataset example in section 3. 

When variable-sized data types are nested, such as lists of lists, there will be two offset buffers, the first one pointing into the second one, and the second one pointing into the values buffer.

###### The takeaway is ...

... that Arrow arrays can consist of multiple buffers, that work together to represent columns of all sorts of complex and variable-size data types. Arrow deals with variable-size data using offset buffers.

## Exercise answers

In [None]:
# Answer 3.1
table = pa.csv.read_csv("meteorites.csv")
total_mass = pa.compute.sum(table.column('mass (g)'))
print(total_mass.as_py())