Skip to content

t0mer/py-zerobyte

Repository files navigation

Zerobyte SDK

A Python SDK for the Zerobyte API - Manage volumes, backups, snapshots, and more.

Installation

Install via pip:

pip install py-zerobyte

Or install from source:

git clone https://github.com/t0mer/py-zerobyte.git
cd py-zerobyte
pip install -e .

Quick Start

from py_zerobyte import ZerobyteClient

# Initialize the client with URL, username, and password
client = ZerobyteClient(
    url="http://localhost:4096",
    username="admin",
    password="your-password"
)

# The client automatically logs in on initialization
# Get current user information
user = client.auth.get_me()
print(f"Logged in as: {user['user']['username']}")

# List all volumes
volumes = client.volumes.list()
for volume in volumes:
    print(f"Volume: {volume['name']}")

# List repositories for a volume
repositories = client.repositories.list(volume_id=1)
for repo in repositories:
    print(f"Repository: {repo['name']}")

Features

  • Authentication: Register, login, logout, password management
  • Volumes: Create, update, delete, mount/unmount volumes
  • Repositories: Manage backup repositories
  • Snapshots: List, restore, and manage snapshots
  • Backup Schedules: Create and manage automated backup schedules
  • Notifications: Configure notification destinations
  • System: Get system information and settings

Configuration

The SDK requires three parameters during initialization:

  • url: Base URL of the Zerobyte API (e.g., http://localhost:4096)
  • username: Your username
  • password: Your password

By default, the client automatically logs in when initialized. You can disable this:

client = ZerobyteClient(
    url="http://localhost:4096",
    username="admin",
    password="your-password",
    auto_login=False
)

# Login manually
client.login()

Usage Examples

Authentication

# Register a new user
response = client.auth.register("newuser", "password123")

# Login
response = client.auth.login("admin", "password123")

# Get current user
user = client.auth.get_me()

# Change password
client.auth.change_password("old-password", "new-password")

# Check if system has users (first-time setup check)
status = client.auth.get_status()
if not status['hasUsers']:
    # First time setup - register initial user
    client.auth.register("admin", "initial-password")

# Logout
client.auth.logout()

Volumes

# List all volumes
volumes = client.volumes.list()

# Create a new volume
volume = client.volumes.create({
    "name": "my-backup-volume",
    "device": "/dev/sdb1",
    "mountPoint": "/mnt/backup",
    "filesystem": "ext4",
    "autoRemount": True,
    "readonly": False,
    "options": []
})

# Get a specific volume
volume = client.volumes.get(volume_id=1)

# Update a volume
updated = client.volumes.update(
    volume_id=1,
    volume_data={"name": "updated-name"}
)

# Mount a volume
client.volumes.mount(volume_id=1)

# Unmount a volume
client.volumes.unmount(volume_id=1)

# Check volume health
health = client.volumes.health_check(volume_id=1)

# List files in a volume
files = client.volumes.list_files(volume_id=1, path="/backups")

# Browse filesystem
listing = client.volumes.browse_filesystem(path="/mnt")

# List rclone remotes
remotes = client.volumes.list_rclone_remotes()

# Delete a volume
client.volumes.delete(volume_id=1)

Repositories

# List repositories for a volume
repositories = client.repositories.list(volume_id=1)

# Create a new repository
repo = client.repositories.create(
    volume_id=1,
    repository_data={
        "name": "my-backup-repo",
        "type": "local",
        "config": {
            "path": "/backups/repo1"
        }
    }
)

# Get a specific repository
repo = client.repositories.get(volume_id=1, repository_id=1)

# Update a repository
updated = client.repositories.update(
    volume_id=1,
    repository_id=1,
    repository_data={"name": "updated-repo-name"}
)

# Run doctor command on repository
result = client.repositories.doctor(volume_id=1, repository_id=1)

# Delete a repository
client.repositories.delete(volume_id=1, repository_id=1)

Snapshots

# List all snapshots in a repository
snapshots = client.snapshots.list(volume_id=1, repository_id=1)

# Get snapshot details
details = client.snapshots.get_details(
    volume_id=1,
    repository_id=1,
    snapshot_id="abc123"
)

# List files in a snapshot
files = client.snapshots.list_files(
    volume_id=1,
    repository_id=1,
    snapshot_id="abc123",
    path="/data"
)

# Restore a snapshot
response = client.snapshots.restore(
    volume_id=1,
    repository_id=1,
    snapshot_id="abc123",
    restore_data={
        "target": "/restore/path",
        "include": ["/data"],
        "exclude": ["/data/temp"]
    }
)

# Delete a snapshot
client.snapshots.delete(
    volume_id=1,
    repository_id=1,
    snapshot_id="abc123"
)

Backup Schedules

# List backup schedules for a repository
schedules = client.backup_schedules.list(volume_id=1, repository_id=1)

# Create a new backup schedule
schedule = client.backup_schedules.create(
    volume_id=1,
    repository_id=1,
    schedule_data={
        "name": "Daily Backup",
        "schedule": "0 2 * * *",  # Every day at 2 AM
        "enabled": True,
        "backupPaths": ["/data"],
        "excludePaths": ["/data/temp", "/data/cache"],
        "retention": {
            "keepLast": 7,
            "keepDaily": 7,
            "keepWeekly": 4,
            "keepMonthly": 12,
            "keepYearly": 3
        },
        "tags": ["daily", "production"]
    }
)

# Get a specific schedule
schedule = client.backup_schedules.get(
    volume_id=1,
    repository_id=1,
    schedule_id=1
)

# Update a schedule
updated = client.backup_schedules.update(
    volume_id=1,
    repository_id=1,
    schedule_id=1,
    schedule_data={"enabled": False}
)

# Run a backup immediately
client.backup_schedules.run_now(
    volume_id=1,
    repository_id=1,
    schedule_id=1
)

# Stop a running backup
client.backup_schedules.stop_backup(
    volume_id=1,
    repository_id=1,
    schedule_id=1
)

# Run forget command (apply retention policy)
client.backup_schedules.run_forget(
    volume_id=1,
    repository_id=1,
    schedule_id=1
)

# Get notification settings
notifications = client.backup_schedules.get_notifications(
    volume_id=1,
    repository_id=1,
    schedule_id=1
)

# Update notification settings
client.backup_schedules.update_notifications(
    volume_id=1,
    repository_id=1,
    schedule_id=1,
    notifications_data={
        "onSuccess": True,
        "onFailure": True,
        "destinations": [1, 2]
    }
)

# Get mirror settings
mirrors = client.backup_schedules.get_mirrors(
    volume_id=1,
    repository_id=1,
    schedule_id=1
)

# Update mirror settings
client.backup_schedules.update_mirrors(
    volume_id=1,
    repository_id=1,
    schedule_id=1,
    mirrors_data={
        "enabled": True,
        "repositories": [2, 3]
    }
)

# Check mirror compatibility
compatibility = client.backup_schedules.get_mirror_compatibility(
    volume_id=1,
    repository_id=1,
    schedule_id=1
)

# Reorder schedules
client.backup_schedules.reorder(
    volume_id=1,
    repository_id=1,
    order_data={"scheduleIds": [3, 1, 2]}
)

# Delete a schedule
client.backup_schedules.delete(
    volume_id=1,
    repository_id=1,
    schedule_id=1
)

Notifications

# List all notification destinations
destinations = client.notifications.list_destinations()

# Create an email notification destination
email_dest = client.notifications.create_destination({
    "name": "Admin Email",
    "type": "email",
    "config": {
        "to": "admin@example.com",
        "from": "backup@example.com",
        "smtpHost": "smtp.gmail.com",
        "smtpPort": 587,
        "username": "backup@example.com",
        "password": "app-password"
    }
})

# Create a Slack notification destination
slack_dest = client.notifications.create_destination({
    "name": "Team Slack",
    "type": "slack",
    "config": {
        "webhookUrl": "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
    }
})

# Get a notification destination
dest = client.notifications.get_destination(destination_id=1)

# Update a destination
updated = client.notifications.update_destination(
    destination_id=1,
    destination_data={"name": "Updated Email Alerts"}
)

# Test a notification destination
result = client.notifications.test_destination(destination_id=1)
if result['success']:
    print("Test notification sent successfully!")

# Delete a destination
client.notifications.delete_destination(destination_id=1)

System

# Get system information
info = client.system.get_info()
print(f"Version: {info['version']}")
print(f"Platform: {info['platform']}")

# Download Restic password
password = client.system.download_restic_password()
with open('restic-password.txt', 'w') as f:
    f.write(password)

Error Handling

The SDK provides custom exceptions for different error scenarios:

from py_zerobyte import (
    ZerobyteClient,
    ZerobyteError,
    AuthenticationError,
    APIError,
    NotFoundError,
    ValidationError
)

try:
    client = ZerobyteClient(
        url="http://localhost:4096",
        username="admin",
        password="wrong-password"
    )
except AuthenticationError as e:
    print(f"Authentication failed: {e}")
except ZerobyteError as e:
    print(f"Error: {e}")

try:
    volume = client.volumes.get(volume_id=999)
except NotFoundError as e:
    print(f"Volume not found: {e}")
    print(f"Status code: {e.status_code}")

try:
    volume = client.volumes.create({"name": ""})  # Invalid data
except ValidationError as e:
    print(f"Validation error: {e}")

try:
    # Some API call
    result = client.volumes.list()
except APIError as e:
    print(f"API error: {e}")
    print(f"Status code: {e.status_code}")

Complete Example

Here's a complete example that demonstrates common workflows:

from py_zerobyte import ZerobyteClient, AuthenticationError

# Initialize client
client = ZerobyteClient(
    url="http://localhost:4096",
    username="admin",
    password="your-password"
)

# 1. Create a volume
print("Creating volume...")
volume = client.volumes.create({
    "name": "backup-drive",
    "device": "/dev/sdb1",
    "mountPoint": "/mnt/backup",
    "filesystem": "ext4",
    "autoRemount": True,
    "readonly": False,
    "options": []
})
volume_id = volume['id']
print(f"Volume created: {volume['name']}")

# 2. Mount the volume
print("Mounting volume...")
client.volumes.mount(volume_id)
print("Volume mounted successfully")

# 3. Create a repository
print("Creating repository...")
repo = client.repositories.create(
    volume_id=volume_id,
    repository_data={
        "name": "production-backup",
        "type": "local",
        "config": {
            "path": "/backups/production"
        }
    }
)
repo_id = repo['id']
print(f"Repository created: {repo['name']}")

# 4. Create a backup schedule
print("Creating backup schedule...")
schedule = client.backup_schedules.create(
    volume_id=volume_id,
    repository_id=repo_id,
    schedule_data={
        "name": "Daily Production Backup",
        "schedule": "0 2 * * *",
        "enabled": True,
        "backupPaths": ["/var/www", "/etc"],
        "excludePaths": ["/var/www/cache", "/var/www/tmp"],
        "retention": {
            "keepLast": 7,
            "keepDaily": 7,
            "keepWeekly": 4,
            "keepMonthly": 12
        }
    }
)
schedule_id = schedule['id']
print(f"Backup schedule created: {schedule['name']}")

# 5. Create a notification destination
print("Creating notification destination...")
notification = client.notifications.create_destination({
    "name": "Admin Alerts",
    "type": "email",
    "config": {
        "to": "admin@example.com",
        "from": "backup@example.com",
        "smtpHost": "smtp.gmail.com",
        "smtpPort": 587
    }
})
notification_id = notification['id']
print(f"Notification destination created: {notification['name']}")

# 6. Configure notifications for the schedule
print("Configuring backup notifications...")
client.backup_schedules.update_notifications(
    volume_id=volume_id,
    repository_id=repo_id,
    schedule_id=schedule_id,
    notifications_data={
        "onSuccess": True,
        "onFailure": True,
        "destinations": [notification_id]
    }
)
print("Notifications configured")

# 7. Run the backup immediately
print("Running backup now...")
client.backup_schedules.run_now(
    volume_id=volume_id,
    repository_id=repo_id,
    schedule_id=schedule_id
)
print("Backup started")

# 8. List all snapshots
print("\nListing snapshots...")
snapshots = client.snapshots.list(
    volume_id=volume_id,
    repository_id=repo_id
)
for snapshot in snapshots:
    print(f"  - {snapshot['id']}: {snapshot['time']}")

print("\nSetup complete!")

Development

Setting up development environment

# Clone the repository
git clone https://github.com/t0mer/py-zerobyte.git
cd py-zerobyte

# Install in development mode with dev dependencies
pip install -e ".[dev]"

Running tests

pytest

Code formatting

black py_zerobyte/

License

MIT License

Support

For issues, questions, or contributions, please visit: https://github.com/t0mer/py-zerobyte

Changelog

Version 1.0.0

  • Initial release
  • Full API coverage for Zerobyte API v1
  • Authentication, Volumes, Repositories, Snapshots, Backup Schedules, Notifications, and System APIs
  • Comprehensive error handling
  • Type hints support

About

Python SDK for Zerobyte API - Manage volumes, backups, and snapshots

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages