A headless ETL / ELT / data pipeline and integration SDK for Python.
- Data Catalog System: Manage and organize data assets across S3 buckets
- Schema Management: Validate and store schema definitions for data assets
- AWS Infrastructure Abstractions: Simplified interfaces for S3, DynamoDB, SQS, and Secrets Manager
- Source Framework: Define data sources that extract data and store in the catalog
- Caching: In-memory, local file, and DynamoDB caching options
- Asynchronous Processing: Utilities for async operations and task queues
- Geo Utilities: Location-based services with Google Maps integration
Assets are the fundamental units of data in Hyperion. Each asset represents a dataset stored in a specific location with a defined schema. Hyperion supports three types of assets:
- Represents raw, immutable data stored in a data lake
- Time-partitioned by date
- Each partition has a schema version
- Example use cases: raw API responses, event logs, or any source data that needs to be preserved in its original form
- Represents processed feature data with time resolution
- Used for analytics, machine learning features, and derived datasets
- Supports different time resolutions (seconds, minutes, hours, days, weeks, months, years)
- Can include additional partition keys for finer-grained organization
- Example use cases: aggregated metrics, processed signals, ML features
- Represents persistent data storage without time partitioning
- Used for reference data, lookup tables, or any data that doesn't change frequently
- Example use cases: reference data, configuration settings, master data
All assets in Hyperion have associated schemas that define their structure:
- Schema Store: The SchemaStore manages asset schemas in Avro format
- Schema Validation: All data is validated against its schema during storage
- Schema Versioning: Assets include a schema version to support evolution over time
- Schema Storage: Schemas can be stored in the local filesystem or S3
If a schema is missing for an asset:
- An error will be raised when attempting to store or retrieve the asset
- You need to define the schema in Avro format and store it in the schema store
- The schema should be named according to the pattern:
{asset_type}/{asset_name}.v{version}.avro.json
The Catalog is the central component that manages asset storage and retrieval:
- Storage Location: Maps asset types to their appropriate storage buckets
- Asset Retrieval: Provides methods to retrieve assets by name, date, and schema version
- Partitioning: Handles partitioning logic for different asset types
- Notifications: Can send notifications when new assets arrive
Sources are responsible for extracting data from external systems and storing it in the catalog:
- Standardized Interface: All sources implement a common interface
- AWS Lambda Support: Easy integration with AWS Lambda for scheduled extraction
- Backfill Capability: Support for historical data backfill
- Incremental Processing: Extract data with date-based filtering
Hyperion uses Poetry for dependency management:
# Clone the repository
git clone https://github.com/tomasvotava/hyperion.git
cd hyperion
# Install dependencies
poetry install
Hyperion is configured through environment variables. You can use a .env
file for local development:
# Common settings
HYPERION_COMMON_LOG_PRETTY=True
HYPERION_COMMON_LOG_LEVEL=INFO
HYPERION_COMMON_SERVICE_NAME=my-service
# Storage settings
HYPERION_STORAGE_DATA_LAKE_BUCKET=my-data-lake-bucket
HYPERION_STORAGE_FEATURE_STORE_BUCKET=my-feature-store-bucket
HYPERION_STORAGE_PERSISTENT_STORE_BUCKET=my-persistent-store-bucket
HYPERION_STORAGE_SCHEMA_PATH=s3://my-schema-bucket/schemas
HYPERION_STORAGE_MAX_CONCURRENCY=5
# Queue settings
HYPERION_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789012/my-queue
# Secrets settings
HYPERION_SECRETS_BACKEND=AWSSecretsManager
# HTTP settings (optional)
HYPERION_HTTP_PROXY_HTTP=http://proxy:8080
HYPERION_HTTP_PROXY_HTTPS=http://proxy:8080
# Geo settings (optional)
HYPERION_GEO_GMAPS_API_KEY=your-google-maps-api-key
Before any real documentation is written, you can check the
hyperion/config.py
file for all available configuration options. Hyperion is using EnvProxy
for configuration.
from hyperion.catalog import Catalog
from hyperion.entities.catalog import DataLakeAsset
from datetime import datetime, timezone
# Initialize the catalog
catalog = Catalog.from_config()
# Create a data lake asset
asset = DataLakeAsset(
name="customer_data",
date=datetime.now(timezone.utc),
schema_version=1
)
# Store data in the asset
data = [
{"id": 1, "name": "Customer 1", "timestamp": datetime.now(timezone.utc)},
{"id": 2, "name": "Customer 2", "timestamp": datetime.now(timezone.utc)},
]
catalog.store_asset(asset, data)
from hyperion.catalog import Catalog
from hyperion.entities.catalog import FeatureAsset
from hyperion.dateutils import TimeResolution
from datetime import datetime, timezone
# Initialize the catalog
catalog = Catalog.from_config()
# Create a feature asset with daily resolution
resolution = TimeResolution(1, "d") # 1 day resolution
asset = FeatureAsset(
name="customer_activity",
partition_date=datetime.now(timezone.utc),
resolution=resolution,
schema_version=1
)
# Store aggregated feature data
feature_data = [
{"customer_id": 1, "activity_score": 87.5, "date": datetime.now(timezone.utc)},
{"customer_id": 2, "activity_score": 92.1, "date": datetime.now(timezone.utc)},
]
catalog.store_asset(asset, feature_data)
# Retrieve feature data for a specific time period
from_date = datetime(2023, 1, 1, tzinfo=timezone.utc)
to_date = datetime(2023, 1, 31, tzinfo=timezone.utc)
for feature_asset in catalog.iter_feature_store_partitions(
feature_name="customer_activity",
resolution="1d", # Can use string format too
date_from=from_date,
date_to=to_date
):
data = catalog.retrieve_asset(feature_asset)
for record in data:
print(record)
from hyperion.catalog import Catalog
from hyperion.entities.catalog import PersistentStoreAsset
# Initialize the catalog
catalog = Catalog.from_config()
# Create a persistent store asset
asset = PersistentStoreAsset(
name="product_catalog",
schema_version=1
)
# Store reference data
products = [
{"id": "P001", "name": "Product 1", "category": "Electronics"},
{"id": "P002", "name": "Product 2", "category": "Clothing"},
]
catalog.store_asset(asset, products)
# Retrieve reference data
for product in catalog.retrieve_asset(asset):
print(product)
import asyncio
from datetime import datetime, timezone
from typing import AsyncIterator
from hyperion.catalog import Catalog
from hyperion.entities.catalog import DataLakeAsset
from hyperion.sources.base import Source, SourceAsset
class MyCustomSource(Source):
source = "my-custom-source"
async def run(self, start_date=None, end_date=None) -> AsyncIterator[SourceAsset]:
# Fetch your data (this is where you'd implement your data extraction logic)
data = [
{"id": 1, "name": "Item 1", "timestamp": datetime.now(timezone.utc)},
{"id": 2, "name": "Item 2", "timestamp": datetime.now(timezone.utc)},
]
# Create asset
asset = DataLakeAsset(
name="my-custom-data",
date=datetime.now(timezone.utc)
)
# Yield source asset
yield SourceAsset(asset=asset, data=data)
# Use with AWS Lambda
def lambda_handler(event, context):
MyCustomSource.handle_aws_lambda_event(event, context)
# Use standalone
if __name__ == "__main__":
asyncio.run(MyCustomSource._run(Catalog.from_config()))
To create and register a schema for an asset:
import json
from pathlib import Path
# Define schema in Avro format
schema = {
"type": "record",
"name": "CustomerData",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}}
]
}
# Save schema to local file
schema_path = Path("schemas/data_lake/customer_data.v1.avro.json")
schema_path.parent.mkdir(parents=True, exist_ok=True)
with open(schema_path, "w") as f:
json.dump(schema, f)
# Or upload to S3 if using S3SchemaStore
import boto3
s3_client = boto3.client('s3')
s3_client.put_object(
Bucket="my-schema-bucket",
Key="data_lake/customer_data.v1.avro.json",
Body=json.dumps(schema)
)
Asset collections provide a high-level interface for fetching and working with groups of assets. You can define a collection class that specifies the assets you need and fetch them all at once.
See docs/asset_collections.md
for more information.
from hyperion.catalog import Catalog
from hyperion.entities.catalog import DataLakeAsset
from hyperion.dateutils import TimeResolutionUnit
from datetime import datetime, timezone
import asyncio
async def repartition_data():
catalog = Catalog.from_config()
# Original asset with day-level partitioning
asset = DataLakeAsset(
name="web_logs",
date=datetime.now(timezone.utc),
schema_version=1
)
# Repartition by hour
await catalog.repartition(
asset,
granularity=TimeResolutionUnit("h"),
date_attribute="timestamp"
)
asyncio.run(repartition_data())
from hyperion.infrastructure.cache import Cache
# Get cache from configuration
cache = Cache.from_config()
# Store data in cache
cache.set("my-key", "my-value")
# Retrieve data from cache
value = cache.get("my-key")
print(value) # "my-value"
# Check if key exists
if cache.hit("my-key"):
print("Cache hit!")
# Delete key
cache.delete("my-key")
from hyperion.infrastructure.geo import GoogleMaps, Location
# Initialize Google Maps client
gmaps = GoogleMaps.from_config()
# Geocode an address
with gmaps:
location = gmaps.geocode("1600 Amphitheatre Parkway, Mountain View, CA")
print(f"Latitude: {location.latitude}, Longitude: {location.longitude}")
# Reverse geocode a location
named_location = gmaps.reverse_geocode(location)
print(f"Address: {named_location.address}")
print(f"Country: {named_location.country}")
# Install development dependencies
poetry install
# Install pre-commit hooks
poetry run pre-commit install
# Run all tests
poetry run pytest
# Run with coverage
poetry run pytest --cov=hyperion
# Run specific test files
poetry run pytest tests/test_asyncutils.py
This project uses pre-commit hooks to enforce code style:
# Run pre-commit on all files
poetry run pre-commit run -a
The project uses:
- ruff for linting
- mypy for type checking
- commitizen for standardized commits
- Catalog: Manages data assets and their storage in S3
- SchemaStore: Handles schema validation and storage
- Source: Base class for implementing data sources
- Infrastructure: Abstractions for AWS services (S3, DynamoDB, SQS, etc.)
- Utils: Helper functions for dates, async operations, etc.
- DataLakeAsset: Raw data stored in a data lake
- FeatureAsset: Processed features with time resolution
- PersistentStoreAsset: Persistent data storage
See CONTRIBUTING.md for guidelines on contributing to this project.
This project is licensed under the MIT License - see the LICENSE file for details.