Skip to content

tomasvotava/hyperion

Repository files navigation

Hyperion

A headless ETL / ELT / data pipeline and integration SDK for Python.

pre-commit pytest codecov

Features

  • Data Catalog System: Manage and organize data assets across S3 buckets
  • Schema Management: Validate and store schema definitions for data assets
  • AWS Infrastructure Abstractions: Simplified interfaces for S3, DynamoDB, SQS, and Secrets Manager
  • Source Framework: Define data sources that extract data and store in the catalog
  • Caching: In-memory, local file, and DynamoDB caching options
  • Asynchronous Processing: Utilities for async operations and task queues
  • Geo Utilities: Location-based services with Google Maps integration

Core Concepts

Assets

Assets are the fundamental units of data in Hyperion. Each asset represents a dataset stored in a specific location with a defined schema. Hyperion supports three types of assets:

DataLakeAsset

  • Represents raw, immutable data stored in a data lake
  • Time-partitioned by date
  • Each partition has a schema version
  • Example use cases: raw API responses, event logs, or any source data that needs to be preserved in its original form

FeatureAsset

  • Represents processed feature data with time resolution
  • Used for analytics, machine learning features, and derived datasets
  • Supports different time resolutions (seconds, minutes, hours, days, weeks, months, years)
  • Can include additional partition keys for finer-grained organization
  • Example use cases: aggregated metrics, processed signals, ML features

PersistentStoreAsset

  • Represents persistent data storage without time partitioning
  • Used for reference data, lookup tables, or any data that doesn't change frequently
  • Example use cases: reference data, configuration settings, master data

Schema Management

All assets in Hyperion have associated schemas that define their structure:

  • Schema Store: The SchemaStore manages asset schemas in Avro format
  • Schema Validation: All data is validated against its schema during storage
  • Schema Versioning: Assets include a schema version to support evolution over time
  • Schema Storage: Schemas can be stored in the local filesystem or S3

If a schema is missing for an asset:

  1. An error will be raised when attempting to store or retrieve the asset
  2. You need to define the schema in Avro format and store it in the schema store
  3. The schema should be named according to the pattern: {asset_type}/{asset_name}.v{version}.avro.json

Catalog

The Catalog is the central component that manages asset storage and retrieval:

  • Storage Location: Maps asset types to their appropriate storage buckets
  • Asset Retrieval: Provides methods to retrieve assets by name, date, and schema version
  • Partitioning: Handles partitioning logic for different asset types
  • Notifications: Can send notifications when new assets arrive

Source Framework

Sources are responsible for extracting data from external systems and storing it in the catalog:

  • Standardized Interface: All sources implement a common interface
  • AWS Lambda Support: Easy integration with AWS Lambda for scheduled extraction
  • Backfill Capability: Support for historical data backfill
  • Incremental Processing: Extract data with date-based filtering

Installation

Hyperion uses Poetry for dependency management:

# Clone the repository
git clone https://github.com/tomasvotava/hyperion.git
cd hyperion

# Install dependencies
poetry install

Configuration

Hyperion is configured through environment variables. You can use a .env file for local development:

# Common settings
HYPERION_COMMON_LOG_PRETTY=True
HYPERION_COMMON_LOG_LEVEL=INFO
HYPERION_COMMON_SERVICE_NAME=my-service

# Storage settings
HYPERION_STORAGE_DATA_LAKE_BUCKET=my-data-lake-bucket
HYPERION_STORAGE_FEATURE_STORE_BUCKET=my-feature-store-bucket
HYPERION_STORAGE_PERSISTENT_STORE_BUCKET=my-persistent-store-bucket
HYPERION_STORAGE_SCHEMA_PATH=s3://my-schema-bucket/schemas
HYPERION_STORAGE_MAX_CONCURRENCY=5

# Queue settings
HYPERION_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789012/my-queue

# Secrets settings
HYPERION_SECRETS_BACKEND=AWSSecretsManager

# HTTP settings (optional)
HYPERION_HTTP_PROXY_HTTP=http://proxy:8080
HYPERION_HTTP_PROXY_HTTPS=http://proxy:8080

# Geo settings (optional)
HYPERION_GEO_GMAPS_API_KEY=your-google-maps-api-key

Before any real documentation is written, you can check the hyperion/config.py file for all available configuration options. Hyperion is using EnvProxy for configuration.

Usage Examples

Working with Assets

Creating and Storing a DataLakeAsset

from hyperion.catalog import Catalog
from hyperion.entities.catalog import DataLakeAsset
from datetime import datetime, timezone

# Initialize the catalog
catalog = Catalog.from_config()

# Create a data lake asset
asset = DataLakeAsset(
    name="customer_data",
    date=datetime.now(timezone.utc),
    schema_version=1
)

# Store data in the asset
data = [
    {"id": 1, "name": "Customer 1", "timestamp": datetime.now(timezone.utc)},
    {"id": 2, "name": "Customer 2", "timestamp": datetime.now(timezone.utc)},
]

catalog.store_asset(asset, data)

Working with FeatureAssets

from hyperion.catalog import Catalog
from hyperion.entities.catalog import FeatureAsset
from hyperion.dateutils import TimeResolution
from datetime import datetime, timezone

# Initialize the catalog
catalog = Catalog.from_config()

# Create a feature asset with daily resolution
resolution = TimeResolution(1, "d")  # 1 day resolution
asset = FeatureAsset(
    name="customer_activity",
    partition_date=datetime.now(timezone.utc),
    resolution=resolution,
    schema_version=1
)

# Store aggregated feature data
feature_data = [
    {"customer_id": 1, "activity_score": 87.5, "date": datetime.now(timezone.utc)},
    {"customer_id": 2, "activity_score": 92.1, "date": datetime.now(timezone.utc)},
]

catalog.store_asset(asset, feature_data)

# Retrieve feature data for a specific time period
from_date = datetime(2023, 1, 1, tzinfo=timezone.utc)
to_date = datetime(2023, 1, 31, tzinfo=timezone.utc)

for feature_asset in catalog.iter_feature_store_partitions(
    feature_name="customer_activity",
    resolution="1d",  # Can use string format too
    date_from=from_date,
    date_to=to_date
):
    data = catalog.retrieve_asset(feature_asset)
    for record in data:
        print(record)

Working with PersistentStoreAssets

from hyperion.catalog import Catalog
from hyperion.entities.catalog import PersistentStoreAsset

# Initialize the catalog
catalog = Catalog.from_config()

# Create a persistent store asset
asset = PersistentStoreAsset(
    name="product_catalog",
    schema_version=1
)

# Store reference data
products = [
    {"id": "P001", "name": "Product 1", "category": "Electronics"},
    {"id": "P002", "name": "Product 2", "category": "Clothing"},
]

catalog.store_asset(asset, products)

# Retrieve reference data
for product in catalog.retrieve_asset(asset):
    print(product)

Creating a Custom Source

import asyncio
from datetime import datetime, timezone
from typing import AsyncIterator

from hyperion.catalog import Catalog
from hyperion.entities.catalog import DataLakeAsset
from hyperion.sources.base import Source, SourceAsset


class MyCustomSource(Source):
    source = "my-custom-source"

    async def run(self, start_date=None, end_date=None) -> AsyncIterator[SourceAsset]:
        # Fetch your data (this is where you'd implement your data extraction logic)
        data = [
            {"id": 1, "name": "Item 1", "timestamp": datetime.now(timezone.utc)},
            {"id": 2, "name": "Item 2", "timestamp": datetime.now(timezone.utc)},
        ]

        # Create asset
        asset = DataLakeAsset(
            name="my-custom-data",
            date=datetime.now(timezone.utc)
        )

        # Yield source asset
        yield SourceAsset(asset=asset, data=data)


# Use with AWS Lambda
def lambda_handler(event, context):
    MyCustomSource.handle_aws_lambda_event(event, context)


# Use standalone
if __name__ == "__main__":
    asyncio.run(MyCustomSource._run(Catalog.from_config()))

Working with Schemas

To create and register a schema for an asset:

import json
from pathlib import Path

# Define schema in Avro format
schema = {
    "type": "record",
    "name": "CustomerData",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string"},
        {"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}}
    ]
}

# Save schema to local file
schema_path = Path("schemas/data_lake/customer_data.v1.avro.json")
schema_path.parent.mkdir(parents=True, exist_ok=True)
with open(schema_path, "w") as f:
    json.dump(schema, f)

# Or upload to S3 if using S3SchemaStore
import boto3
s3_client = boto3.client('s3')
s3_client.put_object(
    Bucket="my-schema-bucket",
    Key="data_lake/customer_data.v1.avro.json",
    Body=json.dumps(schema)
)

Advanced Features

Asset Collections

Asset collections provide a high-level interface for fetching and working with groups of assets. You can define a collection class that specifies the assets you need and fetch them all at once.

See docs/asset_collections.md for more information.

Repartitioning Data

from hyperion.catalog import Catalog
from hyperion.entities.catalog import DataLakeAsset
from hyperion.dateutils import TimeResolutionUnit
from datetime import datetime, timezone
import asyncio

async def repartition_data():
    catalog = Catalog.from_config()

    # Original asset with day-level partitioning
    asset = DataLakeAsset(
        name="web_logs",
        date=datetime.now(timezone.utc),
        schema_version=1
    )

    # Repartition by hour
    await catalog.repartition(
        asset,
        granularity=TimeResolutionUnit("h"),
        date_attribute="timestamp"
    )

asyncio.run(repartition_data())

Caching

from hyperion.infrastructure.cache import Cache

# Get cache from configuration
cache = Cache.from_config()

# Store data in cache
cache.set("my-key", "my-value")

# Retrieve data from cache
value = cache.get("my-key")
print(value)  # "my-value"

# Check if key exists
if cache.hit("my-key"):
    print("Cache hit!")

# Delete key
cache.delete("my-key")

Geo Utilities

from hyperion.infrastructure.geo import GoogleMaps, Location

# Initialize Google Maps client
gmaps = GoogleMaps.from_config()

# Geocode an address
with gmaps:
    location = gmaps.geocode("1600 Amphitheatre Parkway, Mountain View, CA")
    print(f"Latitude: {location.latitude}, Longitude: {location.longitude}")

    # Reverse geocode a location
    named_location = gmaps.reverse_geocode(location)
    print(f"Address: {named_location.address}")
    print(f"Country: {named_location.country}")

Development

Setup Development Environment

# Install development dependencies
poetry install

# Install pre-commit hooks
poetry run pre-commit install

Running Tests

# Run all tests
poetry run pytest

# Run with coverage
poetry run pytest --cov=hyperion

# Run specific test files
poetry run pytest tests/test_asyncutils.py

Code Style

This project uses pre-commit hooks to enforce code style:

# Run pre-commit on all files
poetry run pre-commit run -a

The project uses:

Architecture

Core Components

  • Catalog: Manages data assets and their storage in S3
  • SchemaStore: Handles schema validation and storage
  • Source: Base class for implementing data sources
  • Infrastructure: Abstractions for AWS services (S3, DynamoDB, SQS, etc.)
  • Utils: Helper functions for dates, async operations, etc.

Asset Types

  • DataLakeAsset: Raw data stored in a data lake
  • FeatureAsset: Processed features with time resolution
  • PersistentStoreAsset: Persistent data storage

Contributing

See CONTRIBUTING.md for guidelines on contributing to this project.

License

This project is licensed under the MIT License - see the LICENSE file for details.

About

No description, website, or topics provided.

Resources

Security policy

Stars

Watchers

Forks

Packages

No packages published

Contributors 3

  •  
  •  
  •  

Languages