In [1]:
include("setup.jl");

[32m[1m  Activating[22m[39m project at `~/docs/JuliaCon/2023/Data`
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mPrecompiling DTables [20c56dc6-594c-4682-91cf-1d46875b1eba]


![](https://github.com/JuliaParallel/Dagger.jl/raw/master/docs/logo.jpg)

## Dagger for Data Processing

#### Written by: Julian Samaroo, Research Software Engineer at MIT's JuliaLab, maintainer of Dagger.jl

In [2]:
using Dagger, DTables

### What is Dagger?

- A Julia library for parallel computing
- Manages data movement and conversions
- Supports out-of-core data processing
- Has a Tables-compatible implementation: DTables.jl

### Why another table library (DTables.jl), and why Dagger?
- Dagger has a real model of data location and movement
- Distributed and multithreaded parallelism
- Wraps other tables to gain their advantages
- Out-of-core support for arbitrary file formats (CSV, Arrow, Parquet, etc.)
- Automatic swap-to-disk for big data processing
- Tables.jl and DataFrames.jl compatible

I'm going to show you how to use Dagger and DTables for data processing through a set of examples.

In [2]:
# Generate a bunch of big Arrow files (much bigger than RAM), about 20GB total
path = "/home/jpsamaroo/docs/JuliaCon/2023/Data/data"
if !isdir(path)
    mkdir(path)
    i = 120
    sz = 0
    while true
        file = joinpath(path, "$(lpad(repr(i), 5, '0')).arrow")
        tbl = (;a=rand(1:10, 10_000_000), b=rand('a':'d', 10_000_000))
        Arrow.write(file, tbl)
        sz += Base.summarysize(tbl)
        println("At $file ($(Base.format_bytes(sz)))")
        if sz > 20 * (1024^3) # 20GB
            break
        end
        i += 1
    end
end

In [4]:
# ... and let's load them "into memory"
bigtbl = DTable([Dagger.File(joinpath(path, f);
                          serialize=Arrow.write,
                          deserialize=Arrow.Table,
                          use_io=true) for f in readdir(path)];
             tabletype=DataFrame)

DTable with 172 partitions
Tabletype: DataFrame

Notice:
- No problem loading everything into one table (just like JuliaDB)
- Loading is fast (really, it's lazy!)
- Data is partitioned by file (but this is flexible)

How it works:
- `Dagger.File` specifies files by path and how to read/write it (here, Arrow)
- `DTable` consumes these files lazily
- We'll see effect of `tabletype=DataFrame` soon!

We can do regular operations:

In [5]:
reduce(+, bigtbl; cols=[:a]) |> fetch # fetch because `reduce` returns a lazy result

(a = 9460058042,)

What's inside?
- A bunch of "partitions" or "chunks"
- Each chunk can be any table type

Let's see what's under the hood:

In [6]:
fetch(bigtbl.chunks[1]) # Let's get the first partition

Arrow.Table with 10000000 rows, 2 columns, and schema:
 :a  Int64
 :b  Char

In [7]:
@assert all(Tables.istable(fetch(c)) for c in bigtbl.chunks)
unique(map(c->typeof(fetch(c)), bigtbl.chunks))

1-element Vector{DataType}:
 Arrow.Table

Notice I don't ever call `DTable |> DataFrame`: that would attempt to fully materialize the table in memory, but this data is too big to fit!

N.B.:
- Out-of-core is still WIP
- Often exceeds memory limit; `--heap-size-hint` helps a bit
- Arrow is way better than CSV for out-of-core

To avoid some known issues, we'll use a smaller table for the rest of this talk:

In [3]:
tbl = DTable([Dagger.File(joinpath(path, f);
                          serialize=Arrow.write,
                          deserialize=Arrow.Table,
                          use_io=true) for f in readdir(path)[1:2]]; # Just load 2 files (~200MB)
             tabletype=DataFrame)

DTable with 2 partitions
Tabletype: DataFrame

`DTable`s are easy to transform:

In [4]:
tbl2 = map(row->(;a=row.a * 2), tbl)

DTable with 2 partitions
Tabletype: DataFrame

The `tabletype` chosen above specifies the output table type for new `DTable`s:

In [5]:
unique(map(c->typeof(fetch(c)), tbl2.chunks))

1-element Vector{DataType}:
 NamedTuple{(:a,), Tuple{Vector{Int64}}}

If the `DTable` is small enough, we can convert it to another table type:

In [7]:
tbl2 |> DataFrame

Row,a
Unnamed: 0_level_1,Int64
1,16
2,20
3,16
4,2
5,12
6,12
7,2
8,8
9,16
10,4


Joins work too:

In [8]:
df = DataFrame(a=collect(1:10), c=collect('a':'j'))
fetch(innerjoin(tbl2, df, on=:a))[1:10,:]

Row,a,c
Unnamed: 0_level_1,Int64,Char
1,2,b
2,2,b
3,8,h
4,4,d
5,10,j
6,2,b
7,10,j
8,2,b
9,4,d
10,2,b


In [9]:
df2 = DataFrame(a=collect(1:10), c=collect('a':'j'))
fetch(leftjoin(tbl2, df, on=:a))[1:10,:]

Row,a,c
Unnamed: 0_level_1,Int64,Char?
1,2,b
2,2,b
3,8,h
4,4,d
5,10,j
6,2,b
7,10,j
8,2,b
9,4,d
10,2,b


In [None]:
fetch(innerjoin(tbl, tbl2, on=:a))[1:10,:]

### Heterogeneous tables

Let's see how the DTable fares with more kinds of tables:
- Arrow
- CSV
- Parquet
- SQLite
- Julia Serialization (NamedTuple of Vectors)

In [10]:
# Let's generate some files...
tbl3 = DataFrame(tbl2)[1:10000,:]
tbl3 |> Arrow.write("tbl3.arrow")
tbl3 |> CSV.write("tbl3.csv")
Parquet2.writefile("tbl3.parquet", tbl3)
db = SQLite.DB("tbl3.sqlite"); tbl3 |> SQLite.load!(db, "tbl3")
serialize("tbl3.jls", tbl3 |> Tables.columntable)

In [11]:
# ...and read them back in!
sqlite_serialize(path::String, x) = SQLite.load!(SQLite.DB(path), x)
sqlite_deserialize(path::String) = DBInterface.execute(SQLite.DB(path), "SELECT * FROM tbl3") |> DataFrame
for (file, ser, des) in [
        ("tbl3.arrow", Arrow.write, Arrow.Table),
        ("tbl3.csv", CSV.write, CSV.File),
        ("tbl3.parquet", Parquet2.writefile, Parquet2.Dataset),
        ("tbl3.sqlite", sqlite_serialize, sqlite_deserialize),
        ("tbl3.jls", serialize, deserialize)
    ]
    println("Reading $file")
    tbl3_read = DTable([Dagger.File(file;
                                    serialize=ser,
                                    deserialize=des,
                                    use_io=false)])
    df = DataFrame(tbl3_read)
    println(df[1:10,:])
    @assert all(tbl3_read.a .== tbl3.a)
end

Reading tbl3.arrow
[1m10×1 DataFrame[0m
[1m Row [0m│[1m a     [0m
     │[90m Int64 [0m
─────┼───────
   1 │    16
   2 │    20
   3 │    16
   4 │     2
   5 │    12
   6 │    12
   7 │     2
   8 │     8
   9 │    16
  10 │     4
Reading tbl3.csv
[1m10×1 DataFrame[0m
[1m Row [0m│[1m a     [0m
     │[90m Int64 [0m
─────┼───────
   1 │    16
   2 │    20
   3 │    16
   4 │     2
   5 │    12
   6 │    12
   7 │     2
   8 │     8
   9 │    16
  10 │     4
Reading tbl3.parquet
[1m10×1 DataFrame[0m
[1m Row [0m│[1m a     [0m
     │[90m Int64 [0m
─────┼───────
   1 │    16
   2 │    20
   3 │    16
   4 │     2
   5 │    12
   6 │    12
   7 │     2
   8 │     8
   9 │    16
  10 │     4
Reading tbl3.sqlite
[1m10×1 DataFrame[0m
[1m Row [0m│[1m a     [0m
     │[90m Int64 [0m
─────┼───────
   1 │    16
   2 │    20
   3 │    16
   4 │     2
   5 │    12
   6 │    12
   7 │     2
   8 │     8
   9 │    16
  10 │     4
Reading tbl3.jls
[1m10×1 DataFrame[0m
[1

### What kinds of data can Dagger manage?

- Partitioned tables (`DTables.DTable`)
- Partitioned arrays (`Dagger.DArray`)
- Arbitrary in-memory data (`Dagger.Chunk`/`Dagger.Shard`)
- Arbitrary file data (`Dagger.File`)

### How does Dagger manage data on disk?

Everything goes through `Dagger.Chunk`! Let's look at how Dagger stores some in-memory data:

In [9]:
x = Dagger.@mutable 42 # Create a piece of data to inspect, get back a Chunk

Dagger.Chunk{Int64, MemPool.DRef, OSProc, ProcessScope}(Int64, UnitDomain(), MemPool.DRef(1, 9, 0x0000000000000008), OSProc(1), ProcessScope: worker == 1, false)

In [10]:
x.processor # Where the data lives (worker 1)

OSProc(1)

In [11]:
dump(x.handle) # The low-level remote handle to the data

MemPool.DRef
  owner: Int64 1
  id: Int64 9
  size: UInt64 0x0000000000000008


In [12]:
MemPool.poolget(x.handle) # How to ask MemPool for the data

42

### Powerful and Extensible

- Single implementation, but many abstractions
- Support for arbitrary file and data formats
- Distributed awareness
- Lazy loading and data movement
- Customizable data movement and conversion
- Customizable 9

### What comes next?
- In-place table operations
- Streaming data support
- GPU processing