# Distributed Writes with Lance

Lance provides low-level APIs which can be integrated with distributed frameworks.
In this tutorial, we'll show how to use with PySpark, but the same principles apply
to other frameworks.

## Creating a new dataset

In [1]:
import pyspark
import lance
import pyarrow as pa
import pickle

# Create a SparkSession
spark = pyspark.sql.SparkSession.builder.getOrCreate()

# Create a Spark DataFrame
df = spark.createDataFrame(
    [ (i, f"value_{i}") for i in range(100) ],
    ("id", "value"),
)
df = df.repartition(10)
df.head(5)

23/08/22 15:13:59 WARN Utils: Your hostname, Wills-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.13 instead (on interface en0)
23/08/22 15:13:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/22 15:14:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

[Row(id=10, value='value_10'),
 Row(id=2, value='value_2'),
 Row(id=21, value='value_21'),
 Row(id=35, value='value_35'),
 Row(id=24, value='value_24')]

In [2]:
dataset_uri = "my_dataset"

def write_fragment(data_iter):
    table = pa.Table.from_batches(list(data_iter))
    fragment_meta = lance.fragment.LanceFragment.create(
        dataset_uri,
        table,
    )
    data = pickle.dumps(fragment_meta)
    yield pa.RecordBatch.from_arrays([pa.array([data])], ["fragment_meta"])

# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.mapInArrow.html
write_results = df.mapInArrow(write_fragment, schema="fragment_meta binary").collect()

                                                                                

In [3]:
fragments = [pickle.loads(row.fragment_meta)
             for row in write_results]
fragments

[Fragment { id: 0, files: [DataFile { path: "4f40324c-70e1-42e3-93b5-77e30d4073cf.lance", fields: [0, 1] }], deletion_file: None },
 Fragment { id: 0, files: [DataFile { path: "ecc2ee2d-2eaf-48ab-8124-76fc90c2f8fa.lance", fields: [0, 1] }], deletion_file: None },
 Fragment { id: 0, files: [DataFile { path: "a4a2d1c1-df46-42b1-805d-786a5a9d04e4.lance", fields: [0, 1] }], deletion_file: None },
 Fragment { id: 0, files: [DataFile { path: "53d501f1-4137-42d0-9240-ed8da9c0e1f3.lance", fields: [0, 1] }], deletion_file: None },
 Fragment { id: 0, files: [DataFile { path: "1e34e06a-d4f1-4344-b172-a35a21ab84ec.lance", fields: [0, 1] }], deletion_file: None },
 Fragment { id: 0, files: [DataFile { path: "221d6cf3-70e4-4821-8f95-c0353d62e49e.lance", fields: [0, 1] }], deletion_file: None },
 Fragment { id: 0, files: [DataFile { path: "41fd6c30-a6b4-4743-9142-c904d799abd4.lance", fields: [0, 1] }], deletion_file: None },
 Fragment { id: 0, files: [DataFile { path: "280b155a-61ef-4cb0-b944-50ef83a

In [4]:
schema = pa.schema([
    pa.field("id", pa.int64()),
    pa.field("value", pa.string()),
])
dataset = lance.LanceDataset._commit(
    dataset_uri,
    schema,
    fragments,
    mode="create"
)
dataset.to_table().to_pandas()

Unnamed: 0,id,value
0,10,value_10
1,2,value_2
2,21,value_21
3,35,value_35
4,24,value_24
...,...,...
95,45,value_45
96,52,value_52
97,67,value_67
98,83,value_83


## Appending to the dataset

To append to a dataset, you can extend the fragments list with new fragments.
Write the new fragments just as before, then assign them new ids, and finally
call commit with the combined list of fragments.

In [5]:
def dataset_append(dataset, df):
    write_results = df.mapInArrow(write_fragment, schema="fragment_meta binary").collect()
    new_fragments = [pickle.loads(row.fragment_meta)
                     for row in write_results]
    fragments = [fragment.metadata for fragment in dataset.get_fragments()]

    # Need to assign new fragment ids
    max_fragment_id = max(fragment.id for fragment in fragments)
    for fragment in new_fragments:
        max_fragment_id += 1
        fragment.id = max_fragment_id
    
    fragments.extend(new_fragments)

    return lance.LanceDataset._commit(
        dataset.uri,
        dataset.schema,
        fragments
    )

dataset = dataset_append(dataset, df)
dataset.to_table().to_pandas()

Unnamed: 0,id,value
0,10,value_10
1,2,value_2
2,21,value_21
3,35,value_35
4,24,value_24
...,...,...
195,45,value_45
196,52,value_52
197,67,value_67
198,83,value_83


## Adding a new column

One operation that is unique to Lance is the ability to add a new column to an
existing table without rewriting the entire table. This can even be done in a
distributed fashion.

In [6]:
df = spark.createDataFrame(
    [(fragment.metadata.id,) for fragment in dataset.get_fragments()],
    ("id",),
)
# Put each fragment in it's own partition
df = df.repartition('id')

In [7]:
# Function to run for each chunk of the table to generate the new column(s)
def add_column(batch: pa.RecordBatch) -> pa.RecordBatch:
    import pyarrow.compute as pc
    array = pc.multiply(batch.column(0), 2)
    return pa.RecordBatch.from_arrays([array], names=["double_id"])

def add_double_id(data_iter):
    # Get the fragment from the id
    row = list(data_iter)[0]
    fragment_id = row["id"][0].as_py()
    fragment = lance.fragment.LanceFragment(dataset, fragment_id)

    # Call `add_columns()` on the fragment
    fragment_meta = fragment.add_columns(add_column, columns=['id'])

    # Save the fragment metadata
    data = pickle.dumps(fragment_meta)
    yield pa.RecordBatch.from_arrays([pa.array([data])], ["fragment_meta"])

write_results = df.mapInArrow(add_double_id, schema="fragment_meta binary").collect()
fragments = [pickle.loads(row.fragment_meta) for row in write_results]

dataset = lance.LanceDataset._commit(
    dataset.uri,
    dataset.schema, # This is flawed because of the schema argument
    fragments
)

dataset.to_table().to_pandas()

Unnamed: 0,id,value
0,10,value_10
1,2,value_2
2,21,value_21
3,35,value_35
4,24,value_24
5,46,value_46
6,55,value_55
7,61,value_61
8,73,value_73
9,87,value_87


## Retrying and idempotency

PySpark provides retries for failed tasks, but not all distributed frameworks
provide that automatically. But they are not aware of temporary files Lance might
create.