# AutoCron Demo 4: Task Persistence

This notebook demonstrates how to save and restore tasks across system restarts.

## Features Covered:
- Saving tasks to YAML files
- Loading tasks from files
- Task state persistence
- Backup and restore workflows
- Configuration management

## Setup

In [1]:
from autocron import AutoCron
from datetime import datetime
import os
from pathlib import Path

## 1. Basic Task Persistence

Save tasks to a file and load them back.

In [2]:
scheduler = AutoCron()

def backup_database():
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Backing up database")
    return "Backup complete"

def send_report():
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Sending report")
    return "Report sent"

def cleanup_logs():
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Cleaning up logs")
    return "Logs cleaned"

scheduler.add_task(name="database_backup", func=backup_database, every='6h')
scheduler.add_task(name="daily_report", func=send_report, cron='0 9 * * *')
scheduler.add_task(name="log_cleanup", func=cleanup_logs, every='1d')

tasks = scheduler.list_tasks()
print(f"Added {len(tasks)} tasks to scheduler\n")
for task in tasks:
    print(f"{task.name} - Schedule: {task.schedule_type} ({task.schedule_value})")

2026-02-22 15:47:11 - INFO - Task 'database_backup' scheduled with: interval=6h
2026-02-22 15:47:11 - INFO - Task 'daily_report' scheduled with: cron=0 9 * * *
2026-02-22 15:47:11 - INFO - Task 'log_cleanup' scheduled with: interval=1d


Added 3 tasks to scheduler

database_backup - Schedule: interval (6h)
daily_report - Schedule: cron (0 9 * * *)
log_cleanup - Schedule: interval (1d)


In [3]:
# Save tasks to file
save_path = "tasks_config.yaml"
scheduler.save_tasks(save_path)

print(f" Tasks saved to: {save_path}")
print(f" File size: {os.path.getsize(save_path)} bytes")

2026-02-22 15:47:19 - INFO - Skipped 3 function-based tasks: database_backup, daily_report, log_cleanup. Only script-based tasks can be persisted.
2026-02-22 15:47:19 - INFO - Saved 0 tasks to tasks_config.yaml


 Tasks saved to: tasks_config.yaml
 File size: 67 bytes


In [4]:
# View the saved YAML file
with open(save_path, 'r') as f:
 content = f.read()
 print(" Saved YAML Content:\n")
 print(content)

 Saved YAML Content:

version: '1.0'
saved_at: '2026-02-22T15:47:19.868087'
tasks: []



In [5]:
new_scheduler = AutoCron()

print(f"Before loading: {len(new_scheduler.list_tasks())} tasks\n")

new_scheduler.load_tasks(save_path)

loaded_tasks = new_scheduler.list_tasks()
print(f"After loading: {len(loaded_tasks)} tasks\n")
for task in loaded_tasks:
    print(f"Loaded: {task.name} - Schedule: {task.schedule_type} ({task.schedule_value})")

2026-02-22 15:47:36 - INFO - Loaded 0 tasks from tasks_config.yaml (skipped 0)


Before loading: 0 tasks

After loading: 0 tasks



## 2. Default Save Location

Tasks are automatically saved to `~/.autocron/tasks.yaml` by default.

In [6]:
scheduler = AutoCron()

scheduler.add_task(
 name="health_check",
 func=lambda: print("Health check"),
 every='5m'
)

# Save to default location
scheduler.save_tasks() # No path specified

default_path = Path.home() / ".autocron" / "tasks.yaml"
print(" Tasks saved to default location:")
print(f" {default_path}")
print(f" Exists: {default_path.exists()}")

2026-02-22 15:47:45 - INFO - Task 'health_check' scheduled with: interval=5m
2026-02-22 15:47:45 - INFO - Skipped 1 function-based tasks: health_check. Only script-based tasks can be persisted.
2026-02-22 15:47:45 - INFO - Saved 0 tasks to C:\Users\SHOAIB-CHANDA\.autocron\tasks.yaml


 Tasks saved to default location:
 C:\Users\SHOAIB-CHANDA\.autocron\tasks.yaml
 Exists: True


## 3. Task Metadata Persistence

Save tasks with all their configuration including metadata, tags, and priorities.

In [7]:
scheduler = AutoCron()

task_catalog = {
    "etl_pipeline": {
        "owner": "Data Team",
        "version": "2.1",
        "cost_per_run": 0.05,
        "sla_minutes": 30,
        "tags": ["etl", "critical", "production"],
        "priority": 10,
    }
}

scheduler.add_task(
    name="etl_pipeline",
    func=lambda: print("Running ETL"),
    every='1h',
    retries=3,
    retry_delay=60,
    timeout=300,
)

config_path = "full_config.yaml"
scheduler.save_tasks(config_path)

print("Task saved with full configuration\n")
with open(config_path, 'r') as config_file:
    print("Saved configuration:\n")
    print(config_file.read())
print("External metadata catalog:", task_catalog)

2026-02-22 15:47:48 - INFO - Task 'etl_pipeline' scheduled with: interval=1h
2026-02-22 15:47:48 - INFO - Skipped 1 function-based tasks: etl_pipeline. Only script-based tasks can be persisted.
2026-02-22 15:47:48 - INFO - Saved 0 tasks to full_config.yaml


Task saved with full configuration

Saved configuration:

version: '1.0'
saved_at: '2026-02-22T15:47:48.528868'
tasks: []

External metadata catalog: {'etl_pipeline': {'owner': 'Data Team', 'version': '2.1', 'cost_per_run': 0.05, 'sla_minutes': 30, 'tags': ['etl', 'critical', 'production'], 'priority': 10}}


In [12]:
restored_scheduler = AutoCron()
restored_scheduler.load_tasks(config_path)

restored_tasks = restored_scheduler.list_tasks()
if restored_tasks:
    task = restored_tasks[0]
    print("Restored Task Configuration:\n")
    print(f"Name: {task.name}")
    print(f"Schedule: {task.schedule_type} ({task.schedule_value})")
    print(f"Retries: {task.retries}")
    print(f"Timeout: {task.timeout}s")
else:
    print("No tasks were restored from persisted file")
    print("Note: only script-based tasks are persisted by AutoCron")

catalog = task_catalog.get("etl_pipeline", {})
print("Catalog metadata:")
for key, value in catalog.items():
    print(f"- {key}: {value}")

2026-02-22 15:48:35 - INFO - Loaded 0 tasks from full_config.yaml (skipped 0)


No tasks were restored from persisted file
Note: only script-based tasks are persisted by AutoCron
Catalog metadata:
- owner: Data Team
- version: 2.1
- cost_per_run: 0.05
- sla_minutes: 30
- tags: ['etl', 'critical', 'production']
- priority: 10


## 4. Multiple Configuration Files

Manage different task sets for different environments.

In [9]:
# Development tasks
dev_scheduler = AutoCron()
dev_scheduler.add_task(name="dev_test", func=lambda: print("Dev"), every='1m')
dev_scheduler.add_task(name="debug_log", func=lambda: print("Debug"), every='30s')
dev_scheduler.save_tasks("tasks_development.yaml")

# Staging tasks
staging_scheduler = AutoCron()
staging_scheduler.add_task(name="staging_test", func=lambda: print("Staging"), every='5m')
staging_scheduler.add_task(name="integration_test", func=lambda: print("Integration"), every='15m')
staging_scheduler.save_tasks("tasks_staging.yaml")

# Production tasks
prod_scheduler = AutoCron()
prod_scheduler.add_task(name="prod_backup", func=lambda: print("Backup"), every='1h')
prod_scheduler.add_task(name="health_monitor", func=lambda: print("Health"), every='30s')
prod_scheduler.save_tasks("tasks_production.yaml")

print(" Saved tasks for 3 environments:\n")
print(f" tasks_development.yaml - {len(dev_scheduler.tasks)} tasks")
print(f" tasks_staging.yaml - {len(staging_scheduler.tasks)} tasks")
print(f" tasks_production.yaml - {len(prod_scheduler.tasks)} tasks")

2026-02-22 15:47:52 - INFO - Task 'dev_test' scheduled with: interval=1m
2026-02-22 15:47:52 - INFO - Task 'debug_log' scheduled with: interval=30s
2026-02-22 15:47:52 - INFO - Skipped 2 function-based tasks: dev_test, debug_log. Only script-based tasks can be persisted.
2026-02-22 15:47:52 - INFO - Saved 0 tasks to tasks_development.yaml
2026-02-22 15:47:52 - INFO - Task 'staging_test' scheduled with: interval=5m
2026-02-22 15:47:52 - INFO - Task 'integration_test' scheduled with: interval=15m
2026-02-22 15:47:52 - INFO - Skipped 2 function-based tasks: staging_test, integration_test. Only script-based tasks can be persisted.
2026-02-22 15:47:52 - INFO - Saved 0 tasks to tasks_staging.yaml
2026-02-22 15:47:52 - INFO - Task 'prod_backup' scheduled with: interval=1h
2026-02-22 15:47:52 - INFO - Task 'health_monitor' scheduled with: interval=30s
2026-02-22 15:47:52 - INFO - Skipped 2 function-based tasks: prod_backup, health_monitor. Only script-based tasks can be persisted.
2026-02-22 1

 Saved tasks for 3 environments:

 tasks_development.yaml - 2 tasks
 tasks_staging.yaml - 2 tasks
 tasks_production.yaml - 2 tasks


In [10]:
import os

environment = "production"
config_file = f"tasks_{environment}.yaml"
scheduler = AutoCron()
scheduler.load_tasks(config_file)

loaded_tasks = scheduler.list_tasks()
print(f"Loaded {environment.upper()} configuration")
print(f"Tasks loaded: {len(loaded_tasks)}\n")
for task in loaded_tasks:
    print(task.name)

2026-02-22 15:47:56 - INFO - Loaded 0 tasks from tasks_production.yaml (skipped 0)


Loaded PRODUCTION configuration
Tasks loaded: 0



## 5. Backup and Restore Workflow

Create timestamped backups of your task configurations.

In [11]:
from datetime import datetime
from pathlib import Path

scheduler = AutoCron()
scheduler.add_task(name="task1", func=lambda: None, every='1h')
scheduler.add_task(name="task2", func=lambda: None, every='30m')
scheduler.add_task(name="task3", func=lambda: None, cron='0 9 * * *')

backup_dir = Path("backups")
backup_dir.mkdir(exist_ok=True)

# Save with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = backup_dir / f"tasks_backup_{timestamp}.yaml"
scheduler.save_tasks(str(backup_file))

print(" Backup created:")
print(f" {backup_file}")
print(f" Size: {backup_file.stat().st_size} bytes")
print(f"⏰ Timestamp: {timestamp}")

2026-02-22 15:47:59 - INFO - Task 'task1' scheduled with: interval=1h
2026-02-22 15:47:59 - INFO - Task 'task2' scheduled with: interval=30m
2026-02-22 15:47:59 - INFO - Task 'task3' scheduled with: cron=0 9 * * *
2026-02-22 15:47:59 - INFO - Skipped 3 function-based tasks: task1, task2, task3. Only script-based tasks can be persisted.
2026-02-22 15:47:59 - INFO - Saved 0 tasks to backups\tasks_backup_20260222_154759.yaml


 Backup created:
 backups\tasks_backup_20260222_154759.yaml
 Size: 67 bytes
⏰ Timestamp: 20260222_154759


In [13]:
backup_dir = Path("backups")
backups = sorted(backup_dir.glob("tasks_backup_*.yaml"), reverse=True)

print(" Available Backups:\n")
for i, backup in enumerate(backups, 1):
 size = backup.stat().st_size
 mtime = datetime.fromtimestamp(backup.stat().st_mtime)
 print(f"{i}. {backup.name}")
 print(f" Size: {size} bytes")
 print(f" Modified: {mtime.strftime('%Y-%m-%d %H:%M:%S')}\n")

 Available Backups:

1. tasks_backup_20260222_154759.yaml
 Size: 67 bytes
 Modified: 2026-02-22 15:47:59



In [14]:
if backups:
    latest_backup = backups[0]
    restored_scheduler = AutoCron()
    restored_scheduler.load_tasks(str(latest_backup))
    restored_tasks = restored_scheduler.list_tasks()

    print(f"Restored from: {latest_backup.name}")
    print(f"Tasks restored: {len(restored_tasks)}\n")
    for task in restored_tasks:
        print(task.name)
else:
    print("No backups found")

2026-02-22 15:48:39 - INFO - Loaded 0 tasks from backups\tasks_backup_20260222_154759.yaml (skipped 0)


Restored from: tasks_backup_20260222_154759.yaml
Tasks restored: 0



## 6. Persistence in Production

Best practices for using task persistence in production.

In [15]:
import sys
from pathlib import Path

class ProductionScheduler:
    """Production-ready scheduler with automatic persistence."""

    def __init__(self, config_file="tasks.yaml"):
        self.scheduler = AutoCron()
        self.config_file = Path(config_file)
        self.config_file.parent.mkdir(parents=True, exist_ok=True)
        print("Production Scheduler initialized")
        print(f"Config file: {self.config_file}")

    def load_or_create(self):
        """Load existing tasks or start fresh."""
        if self.config_file.exists():
            print("\nLoading existing configuration")
            self.scheduler.load_tasks(str(self.config_file))
            print(f"Loaded {len(self.scheduler.list_tasks())} tasks")
        else:
            print("\nNo existing configuration found")
            print("Starting with empty scheduler")
        return self

    def add_task(self, **kwargs):
        """Add task and auto-save."""
        self.scheduler.add_task(**kwargs)
        self.save()
        print(f"Task '{kwargs['name']}' added and saved")

    def save(self):
        """Save current configuration."""
        self.scheduler.save_tasks(str(self.config_file))

    def start(self):
        """Start the scheduler."""
        print(f"\nStarting scheduler with {len(self.scheduler.list_tasks())} tasks\n")
        try:
            self.scheduler.start()
        except KeyboardInterrupt:
            print("\nScheduler stopped by user")
        except Exception as error:
            print(f"\nError: {error}")
        finally:
            self.save()
            print("Configuration saved")

prod = ProductionScheduler("production_tasks.yaml")
prod.load_or_create()
prod.add_task(name="monitor", func=lambda: print("Monitoring"), every='1m')

print("\nProduction scheduler ready")
print("All changes are automatically persisted")

2026-02-22 15:48:40 - INFO - Task 'monitor' scheduled with: interval=1m
2026-02-22 15:48:40 - INFO - Skipped 1 function-based tasks: monitor. Only script-based tasks can be persisted.
2026-02-22 15:48:40 - INFO - Saved 0 tasks to production_tasks.yaml


Production Scheduler initialized
Config file: production_tasks.yaml

No existing configuration found
Starting with empty scheduler
Task 'monitor' added and saved

Production scheduler ready
All changes are automatically persisted


## 7. Task Migration

Migrate tasks between different schedulers or systems.

In [16]:
old_scheduler = AutoCron()
old_scheduler.add_task(name="legacy_task_1", func=lambda: None, every='1h')
old_scheduler.add_task(name="legacy_task_2", func=lambda: None, every='30m')
old_scheduler.save_tasks("legacy_export.yaml")

print("Exported from old system: 2 tasks\n")

new_scheduler = AutoCron()
new_scheduler.load_tasks("legacy_export.yaml")
new_scheduler.add_task(name="new_task_1", func=lambda: None, every='15m')
new_scheduler.add_task(name="new_task_2", func=lambda: None, cron='0 * * * *')
new_scheduler.save_tasks("migrated_tasks.yaml")

migrated_tasks = new_scheduler.list_tasks()
print("Imported to new system")
print(f"Total tasks after migration: {len(migrated_tasks)}\n")
for task in migrated_tasks:
    prefix = "LEGACY" if "legacy" in task.name else "NEW"
    print(f"{prefix}: {task.name}")

2026-02-22 15:48:41 - INFO - Task 'legacy_task_1' scheduled with: interval=1h
2026-02-22 15:48:41 - INFO - Task 'legacy_task_2' scheduled with: interval=30m
2026-02-22 15:48:41 - INFO - Skipped 2 function-based tasks: legacy_task_1, legacy_task_2. Only script-based tasks can be persisted.
2026-02-22 15:48:41 - INFO - Saved 0 tasks to legacy_export.yaml
2026-02-22 15:48:41 - INFO - Loaded 0 tasks from legacy_export.yaml (skipped 0)
2026-02-22 15:48:41 - INFO - Task 'new_task_1' scheduled with: interval=15m
2026-02-22 15:48:41 - INFO - Task 'new_task_2' scheduled with: cron=0 * * * *
2026-02-22 15:48:41 - INFO - Skipped 2 function-based tasks: new_task_1, new_task_2. Only script-based tasks can be persisted.
2026-02-22 15:48:41 - INFO - Saved 0 tasks to migrated_tasks.yaml


Exported from old system: 2 tasks

Imported to new system
Total tasks after migration: 2

NEW: new_task_1
NEW: new_task_2


## 8. Real-World Example: Server Restart Recovery

Automatically restore tasks after server restarts.

In [17]:
#!/usr/bin/env python3
"""server_startup.py - Auto-load tasks on server start"""

from autocron import AutoCron
from pathlib import Path
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def initialize_scheduler():
    """Initialize scheduler with persisted tasks."""
    config_file = Path("/etc/autocron/tasks.yaml")
    scheduler = AutoCron()

    if config_file.exists():
        logger.info(f"Loading tasks from {config_file}")
        try:
            scheduler.load_tasks(str(config_file))
            tasks = scheduler.list_tasks()
            logger.info(f"Successfully loaded {len(tasks)} tasks")
            for task in tasks:
                logger.info(f"- {task.name}: {task.schedule_type} ({task.schedule_value})")
        except Exception as error:
            logger.error(f"Failed to load tasks: {error}")
            return None
    else:
        logger.warning(f"No configuration file found at {config_file}")
        logger.info("Starting with empty scheduler")

    return scheduler

print("Server Starting\n")
print("=" * 50)

scheduler = initialize_scheduler()
if scheduler is not None:
    print("\nScheduler initialized successfully")
    print(f"Ready to execute {len(scheduler.list_tasks())} tasks")
    print("\nTasks will survive server restarts")
else:
    print("\nFailed to initialize scheduler")

INFO:__main__:Starting with empty scheduler


Server Starting


Scheduler initialized successfully
Ready to execute 0 tasks

Tasks will survive server restarts


## Cleanup

Remove demo files created in this notebook.

In [19]:
import os
import shutil

demo_files = [
    "tasks_config.yaml",
    "full_config.yaml",
    "tasks_development.yaml",
    "tasks_staging.yaml",
    "tasks_production.yaml",
    "production_tasks.yaml",
    "legacy_export.yaml",
    "migrated_tasks.yaml",
]

for file_name in demo_files:
    if os.path.exists(file_name):
        os.remove(file_name)
        print(f"Removed: {file_name}")

if os.path.exists("backups") and os.path.isdir("backups"):
    shutil.rmtree("backups")
    print("Removed: backups/")

print("Cleanup complete")

Removed: tasks_config.yaml
Removed: full_config.yaml
Removed: tasks_development.yaml
Removed: tasks_staging.yaml
Removed: tasks_production.yaml
Removed: production_tasks.yaml
Removed: legacy_export.yaml
Removed: migrated_tasks.yaml
Removed: backups/
Cleanup complete


## Summary

In this demo, you learned:

 **Save Tasks** - Export task configurations to YAML files

 **Load Tasks** - Restore tasks from saved configurations

 **Full Configuration** - Persist all task settings (metadata, tags, priorities)

 **Multiple Environments** - Manage dev, staging, and production configs

 **Backup & Restore** - Create timestamped backups for safety

 **Production Patterns** - Auto-save, auto-load, server restart recovery

 **Task Migration** - Move tasks between systems

### Best Practices:

1. **Version Control** - Store task configs in Git
2. **Environment-Specific** - Separate configs for dev/staging/prod
3. **Regular Backups** - Automated timestamped backups
4. **Auto-Save** - Save after every task modification
5. **Startup Recovery** - Auto-load on server/application start

### Use Cases:

- **Server Restarts** - Tasks survive reboots
- **Deployments** - Maintain tasks across updates
- **Team Sharing** - Share task configs via Git
- **Multi-Environment** - Separate configs per environment
- **Backup/Restore** - Disaster recovery

### Next Steps:
- Check out `05_safe_mode.ipynb` for secure task execution
- See `06_dashboard.ipynb` for visual monitoring
- Explore `07_notifications.ipynb` for alerts