# Using `LakeFS` to Drive Value in Development

LakeFS is an essential tool for modern day development teams who are working with data lakes (S3, Azure Data Lake). LakeFS provides version control, backup, and workflow management soulutions that allow technical teams to: 
- `Experiment`: Safely experiment with copies of the production data lake without risking data lake contaimination 
- `Collaborate`:  Collaborate with other engineering teams on the development of engineering workflows

When working with a data lake without LakeFS engineering teams have the tough choice of: 
- Slow down development by prohibiting in-situ experimentation and testing with production data
- Digitally copy Data Lake data multiplying storage and hosting costs 
- Risk contaimination of the Datalake resulting in expensive rollback procedure, loss of newly generated data, & duplication of storage

![Image](https://lakefs.io/wp-content/uploads/2022/03/Share-image_1200x630-2.png)

LakeFS provides a highly scalable format agnostic zero copy operations that allow developers and engineering teams to manage their data like code. This demonstration will cover the following topics:

1. Configuration of the LakeFS Client / Overview of the LakeFS Admin UI 
2. Initializing repositories and creating new branches 
3. Adding data to branches 
    - Adding data, committing 
    - Version differencing
    - Merge operations
    - Version Tagging
4. Data Ops Cycle with LakeFS
5. Recovering from Production Data Loss 


### 1. Configure the LakeFS Client and Connect
----
In this section we'll demonstrate using the Python LakeFS API (`lakefs_client`) to interface with the LakeFS deployment. We'll instantiate an instance of the `LakeFSClient` object that allows us to communicate with and manipulate the state of the LakeFS instance using Python

For this demo we will be primarily using the Python interface but LakeFS has developed Sofware Development Kits (SDKs) for: 
- Python
- Java 
- goLang

These SDKs allow developers to programatically access and integrate with LakeFS frictionlessly. 


In [1]:
HOST = 'https://cosmic-bat.lakefs-demo.io'
USERNAME = "AKIAJP5F7GBGE7V6OKZQ"
PASSWORD = "AqInl1Ugb9tIAVVMHEQIKYaW0Feo3XhF7xiy4kgj"
REPO_NAME = 'demo-data'

In [2]:
# Import required libraries and change working directory
%cd "C:\Users\rskin\lakefs-demo"
import os
from pathlib import Path, PurePosixPath
import lakefs_client
from lakefs_client import models
from lakefs_client.client import LakeFSClient

C:\Users\rskin\lakefs-demo


In [3]:
# Configure the LakeFS client to connect to the service
configuration = lakefs_client.Configuration()
configuration.host = HOST
configuration.username = USERNAME
configuration.password = PASSWORD
client = LakeFSClient(configuration)

### 2. Initalize a new Repository and create a new branch
----

In this section we'll create a new repository `stock-data`. We'll then create a branch called `data-upload` that we'll use to load our first set of Exchange Traded Fund (ETF) data. This section will cover the following concepts: 
- Initializing a new repository
- Creating a new branch 
- Creating a protected branch 


Branches are used to create **isolated environments to perform data upload / experimentation**. This allows development teams to safely ingest data and test for data quality before releasing 

In [4]:
# Create a repository 
repo = models.RepositoryCreation(
    name= REPO_NAME, 
    storage_namespace='s3://treeverse-demo-lakefs-storage-production/user_cosmic-bat/demo-data', 
    default_branch='main')
client.repositories.create_repository(repo)

{'creation_date': 1650366765,
 'default_branch': 'main',
 'id': 'demo-data',
 'storage_namespace': 's3://treeverse-demo-lakefs-storage-production/user_cosmic-bat/demo-data'}

In [5]:
""" 
Creating a new branch from the latest commit (main) named data-upload. Creating a new branch allows developers/data engineers to 
easily track changes between branches. 
"""
client.branches.create_branch(
    repository=REPO_NAME, 
    branch_creation=models.BranchCreation(name='data-upload', source='main')
)

'57d8bb999de708da4b3a3be049496b246ed6d96c68e516343fe2bce9923a3c7c'

### 3. Adding Data to branches

----


We've create two different helper functions to `upload_data` and `upload_dir` which will upload the contents of a single file or directory respectively. These two functions will be used to upload all of the ETF data inside of our `stock-data` directory. 

Once the data is uploaded we'll verify that the data has been loaded to the branch, check uncommited changes to verify we've uploaded the data we want, and commit the change



In [6]:
def upload_data(branch:str,fname:str, lfs_client:LakeFSClient = client, repository:str = REPO_NAME):
    """Add data to the specified LakeFS Repositry / Branch"""
    with open(fname, 'r', encoding='utf8', errors='ignore') as f:
        client.objects.upload_object(repository=repository, branch=branch, path=fname,content=f)

def broken_upload_dir(directory:str, branch:str,  repository:str = REPO_NAME):
    """Upload all files in a directory to LakeFS """
    directory = Path(directory)
    dummy_counter = 0
    for filename in os.listdir(directory):
        if dummy_counter > 20: 
            break
        path = os.path.join(directory / filename)
        path = str(path)
        upload_data(branch='data-upload', fname= path)
        dummy_counter += 1

In [None]:
# Upload data with a broken path
broken_upload_dir('stock-data/ETFs', 'data-upload')

In [7]:
def fixed_upload_dir(directory:str, branch:str,  repository:str = REPO_NAME):
    """Upload all files in a directory to LakeFS """
    directory = Path(directory)
    dummy_counter = 0 
    for filename in os.listdir(directory):
        if dummy_counter > 20: 
            break
        path = PurePosixPath(directory / filename)
        path = str(path)
        upload_data(branch='data-upload', fname= path)
        dummy_counter += 1 

In [8]:
fixed_upload_dir('stock-data/ETFs', 'data-upload')

Now that we have a Exchange Traded Fund data loaded we'll have our development teams add the Stock data to the `data-upload` branch. 

In [10]:
# Uploadging the remaining stock data
fixed_upload_dir('stock-data/Stocks','data-upload')

# Programatically commiting and merging the branch 
client.commits.commit(
    repository = REPO_NAME,
    branch='data-upload',
    commit_creation={
        "message":"Added stock data", 
        "metadata":{
            'type':'data-upload'
            }
        }
    )

{'committer': 'admin',
 'creation_date': 1650366812,
 'id': '6a650f0166480c10faac7bb828b65a08dfccaed75337e160d3442d1db97a6720',
 'message': 'Added stock data',
 'meta_range_id': '',
 'metadata': {'type': 'data-upload'},
 'parents': ['9a9b15e690d8457ef3653fe54dfe64c197a77dfac013f97e7582ed642b6821fa']}

We've loaded the ETF data to our `data-upload` branch and also added the Stock data to our `data-upload` branch, its now time to merge the `data-upload` branch into our production branch `main`. 

By adding the data to the `data-upload` branch we can ensure that the production release is updated atomically, simplifying the data lineage, ensuring that all the data is added, and protecting the main production release from only having half the data loaded

##  3. Demonstrating S3 API Consistency
----

In this section we'll demonstrate using the most Python S3 interface (`boto3`) to programatically access the object storage and metadata layers of our LakeFS storage. This will allow teams to access, manipulate, and download storage using a common API, only changing the target buckets and authentication. LakeFS has been designed to work with the S3 interfaces, allowing engineering and end-user teams to work with familiar APIs without re-engineering data uploading / data connection processes respectively. Other applications that use the S3 API include but aren't limited to: 
 - Spark 
 - Kafka 
 - Hadoop
 - Hudi


![LakeFS Architecture](https://docs.lakefs.io/assets/img/arch.png)



In [11]:
import boto3
s3 = boto3.client(
    's3', 
    endpoint_url = HOST, 
    aws_access_key_id = USERNAME, 
    aws_secret_access_key = PASSWORD
    )

In [12]:
# Recover the latest meta data for the main branch
list_resp = s3.list_objects(Bucket=REPO_NAME, Prefix='main/')
list_resp['Contents'][0]

{'Key': 'main/stock-data/ETFs/aadr.us.txt',
 'LastModified': datetime.datetime(2022, 4, 19, 11, 12, 55, 639000, tzinfo=tzutc()),
 'ETag': '"dfcd8314aabce0eadc2362b663c4327d"',
 'Size': 70908,
 'StorageClass': 'STANDARD'}

In [14]:
list_resp = s3.list_objects(Bucket=REPO_NAME, Prefix='latest-tag/')
list_resp['Contents'][0]

{'Key': 'latest-tag/stock-data/ETFs/aadr.us.txt',
 'LastModified': datetime.datetime(2022, 4, 19, 11, 12, 55, 639000, tzinfo=tzutc()),
 'ETag': '"dfcd8314aabce0eadc2362b663c4327d"',
 'Size': 70908,
 'StorageClass': 'STANDARD'}

## 4. Recovering from Production Data Loss
----

LakeFS is an essential tool in preventing data loss within Data Lakes. In this section we'll demonstrate the following: 
 - Recovering deletion of production of data by reverting an uncommited change
 - Recovering production data status of a commited change by reverting to the last stable commit
 - Preventing the loss of production data by setting up a protected branch


In [17]:
def delete_production():
    """Delete all the data on the main branch, creating a loss of production data situation""" 
    objects = s3.list_objects(Bucket=REPO_NAME, Prefix='main/')
    for obj in objects['Contents']: 
        s3.delete_object(Bucket=REPO_NAME, Key=obj['Key'])



def really_delete_production():
    """Delete all the data on the main (production) branch and commit the change"""
    objects = s3.list_objects(Bucket=REPO_NAME, Prefix='main/')
    for obj in objects['Contents']: 
        s3.delete_object(Bucket=REPO_NAME, Key=obj['Key'])
    
    client.commits.commit(
        repository=REPO_NAME, 
        branch='main',
        commit_creation={'message':"Removing data from the production branch"})


In [18]:
# Delete the production environment without committing. 
# We'll then use the LakeFS UI to revert the uncommitted changes
delete_production()

In [19]:
# Delete and commit, replicating encountering a commit without data 
really_delete_production()

In [20]:
# Reverting the previous commit to restore production data with no-copy
from lakefs_client.model.revert_creation import RevertCreation
revert_creation = RevertCreation(
    ref='0e7223c8f343112d1a242b5674bb82ceb891b9875db4fabac7fe358e7c0ec4ea',
    parent_number=1
    )

client.branches.revert_branch(
    repository=REPO_NAME, 
    branch='main', 
    revert_creation=revert_creation
)

Without LakeFS's branching and commit system we would have the following options: 
- Rerun all pipelines / implement major engineering effort to ensure data lake is restored successfully 
- Significant recovery cost from recovery S3 buckets with extended recovery time
    - Also increased storage cost for duplicated data 
- Accept the data loss

LakeFS' commit and branching system allow the engineering teams to effortlessly recover from disastarous production data loss

### Preventing the loss of Production Data 

To prevent disruption to the business, we can define protected branches using LakeFS. A protected branch cannot be written into / deleted from directly, and can only be edited via a merge request. This allows the development teams to create protected branches that cannot be edited by end-users, preserving the integrity of the DataLake.

No other service allows the explicit protection of data lake objects without redundant duplication.

In [21]:
from lakefs_client.model.branch_protection_rule import BranchProtectionRule

branch_protection_rule = BranchProtectionRule(pattern="main")
client.repositories.create_branch_protection_rule(
    repository=REPO_NAME,
    branch_protection_rule=branch_protection_rule
    )

We'll attempt to run our `really_delete_production` function, but will be blocked by the branch protection rule we just created

In [22]:
try: 
    really_delete_production()
except Exception as e: 
    print("encountered an error attempting to delete production data due to protected branch")

encountered an error attempting to delete production data due to protected branch


## 5. Replicate DataLake at time of Error
----

One key feature of LakeFS is its ability to allow teams to "time travel" through the data lake, and access what the state of the datalake was at a given point. This is critical in allowing development teams to isolate and stabilize a particular branch while hunting a bug. 

In this example we'll demonstrate pulling data from specific commit reference IDs and show how LakeFS enables users to quickly get a copy of what the state of the datalake was at a given time. 

In [None]:
import shutil

def load_data_at_ref(ref:str):
    try: 
        shutil.rmtree('C:/Users/rskin/lakefs-demo/isolated-environment')
    except Exception: 
        pass
    ref = ref + '/'
    objects = s3.list_objects(Bucket=REPO_NAME, Prefix=ref)
    for obj in objects['Contents']: 
        fname = Path('isolated-environment',obj['Key'])
        dir = os.path.dirname(fname)
        if not os.path.isdir(dir):
            os.makedirs(os.path.abspath(dir))
        with open(fname, 'wb') as f: 
            s3.download_fileobj(REPO_NAME,obj['Key'],  f)


In [None]:
load_data_at_ref('128160ce76f70c2797c178d2a0b0fcec41b56a92cbc44f7e50ca1b219cc5aa98')

This feature is a critical enabler in large data lakes, allowing engineers to test functionality, test issues with deployment pipelines, and experiment on the exact state of the data at a given commit. 

Without this functionality engineering teams would need to develop custom pipelines that would generate the expected state of the data (if even feasible, depending on the statefulness of the pipelines). With LakeFS getting a perfect copy of the data lake is a trivial operation. 

# Experimenting and Creating New Feature Branches

In this section we'll demonstrate how data engineering teams can use LakeFS to test development directly on production data without risking damage to the production release. 

We'll be converting data from an inefficient format (.txt) into a more scalable, data-lake friendly format (.parquet)

**Directly editing on the `/parquet-conversion` branch**
![Directly Editing](deploy-photos\directly-editing.drawio.png)

We'll do this operation in place on an exactl replica of the production data by leveraging the LakeFS branching model and copy-on-write paradigms to create an exact duplicate of the data in `/main` without risking the production data or creating duplicated storage.

In [141]:
# Create a new client
from lakefs_client.model.branch_creation import BranchCreation
import io
import pandas as pd

branch_creation = BranchCreation(
    name = 'parquet-conversion',
    source='main'
)
try: 
    client.branches.create_branch(REPO_NAME, branch_creation)
except Exception:
    pass

In [None]:
def parquet_reformat(): 
    # List all bucket objs 
    bucket_objs = s3.list_objects(Bucket=REPO_NAME, Prefix='parquet-conversion/')

    for obj in bucket_objs['Contents']: 
        key = obj['Key']
        # Fetch the S3 / read the S3 response into filestream
        s3_response = s3.get_object(Bucket=REPO_NAME, Key = key)
        content = s3_response['Body'].read()
        df = pd.read_csv(io.BytesIO(content))

        # Put the new parquet file back
        s3.put_object(Body=df.to_parquet(), Bucket=REPO_NAME, Key=key)

In [None]:
parquet_reformat()

In [142]:
def fixed_parquet_reformat():
    # List all bucket objs 
    bucket_objs = s3.list_objects(Bucket=REPO_NAME, Prefix='parquet-conversion/')

    for obj in bucket_objs['Contents']: 
        key = obj['Key']
        
        # Fetch the S3 / read the S3 response into filestream
        s3_response = s3.get_object(Bucket=REPO_NAME, Key = key)
        content = s3_response['Body'].read()
        df = pd.read_csv(io.BytesIO(content))
        
        # Generate new key name
        basename = os.path.basename(key)
        dirname = os.path.dirname(key)
        basename = basename.replace('.txt', '.parquet')
        parquet_key = PurePosixPath(dirname, basename)

        # Put the new parquet file back
        s3.put_object(
            Body=df.to_parquet(), 
            Bucket=REPO_NAME, 
            Key=str(parquet_key))
        
        # Delete the old text files 
        s3.delete_object(Bucket= REPO_NAME, Key=key)



In [143]:
fixed_parquet_reformat()

### Alternative 1: Duplicate the Object Storage
Without the LakeFS branching models, we can directly develop and test our conversion function on Production data by duplicating the object storage and manipulating the duplicated object storage. 

![duplicating storage](deploy-photos\duplicating.drawio.png)

**Risks:** 
- Expensive 
    - Duplicated storage cost
    - 3x read/write costs
    - Expensive compute pipeline for large lakes
- Time consuming for large lakes 
- Requires Data Engineering effort to implement for larger buckets

### Alternative 2: Develop and Test using Representative Subset

The second alternative is to develop and test the function (`parquet_conversion`) using a representative sample of the data-lake. This method involves sampling from the data lake and duplicating a small portion to test the function on before applying the function to the production dataset


![Representitive Subset](deploy-photos\representitive-subset.drawio.png)


This alternative avoids the expensive copy and read/write operations by creating a small representative dataset. This method still presents significant development risks

**Risks**
- Development effort required to generate representative data 
- Risk missing important data structures / edge conditions with representation 
- At the end of the pipeline the function must directly edit the base datalake 
    - Increase V&V criticality / extensiveness of testing and development