# Exploring LakeFS with PySpark

This uses the [Everything Bagel](https://github.com/treeverse/lakeFS/tree/master/deployments/compose) Docker Compose environment.

[@rmoff](https://twitter.com/rmoff/) 

## Setup

### Install libraries

(could be built into the `Dockerfile`)

In [1]:
import sys
!{sys.executable} -m pip install lakefs_client

Collecting lakefs_client
  Downloading lakefs_client-0.98.0-py3-none-any.whl (312 kB)
     |████████████████████████████████| 312 kB 3.4 MB/s            
Installing collected packages: lakefs-client
Successfully installed lakefs-client-0.98.0


In [2]:
!{sys.executable} -m pip install deltalake

Collecting deltalake
  Downloading deltalake-0.8.1-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (17.3 MB)
     |████████████████████████████████| 17.3 MB 27.8 MB/s            
[?25hCollecting pyarrow>=7
  Downloading pyarrow-11.0.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (32.7 MB)
     |████████████████████████████████| 32.7 MB 9.5 MB/s             
Installing collected packages: pyarrow, deltalake
  Attempting uninstall: pyarrow
    Found existing installation: pyarrow 6.0.1
    Uninstalling pyarrow-6.0.1:
      Successfully uninstalled pyarrow-6.0.1
Successfully installed deltalake-0.8.1 pyarrow-11.0.0


In [3]:
import sys
print("Kernel:", sys.executable)
print("Python version:", sys.version)

import pyspark
print("PySpark version:", pyspark.__version__)


Kernel: /opt/conda/bin/python
Python version: 3.9.7 | packaged by conda-forge | (default, Oct 10 2021, 15:08:54) 
[GCC 9.4.0]
PySpark version: 3.2.0


###  Spark

_With the necessary Delta Lake config too_

In [1]:
from pyspark.context import SparkContext
from pyspark import SparkFiles
from pyspark.sql.session import SparkSession
# sc = SparkContext('local[*]')
spark = (
    SparkSession.builder.master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
    .getOrCreate()
)        

#### Test delta

In [2]:
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

AnalysisException: Cannot write to already existent path file:/tmp/delta-table without setting OVERWRITE = 'true'.

In [None]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

### LakeFS

In [None]:
import lakefs_client
from lakefs_client import models
from lakefs_client.client import LakeFSClient
from lakefs_client.api import branches_api
from lakefs_client.api import commits_api

# lakeFS credentials and endpoint
configuration = lakefs_client.Configuration()
configuration.username = 'AKIA-EXAMPLE-KEY'
configuration.password = 'EXAMPLE-SECRET'
configuration.host = 'http://lakefs:8000'

client = LakeFSClient(configuration)
api_client = lakefs_client.ApiClient(configuration)

#### List the current branches in the repository

https://pydocs.lakefs.io/docs/BranchesApi.html#list_branches

In [None]:
repo='example'

In [None]:
for b in client.branches.list_branches(repo).results:
    display(b.id)

In [None]:
# The sample parquet file is Apache 2.0 licensed so perhaps include it in the Everything Bagel distribution? 
url='https://github.com/Teradata/kylo/blob/master/samples/sample-data/parquet/userdata1.parquet?raw=true'
sc.addFile(url)
df = spark.read.parquet("file://" + SparkFiles.get("userdata1.parquet"))

How many rows of data?

In [None]:
display(df.count())

What does the data look like?

In [None]:
display(df.show(n=1,vertical=True))

## Write data to S3 (on the `main` branch)

N.B. the connection to s3a is configured in the Docker Compose's `./etc/hive-site.xml` file. 

In [None]:
branch='main'

In [None]:
df.write.mode('overwrite').parquet('s3a://'+repo+'/'+branch+'/demo/users')

### The data as seen from LakeFS

https://pydocs.lakefs.io/docs/ObjectsApi.html#list_objects

Note the `physical_address` and its match in the S3 output in the next step

In [None]:
client.objects.list_objects(repo,branch).results

In [None]:
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

### The data as seen from S3

In [None]:
from pyspark.context import SparkContext
from pyspark import SparkFiles
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

In [None]:
for o in s3.Bucket(repo).objects.all():
    print(o.last_modified, o.key, o.size)

### List diff of branch in LakeFS (this is kinda like a `git status`)

https://pydocs.lakefs.io/docs/BranchesApi.html#diff_branch

_Note that the files show **`'type': 'added'`**_

In [None]:
api_instance = branches_api.BranchesApi(api_client)

api_response = api_instance.diff_branch(repo, branch)
if api_response.pagination.results==0:
    display("Nothing to commit")
else:
    for r in api_response.results:
        display(r)

### Commit the new file in `main`

https://pydocs.lakefs.io/docs/CommitsApi.html#commit

In [None]:
from lakefs_client.api import commits_api
from lakefs_client.model.commit import Commit
from lakefs_client.model.commit_creation import CommitCreation

api_instance = commits_api.CommitsApi(api_client)
commit_creation = CommitCreation(
    message="Everything Bagel - commit users data (original)",
    metadata={
        "foo": "bar",
    }
) 

api_instance.commit(repo, branch, commit_creation)

### List branch status again - nothing returned means that there is nothing uncommitted

In [None]:
api_instance = branches_api.BranchesApi(api_client)

api_response = api_instance.diff_branch(repo, branch)
if api_response.pagination.results==0:
    display("Nothing to commit")
else:
    for r in api_response.results:
        display(r)

_Similar to a `git status` showing `Your branch is up to date with 'main'` / `nothing to commit, working tree clean`_

## Create a branch

https://pydocs.lakefs.io/docs/BranchesApi.html#create_branch

**TODO** Show that there's no additional object created on object store (http://localhost:9001/buckets/example/browse login `minioadmin`/`minioadmin`)

In [None]:
branch='add_more_user_data'

In [None]:
from lakefs_client.model.branch_creation import BranchCreation

api_instance = branches_api.BranchesApi(api_client)
branch_creation = BranchCreation(
    name=branch,
    source="main",
) 

api_response = api_instance.create_branch(repo, branch_creation)
display(api_response)

### List the current branches in the `example` repository

https://pydocs.lakefs.io/docs/BranchesApi.html#list_branches

In [None]:
for b in client.branches.list_branches(repo).results:
    display(b.id)

## Confirm that you can see the same data on the new branch

In [None]:
xform_df = spark.read.parquet('s3a://'+repo+'/'+branch+'/demo/users')

How many rows of data?

In [None]:
display(xform_df.count())

What does the data look like?

In [None]:
display(xform_df.show(n=1,vertical=True))

### Note that on S3 there is still just the original 78k object - we've not duplicated any data for the new branch

In [None]:
for o in s3.Bucket(repo).objects.all():
    print(o.last_modified, o.key, o.size)

## Add some new data

In [None]:
# The sample parquet file is Apache 2.0 licensed so perhaps include it in the Everything Bagel distribution? 
url='https://github.com/Teradata/kylo/blob/master/samples/sample-data/parquet/userdata2.parquet?raw=true'
sc.addFile(url)
df = spark.read.parquet("file://" + SparkFiles.get("userdata2.parquet"))

In [None]:
df.show(n=1,vertical=True)

## Write the data to the new branch and commit it

In [None]:
df.write.mode('append').parquet('s3a://'+repo+'/'+branch+'/demo/users')

LakeFS sees that there is an uncommited change

In [None]:
api_instance = branches_api.BranchesApi(api_client)

api_response = api_instance.diff_branch(repo, branch)
if api_response.pagination.results==0:
    display("Nothing to commit")
else:
    for r in api_response.results:
        display(r)

Commit it

In [None]:
from lakefs_client.api import commits_api
from lakefs_client.model.commit import Commit
from lakefs_client.model.commit_creation import CommitCreation

api_instance = commits_api.CommitsApi(api_client)
commit_creation = CommitCreation(
    message="Everything Bagel - add more user data",
    metadata={
        "foo": "bar",
    }
) 

api_instance.commit(repo, branch, commit_creation)

## Re-read `main` and `add_more_user_data` branches and count rows

Original branch (`main`):

In [None]:
main = spark.read.parquet('s3a://'+repo+'/main/demo/users')
display(main.count())

New branch (`add_more_user_data`):

In [None]:
add_more_user_data = spark.read.parquet('s3a://'+repo+'/add_more_user_data/demo/users')
display(add_more_user_data.count())

### Look at the view in LakeFS

#### `main`

In [None]:
client.objects.list_objects(repo,'main').results

#### `add_more_user_data`

In [None]:
client.objects.list_objects(repo,'add_more_user_data').results

### The data as seen from S3

Note that there are just two 78k files; there is no duplication of data shared by branches.

In [None]:
for o in s3.Bucket(repo).objects.all():
    print(o.last_modified, o.key, o.size)

## Create a new branch and test removing some data

In [None]:
branch='remove_pii'

In [None]:
from lakefs_client.model.branch_creation import BranchCreation

api_instance = branches_api.BranchesApi(api_client)
branch_creation = BranchCreation(
    name=branch,
    source="main",
) 

api_response = api_instance.create_branch(repo, branch_creation)
display(api_response)

### List the current branches in the `example` repository

https://pydocs.lakefs.io/docs/BranchesApi.html#list_branches

In [None]:
for b in client.branches.list_branches(repo).results:
    display(b.id)

### Confirm that you can see the same data on the new branch

In [None]:
xform_df = spark.read.parquet('s3a://'+repo+'/'+branch+'/demo/users')

How many rows of data? 

_Note that this shows 1000 per `main`, and not 2000 per the `add_more_user_data` branch above since this has not been merged to `main`_

In [None]:
display(xform_df.count())

If you are reading and write a file from the same place, you need to use `.cache()` otherwise the write will fail with an error like this: 

```
Caused by: java.io.FileNotFoundException: 
No such file or directory: s3a://example/remove_pii/demo/users/part-00000-7a0bbe79-a3e2-4355-984e-bd8b950a4e0c-c000.snappy.parquet
```

[solution src](https://stackoverflow.com/a/65330116/350613)

### Transform the data

In [None]:
df2=xform_df.drop('ip_address','birthdate','salary','email').cache()
# You need to do something to access the DF otherwise the `cache()` won't have any effect
df2.show(n=1,vertical=True)

### Write data back to the branch

In [None]:
df2.write.mode('overwrite').parquet('s3a://'+repo+'/'+branch+'/demo/users')

### Commit changes

In [None]:
api_instance = commits_api.CommitsApi(api_client)
commit_creation = CommitCreation(
    message="Remove PII",
) 

api_instance.commit(repo, branch, commit_creation)

## Re-read all branches and inspect data for isolation

Original branch (`main`):

In [None]:
main = spark.read.parquet('s3a://'+repo+'/main/demo/users')
display(main.count())
main.show(n=1,vertical=True)

New branch (`add_more_user_data`):

In [None]:
add_more_user_data = spark.read.parquet('s3a://'+repo+'/add_more_user_data/demo/users')
display(add_more_user_data.count())
add_more_user_data.show(n=1,vertical=True)

New branch (`remove_pii`):

In [None]:
remove_pii = spark.read.parquet('s3a://'+repo+'/remove_pii/demo/users')
display(remove_pii.count())
remove_pii.show(n=1,vertical=True)

### Look at the view in LakeFS

#### `main`

In [None]:
client.objects.list_objects(repo,'main').results

#### `add_more_user_data`

In [None]:
client.objects.list_objects(repo,'add_more_user_data').results

#### `remove_pii`

In [None]:
client.objects.list_objects(repo,'remove_pii').results

## Merge `remove_pii` into `main`

In [None]:
client.refs.merge_into_branch(repository=repo, source_ref='remove_pii', destination_branch='main')

Original branch (`main`):

In [None]:
main = spark.read.parquet('s3a://'+repo+'/main/demo/users')
display(main.count())
main.show(n=1,vertical=True)