## 001 - 01  

![Vogon Poetry - Zero Copy data processing over Columnar layouts](./images/vogon-poetry-v02.png)  

#### Columnar Layout  
#### &nbsp;&nbsp;&nbsp;&nbsp;Array of Structs  
#### Struct of Arrays  
#### &nbsp;&nbsp;&nbsp;&nbsp;Metadata Medatata  
#### Zero Copy  
#### &nbsp;&nbsp;&nbsp;&nbsp;Distributed  
#### Relational   
#### &nbsp;&nbsp;&nbsp;&nbsp;Data Engineering    

# **Vogon Poetry**   

## **tl;dr**                              
                             
### Spark (columnar execution vs row materialization)                              
                          
* Stay in DataFrame/Column expressions; avoid UDFs that force row materialization.                           
* Prefer vectorized readers/writers (Parquet/ORC) and column pruning.                          
* Use Arrow-based interchange only when it actually avoids conversion copies; beware that crossing boundaries (Spark <-> Python) can force materialization if types are not supported or if you request row objects.                          
                          
### Polars (Arrow-native columnar)                          
                          
* Favor lazy queries and scan_* APIs (scan_parquet, scan_ipc) to push projection/filter down.                          
* Slicing and projection are typically metadata-only.                          
* Be cautious with operations that require full materialization (sort, certain joins, explode) and with conversions to NumPy/Pandas that may copy.

# Zero-copy in columnar representations                          
                          
                                   
**Zero-copy** means an operation produces a result *without duplicating underlying data buffers*,   
typically by reusing the same memory and returning new metadata (offsets, lengths, indices, validity bitmaps) that describes a “view” of that data.                          
         

                 
Columnar context: a “DataFrame” is a set of column arrays. In a columnar engine, each column is usually represented as:                          
                          
* One or more contiguous value buffers (e.g., int32 values)                          
* A validity bitmap (nulls)                          
* Optional offsets buffer (for variable-length types like strings and lists)                          
* Optional dictionary buffers (dictionary encoding)                          
     

                     
A zero-copy operation does not duplicate those buffers. It may allocate small metadata structures (array headers, offset/length, selection vectors), but not the bulk data.                          
                          

**columnar makes zero-copy practical**                          
                          
Columnar layouts separate values from row structure, so many operations can be expressed as:                          
                          
* Slice/view on a contiguous range                          
* Projection (choose a subset of columns)                          
* Filter as an index/selection vector referencing positions                          
* Dictionary remapping                          
* Chunked arrays referencing existing chunks                          
                          
Row-oriented systems often interleave fields per row, so extracting a column or slicing a subset of rows tends to require copying or re-packing.                          

## Common **zero-copy** (aka metadata-only) operations                          

We'll implement some of these at a "lower-level" so we can see the innards and grasp what's going on...

*(we are using PyArrow here, don't get to caught up in the details, we'll dive deep into PyArrow later, deep breaths...)*    
....
you're going to bug me about this, so nipitinthebud - PyArrow because:
1. its `Table.slice` is explicitly documented as a zero-copy slice.
2. we don't get caught up in lower level details and just focus on the concepts at hand.

### 0. Setup and helper functions

How the helpers help:
* `before` is a snapshot of buffer identities (addresses) for each column in a base `pyarrow.Table`
* `after` is another `Table` produced by an operation (slice, select, take, etc.).

Also, 
1. **"Aliased"** means **"shared underlying memory."**        
2. **"Buffers"** = **"contiguous blocks of memory"**
  
  
If two Arrow arrays (or tables) are aliased, they are not holding two separate copies of the data.    
They are two different objects that point at the same underlying buffers in memory.  
So if we get 2 aliases (before operation and after operation) that point to the same buffer, it means the operation was **zero-copy**

In [1]:
import pyarrow as pa
import pyarrow.compute as pc

**7** Simple helpers.  
**2** of those (**bold**) explain if zero-copy has occurred.  
1. `buf_addrs(arr)`
2. `table_buf_addrs(t)` 
3. **`show_aliasing(label, before, after)`**: prints a compact per-column count of how many output chunks match any baseline chunk fingerprint. A coarse check for buffer reuse, without printing offsets, sizes, or per-buffer role details. 
4. `_buf_info(arr)` 
5. `_chunk_sig(arr)` 
6. `snapshot_table_buffers(t)` 
7. **`show_more_aliasing_info(label, before_snap, after_table, max_chunks_print):`**: More detailed comparision between the output table's underlying buffer addresses against a baseline snapshot to detect buffer reuse (aliasing). Helps us clearly see which operations are zero-copy views vs materializing new buffers.

In [2]:
# buf_addrs(arr) collects the raw memory addresses for each physical buffer backing a single Arrow Array (one chunk).
# A quick identity fingerprint to detect whether two arrays share the same underlying memory.

def buf_addrs(arr):
    # Return a tuple of raw memory addresses for the buffers that back one Arrow Array (one chunk)
    addrs = []
    # arr.buffers() returns the physical buffers for the array (validity, offsets, values, etc.)
    for b in arr.buffers():
        # A buffer can be None when that physical component is not present (e.g., no validity bitmap)
        if b is None:
            addrs.append(None)
        else:
            # Use the buffer address as a quick identity check for whether two arrays share memory
            addrs.append(int(b.address))
    # Use a tuple so it can be compared and used in sets if needed
    return tuple(addrs)

In [3]:
# table_buf_addrs(t) builds a per-column list of per-chunk buffer-address fingerprints for an Arrow Table.
# It lets you compare whole tables by checking whether output chunks reuse buffer addresses from an input snapshot.

def table_buf_addrs(t):
    # Build a per-column list of per-chunk buffer-address tuples for a Table
    out = {}
    for name in t.column_names:
        # Tables store each column as a ChunkedArray, so we snapshot each chunk separately
        out[name] = [buf_addrs(chunk) for chunk in t[name].chunks]
    return out

In [4]:
# show_aliasing(label, before, after) prints a compact per-column count of how many output chunks match any baseline chunk fingerprint.
# A fast, *coarse* check for buffer reuse, without printing offsets, sizes, or per-buffer role details.

def show_aliasing(label, before, after):
    # Print how many output chunks reuse buffer addresses from the baseline snapshot
    print(label)
    for col in after.column_names:
        # Baseline buffer-address tuples for this column (captured from the "before" table)
        b = before.get(col)
        # Output buffer-address tuples for this column (captured from the "after" table)
        a = table_buf_addrs(after)[col]
        # A chunk is counted as aliased if its full buffer-address tuple matches any baseline chunk tuple
        aliased = sum(1 for x in a if x in b)
        # Print a compact per-column summary of aliasing at the chunk level
        print(col, "aliased_chunks=", aliased, "after_chunks=", len(a))


In [5]:
# _buf_info(arr) returns detailed metadata for each buffer in one Arrow Array: buffer index, address, and size in bytes.
# It supports more informative reporting so you can see exactly which buffers are shared vs newly allocated.

def _buf_info(arr):
    # Return per-buffer address and size for one Arrow Array (one chunk)
    out = []
    # Arrow arrays are built from 0..N buffers depending on type (validity, offsets, values, etc.)
    bufs = arr.buffers()
    for i, b in enumerate(bufs):
        # Some buffer slots can be None (e.g., no validity bitmap if there are no nulls)
        if b is None:
            out.append({"i": i, "addr": None, "size": 0})
        else:
            # Use the raw memory address as an identity for "is this the same underlying buffer"
            out.append({"i": i, "addr": int(b.address), "size": int(b.size)})
    return out

In [6]:
# _chunk_sig(arr) builds a chunk signature from the array's buffers using (address, size) pairs for each buffer slot.
# It enables stable equality checks for "is this chunk backed by the same physical buffers as another chunk" at a coarse level.

def _chunk_sig(arr):
    # Build a simple "signature" for a chunk based on its buffers (address + size for each buffer)
    return tuple((x["addr"], x["size"]) for x in _buf_info(arr))

In [7]:
# snapshot_table_buffers(t) captures a reusable baseline of per-column, per-chunk metadata: type, length, offset, null_count, and buffer info/signature.
# It provides the reference point for later comparisons so you can determine whether subsequent operations reused or replaced buffers.

def snapshot_table_buffers(t):
    # Capture a snapshot of buffer identities and key metadata for every column chunk in a Table
    snap = {}
    for name in t.column_names:
        # Get the column as a ChunkedArray (Tables store each column as chunked)
        col = t[name]
        # Normalize into a list of chunks even if it is a single Array
        chunks = col.chunks if isinstance(col, pa.ChunkedArray) else [col]
        # Store per-chunk buffer signatures so we can compare "before" vs "after"
        snap[name] = []
        for c in chunks:
            # Record type and view metadata because slices can alias buffers with different offsets/lengths
            snap[name].append({
                "type": str(c.type),
                "length": int(len(c)),
                "offset": int(c.offset),
                "null_count": int(c.null_count),
                # Store per-buffer info for more detailed inspection/debugging
                "buffers": _buf_info(c),
                # Store a compact signature for quick alias checks
                "sig": _chunk_sig(c),
            })
    return snap

In [8]:
# show_more_aliasing_info compares the output table's underlying buffer addresses against a baseline snapshot to detect buffer reuse (aliasing).
# It prints per-column and per-chunk details (addresses, sizes, offsets) so you can see which operations are zero-copy views vs materializing new buffers.

def show_more_aliasing_info(label, before_snap, after_table, max_chunks_print=4):
    # Take a snapshot of the output table so we can compare its buffers to the baseline snapshot
    after_snap = snapshot_table_buffers(after_table)

    # Print a high-level header for this comparison
    print(label)
    print("columns", len(after_table.column_names), "rows", after_table.num_rows)

    # Accumulators for a summary at the end
    total_cols = 0
    total_after_chunks = 0
    total_aliased_chunks = 0
    total_new_bytes = 0

    for name in after_table.column_names:
        total_cols += 1

        # Base chunks are the "before" buffer signatures for this column
        b_chunks = before_snap.get(name, [])
        # Output chunks are the "after" buffer signatures for this column
        a_chunks = after_snap.get(name, [])
        # Use a set for O(1) lookup when checking if an output chunk matches any base chunk
        b_sigs = set(x["sig"] for x in b_chunks)

        aliased = 0
        new_bytes = 0

        for ac in a_chunks:
            total_after_chunks += 1
            # If the entire chunk signature matches, we treat it as buffer-aliased to the baseline
            if ac["sig"] in b_sigs:
                aliased += 1
                total_aliased_chunks += 1
            else:
                # If the chunk signature is new, estimate how many bytes exist in its buffers
                # This is a crude upper bound for "new bytes" because some buffers may be shared elsewhere
                for bi in ac["buffers"]:
                    if bi["addr"] is not None:
                        new_bytes += bi["size"]

        total_new_bytes += new_bytes

        # Column-level summary
        print("col", name, "type", a_chunks[0]["type"] if a_chunks else "NA")
        print("  chunks_after", len(a_chunks), "chunks_aliased", aliased, "new_bytes_est", new_bytes)

        # Print up to max_chunks_print chunks so the output stays readable on large chunk counts
        nprint = min(len(a_chunks), max_chunks_print)
        for i in range(nprint):
            c = a_chunks[i]
            # Offset is important: slices usually keep the same buffers but change the logical offset
            print("  chunk", i, "len", c["length"], "offset", c["offset"], "nulls", c["null_count"])
            for bi in c["buffers"]:
                # Provide a best-effort label for common buffer roles
                role = "buf"
                # Buffer 0 is typically validity bitmap when present
                if bi["i"] == 0:
                    role = "validity"
                # For variable-width types, buffer 1 is often offsets and buffer 2 is values
                elif bi["i"] == 1 and len(c["buffers"]) == 3:
                    role = "offsets"
                # For fixed-width types, buffer 1 is typically values
                elif bi["i"] == 1 and len(c["buffers"]) == 2:
                    role = "values"
                # For variable-width types, buffer 2 is typically values
                elif bi["i"] == 2:
                    role = "values"
                # Print raw address and size so you can see which buffers are shared vs new
                print("    ", role, "addr", bi["addr"], "size", bi["size"])

        # Indicate when we are truncating chunk output for readability
        if len(a_chunks) > nprint:
            print("  ...", len(a_chunks) - nprint, "more chunks omitted")

    # Overall summary across the whole table
    print("summary")
    print("  total_columns", total_cols)
    print("  total_after_chunks", total_after_chunks)
    print("  total_aliased_chunks", total_aliased_chunks)
    print("  total_new_bytes_est", total_new_bytes)

### 1. Projection (select subset of columns)                          
Projection is typically metadata-only: a new schema/column list referencing existing buffers.                          
* Selecting a subset of columns: result shares the same column buffers.                          
* Cost: create a new schema/column list.                          

In [9]:
t = pa.table(
    {
        "id": pa.array(range(10), type=pa.int32()),
        "s": pa.array([f"v{i%3}" for i in range(10)]),
        "x": pa.array([i * 10 for i in range(10)], type=pa.int32()),
    }
)
base = table_buf_addrs(t)
base_snap = snapshot_table_buffers(t)

In [10]:
# base, '----------', base_snap, '----------', t

In [11]:
p = t.select(["id", "s"])
show_aliasing("projection", base, p)

projection
id aliased_chunks= 1 after_chunks= 1
s aliased_chunks= 1 after_chunks= 1


### 2. Slicing (contiguous row range)                          
                          
* Return a view with (offset, length) into each column’s buffers.                          
* For fixed-width types, often pure metadata.                          
* For variable-width types (strings/lists), still typically metadata: adjust the logical offset into offsets buffer; values buffer is reused.                          

In [12]:
sl = t.slice(2, 5)
show_aliasing("slice", base, sl)


slice
id aliased_chunks= 1 after_chunks= 1
s aliased_chunks= 1 after_chunks= 1
x aliased_chunks= 1 after_chunks= 1


### 3. Renaming / schema changes                          
                          
* Metadata-only.                          

In [13]:
rn = t.rename_columns(["id2", "s2", "x2"])
md = t.replace_schema_metadata({b"owner": b"eng", b"purpose": b"bench"})

In [14]:
rn, md

(pyarrow.Table
 id2: int32
 s2: string
 x2: int32
 ----
 id2: [[0,1,2,3,4,5,6,7,8,9]]
 s2: [["v0","v1","v2","v0","v1","v2","v0","v1","v2","v0"]]
 x2: [[0,10,20,30,40,50,60,70,80,90]],
 pyarrow.Table
 id: int32
 s: string
 x: int32
 ----
 id: [[0,1,2,3,4,5,6,7,8,9]]
 s: [["v0","v1","v2","v0","v1","v2","v0","v1","v2","v0"]]
 x: [[0,10,20,30,40,50,60,70,80,90]])

### 4. Casting that is representation-compatible                          
                          
* Example: int32 to “date32” if stored as int32 days; same bits, new logical type.                          

In [15]:
a = pa.array([1, 2, 3, 4], type=pa.int32())
validity_buf, data_buf = a.buffers()

In [16]:
date = pa.Array.from_buffers(
    pa.date32(),
    len(a),
    [validity_buf, data_buf],
    null_count=a.null_count,
    offset=a.offset,
)

In [17]:
print("int32_buffers", [int(b.address) if b else None for b in a.buffers()])
print("date32_buffers", [int(b.address) if b else None for b in date.buffers()])

int32_buffers [None, 3215510733184]
date32_buffers [None, 3215510733184]


### 5. Dictionary encoding reuse                          
                          
* If you keep codes and dictionary stable and only remap metadata.                          

In [18]:
s = pa.array(["a", "b", "a", "c", "b", "a"])
d = pc.dictionary_encode(s)
print(d.type)

dictionary<values=string, indices=int32, ordered=0>


In [19]:
codes = d.indices
dict_values = d.dictionary

In [20]:
d2 = pa.DictionaryArray.from_arrays(codes, dict_values)
print("codes_alias", buf_addrs(codes) == buf_addrs(d2.indices))
print("dict_alias", buf_addrs(dict_values) == buf_addrs(d2.dictionary))

codes_alias True
dict_alias True


In [21]:
one = pa.scalar(1, type=pa.int32())
zero = pa.scalar(0, type=pa.int32())

In [22]:
mask = pc.equal(pc.bit_wise_and(t["id"], one), zero)
idx = pc.indices_nonzero(mask)

In [23]:
logical = (t, idx)  # table + selection vector
phys = t.take(idx)  # materialized compact result

In [24]:
print("logical_indices_type", idx.type)
show_aliasing("filter_materialize_take", base, phys)

logical_indices_type uint64
filter_materialize_take
id aliased_chunks= 0 after_chunks= 1
s aliased_chunks= 0 after_chunks= 1
x aliased_chunks= 0 after_chunks= 1


In [25]:
perm = pc.sort_indices(t, sort_keys=[("s", "ascending"), ("id", "descending")])
sorted_t = t.take(perm)

In [26]:
print("perm_type", perm.type)
show_aliasing("sort_materialize_take", base, sorted_t)

perm_type uint64
sort_materialize_take
id aliased_chunks= 0 after_chunks= 1
s aliased_chunks= 0 after_chunks= 1
x aliased_chunks= 0 after_chunks= 1


In [27]:
left = pa.table({"k": [1, 2, 3], "lv": ["a", "b", "c"]})
right = pa.table({"k": [2, 3, 4], "rv": ["B", "C", "D"]})

In [28]:
j = left.join(right, keys="k")
print(j)

pyarrow.Table
k: int64
lv: string
rv: string
----
k: [[2,3,1]]
lv: [["b","c","a"]]
rv: [["B","C",null]]


In [29]:
g = t.group_by("s").aggregate([("x", "sum"), ("id", "count")])
print(g)

pyarrow.Table
s: string
x_sum: int64
id_count: int64
----
s: [["v0","v1","v2"]]
x_sum: [[180,120,150]]
id_count: [[4,3,3]]


In [30]:
t1 = t.slice(0, 5)
t2 = t.slice(5, 5)

In [31]:
cat = pa.concat_tables([t1, t2], promote_options="none")
print([len(t1["id"].chunks), len(t2["id"].chunks), len(cat["id"].chunks)])

[1, 1, 2]


In [32]:
compact = cat.combine_chunks()
print([len(cat["id"].chunks), len(compact["id"].chunks)])

[2, 1]


In [33]:
sl = t.slice(1, 3)  # zero-copy buffers, new metadata object :contentReference[oaicite:12]{index=12}
idx = pc.indices_nonzero(pc.equal(t["id"], 3))  # allocates indices buffer, no base buffer rewrite

In [34]:
logical = (t, perm)      # logical sorted view
physical = t.take(perm)  # physical sorted table