# Welcome to pachypy

## Getting started

First, let's create a `PrettyPachydermClient` instance, optimized for usage in a Jupyter Notebook. It inherits from [`PachydermClient`][1] and just adds some eye candy. If you're not in a Jupyter Notebook, you probably want to use [`PachydermClient`][1] directly.

[1]: https://pachypy.readthedocs.io/en/latest/apidoc/pachypy.html#pachypy.client.PachydermClient

In [None]:
import pachypy

client = pachypy.PrettyPachydermClient(
    # If not specified, host and port are inferred from the PACHD_ADDRESS
    # environment variable (like pachctl), or default to localhost:30650.
    host=None,  
    port=None,
    
    # Add image digests when creating/updating pipelines.
    # This forces new images to be used instead of cached ones,
    # even if image name and tag remain the same.
    add_image_digests=True,
    
    # Automatically build Docker images when creating/updating pipelines.
    # Only applies for pipelines that have the transform.dockerfile or
    # transform.dockerfile_path field set. Images will also be pushed to the registry.
    build_images=True,
    
    # Specify one or multiple glob patterns to find your pipeline specification files.
    # Pipeline specifications can be read from YAML and JSON files (see pipelines.yaml example).
    pipeline_spec_files=['../pipelines/*.yaml', '../more_pipelines/*.json'],
    
    # You can specify a custom pipeline spec transformer function,
    # which will be applied to every pipeline specification read.
    # For example, you could define a function that adds secrets to all pipelines without
    # having to specify them individually in every pipeline specification.
    pipeline_spec_transformer=add_secrets_to_pipeline,
    
    # pachypy shows you localized timestamps based on these arguments.
    # They are attempted to be inferred automatically by default,
    # but you can also specify them.
    user_timezone='Australia/Sydney',
    pachd_timezone='UTC',
)

pachypy uses a [`DockerClient`](https://docker-py.readthedocs.io/en/stable/client.html#docker.client.DockerClient) to build images, push them to registries and to retrieve image digests. By default, the `DockerClient` is initialized [from environment variables](https://docker-py.readthedocs.io/en/stable/client.html#docker.client.from_env), but you can always create your own instance and tell pachypy to use that one instead.

In [None]:
import docker

# This is not normally necessary if you have Docker running locally with default settings
client.docker_client = docker.DockerClient(
    base_url='tcp://12.34.56.78:1234',
    tls=True,
)

If you use Amazon ECR as a private container registry, pachypy has some functionality built in to simplify your workflow. It automatically retrieves an authorization token from AWS and logs your `DockerClient` in. It also retrieves image digests more efficiently by using the special AWS ECR API instead of the standard Docker API.

This is done using the [`boto3`](https://github.com/boto/boto3) package. By default, it [reads your AWS credentials](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#configuration) from `~/.aws/credentials` or environment variables. But you can also specify your AWS credentials manually, as shown below, although that is not recommended.

In [None]:
# You only have to do this if you have to manually specify AWS credentials
client.amazon_ecr.set_credentials(
    aws_access_key_id='<YOUR_KEY_ID>',
    aws_secret_access_key='<YOUR_SECRET_ACCESS_KEY>',
)

## Listing objects

When listing Pachyderm objects, pachypy allows you to use wildcard patterns to specify what you want to retrieve. Objects are matched against these patterns using [`fnmatch`](https://docs.python.org/3/library/fnmatch.html#fnmatch.fnmatch).

The `PrettyPachydermClient` also outputs tables with nice formatting and shows progress bars for larger operations.

In [None]:
# Get the last 20 jobs for all pipelines starting with 'example'
client.list_jobs('example*', n=20)

In [None]:
# List all pipelines starting with example*
client.list_pipelines('example*')

In [None]:
# List all repos starting with example*
client.list_repos('example*')

In [None]:
# List all CSV files in the staging branch of repos starting with 'example'
client.list_files('example*', branch='staging', glob='**.csv')

In [None]:
# List the last 10 commits in each repo starting with 'example'
client.list_commits('example*', n=10)

In [None]:
# List all datums for the job with ID 'abcd1234' (no wildcards here)
client.list_datums('abcd1234')

In [None]:
# Print user logs from the last jobs of all pipelines starting with 'example'
# in a nicely formatted way, grouped by pipeline and worker
client.get_logs('example*', last_job_only=True, user_only=True, master=False)

The list commands return an object that contains both the HTML output and the actual pandas DataFrame:

In [None]:
# Get the mean duration of the last 100 jobs of pipeline 'example1'
output = client.list_jobs('example1', n=100)
output.raw['Duration'].mean()

## Inspecting objects

In [None]:
# Inspect pipeline
client.inspect_pipeline('example')

In [None]:
# Inspect job
client.inspect_job('abcd1234')

In [None]:
# Inspect a datum for a job
client.inspect_datum('abcd1234', 'abcdefgh12345678')

## Manipulating objects

Wildcards can also be used in many of the manipulation functions for batch operations.

In [None]:
# Create one or multiple Pachyderm repositories
client.create_repos('example1')
client.create_repos(['example2', 'example3'])

In [None]:
# Delete all repositories starting with 'example'
client.delete_repos('example*')

# Delete all repositories starting with 'example' or 'test'
client.delete_repos(['example*', 'test*'])

In [None]:
# Mark a commit as branch 'staging' in the 'example1' repository
client.create_branch('example1', commit='abcd1234', branch='staging')

In [None]:
# Remove the 'staging' branch in the 'example1' repository
# (while keeping the referenced commit intact)
client.delete_branch('example', 'staging')

Before we get into creating pipelines, check out the [`pipelines.yaml`](https://github.com/itssimon/pachypy/blob/master/examples/pipelines.yaml) example file to see what pipeline specifications can look like when using pachypy.

In [None]:
# Create all pipelines starting with 'example'.
# This will go through the pipeline specification files specified above
# to find matching pipeline specs.
client.create_pipelines('example*')

# Like above, but deletes existing pipelines first before recreating them.
client.create_pipelines('example*', recreate=True)

In [None]:
# Update all pipelines starting with 'example'
client.update_pipelines('example*')

# Like above, but also reprocesses all the data (see Pachyderm docs)
client.update_pipelines('example*', reprocess=True)

In [None]:
# Delete all pipelines starting with 'example'
client.delete_pipelines('example*')

In [None]:
# Start and stop all pipelines starting with 'example'
client.start_pipelines('example*')
client.stop_pipelines('example*')

In [None]:
# Trigger a job for the pipeline 'example1' (no wildcards)
# by committing a timestamp file into its cron input repository.
client.trigger_pipeline('example1')

# Same as above, but block until the job has finished and results are available
client.trigger_pipeline('example1', flush=True)

In [None]:
# Delete job with ID 'abcd1234' (no wildcards)
client.delete_job('abcd1234')

## Getting files

In [None]:
# Download all files in the master branch of the 'example1' repository to the current directory
client.get_files('example1')

# Download all CSV files under '/some/path' in the 'example1' repository (master branch)
# to a local folder 'example_files'
client.get_files('example1', path='/some/path', glob='*.csv', destination='./example_files/')

In [None]:
# Retrieve the content of a single file as a string
# while automatically detecting the character encoding
client.get_file_content('example1', '/some/path/to/file.csv', encoding='auto')

# Retrieves the content of a single file as a bytes object
client.get_file_content('example1', '/some/path/to/image.png')

## Commits

pachypy uses a context manager for commits, so you don't have to worry about starting and finishing a commit manually. If an exception is raised and not caught during the commit it is automatically cancelled.

In [None]:
# Start a new commit in branch 'staging' of repo 'example1',
# delete file 'some_file' and upload a local file with the same name.
# The commit is automatically finished by the context manager.
with client.commit('example1', branch='staging') as commit:
    commit.delete_file('/some_file')
    commit.put_file('./local_folder/some_file')

In [None]:
# Start a new commit in the master branch of repo 'example1',
# upload multiple CSV files while keeping one level of the directory structure.
# Finish the commit and block until all jobs that may have been triggered by
# this commit have completed (flush).
with client.commit('example1', flush=True) as commit:
    commit.put_files('./local_folder/*/*.csv', keep_structure=True, base_path='./local_folder')