Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions python/destinations/quixlake-timeseries/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
# Quix TS Datalake Sink

This connector consumes time-series data from a Kafka topic and writes it to S3 as Hive-partitioned Parquet files, with optional Quix catalog registration for data lake query API.

## Features

- **Hive Partitioning**: Automatically partition data by any columns (e.g., location, sensor type, year/month/day/hour)
- **Time-based Partitioning**: Extract year/month/day/hour from timestamp columns for efficient time-based queries
- **Quix Catalog Integration**: Optional table registration in a REST Catalog for seamless integration with analytics tools
- **Efficient Batching**: Configurable batch sizes and parallel S3 uploads for high throughput
- **Schema Evolution**: Automatic schema detection from data
- **Partition Validation**: Prevents data corruption by validating partition strategies against existing tables

## How to run

Create a [Quix](https://portal.cloud.quix.io/signup?xlink=github) account or log in and visit the `Connectors` tab to use this connector.

Clicking `Set up connector` allows you to enter your connection details and runtime parameters.

Then either:
* Click `Test connection & deploy` to deploy the pre-built and configured container into Quix
* Or click `Customise connector` to inspect or alter the code before deployment

## Environment Variables

### Required

- **`input`**: Name of the Kafka input topic to consume from
*Default*: `sensor-data`

- **`S3_BUCKET`**: S3 bucket name for storing Parquet files

### S3 Configuration

- **`S3_PREFIX`**: S3 prefix/path for data files
*Default*: `data`

- **`AWS_ACCESS_KEY_ID`**: AWS Access Key ID for S3 access
*Default*: `""` (uses IAM role if empty)

- **`AWS_SECRET_ACCESS_KEY`**: AWS Secret Access Key for S3 access
*Default*: `""` (uses IAM role if empty)

- **`AWS_REGION`**: AWS region for S3 bucket
*Default*: `us-east-1`

- **`AWS_ENDPOINT_URL`**: Custom S3 endpoint URL for non-AWS S3-compatible storage
*Examples*:
- MinIO: `http://minio.example.com:9000`
- Wasabi: `https://s3.wasabisys.com`
- DigitalOcean Spaces: `https://nyc3.digitaloceanspaces.com`
- Backblaze B2: `https://s3.us-west-004.backblazeb2.com`
*Default*: None (uses AWS S3)

### Data Organization

- **`TABLE_NAME`**: Table name for data organization and registration
*Default*: Uses the topic name if not specified

- **`HIVE_COLUMNS`**: Comma-separated list of columns for Hive partitioning. Include `year`, `month`, `day`, `hour` to extract from `TIMESTAMP_COLUMN`
*Example*: `location,year,month,day,sensor_type`
*Default*: `""` (no partitioning)

- **`TIMESTAMP_COLUMN`**: Column containing timestamp values to extract year/month/day/hour from
*Default*: `ts_ms`

### Catalog Integration (Optional)

- **`CATALOG_URL`**: REST Catalog URL for optional table registration (leave empty to skip)
*Example*: `https://catalog.example.com/api/v1`

- **`CATALOG_AUTH_TOKEN`**: If using a catalog, the respective auth token to access it

- **`AUTO_DISCOVER`**: Automatically register table in REST Catalog on first write
*Default*: `true`

- **`CATALOG_NAMESPACE`**: Catalog namespace for table registration
*Default*: `default`

### Kafka Configuration

- **`CONSUMER_GROUP`**: Kafka consumer group name
*Default*: `s3_direct_sink_v1.0`

- **`AUTO_OFFSET_RESET`**: Where to start consuming if no offset exists
*Default*: `latest`
*Options*: `earliest`, `latest`

- **`KAFKA_KEY_DESERIALIZER`**: The key deserializer to use
*Default*: `str`

- **`KAFKA_VALUE_DESERIALIZER`**: The value deserializer to use
*Default*: `json`

### Performance Tuning

- **`BATCH_SIZE`**: Number of messages to batch before writing to S3
*Default*: `1000`

- **`COMMIT_INTERVAL`**: Kafka commit interval in seconds
*Default*: `30`

- **`MAX_WRITE_WORKERS`**: How many files can be written in parallel to S3 at once
*Default*: `10`

### Application Settings

- **`LOGLEVEL`**: Set application logging level
*Default*: `INFO`
*Options*: `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`

## Partitioning Strategy Examples

### Example 1: Time-based partitioning
```bash
HIVE_COLUMNS=year,month,day
TIMESTAMP_COLUMN=ts_ms
```
Creates: `s3://bucket/prefix/table/year=2024/month=01/day=15/data_*.parquet`

### Example 2: Multi-dimensional partitioning
```bash
HIVE_COLUMNS=location,sensor_type,year,month
TIMESTAMP_COLUMN=timestamp
```
Creates: `s3://bucket/prefix/table/location=NYC/sensor_type=temp/year=2024/month=01/data_*.parquet`

### Example 3: No partitioning
```bash
HIVE_COLUMNS=
```
Creates: `s3://bucket/prefix/table/data_*.parquet`

## Using Non-AWS S3-Compatible Storage

This connector supports any S3-compatible storage service by setting the `AWS_ENDPOINT_URL` environment variable.

### MinIO Example
```bash
AWS_ENDPOINT_URL=http://minio.example.com:9000
AWS_ACCESS_KEY_ID=minioadmin
AWS_SECRET_ACCESS_KEY=minioadmin
AWS_REGION=us-east-1
S3_BUCKET=my-data-lake
```

### Wasabi Example
```bash
AWS_ENDPOINT_URL=https://s3.wasabisys.com
AWS_ACCESS_KEY_ID=your-wasabi-access-key
AWS_SECRET_ACCESS_KEY=your-wasabi-secret-key
AWS_REGION=us-east-1
S3_BUCKET=my-data-lake
```

### DigitalOcean Spaces Example
```bash
AWS_ENDPOINT_URL=https://nyc3.digitaloceanspaces.com
AWS_ACCESS_KEY_ID=your-spaces-access-key
AWS_SECRET_ACCESS_KEY=your-spaces-secret-key
AWS_REGION=nyc3
S3_BUCKET=my-data-lake
```

### Backblaze B2 Example
```bash
AWS_ENDPOINT_URL=https://s3.us-west-004.backblazeb2.com
AWS_ACCESS_KEY_ID=your-b2-key-id
AWS_SECRET_ACCESS_KEY=your-b2-application-key
AWS_REGION=us-west-004
S3_BUCKET=my-data-lake
```

## Architecture

The sink uses a batching architecture for high throughput:

1. **Consume**: Messages are consumed from Kafka in batches
2. **Transform**: Time-based columns are extracted if needed
3. **Partition**: Data is grouped by partition columns
4. **Upload**: Multiple files are uploaded to S3 in parallel
5. **Register**: Files are registered in the catalog (if configured)

## Requirements

- S3 bucket access:
- AWS S3, or
- Any S3-compatible storage (MinIO, Wasabi, DigitalOcean Spaces, Backblaze B2, etc.)
- Optional: Quix REST Catalog endpoint for data catalog integration

## Contribute

Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.

## Open Source

This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. Please star us and mention us on social to show your appreciation.
70 changes: 70 additions & 0 deletions python/destinations/quixlake-timeseries/catalog_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import requests
from typing import Optional


class CatalogClient:
"""Simple HTTP client for REST Catalog API interactions"""

def __init__(self, base_url: str, auth_token: Optional[str] = None):
"""
Initialize the catalog client

Args:
base_url: Base URL of the REST Catalog API
auth_token: Optional authentication token for API requests
"""
self.base_url = base_url.rstrip('/')
self.auth_token = auth_token
self._session = requests.Session()

# Set up authentication header if token is provided
if self.auth_token:
self._session.headers['Authorization'] = f'Bearer {self.auth_token}'

def get(self, path: str, timeout: int = 30) -> requests.Response:
"""
Make a GET request to the catalog API

Args:
path: API endpoint path (will be appended to base_url)
timeout: Request timeout in seconds

Returns:
Response object from requests library
"""
url = f"{self.base_url}{path}"
return self._session.get(url, timeout=timeout)

def post(self, path: str, json: dict = None, timeout: int = 30) -> requests.Response:
"""
Make a POST request to the catalog API

Args:
path: API endpoint path (will be appended to base_url)
json: JSON payload to send in request body
timeout: Request timeout in seconds

Returns:
Response object from requests library
"""
url = f"{self.base_url}{path}"
return self._session.post(url, json=json, timeout=timeout)

def put(self, path: str, json: dict = None, timeout: int = 30) -> requests.Response:
"""
Make a PUT request to the catalog API

Args:
path: API endpoint path (will be appended to base_url)
json: JSON payload to send in request body
timeout: Request timeout in seconds

Returns:
Response object from requests library
"""
url = f"{self.base_url}{path}"
return self._session.put(url, json=json, timeout=timeout)

def __str__(self):
"""String representation showing the base URL"""
return self.base_url
23 changes: 23 additions & 0 deletions python/destinations/quixlake-timeseries/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM python:3.11.1-slim-buster

# Set environment variables to non-interactive and unbuffered output
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8

# Set the working directory inside the container
WORKDIR /app

# Copy only the requirements file(s) to leverage Docker cache
# Assuming all requirements files are in the root or subdirectories
COPY ./requirements.txt ./

# Install dependencies
# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size
RUN pip install --no-cache-dir -r requirements.txt

# Copy the rest of the application
COPY . .

# Set the command to run your application
ENTRYPOINT ["python3", "main.py"]
Binary file added python/destinations/quixlake-timeseries/icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading