From 028113fe36be3d36b778126b5441a86679b51cb7 Mon Sep 17 00:00:00 2001 From: Tomas Neubauer Date: Thu, 20 Nov 2025 17:19:26 +0100 Subject: [PATCH 1/5] Adds Quixlake Timeseries destination connector This commit introduces a new destination connector that writes time-series data from Kafka to S3 as Hive-partitioned Parquet files. Key features: - Supports Hive partitioning by any column, including time-based partitioning from timestamp columns. - Offers optional integration with a REST Catalog for table registration. - Includes configurable batch sizes and parallel uploads for optimal performance. - Validates partition strategies against existing tables to prevent data corruption. --- .../quixlake-timeseries/README.md | 150 +++++ .../quixlake-timeseries/catalog_client.py | 70 +++ .../quixlake-timeseries/dockerfile | 23 + .../destinations/quixlake-timeseries/icon.png | Bin 0 -> 4851 bytes .../quixlake-timeseries/library.json | 194 ++++++ .../destinations/quixlake-timeseries/main.py | 75 +++ .../quixlake-timeseries/quixlake_sink.py | 557 ++++++++++++++++++ .../quixlake-timeseries/requirements.txt | 6 + python/destinations/s3-file/requirements.txt | 6 +- .../docker-compose.test.yml | 196 ++++++ .../mock-catalog/Dockerfile | 16 + .../quixlake-timeseries/mock-catalog/app.py | 190 ++++++ .../mock-catalog/requirements.txt | 1 + .../quixlake-timeseries/produce_test_data.py | 88 +++ .../quixlake-timeseries/verify_catalog.py | 234 ++++++++ .../quixlake-timeseries/verify_output.py | 293 +++++++++ 16 files changed, 2098 insertions(+), 1 deletion(-) create mode 100644 python/destinations/quixlake-timeseries/README.md create mode 100644 python/destinations/quixlake-timeseries/catalog_client.py create mode 100644 python/destinations/quixlake-timeseries/dockerfile create mode 100644 python/destinations/quixlake-timeseries/icon.png create mode 100644 python/destinations/quixlake-timeseries/library.json create mode 100644 python/destinations/quixlake-timeseries/main.py create mode 100644 python/destinations/quixlake-timeseries/quixlake_sink.py create mode 100644 python/destinations/quixlake-timeseries/requirements.txt create mode 100644 tests/destinations/quixlake-timeseries/docker-compose.test.yml create mode 100644 tests/destinations/quixlake-timeseries/mock-catalog/Dockerfile create mode 100644 tests/destinations/quixlake-timeseries/mock-catalog/app.py create mode 100644 tests/destinations/quixlake-timeseries/mock-catalog/requirements.txt create mode 100644 tests/destinations/quixlake-timeseries/produce_test_data.py create mode 100644 tests/destinations/quixlake-timeseries/verify_catalog.py create mode 100644 tests/destinations/quixlake-timeseries/verify_output.py diff --git a/python/destinations/quixlake-timeseries/README.md b/python/destinations/quixlake-timeseries/README.md new file mode 100644 index 000000000..24b7379fa --- /dev/null +++ b/python/destinations/quixlake-timeseries/README.md @@ -0,0 +1,150 @@ +# Quix TS Datalake Sink + +This connector consumes time-series data from a Kafka topic and writes it to S3 as Hive-partitioned Parquet files, with optional Quix catalog registration for data lake query API. + +## Features + +- **Hive Partitioning**: Automatically partition data by any columns (e.g., location, sensor type, year/month/day/hour) +- **Time-based Partitioning**: Extract year/month/day/hour from timestamp columns for efficient time-based queries +- **Quix Catalog Integration**: Optional table registration in a REST Catalog for seamless integration with analytics tools +- **Efficient Batching**: Configurable batch sizes and parallel S3 uploads for high throughput +- **Schema Evolution**: Automatic schema detection from data +- **Partition Validation**: Prevents data corruption by validating partition strategies against existing tables + +## How to run + +Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log in and visit the `Connectors` tab to use this connector. + +Clicking `Set up connector` allows you to enter your connection details and runtime parameters. + +Then either: +* Click `Test connection & deploy` to deploy the pre-built and configured container into Quix +* Or click `Customise connector` to inspect or alter the code before deployment + +## Environment Variables + +### Required + +- **`input`**: Name of the Kafka input topic to consume from + *Default*: `sensor-data` + +- **`S3_BUCKET`**: S3 bucket name for storing Parquet files + +### S3 Configuration + +- **`S3_PREFIX`**: S3 prefix/path for data files + *Default*: `data` + +- **`AWS_ACCESS_KEY_ID`**: AWS Access Key ID for S3 access + *Default*: `""` (uses IAM role if empty) + +- **`AWS_SECRET_ACCESS_KEY`**: AWS Secret Access Key for S3 access + *Default*: `""` (uses IAM role if empty) + +- **`AWS_REGION`**: AWS region for S3 bucket + *Default*: `us-east-1` + +- **`AWS_ENDPOINT_URL`**: S3 endpoint URL (for non-AWS S3-compatible storage like MinIO) + *Default*: None + +### Data Organization + +- **`TABLE_NAME`**: Table name for data organization and registration + *Default*: Uses the topic name if not specified + +- **`HIVE_COLUMNS`**: Comma-separated list of columns for Hive partitioning. Include `year`, `month`, `day`, `hour` to extract from `TIMESTAMP_COLUMN` + *Example*: `location,year,month,day,sensor_type` + *Default*: `""` (no partitioning) + +- **`TIMESTAMP_COLUMN`**: Column containing timestamp values to extract year/month/day/hour from + *Default*: `ts_ms` + +### Catalog Integration (Optional) + +- **`CATALOG_URL`**: REST Catalog URL for optional table registration (leave empty to skip) + *Example*: `https://catalog.example.com/api/v1` + +- **`CATALOG_AUTH_TOKEN`**: If using a catalog, the respective auth token to access it + +- **`AUTO_DISCOVER`**: Automatically register table in REST Catalog on first write + *Default*: `true` + +- **`CATALOG_NAMESPACE`**: Catalog namespace for table registration + *Default*: `default` + +### Kafka Configuration + +- **`CONSUMER_GROUP`**: Kafka consumer group name + *Default*: `s3_direct_sink_v1.0` + +- **`AUTO_OFFSET_RESET`**: Where to start consuming if no offset exists + *Default*: `latest` + *Options*: `earliest`, `latest` + +- **`KAFKA_KEY_DESERIALIZER`**: The key deserializer to use + *Default*: `str` + +- **`KAFKA_VALUE_DESERIALIZER`**: The value deserializer to use + *Default*: `json` + +### Performance Tuning + +- **`BATCH_SIZE`**: Number of messages to batch before writing to S3 + *Default*: `1000` + +- **`COMMIT_INTERVAL`**: Kafka commit interval in seconds + *Default*: `30` + +- **`MAX_WRITE_WORKERS`**: How many files can be written in parallel to S3 at once + *Default*: `10` + +### Application Settings + +- **`LOGLEVEL`**: Set application logging level + *Default*: `INFO` + *Options*: `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL` + +## Partitioning Strategy Examples + +### Example 1: Time-based partitioning +```bash +HIVE_COLUMNS=year,month,day +TIMESTAMP_COLUMN=ts_ms +``` +Creates: `s3://bucket/prefix/table/year=2024/month=01/day=15/data_*.parquet` + +### Example 2: Multi-dimensional partitioning +```bash +HIVE_COLUMNS=location,sensor_type,year,month +TIMESTAMP_COLUMN=timestamp +``` +Creates: `s3://bucket/prefix/table/location=NYC/sensor_type=temp/year=2024/month=01/data_*.parquet` + +### Example 3: No partitioning +```bash +HIVE_COLUMNS= +``` +Creates: `s3://bucket/prefix/table/data_*.parquet` + +## Architecture + +The sink uses a batching architecture for high throughput: + +1. **Consume**: Messages are consumed from Kafka in batches +2. **Transform**: Time-based columns are extracted if needed +3. **Partition**: Data is grouped by partition columns +4. **Upload**: Multiple files are uploaded to S3 in parallel +5. **Register**: Files are registered in the catalog (if configured) + +## Requirements + +- S3 bucket access (AWS S3 or S3-compatible storage) +- Optional: Quix REST Catalog endpoint for data catalog integration + +## Contribute + +Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. + +## Open Source + +This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. Please star us and mention us on social to show your appreciation. diff --git a/python/destinations/quixlake-timeseries/catalog_client.py b/python/destinations/quixlake-timeseries/catalog_client.py new file mode 100644 index 000000000..594aae629 --- /dev/null +++ b/python/destinations/quixlake-timeseries/catalog_client.py @@ -0,0 +1,70 @@ +import requests +from typing import Optional + + +class CatalogClient: + """Simple HTTP client for REST Catalog API interactions""" + + def __init__(self, base_url: str, auth_token: Optional[str] = None): + """ + Initialize the catalog client + + Args: + base_url: Base URL of the REST Catalog API + auth_token: Optional authentication token for API requests + """ + self.base_url = base_url.rstrip('/') + self.auth_token = auth_token + self._session = requests.Session() + + # Set up authentication header if token is provided + if self.auth_token: + self._session.headers['Authorization'] = f'Bearer {self.auth_token}' + + def get(self, path: str, timeout: int = 30) -> requests.Response: + """ + Make a GET request to the catalog API + + Args: + path: API endpoint path (will be appended to base_url) + timeout: Request timeout in seconds + + Returns: + Response object from requests library + """ + url = f"{self.base_url}{path}" + return self._session.get(url, timeout=timeout) + + def post(self, path: str, json: dict = None, timeout: int = 30) -> requests.Response: + """ + Make a POST request to the catalog API + + Args: + path: API endpoint path (will be appended to base_url) + json: JSON payload to send in request body + timeout: Request timeout in seconds + + Returns: + Response object from requests library + """ + url = f"{self.base_url}{path}" + return self._session.post(url, json=json, timeout=timeout) + + def put(self, path: str, json: dict = None, timeout: int = 30) -> requests.Response: + """ + Make a PUT request to the catalog API + + Args: + path: API endpoint path (will be appended to base_url) + json: JSON payload to send in request body + timeout: Request timeout in seconds + + Returns: + Response object from requests library + """ + url = f"{self.base_url}{path}" + return self._session.put(url, json=json, timeout=timeout) + + def __str__(self): + """String representation showing the base URL""" + return self.base_url diff --git a/python/destinations/quixlake-timeseries/dockerfile b/python/destinations/quixlake-timeseries/dockerfile new file mode 100644 index 000000000..d3cd4959a --- /dev/null +++ b/python/destinations/quixlake-timeseries/dockerfile @@ -0,0 +1,23 @@ +FROM python:3.11.1-slim-buster + +# Set environment variables to non-interactive and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 + +# Set the working directory inside the container +WORKDIR /app + +# Copy only the requirements file(s) to leverage Docker cache +# Assuming all requirements files are in the root or subdirectories +COPY ./requirements.txt ./ + +# Install dependencies +# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the rest of the application +COPY . . + +# Set the command to run your application +ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/python/destinations/quixlake-timeseries/icon.png b/python/destinations/quixlake-timeseries/icon.png new file mode 100644 index 0000000000000000000000000000000000000000..2940139df862529596b3601ee8abfbd96e436f60 GIT binary patch literal 4851 zcmV!el(7juaywH~YOqPAC2l%A=ctW4z~ z68r|*e4W|`xTpj$5uP9-?77&1lM;Xg-$|S2;R?P|!9@7NWEw%8Xa@=Ze%kDZEBs2v zR%-h#S&oZwH2|Hk9j3~6ArBrVO|X6JhZhjQ6~5HCfCQrmV5y34!iCIWo8WQ#%?JT3 zgOA62*R7Kq>^82k4a0IHkbAx>eEGpLe%^i~G=SyjzXBId!Yv#purz!#Wp6u7PxRKRx<%Q;lim{%{D*;4s!qKenHmnsMow@*q=KHQPu#C z6MjiAV&8L5EiZ)G_e(KK$TQpG4%`StNA>>0A!?Y~6z8=?LPP zEP|SbnuasMg+OFu%K3_Q`Vr!cZjW$@!eJS6BS8H&A!-b4C}{|G$ys5C=?6nAm#S3kwsf({R> zy8UtRaixva5tvGqrDs=aH{sYlm9tFLKDhi|o$O=D`%mMEmM$vZ+ZSTB^*bDy)UsEFj zH*Z4meb=KHZbqmkjQIrapE$s zR2rYeUivKL)i699MfL~ZMcwN!BXG~Bp{`pm_Rj|SXI%Fl{MnE3+&6Z>r>Vl_G8LRX zd&f}|fKt{&rVEV|IEgrsY+k&N4(6cj z{2%1PRanNoZ{-RMUU?(p$rOsDW<6@<`}F)%F}V_S&o$I$n%Wg*ev9G5qal#a;zTM7 zLXG!NpmC?+it-|I9fg@6Aw(L+Nkot? z0%P}b?$KgPpYS-E(hVfY)8GWryio!>YQ4y+CwT+gKykJv51A$Ar$En z{izI2CDQoi@m_JA>nI+veo-4PuL}!;)P{njDe_1aic(D;Ry+Y%m<19tCx^yM)YGYV z=L|CdlQ)sXdd49+d|Yxcq*1&4+sc%y#+H9jxetApsMuEvd|9o8*ufkxq4 zT@n^lGy#OVCqcAQFYRU3PAZ;P`NXn0WQh=L>{Zj)9?(3aT*S#lC(;AxUx4N(eAJjo z{p4|+PG<1t>9ct1E#Rj1W;BpikYN(5;xzz=mv3KjIqvazktOwfCY=_a<%j@DQrW*F zBkyP=h5=IXWCjR}pW_S62aUV{k7h3zQH1LyGy*k0?W-ZxP{sRcA8QFeOhYnG6FNM6 z$bgC`01Got0h1Wa)jd^SFTy^b8D^$`>)P8yHZP}kr}8VAIBybT^Ps`SX!OYns;#2b3K@vQH7Vge-ZCmA^WejJ5`f(s6JmLt}fEVgVU)2eZW@g>s8z z`Rg>b%GQ1!Lo|z+plyU=1^tNxHm_KQM>pR_UPM+z`Ghw?ueo+WEK?8*Lh~h^;tK_& zDcDq?Yp3Wq{u`|{Zy-Z3sNw@bb-@6B_WNV#A=i$bHm>lIBY}x6hk!|d9#s?WqQ{B> za4evS1jz1TgyIHF7WyQJ`e<=sA7m( zmCqwIj|`}M8-UmNQIGWz5ys01y43!ujJbwUwuG!u(jf~Ry*k*1O902iC4j@>62Reb z3E*(J1aLSUO#pW)$mvC*;F7@cLx9s3f{cZ012`Nm0UQpO01k&s0EfdRfWzSuz~OKS z;BZtt0cInZGqv&)m5GYa3FHi2&fDXJP~9kv5JI!EcRBl4tPUP2z&MsN#`#KL7DP6& zYM#$E0mZefGGLq;6J}k%bAb`B)fEDdT-ya4Ico0ThtQ6%nWNIdL>b*NA6xBD1*0?{ilN z&j+xu>O(=aR#%B$tBHL7l1T34eMPSF!O5OQT`UG=A@Y*2-e?>u$-Xb1z?paoL$r_6 z!G}^Aky6eTLUeJBlE^UT6mX$gtiKWP$e{%!C=dZt$ohqenu@v7LsaDS`Bag{tu>m9 zJZghMynf;o{%y~1Q6CHlbeR_hjeJ3Cdrmy$p8w7lv3%pjyuhx%6VD6%Q%P z#EAz)mq}BsAR=LNh4P@!Bi=_cdGw{zqHK&PB*pauVGa-SEc>`Pq*)zoKjG7B^=x+e z*c0F~FD%f3R2qjPF&rEmMt3Y>TKaSzuP8c8WZ?30A+8Ce5J@z%evndDsar$~46{s( z;;s&!KVTu&kx$NbfzDBnKAlYCED8HmJc&Mf-Iq+^cr-z-UR{N*># zRYD>K1H(iYCXA1ZLJ=7P8oyTIS50q1qvNfx$`s)so$ z5X4|IFc;YW=OQ26+mkelXuEq(5X(LZ%jjQqm}pk75Hp?vx*>qS<6`w z)4Uwy;U{b=jbU^&x5HTfm&l*{7PO%#qyazkfXmS2i`yZ;i=jxMMvH8OD2DmaO2x3;N|O*AejdyDi**6J=2&ykhv3Jy0h{iGqNg09c*i8?%tzQWf)j%F1#7}s#BKSC3wG-q zWa4mhDuwE723XVqerEbykbX|QfKYwC*v@4-5~q9f3?b%#Xd028TOb%5_4!D0BS?2{GN=uAaj3)L62Re*g8-s-|Lm!A zeqe+wNug2@Amk%&wqS;4XAh-EVljkzoWrle}1*k zpUSnf@(3ATMZ=1_?uGg-&IovD$?=5fcC z598`g4C!2M{K*pFZ{o zvU=%!=+=(2V`2jEmX8;?e{}f5jT@2K^f3e%E|Ny*<&cU1W06azv1iBE(DUN=(VfVO z3}0C5{Kq8#2{&CRVlnl2Psb7{pMMBiOPjcsjKXI~O#oYHiQyp}cPgAa_97=Dx5R=B7{?4kCF@-crbCd~@w z6RF)nFE>!z1s9cqJ=8Xk>~5J%_>5^~K3E!I1GO#GI<4jo5pxSx@jWo-bhv%TitbJc Ze*{Ic4`|4H{L=sc002ovPDHLkV1k0Y0c8LH literal 0 HcmV?d00001 diff --git a/python/destinations/quixlake-timeseries/library.json b/python/destinations/quixlake-timeseries/library.json new file mode 100644 index 000000000..8cca517f2 --- /dev/null +++ b/python/destinations/quixlake-timeseries/library.json @@ -0,0 +1,194 @@ +{ + "libraryItemId": "starter-destination", + "name": "Quix TS Datalake Sink", + "language": "Python", + "tags": { + "Pipeline Stage": ["Destination"], + "Type": ["Connectors"], + "Category": ["File Store"] + }, + "shortDescription": "Consume data from a Kafka topic and write it to an AWS S3 bucket path.", + "DefaultFile": "main.py", + "EntryPoint": "dockerfile", + "RunEntryPoint": "main.py", + "IconFile": "icon.png", + "Variables": [ + { + "Name": "input", + "Type": "EnvironmentVariable", + "InputType": "InputTopic", + "Description": "Name of the Kafka input topic to consume from", + "DefaultValue": "sensor-data", + "Required": true + }, + { + "Name": "S3_BUCKET", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "S3 bucket name for storing Parquet files", + "Required": true + }, + { + "Name": "S3_PREFIX", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "S3 prefix/path for data files", + "DefaultValue": "data", + "Required": false + }, + { + "Name": "TABLE_NAME", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Table name for data organization and registration, else defaults to topic name", + "Required": false + }, + { + "Name": "HIVE_COLUMNS", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Comma-separated list of columns for Hive partitioning. Include year/month/day/hour to extract from TIMESTAMP_COLUMN (e.g., location,year,month,day,sensor_type)", + "DefaultValue": "", + "Required": false + }, + { + "Name": "TIMESTAMP_COLUMN", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Column containing timestamp values to extract year/month/day/hour from", + "DefaultValue": "ts_ms", + "Required": false + }, + { + "Name": "CATALOG_URL", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "REST Catalog URL for optional table registration (leave empty to skip)", + "Required": false + }, + { + "Name": "CATALOG_AUTH_TOKEN", + "Type": "EnvironmentVariable", + "InputType": "Secret", + "Description": "If using a catalog, the respective auth token to access it", + "Required": false + }, + { + "Name": "AUTO_DISCOVER", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Automatically register table in REST Catalog on first write", + "DefaultValue": "true", + "Required": false + }, + { + "Name": "CATALOG_NAMESPACE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Catalog namespace for table registration", + "DefaultValue": "default", + "Required": false + }, + { + "Name": "BATCH_SIZE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Number of messages to batch before writing to S3", + "DefaultValue": "1000", + "Required": false + }, + { + "Name": "COMMIT_INTERVAL", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Kafka commit interval in seconds", + "DefaultValue": "30", + "Required": false + }, + { + "Name": "KAFKA_KEY_DESERIALIZER", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The key deserializer to use", + "DefaultValue": "str", + "Required": false + }, + { + "Name": "KAFKA_VALUE_DESERIALIZER", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "the value deserializer to use", + "DefaultValue": "json", + "Required": false + }, + { + "Name": "CONSUMER_GROUP", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Kafka consumer group name", + "DefaultValue": "s3_direct_sink_v1.0", + "Required": false + }, + { + "Name": "AUTO_OFFSET_RESET", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Where to start consuming if no offset exists", + "DefaultValue": "latest", + "Required": false + }, + { + "Name": "AWS_ACCESS_KEY_ID", + "Type": "EnvironmentVariable", + "InputType": "Secret", + "Description": "AWS Access Key ID for S3 access", + "DefaultValue": "", + "Required": false + }, + { + "Name": "AWS_SECRET_ACCESS_KEY", + "Type": "EnvironmentVariable", + "InputType": "Secret", + "Description": "AWS Secret Access Key for S3 access", + "Required": false + }, + { + "Name": "AWS_REGION", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "AWS region for S3 bucket", + "DefaultValue": "us-east-1", + "Required": false + }, + { + "Name": "AWS_ENDPOINT_URL", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "S3 endpoint url (for non-AWS endpoints)", + "Required": false + }, + { + "Name": "LOGLEVEL", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "set application logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)", + "DefaultValue": "INFO" + }, + { + "Name": "MAX_WRITE_WORKERS", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "How many files can be written in parallel to S3 at once (based on partitioning + batch size)", + "DefaultValue": "10", + "Required": false + } + ], + "DeploySettings": { + "DeploymentType": "Service", + "CpuMillicores": 200, + "MemoryInMb": 500, + "Replicas": 1, + "PublicAccess": false, + "ValidateConnection": false + } +} \ No newline at end of file diff --git a/python/destinations/quixlake-timeseries/main.py b/python/destinations/quixlake-timeseries/main.py new file mode 100644 index 000000000..9869719ae --- /dev/null +++ b/python/destinations/quixlake-timeseries/main.py @@ -0,0 +1,75 @@ +""" +Quix TS Datalake Sink - Main Entry Point + +This application consumes data from a Kafka topic and writes it to S3 as +Hive-partitioned Parquet files with optional Iceberg catalog registration. +""" +import os +import logging + +from quixstreams import Application +from quixlake_sink import QuixLakeSink + +# Configure logging +logging.basicConfig( + level=os.getenv("LOGLEVEL", "INFO"), + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +def parse_hive_columns(columns_str: str) -> list: + """ + Parse comma-separated list of partition columns. + + Args: + columns_str: Comma-separated column names (e.g., "year,month,day") + + Returns: + List of column names, or empty list if input is empty + """ + if not columns_str or columns_str.strip() == "": + return [] + return [col.strip() for col in columns_str.split(",") if col.strip()] + + +# Initialize Quix Streams Application +app = Application( + consumer_group=os.getenv("CONSUMER_GROUP", "s3_direct_sink_v1.0"), + auto_offset_reset=os.getenv("AUTO_OFFSET_RESET", "latest"), + commit_interval=int(os.getenv("COMMIT_INTERVAL", "30")) +) + +# Parse configuration +hive_columns = parse_hive_columns(os.getenv("HIVE_COLUMNS", "")) +auto_discover = os.getenv("AUTO_DISCOVER", "true").lower() == "true" +table_name = os.getenv("TABLE_NAME") or os.environ["input"] + +# Initialize QuixLakeSink +s3_sink = QuixLakeSink( + s3_bucket=os.environ["S3_BUCKET"], + s3_prefix=os.getenv("S3_PREFIX", "data"), + table_name=table_name, + hive_columns=hive_columns, + timestamp_column=os.getenv("TIMESTAMP_COLUMN", "ts_ms"), + catalog_url=os.getenv("CATALOG_URL"), + catalog_auth_token=os.getenv("CATALOG_AUTH_TOKEN"), + auto_discover=auto_discover, + namespace=os.getenv("CATALOG_NAMESPACE", "default"), + auto_create_bucket=True, + max_workers=int(os.getenv("MAX_WRITE_WORKERS", "10")) +) + +# Create streaming dataframe and attach sink +sdf = app.dataframe(topic=app.topic(os.environ["input"])) + +# Attach sink (batching is handled by BatchingSink) +sdf.sink(s3_sink) + +logger.info("Starting Quix TS Datalake Sink") +logger.info(f" Input topic: {os.environ['input']}") +logger.info(f" S3 destination: s3://{os.environ['S3_BUCKET']}/{os.getenv('S3_PREFIX', 'data')}/{table_name}") +logger.info(f" Partitioning: {hive_columns if hive_columns else 'none'}") + +if __name__ == "__main__": + app.run() diff --git a/python/destinations/quixlake-timeseries/quixlake_sink.py b/python/destinations/quixlake-timeseries/quixlake_sink.py new file mode 100644 index 000000000..b597ff39c --- /dev/null +++ b/python/destinations/quixlake-timeseries/quixlake_sink.py @@ -0,0 +1,557 @@ +from quixstreams.sinks import BatchingSink, SinkBatch +import boto3 +from botocore.exceptions import ClientError +from s3transfer.manager import TransferManager, TransferConfig +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq +import time +import logging +import uuid +import os +from typing import List, Dict, Any, Optional +from datetime import datetime, timezone +from io import BytesIO +from catalog_client import CatalogClient + + +TIMESTAMP_COL_MAPPER = { + "year": lambda col: col.dt.year.astype(str), + "month": lambda col: col.dt.month.astype(str).str.zfill(2), + "day": lambda col: col.dt.day.astype(str).str.zfill(2), + "hour": lambda col: col.dt.hour.astype(str).str.zfill(2) +} + +logger = logging.getLogger('quixstreams') + + +class QuixLakeSink(BatchingSink): + """ + Writes Kafka batches directly to S3 as Hive-partitioned Parquet files, + then optionally registers the table using the discover endpoint. + """ + + def __init__( + self, + s3_bucket: str, + s3_prefix: str, + table_name: str, + hive_columns: List[str] = None, + timestamp_column: str = "ts_ms", + catalog_url: Optional[str] = None, + catalog_auth_token: Optional[str] = None, + auto_discover: bool = True, + namespace: str = "default", + auto_create_bucket: bool = True, + max_workers: int = 10 + ): + """ + Initialize S3 Direct Sink + + Args: + s3_bucket: S3 bucket name + s3_prefix: S3 prefix/path for data files + table_name: Table name for registration + hive_columns: List of columns to use for Hive partitioning. Include 'year', 'month', + 'day', 'hour' to extract these from timestamp_column + timestamp_column: Column containing timestamp to extract time partitions from + catalog_url: Optional REST Catalog URL for table registration + catalog_auth_token: If using REST Catalog, the respective auth token for it + auto_discover: Whether to auto-register table on first write + namespace: Catalog namespace (default: "default") + auto_create_bucket: if True, create bucket in S3 if missing. + max_workers: Maximum number of parallel upload threads (default: 10) + """ + self._aws_region = os.getenv('AWS_REGION', 'us-east-1') + self._aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID") + self._aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY") + self._aws_endpoint_url = os.getenv("AWS_ENDPOINT_URL", None) + self._credentials = { + "region_name": self._aws_region, + "aws_access_key_id": self._aws_access_key_id, + "aws_secret_access_key": self._aws_secret_access_key, + "endpoint_url": self._aws_endpoint_url, + } + self.s3_bucket = s3_bucket + self.s3_prefix = s3_prefix + self.table_name = table_name + self.hive_columns = hive_columns or [] + self.timestamp_column = timestamp_column + self._catalog = CatalogClient(catalog_url, catalog_auth_token) if catalog_url else None + self.auto_discover = auto_discover + self.namespace = namespace + self.table_registered = False + + # S3 client will be initialized in setup() + self.s3_client = None + self._ts_hive_columns = {'year', 'month', 'day', 'hour'} & set(self.hive_columns) + self._auto_create_bucket = auto_create_bucket + self._max_workers = max_workers + + # Batch upload tracking with TransferManager + self._pending_futures = [] + self._transfer_manager = None + + super().__init__() + + def setup(self): + """Initialize S3 client and test connection""" + logger.info("Starting S3 Direct Sink...") + logger.info(f"S3 Target: s3://{self.s3_bucket}/{self.s3_prefix}/{self.table_name}") + logger.info(f"Partitioning: hive_columns={self.hive_columns}") + + if self._catalog and self.auto_discover: + logger.info(f"Table will be auto-registered in REST Catalog on first write") + + try: + # Initialize S3 client + self.s3_client = boto3.client( + 's3', + **self._credentials + ) + + # Initialize TransferManager for concurrent uploads + transfer_config = TransferConfig(max_request_concurrency=self._max_workers) + self._transfer_manager = TransferManager(self.s3_client, config=transfer_config) + + # Confirm bucket connection + self._ensure_bucket() + + # Test Catalog connection if configured + if self._catalog: + try: + response = self._catalog.get("/health", timeout=5) + response.raise_for_status() + logger.info("Successfully connected to REST Catalog at %s", self._catalog) + except Exception as e: + logger.warning("Could not connect to REST Catalog: %s. Table registration disabled.", e) + self.auto_discover = False + + # Check if table already exists in S3 and validate partition strategy + self._validate_existing_table_structure() + + except Exception as e: + logger.error("Failed to setup S3 connection: %s", e) + raise + + def _ensure_bucket(self): + bucket = self.s3_bucket + try: + self.s3_client.head_bucket(Bucket=bucket) + except ClientError as e: + error_code = int(e.response["Error"]["Code"]) + if error_code == 404 and self._auto_create_bucket: + # Bucket does not exist, create it + logger.debug(f"⚠️ Bucket '{bucket}' not found. Creating it...") + self.s3_client.create_bucket(Bucket=self.s3_bucket) + logger.info(f"✅ Bucket '{bucket}' created.") + else: + raise + logger.info("Successfully connected to S3 bucket: %s", bucket) + + def write(self, batch: SinkBatch): + """Write batch directly to S3""" + # Register table before first write if auto-discover is enabled + if self.auto_discover and not self.table_registered and self._catalog: + self._register_table() + + attempts = 3 + while attempts: + start = time.perf_counter() + try: + self._write_batch(batch) + elapsed_ms = (time.perf_counter() - start) * 1000 + logger.info("✔ wrote %d rows to S3 in %.1f ms", batch.size, elapsed_ms) + return + except Exception as exc: + attempts -= 1 + if attempts == 0: + raise + logger.warning("Write failed (%s) – retrying …", exc) + time.sleep(3) + + def _write_batch(self, batch: SinkBatch): + """Convert batch to Parquet and write to S3 with Hive partitioning""" + if not batch: + return + + # Convert batch to list of dictionaries + rows = [] + for item in batch: + row = item.value.copy() + # Add timestamp and key if not present + # This ensures we have a timestamp column for time-based partitioning + if self.timestamp_column not in row: + row[self.timestamp_column] = item.timestamp + row["__key"] = item.key + rows.append(row) + + # Convert to DataFrame for easier manipulation + df = pd.DataFrame(rows) + + # Add time-based partition columns (year/month/day/hour) if they're specified in hive_columns + # These are extracted from the timestamp_column + if self._ts_hive_columns: + df = self._add_timestamp_columns(df) + + # Use only the explicitly specified partition columns + if partition_columns := self.hive_columns.copy(): + # Group by partition columns and write each partition separately + # This creates the Hive-style directory structure: col1=val1/col2=val2/file.parquet + for group_values, group_df in df.groupby(partition_columns): + # Ensure group_values is always a tuple for consistent handling + if not isinstance(group_values, tuple): + group_values = (group_values,) + + # Build S3 key with Hive partitioning (col=value format) + # Example: s3://bucket/prefix/table/year=2024/month=01/day=15/data_abc123.parquet + partition_parts = [f"{col}={val}" for col, val in zip(partition_columns, group_values)] + s3_key = f"{self.s3_prefix}/{self.table_name}/" + "/".join(partition_parts) + f"/data_{uuid.uuid4().hex}.parquet" + + # Remove partition columns from data (Hive style - partition values are in the path, not the data) + data_df = group_df.drop(columns=partition_columns, errors='ignore') + + # Write to S3 + self._write_parquet_to_s3(data_df, s3_key, partition_columns, group_values) + else: + # No partitioning - write as single file directly under table directory + s3_key = f"{self.s3_prefix}/{self.table_name}/data_{uuid.uuid4().hex}.parquet" + self._write_parquet_to_s3(df, s3_key, [], ()) + + # Wait for all uploads to complete and register files in catalog + self._finalize_writes() + + def _write_parquet_to_s3( + self, + df: pd.DataFrame, + s3_key: str, + partition_columns: List[str], + partition_values: tuple + ): + # Convert to Arrow table and prepare buffer + self._null_empty_dicts(df) + table = pa.Table.from_pandas(df) + + buf = pa.BufferOutputStream() + pq.write_table(table, buf) + parquet_bytes = buf.getvalue().to_pybytes() + + # Submit upload to TransferManager + future = self._transfer_manager.upload( + BytesIO(parquet_bytes), + self.s3_bucket, + s3_key + ) + + self._pending_futures.append({ + 'future': future, + 'key': s3_key, + 'row_count': len(df), + 'file_size': len(parquet_bytes), + 'partition_columns': partition_columns, + 'partition_values': partition_values + }) + + def _finalize_writes(self): + """Wait for all pending uploads to complete and register files in catalog""" + if not self._pending_futures: + return + + count = len(self._pending_futures) + logger.debug(f"Waiting for {count} upload(s) to complete...") + + # Wait for all uploads to complete + for item in self._pending_futures: + try: + item['future'].result() # Wait and raise on error + logger.debug("✓ Uploaded %d rows to s3://%s/%s", + item['row_count'], self.s3_bucket, item['key']) + except Exception as e: + logger.error("✗ Failed to upload s3://%s/%s: %s", + self.s3_bucket, item['key'], e) + raise + + logger.info(f"✓ Successfully uploaded {count} file(s)") + + # Register all files in catalog manifest if configured + if self._catalog and self.table_registered: + self._register_files_in_manifest() + + # Clear the futures list + self._pending_futures.clear() + + def _null_empty_dicts(self, df: pd.DataFrame): + """ + Convert empty dictionaries to null values before writing to Parquet. + + Parquet format has limitations with empty maps/structs - they cannot be written + properly and will cause serialization errors. This method scans all columns + that contain dictionaries and replaces empty dicts ({}) with None/null values. + + This is done in-place to avoid copying the DataFrame. + """ + for col in df.columns: + # Check if column contains any dictionary values + if df[col].apply(lambda x: isinstance(x, dict)).any(): + # Replace empty dicts with None; keeps non-empty dicts as-is + df[col] = df[col].apply(lambda x: x or None) + + def _register_table(self): + """Register the table in REST Catalog""" + if not self._catalog: + return + + try: + # First check if table already exists + check_response = self._catalog.get( + f"/namespaces/{self.namespace}/tables/{self.table_name}", + timeout=5 + ) + + if check_response.status_code == 200: + logger.info("Table '%s' already exists in catalog", self.table_name) + self.table_registered = True + # Validate partition strategy matches + self._validate_partition_strategy(check_response.json()) + return + + # Table doesn't exist, create it + s3_path = f"s3://{self.s3_bucket}/{self.s3_prefix}/{self.table_name}" + + # Define partition spec based on configuration + # For dynamic partition discovery, create table without partition spec + # The partition spec will be set when first files are added + partition_spec = [] # Empty spec for dynamic discovery + + # Create table with minimal schema (will be inferred from data) + create_response = self._catalog.put( + f"/namespaces/{self.namespace}/tables/{self.table_name}", + json={ + "location": s3_path, + "partition_spec": partition_spec, # Empty for dynamic discovery + "properties": { + "created_by": "quix-lake-sink", + "auto_discovered": "false", + "expected_partitions": self.hive_columns.copy() # Store expected partitions in properties + } + }, + timeout=30 + ) + + if create_response.status_code in [200, 201]: + logger.info( + "Successfully created table '%s' in REST Catalog. Partitions will be set dynamically to: %s", + self.table_name, + self.hive_columns + ) + self.table_registered = True + else: + logger.warning( + "Failed to create table '%s': %s", + self.table_name, + create_response.text + ) + + except Exception as e: + logger.warning("Failed to register table '%s': %s", self.table_name, e) + + def _add_timestamp_columns(self, df: pd.DataFrame) -> pd.DataFrame: + """ + Add timestamp-based columns (year/month/day/hour) for time-based partitioning. + + This method extracts time components from the timestamp column and adds them + as separate columns that can be used for Hive partitioning. + """ + # Convert to datetime if needed (handles numeric timestamps) + if not pd.api.types.is_datetime64_any_dtype(df[self.timestamp_column]): + sample_value = float(df[self.timestamp_column].iloc[0] if not df[self.timestamp_column].empty else 0) + + # Auto-detect timestamp unit by inspecting the magnitude of the value + # Typical timestamp ranges: + # - Seconds: ~1.7e9 (since epoch 1970) + # - Milliseconds: ~1.7e12 + # - Microseconds: ~1.7e15 + # - Nanoseconds: ~1.7e18 + if sample_value > 1e17: + # Nanoseconds (Java/Kafka timestamps) + df[self.timestamp_column] = pd.to_datetime(df[self.timestamp_column], unit='ns') + elif sample_value > 1e14: + # Microseconds + df[self.timestamp_column] = pd.to_datetime(df[self.timestamp_column], unit='us') + elif sample_value > 1e11: + # Milliseconds (common in JavaScript/Kafka) + df[self.timestamp_column] = pd.to_datetime(df[self.timestamp_column], unit='ms') + else: + # Seconds (Unix timestamp) + df[self.timestamp_column] = pd.to_datetime(df[self.timestamp_column], unit='s') + + # Extract time-based columns (year, month, day, hour) from the timestamp + timestamp_col = df[self.timestamp_column] + + # Only add columns that are specified in _ts_hive_columns + # TIMESTAMP_COL_MAPPER handles proper formatting (e.g., zero-padding for months/days) + for col in self._ts_hive_columns: + df[col] = TIMESTAMP_COL_MAPPER[col](timestamp_col) + + return df + + def _validate_partition_strategy(self, table_metadata: Dict[str, Any]): + """Validate that the sink's partition strategy matches the existing table""" + existing_partition_spec = table_metadata.get("partition_spec", []) + + # Build expected partition spec from sink configuration + expected_partition_spec = self.hive_columns.copy() + + # Special case: If table has no partition spec yet (empty list), + # it will be set when first files are added + if not existing_partition_spec: + logger.info( + "Table '%s' has no partition spec yet. Will be set to %s on first write.", + self.table_name, + expected_partition_spec + ) + return + + # Check if partition strategies match + if set(existing_partition_spec) != set(expected_partition_spec): + error_msg = ( + f"Partition strategy mismatch for table '{self.table_name}'. " + f"Existing table has partitions: {existing_partition_spec}, " + f"but sink is configured with: {expected_partition_spec}. " + "This would corrupt the folder structure. Please ensure the sink partition " + "configuration matches the existing table." + ) + logger.error(error_msg) + raise ValueError(error_msg) + + # Also check the order of partitions + if existing_partition_spec != expected_partition_spec: + warning_msg = ( + f"Partition column order differs for table '{self.table_name}'. " + f"Existing: {existing_partition_spec}, Configured: {expected_partition_spec}. " + "While this won't corrupt data, it may lead to suboptimal query performance." + ) + logger.warning(warning_msg) + + def _validate_existing_table_structure(self): + """ + Check if table already exists in S3 and validate partition structure. + + This prevents data corruption by ensuring that if a table already exists in S3, + the sink's partition configuration matches what's already on disk. Mismatched + partition strategies would result in a corrupted folder structure that would + make the data unqueryable. + """ + table_prefix = f"{self.s3_prefix}/{self.table_name}/" + + try: + # List objects to see if table exists (sample first 100 files) + response = self.s3_client.list_objects_v2( + Bucket=self.s3_bucket, + Prefix=table_prefix, + MaxKeys=100 + ) + + if 'Contents' not in response: + # Table doesn't exist yet, no validation needed + return + + # Detect existing partition columns from S3 directory structure + # We parse the S3 paths to extract partition columns from Hive-style paths + detected_partition_columns = [] + for obj in response['Contents']: + if obj['Key'].endswith('.parquet'): + # Extract path after table prefix + # Example: "year=2024/month=01/day=15/data.parquet" -> ["year=2024", "month=01", "day=15", "data.parquet"] + relative_path = obj['Key'][len(table_prefix):] + path_parts = relative_path.split('/') + + # Look for Hive-style partitions (col=value format) + for part in path_parts[:-1]: # Exclude filename + if '=' in part: + # Extract column name from "col=value" + col_name = part.split('=')[0] + # Maintain order of first appearance + if col_name not in detected_partition_columns: + detected_partition_columns.append(col_name) + + if detected_partition_columns: + # Build expected partition spec from sink configuration + expected_partition_spec = self.hive_columns.copy() + + # Check if partition strategies match + # Using set comparison to ignore order first + if set(detected_partition_columns) != set(expected_partition_spec): + error_msg = ( + f"Partition strategy mismatch for table '{self.table_name}'. " + f"Existing table in S3 has partitions: {detected_partition_columns}, " + f"but sink is configured with: {expected_partition_spec}. " + "This would corrupt the folder structure. Please ensure the sink partition " + "configuration matches the existing table." + ) + logger.error(error_msg) + raise ValueError(error_msg) + + logger.info( + "Validated partition strategy for existing table '%s'. Partitions: %s", + self.table_name, + detected_partition_columns + ) + + except self.s3_client.exceptions.NoSuchBucket: + raise + except ValueError: + raise + except Exception as e: + logger.warning( + "Could not validate existing table structure: %s. Proceeding with caution.", e + ) + + def _register_files_in_manifest(self): + """Register multiple newly written files in the catalog manifest""" + if not (file_items := self._pending_futures): + return + + try: + # Build file entries for all files + file_entries = [] + for item in file_items: + s3_key = item['key'] + row_count = item['row_count'] + file_size = item['file_size'] + partition_columns = item['partition_columns'] + partition_values = item['partition_values'] + + # Build S3 URL + file_path = f"s3://{self.s3_bucket}/{s3_key}" + + # Build partition values dict + partition_dict = {} + if partition_columns and partition_values: + for col, val in zip(partition_columns, partition_values): + partition_dict[col] = str(val) + + # Create file entry + file_entries.append({ + "file_path": file_path, + "file_size": file_size, + "last_modified": datetime.now(tz=timezone.utc).isoformat(), + "partition_values": partition_dict, + "row_count": row_count + }) + + # Send all files to catalog in a single request + response = self._catalog.post( + f"/namespaces/{self.namespace}/tables/{self.table_name}/manifest/add-files", + json={"files": file_entries}, + timeout=10 + ) + + if response.status_code == 200: + logger.info(f"✓ Registered {len(file_entries)} file(s) in catalog manifest") + else: + logger.warning("Failed to register files in manifest: %s", response.text) + + except Exception as e: + # Don't fail the write if manifest registration fails + logger.warning("Failed to register files in manifest: %s", e) \ No newline at end of file diff --git a/python/destinations/quixlake-timeseries/requirements.txt b/python/destinations/quixlake-timeseries/requirements.txt new file mode 100644 index 000000000..170a474f4 --- /dev/null +++ b/python/destinations/quixlake-timeseries/requirements.txt @@ -0,0 +1,6 @@ +quixstreams==3.23.1 +python-dotenv +requests +pandas +boto3 +pyarrow \ No newline at end of file diff --git a/python/destinations/s3-file/requirements.txt b/python/destinations/s3-file/requirements.txt index 5f3f3920e..170a474f4 100644 --- a/python/destinations/s3-file/requirements.txt +++ b/python/destinations/s3-file/requirements.txt @@ -1,2 +1,6 @@ -quixstreams[s3]==3.23.1 +quixstreams==3.23.1 python-dotenv +requests +pandas +boto3 +pyarrow \ No newline at end of file diff --git a/tests/destinations/quixlake-timeseries/docker-compose.test.yml b/tests/destinations/quixlake-timeseries/docker-compose.test.yml new file mode 100644 index 000000000..abc79db83 --- /dev/null +++ b/tests/destinations/quixlake-timeseries/docker-compose.test.yml @@ -0,0 +1,196 @@ +# Test configuration for Quix Lake Timeseries destination +# Uses only public Docker images + mock catalog service +# timeout: 90 + +services: + # MinIO for S3-compatible storage + minio: + image: minio/minio:latest + command: server /data --console-address ":9001" + environment: + - MINIO_ROOT_USER=minioadmin + - MINIO_ROOT_PASSWORD=minioadmin + networks: + - test-network + healthcheck: + test: ["CMD", "mc", "ready", "local"] + interval: 3s + timeout: 5s + retries: 10 + stop_grace_period: 3s + + # Initialize MinIO with test bucket + minio-init: + image: minio/mc:latest + depends_on: + minio: + condition: service_healthy + entrypoint: > + /bin/sh -c " + mc alias set myminio http://minio:9000 minioadmin minioadmin; + mc mb myminio/test-bucket --ignore-existing; + mc rm --recursive --force myminio/test-bucket/data/ || true; + echo 'MinIO bucket created and cleaned'; + echo 'Keeping minio-init alive...'; + tail -f /dev/null + " + networks: + - test-network + + # Kafka broker (using Redpanda) + kafka: + image: docker.redpanda.com/redpandadata/redpanda:v24.2.4 + command: + - redpanda + - start + - --kafka-addr internal://0.0.0.0:9092 + - --advertise-kafka-addr internal://kafka:9092 + - --mode dev-container + - --smp 1 + healthcheck: + test: ["CMD-SHELL", "rpk cluster health | grep -E 'Healthy:.+true' || exit 1"] + interval: 5s + timeout: 10s + retries: 10 + networks: + - test-network + stop_grace_period: 3s + + # Mock REST Catalog service for testing + mock-catalog: + build: + context: ./mock-catalog + dockerfile: Dockerfile + networks: + - test-network + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:5001/health')"] + interval: 3s + timeout: 5s + retries: 10 + stop_grace_period: 3s + + # Quix Lake Timeseries destination service + quixlake-timeseries: + build: + context: ../../../python/destinations/quixlake-timeseries + dockerfile: dockerfile + environment: + # Quix/Kafka configuration + - Quix__Broker__Address=kafka:9092 + - Quix__Consumer__Group=quixlake-ts-test + - Quix__Deployment__Id=test-quixlake-ts + + # Input topic + - input=test-quixlake-input + + # S3 configuration + - S3_BUCKET=test-bucket + - S3_PREFIX=data + - AWS_ACCESS_KEY_ID=minioadmin + - AWS_SECRET_ACCESS_KEY=minioadmin + - AWS_REGION=us-east-1 + - AWS_ENDPOINT_URL=http://minio:9000 + + # Table configuration + - TABLE_NAME=test-quixlake-input + - HIVE_COLUMNS=location,year,month,day + - TIMESTAMP_COLUMN=ts_ms + + # Catalog configuration (using mock catalog) + - CATALOG_URL=http://mock-catalog:5001 + - CATALOG_AUTH_TOKEN= + - AUTO_DISCOVER=true + - CATALOG_NAMESPACE=default + + # Performance settings + - BATCH_SIZE=5 + - COMMIT_INTERVAL=5 + - MAX_WRITE_WORKERS=3 + + # Kafka consumer settings + - CONSUMER_GROUP=quixlake-ts-test-consumer + - AUTO_OFFSET_RESET=earliest + - KAFKA_KEY_DESERIALIZER=str + - KAFKA_VALUE_DESERIALIZER=json + + # Logging + - LOGLEVEL=DEBUG + networks: + - test-network + depends_on: + minio: + condition: service_healthy + kafka: + condition: service_healthy + minio-init: + condition: service_started + mock-catalog: + condition: service_healthy + stop_grace_period: 3s + + # Test runner - produces test data and verifies output + test-runner: + build: + context: ../../framework + dockerfile: Dockerfile + environment: + # Kafka configuration + - Quix__Broker__Address=kafka:9092 + + # Test configuration + - TEST_INPUT_TOPIC=test-quixlake-input + - TEST_MESSAGE_COUNT=10 + + # S3 configuration for verification + - AWS_ENDPOINT_URL=http://minio:9000 + - AWS_ACCESS_KEY_ID=minioadmin + - AWS_SECRET_ACCESS_KEY=minioadmin + - S3_BUCKET=test-bucket + - S3_PREFIX=data + + # Expected table configuration + - TABLE_NAME=test-quixlake-input + - HIVE_COLUMNS=location,year,month,day + + # Mock catalog URL for verification + - CATALOG_URL=http://mock-catalog:5001 + command: > + sh -c " + echo '=== Installing test dependencies ===' && + pip install boto3 pyarrow requests > /dev/null 2>&1 && + echo '✓ Dependencies installed' && + echo '' && + echo '=== Producing test messages to Kafka ===' && + python /tests/produce_test_data.py && + echo '' && + echo '=== Waiting for quixlake-timeseries to process messages ===' && + sleep 25 && + echo '' && + echo '=== Verifying Parquet data in S3 ===' && + python /tests/verify_output.py && + echo '' && + echo '=== Verifying catalog integration ===' && + python /tests/verify_catalog.py + " + volumes: + - ./produce_test_data.py:/tests/produce_test_data.py:ro + - ./verify_output.py:/tests/verify_output.py:ro + - ./verify_catalog.py:/tests/verify_catalog.py:ro + working_dir: / + networks: + - test-network + depends_on: + minio: + condition: service_healthy + kafka: + condition: service_healthy + mock-catalog: + condition: service_healthy + quixlake-timeseries: + condition: service_started + stop_grace_period: 3s + +networks: + test-network: + driver: bridge diff --git a/tests/destinations/quixlake-timeseries/mock-catalog/Dockerfile b/tests/destinations/quixlake-timeseries/mock-catalog/Dockerfile new file mode 100644 index 000000000..6875155f5 --- /dev/null +++ b/tests/destinations/quixlake-timeseries/mock-catalog/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Copy requirements and install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application +COPY app.py . + +# Expose port +EXPOSE 5001 + +# Run the application +CMD ["python", "app.py"] diff --git a/tests/destinations/quixlake-timeseries/mock-catalog/app.py b/tests/destinations/quixlake-timeseries/mock-catalog/app.py new file mode 100644 index 000000000..200120485 --- /dev/null +++ b/tests/destinations/quixlake-timeseries/mock-catalog/app.py @@ -0,0 +1,190 @@ +""" +Mock REST Catalog Service for Testing + +This service mimics an Iceberg REST Catalog to verify that the QuixLake sink +correctly calls the catalog endpoints with proper data. + +It logs all requests and stores them for verification. +""" +from flask import Flask, request, jsonify +import json +import logging +from datetime import datetime + +app = Flask(__name__) + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# In-memory storage for tracking calls +catalog_state = { + 'tables': {}, + 'requests': [], + 'manifest_files': [] +} + + +def log_request(endpoint, method, data=None): + """Log an incoming request for verification""" + request_info = { + 'timestamp': datetime.utcnow().isoformat(), + 'endpoint': endpoint, + 'method': method, + 'data': data + } + catalog_state['requests'].append(request_info) + logger.info(f"{method} {endpoint}") + if data: + logger.info(f" Data: {json.dumps(data, indent=2)}") + + +@app.route('/health', methods=['GET']) +def health(): + """Health check endpoint""" + return jsonify({'status': 'healthy'}), 200 + + +@app.route('/namespaces//tables/', methods=['GET']) +def get_table(namespace, table): + """Get table metadata""" + log_request(f'/namespaces/{namespace}/tables/{table}', 'GET') + + table_key = f"{namespace}.{table}" + + if table_key in catalog_state['tables']: + logger.info(f" → Table exists: {table_key}") + return jsonify(catalog_state['tables'][table_key]), 200 + else: + logger.info(f" → Table not found: {table_key}") + return jsonify({'error': 'Table not found'}), 404 + + +@app.route('/namespaces//tables/
', methods=['PUT']) +def create_table(namespace, table): + """Create a new table""" + data = request.get_json() + log_request(f'/namespaces/{namespace}/tables/{table}', 'PUT', data) + + table_key = f"{namespace}.{table}" + + # Store the table metadata + catalog_state['tables'][table_key] = { + 'name': table, + 'namespace': namespace, + 'location': data.get('location'), + 'partition_spec': data.get('partition_spec', []), + 'properties': data.get('properties', {}), + 'created_at': datetime.utcnow().isoformat() + } + + logger.info(f" → Table created: {table_key}") + logger.info(f" → Location: {data.get('location')}") + logger.info(f" → Partitions: {data.get('partition_spec', [])}") + + return jsonify(catalog_state['tables'][table_key]), 201 + + +@app.route('/namespaces//tables/
/manifest/add-files', methods=['POST']) +def add_files_to_manifest(namespace, table): + """Add files to table manifest""" + data = request.get_json() + log_request(f'/namespaces/{namespace}/tables/{table}/manifest/add-files', 'POST', data) + + table_key = f"{namespace}.{table}" + files = data.get('files', []) + + # Store file registrations + for file_info in files: + file_entry = { + 'table': table_key, + 'file_path': file_info.get('file_path'), + 'file_size': file_info.get('file_size'), + 'row_count': file_info.get('row_count'), + 'partition_values': file_info.get('partition_values', {}), + 'registered_at': datetime.utcnow().isoformat() + } + catalog_state['manifest_files'].append(file_entry) + + logger.info(f" → Registered {len(files)} file(s) for table {table_key}") + for i, file_info in enumerate(files[:3]): # Log first 3 files + logger.info(f" File {i+1}: {file_info.get('file_path')}") + logger.info(f" Size: {file_info.get('file_size')} bytes") + logger.info(f" Rows: {file_info.get('row_count')}") + logger.info(f" Partitions: {file_info.get('partition_values', {})}") + + if len(files) > 3: + logger.info(f" ... and {len(files) - 3} more file(s)") + + return jsonify({'success': True, 'files_added': len(files)}), 200 + + +@app.route('/catalog/state', methods=['GET']) +def get_catalog_state(): + """Get the current catalog state (for testing/debugging)""" + return jsonify({ + 'tables': catalog_state['tables'], + 'total_requests': len(catalog_state['requests']), + 'total_files': len(catalog_state['manifest_files']), + 'recent_requests': catalog_state['requests'][-10:] # Last 10 requests + }), 200 + + +@app.route('/catalog/verify', methods=['GET']) +def verify_catalog(): + """Verify catalog received expected calls""" + table_count = len(catalog_state['tables']) + file_count = len(catalog_state['manifest_files']) + request_count = len(catalog_state['requests']) + + # Check if we received table creation requests + table_creation_requests = [r for r in catalog_state['requests'] if r['method'] == 'PUT' and '/tables/' in r['endpoint']] + + # Check if we received file registration requests + file_registration_requests = [r for r in catalog_state['requests'] if 'manifest/add-files' in r['endpoint']] + + verification = { + 'success': True, + 'tables_created': table_count, + 'files_registered': file_count, + 'total_requests': request_count, + 'table_creation_calls': len(table_creation_requests), + 'file_registration_calls': len(file_registration_requests), + 'issues': [] + } + + # Validate we got expected calls + if table_count == 0: + verification['success'] = False + verification['issues'].append('No tables were created') + + if file_count == 0: + verification['success'] = False + verification['issues'].append('No files were registered') + + # Log verification results + logger.info("="*60) + logger.info("CATALOG VERIFICATION RESULTS") + logger.info("="*60) + logger.info(f"Tables created: {table_count}") + logger.info(f"Files registered: {file_count}") + logger.info(f"Total requests: {request_count}") + logger.info(f"Table creation calls: {len(table_creation_requests)}") + logger.info(f"File registration calls: {len(file_registration_requests)}") + + if verification['success']: + logger.info("✓ Catalog verification PASSED") + else: + logger.info("✗ Catalog verification FAILED") + for issue in verification['issues']: + logger.info(f" - {issue}") + + logger.info("="*60) + + return jsonify(verification), 200 if verification['success'] else 400 + + +if __name__ == '__main__': + logger.info("Starting Mock REST Catalog...") + logger.info("Listening on port 5001") + app.run(host='0.0.0.0', port=5001, debug=False) diff --git a/tests/destinations/quixlake-timeseries/mock-catalog/requirements.txt b/tests/destinations/quixlake-timeseries/mock-catalog/requirements.txt new file mode 100644 index 000000000..5bd19d39d --- /dev/null +++ b/tests/destinations/quixlake-timeseries/mock-catalog/requirements.txt @@ -0,0 +1 @@ +flask==3.0.0 diff --git a/tests/destinations/quixlake-timeseries/produce_test_data.py b/tests/destinations/quixlake-timeseries/produce_test_data.py new file mode 100644 index 000000000..495fa9ba6 --- /dev/null +++ b/tests/destinations/quixlake-timeseries/produce_test_data.py @@ -0,0 +1,88 @@ +""" +Produce test data for Quix Lake Timeseries destination tests. + +Generates test messages with timestamps and partitionable fields (location, sensor_type) +to verify Hive partitioning and time-based partitioning functionality. +""" +import os +import time +import json +from datetime import datetime, timezone +from quixstreams import Application + + +def main(): + broker_address = os.getenv("Quix__Broker__Address", "kafka:9092") + topic_name = os.getenv("TEST_INPUT_TOPIC", "test-quixlake-input") + message_count = int(os.getenv("TEST_MESSAGE_COUNT", "10")) + + print(f"Producing {message_count} test messages to topic: {topic_name}") + + app = Application( + broker_address=broker_address, + producer_extra_config={ + "allow.auto.create.topics": "true" + } + ) + + topic = app.topic(topic_name) + + # Generate test data with multiple partitionable dimensions + locations = ["NYC", "LA", "CHI"] + sensor_types = ["temperature", "humidity", "pressure"] + + # Use a fixed timestamp for consistent testing (2024-01-15 12:00:00 UTC) + base_timestamp_ms = int(datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc).timestamp() * 1000) + + with app.get_producer() as producer: + for i in range(message_count): + # Distribute messages across different partitions + location = locations[i % len(locations)] + sensor_type = sensor_types[i % len(sensor_types)] + + # Add some time variation (each message is 1 hour apart) + timestamp_ms = base_timestamp_ms + (i * 3600 * 1000) + + message = { + "id": i, + "location": location, + "sensor_type": sensor_type, + "value": round(20.0 + (i * 1.5), 2), # Incrementing sensor value + "ts_ms": timestamp_ms, + "status": "active", + "metadata": { + "device_id": f"device_{i % 3}", + "firmware": "v1.2.3" + } + } + + # Convert timestamp to human-readable for logging + dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) + print(f"Producing message {i}: location={location}, sensor_type={sensor_type}, " + f"ts={dt.isoformat()}, value={message['value']}") + + serialized = json.dumps(message).encode('utf-8') + + producer.produce( + topic=topic.name, + key=f"key_{location}_{sensor_type}_{i}", + value=serialized + ) + + producer.flush() + + print(f"\nSuccessfully produced {message_count} messages") + print(f"Messages span from {datetime.fromtimestamp(base_timestamp_ms / 1000, tz=timezone.utc).isoformat()}") + print(f" to {datetime.fromtimestamp((base_timestamp_ms + (message_count - 1) * 3600 * 1000) / 1000, tz=timezone.utc).isoformat()}") + print(f"Locations: {locations}") + print(f"Sensor types: {sensor_types}") + + +if __name__ == "__main__": + try: + main() + except Exception as e: + print(f"Error: {e}") + import traceback + traceback.print_exc() + exit(1) diff --git a/tests/destinations/quixlake-timeseries/verify_catalog.py b/tests/destinations/quixlake-timeseries/verify_catalog.py new file mode 100644 index 000000000..1eabce9c0 --- /dev/null +++ b/tests/destinations/quixlake-timeseries/verify_catalog.py @@ -0,0 +1,234 @@ +""" +Verify catalog integration for Quix Lake Timeseries destination tests. + +Tests that the sink correctly interacts with the REST Catalog: +1. Tables are created/registered +2. Files are added to the manifest +3. Correct partition information is sent +4. Proper metadata is included +""" +import os +import sys +import requests +import time + + +def wait_for_catalog(catalog_url: str, max_attempts: int = 20) -> bool: + """ + Wait for the catalog service to be ready. + + Args: + catalog_url: Base URL of the catalog + max_attempts: Maximum number of retry attempts + + Returns: + True if catalog is ready, False otherwise + """ + print(f"Waiting for catalog at {catalog_url} to be ready...") + + for attempt in range(max_attempts): + try: + response = requests.get(f"{catalog_url}/health", timeout=5) + if response.status_code == 200: + print(f"✓ Catalog is ready") + return True + except requests.exceptions.RequestException as e: + if attempt == 0: + print(f" Catalog not ready yet: {e}") + + if attempt < max_attempts - 1: + time.sleep(2) + + return False + + +def get_catalog_state(catalog_url: str): + """Get the current state of the catalog""" + try: + response = requests.get(f"{catalog_url}/catalog/state", timeout=10) + if response.status_code == 200: + return response.json() + else: + print(f"✗ Failed to get catalog state: {response.status_code}") + print(f" Response: {response.text}") + return None + except Exception as e: + print(f"✗ Error getting catalog state: {e}") + return None + + +def verify_catalog(catalog_url: str): + """Verify the catalog received expected calls""" + try: + response = requests.get(f"{catalog_url}/catalog/verify", timeout=10) + result = response.json() + + if response.status_code == 200: + return result, True + else: + return result, False + except Exception as e: + print(f"✗ Error verifying catalog: {e}") + return None, False + + +def main(): + # Configuration from environment + catalog_url = os.getenv("CATALOG_URL", "http://mock-catalog:5001") + table_name = os.getenv("TABLE_NAME", "test-quixlake-input") + expected_message_count = int(os.getenv("TEST_MESSAGE_COUNT", "10")) + + print("="*60) + print("Catalog Integration Verification") + print("="*60) + print(f"Catalog URL: {catalog_url}") + print(f"Expected table: {table_name}") + print(f"Expected messages: {expected_message_count}") + print() + + # Wait for catalog to be ready + if not wait_for_catalog(catalog_url): + print(f"\n✗ FAILED: Catalog at {catalog_url} did not become ready") + sys.exit(1) + + # Get catalog state + print("\n" + "="*60) + print("Fetching Catalog State") + print("="*60) + + state = get_catalog_state(catalog_url) + + if not state: + print(f"✗ FAILED: Could not retrieve catalog state") + sys.exit(1) + + print(f"Total requests received: {state['total_requests']}") + print(f"Tables created: {len(state['tables'])}") + print(f"Files registered: {state['total_files']}") + + # Verify catalog interactions + print("\n" + "="*60) + print("Verifying Catalog Interactions") + print("="*60) + + verification, success = verify_catalog(catalog_url) + + if not verification: + print(f"✗ FAILED: Could not verify catalog") + sys.exit(1) + + # Print verification results + print(f"\nVerification Results:") + print(f" Tables created: {verification['tables_created']}") + print(f" Files registered: {verification['files_registered']}") + print(f" Table creation calls: {verification['table_creation_calls']}") + print(f" File registration calls: {verification['file_registration_calls']}") + + if not success or not verification['success']: + print(f"\n✗ VERIFICATION FAILED") + if verification.get('issues'): + print(f"\nIssues:") + for issue in verification['issues']: + print(f" - {issue}") + sys.exit(1) + + # Detailed validation + print("\n" + "="*60) + print("Detailed Validation") + print("="*60) + + # Check table was created + if verification['tables_created'] == 0: + print(f"✗ FAILED: No tables were created") + sys.exit(1) + + print(f"✓ Table creation verified ({verification['tables_created']} table(s))") + + # Check table metadata + if state['tables']: + for table_key, table_info in state['tables'].items(): + print(f"\nTable: {table_key}") + print(f" Location: {table_info['location']}") + print(f" Partition spec: {table_info['partition_spec']}") + print(f" Properties: {table_info['properties']}") + print(f" Created at: {table_info['created_at']}") + + # Verify expected partition columns are in partition spec + expected_partitions = os.getenv("HIVE_COLUMNS", "location,year,month,day").split(',') + actual_partitions = table_info.get('partition_spec', []) + + # Note: The partition spec might be empty initially (dynamic discovery) + # or might contain the expected partitions + if actual_partitions: + print(f" ✓ Partition spec configured: {actual_partitions}") + else: + print(f" ℹ Partition spec empty (dynamic discovery mode)") + + # Check files were registered + if verification['files_registered'] == 0: + print(f"\n✗ FAILED: No files were registered in manifest") + sys.exit(1) + + print(f"\n✓ File registration verified ({verification['files_registered']} file(s))") + + # Check file metadata + if state['total_files'] > 0: + print(f"\nSample registered files:") + for i, file_entry in enumerate(state.get('manifest_files', [])[:3]): + print(f"\n File {i+1}:") + print(f" Path: {file_entry['file_path']}") + print(f" Size: {file_entry['file_size']} bytes") + print(f" Rows: {file_entry['row_count']}") + print(f" Partitions: {file_entry['partition_values']}") + print(f" Registered: {file_entry['registered_at']}") + + # Verify partition values are present + if not file_entry['partition_values']: + print(f" ⚠ Warning: No partition values (expected for non-partitioned tables)") + else: + print(f" ✓ Partition values present") + + if state['total_files'] > 3: + print(f"\n ... and {state['total_files'] - 3} more file(s)") + + # Check minimum file registration count + # Should be at least 1 file per partition combination + # With location=3 values × year=1 × month=1 × day=1 = 3 partitions minimum + min_expected_files = 1 + if verification['files_registered'] < min_expected_files: + print(f"\n⚠ Warning: Expected at least {min_expected_files} file(s), got {verification['files_registered']}") + + # Display recent requests for debugging + if state.get('recent_requests'): + print(f"\n" + "="*60) + print("Recent Catalog Requests (last 10)") + print("="*60) + for i, req in enumerate(state['recent_requests'][-5:]): # Show last 5 + print(f"\n{i+1}. {req['method']} {req['endpoint']}") + print(f" Timestamp: {req['timestamp']}") + if req.get('data'): + print(f" Data keys: {list(req['data'].keys())}") + + # Final summary + print("\n" + "="*60) + print("✓ ALL CATALOG VERIFICATIONS PASSED") + print("="*60) + print(f"✓ Sink successfully registered table in catalog") + print(f"✓ Sink successfully registered {verification['files_registered']} file(s) in manifest") + print(f"✓ Catalog received {verification['total_requests']} total request(s)") + print() + + sys.exit(0) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n\nTest interrupted by user") + sys.exit(1) + except Exception as e: + print(f"\n✗ Unexpected error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) diff --git a/tests/destinations/quixlake-timeseries/verify_output.py b/tests/destinations/quixlake-timeseries/verify_output.py new file mode 100644 index 000000000..43518a3a5 --- /dev/null +++ b/tests/destinations/quixlake-timeseries/verify_output.py @@ -0,0 +1,293 @@ +""" +Verify output for Quix Lake Timeseries destination tests. + +Validates that: +1. Parquet files are written to S3 with correct Hive partitioning +2. Data integrity is maintained +3. Partition structure matches configuration +4. All expected records are present +""" +import boto3 +import os +import sys +import time +import pyarrow.parquet as pq +from io import BytesIO + + +def parse_partition_from_path(path, table_name): + """ + Extract partition values from Hive-style S3 path. + + Example: prefix/table_name/year=2024/month=01/day=15/file.parquet + Returns: {'year': '2024', 'month': '01', 'day': '15'} + """ + partitions = {} + + # Find the table name in the path and process everything after it + if table_name in path: + parts = path.split(f"{table_name}/", 1) + if len(parts) > 1: + partition_path = parts[1] + # Split by '/' and look for key=value pairs + for part in partition_path.split('/'): + if '=' in part and not part.endswith('.parquet'): + key, value = part.split('=', 1) + partitions[key] = value + + return partitions + + +def main(): + # Configuration from environment + minio_endpoint = os.getenv("AWS_ENDPOINT_URL", "http://minio:9000") + access_key = os.getenv("AWS_ACCESS_KEY_ID", "minioadmin") + secret_key = os.getenv("AWS_SECRET_ACCESS_KEY", "minioadmin") + bucket_name = os.getenv("S3_BUCKET", "test-bucket") + s3_prefix = os.getenv("S3_PREFIX", "data") + table_name = os.getenv("TABLE_NAME", "test-quixlake-input") + hive_columns = os.getenv("HIVE_COLUMNS", "location,year,month,day") + expected_message_count = int(os.getenv("TEST_MESSAGE_COUNT", "10")) + + print(f"Configuration:") + print(f" Endpoint: {minio_endpoint}") + print(f" Bucket: {bucket_name}") + print(f" Prefix: {s3_prefix}") + print(f" Table: {table_name}") + print(f" Expected partitions: {hive_columns}") + print(f" Expected messages: {expected_message_count}") + + # Parse expected partition columns + expected_partition_columns = [col.strip() for col in hive_columns.split(',') if col.strip()] + + # Create S3 client + print(f"\nConnecting to S3 at {minio_endpoint}...") + s3_client = boto3.client( + 's3', + endpoint_url=minio_endpoint, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name='us-east-1' + ) + + # Build the full table prefix + table_prefix = f"{s3_prefix}/{table_name}/" + + max_attempts = 30 + found_files = [] + + print(f"\nLooking for Parquet files in s3://{bucket_name}/{table_prefix}") + + # Retry logic with polling + for attempt in range(max_attempts): + found_files = [] + + try: + # List all objects under the table prefix + paginator = s3_client.get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=bucket_name, Prefix=table_prefix) + + for page in pages: + if 'Contents' in page: + for obj in page['Contents']: + key = obj['Key'] + # Only include .parquet files (skip directory markers) + if key.endswith('.parquet'): + found_files.append(key) + print(f"Found file: {key} (size: {obj['Size']} bytes)") + + except Exception as e: + print(f"Error listing objects: {e}") + + if len(found_files) > 0: + print(f"\n✓ Found {len(found_files)} Parquet file(s)") + break + + print(f"Attempt {attempt + 1}/{max_attempts}: No files found yet, waiting...") + time.sleep(2) + + if len(found_files) == 0: + print(f"\n✗ FAILED: No Parquet files found after {max_attempts} attempts") + sys.exit(1) + + # Validate partition structure + print("\n--- Validating Partition Structure ---") + partition_paths = set() + + for file_key in found_files: + partitions = parse_partition_from_path(file_key, table_name) + + if expected_partition_columns: + # Verify all expected partition columns are present + actual_columns = set(partitions.keys()) + expected_columns = set(expected_partition_columns) + + if actual_columns != expected_columns: + print(f"✗ ERROR: Partition mismatch in {file_key}") + print(f" Expected columns: {expected_columns}") + print(f" Actual columns: {actual_columns}") + sys.exit(1) + + # Build partition path string for tracking unique partitions + partition_path = "/".join([f"{col}={partitions[col]}" for col in expected_partition_columns]) + partition_paths.add(partition_path) + print(f"✓ Valid partition: {partition_path}") + else: + # No partitioning expected + if partitions: + print(f"✗ ERROR: Found unexpected partitions in {file_key}: {partitions}") + sys.exit(1) + print(f"✓ No partitioning (as expected)") + + print(f"\n✓ All partition structures valid") + if partition_paths: + print(f"✓ Found {len(partition_paths)} unique partition(s):") + for path in sorted(partition_paths): + print(f" - {path}") + + # Read and validate Parquet data + print("\n--- Validating Parquet Data ---") + all_records = [] + + for file_key in found_files: + print(f"\nReading: {file_key}") + + try: + # Download Parquet file + obj_response = s3_client.get_object(Bucket=bucket_name, Key=file_key) + parquet_bytes = obj_response['Body'].read() + + # Read Parquet file with pyarrow + parquet_file = pq.read_table(BytesIO(parquet_bytes)) + + # Convert to list of dictionaries + records = parquet_file.to_pylist() + + print(f" Records in file: {len(records)}") + print(f" Columns: {parquet_file.schema.names}") + + all_records.extend(records) + + # Show first record as sample + if records: + print(f" Sample record: {records[0]}") + + except Exception as e: + print(f"✗ ERROR reading Parquet file {file_key}: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + print(f"\n✓ Successfully read {len(all_records)} total records from {len(found_files)} file(s)") + + # Validate record count + if len(all_records) < expected_message_count: + print(f"✗ ERROR: Expected at least {expected_message_count} records, found {len(all_records)}") + sys.exit(1) + + print(f"✓ Record count matches expectation ({len(all_records)} >= {expected_message_count})") + + # Validate record structure and content + print("\n--- Validating Record Content ---") + + # Partition columns are stored in the directory structure (Hive-style), not in the data + # So we exclude them from expected fields in the Parquet files + all_expected_fields = {'id', 'location', 'sensor_type', 'value', 'ts_ms', 'status', 'metadata', '__key'} + expected_fields = all_expected_fields - set(expected_partition_columns) + + print(f"Expected fields in Parquet data: {expected_fields}") + print(f"Partition columns (in path only): {expected_partition_columns}") + + found_ids = set() + + for i, record in enumerate(all_records[:expected_message_count]): # Check first N records + # Verify required fields exist + actual_fields = set(record.keys()) + missing_fields = expected_fields - actual_fields + + if missing_fields: + print(f"✗ ERROR: Record {i} missing fields: {missing_fields}") + print(f" Record: {record}") + sys.exit(1) + + # Verify field types + if not isinstance(record['id'], int): + print(f"✗ ERROR: Record {i} 'id' is not an integer: {type(record['id'])}") + sys.exit(1) + + if not isinstance(record['value'], (int, float)): + print(f"✗ ERROR: Record {i} 'value' is not numeric: {type(record['value'])}") + sys.exit(1) + + # ts_ms might be datetime after pyarrow conversion, check for both + if not isinstance(record['ts_ms'], (int, float)) and not hasattr(record['ts_ms'], 'timestamp'): + print(f"✗ ERROR: Record {i} 'ts_ms' is not a valid timestamp type: {type(record['ts_ms'])}") + sys.exit(1) + + # Track IDs + found_ids.add(record['id']) + + if i < 3: # Show first 3 records + print(f"✓ Record {i}: id={record['id']}, " + f"sensor_type={record['sensor_type']}, value={record['value']}") + + # Verify we have all expected IDs (0 through expected_message_count - 1) + expected_ids = set(range(expected_message_count)) + if found_ids != expected_ids: + print(f"✗ ERROR: Missing or extra IDs") + print(f" Expected: {sorted(expected_ids)}") + print(f" Found: {sorted(found_ids)}") + missing = expected_ids - found_ids + extra = found_ids - expected_ids + if missing: + print(f" Missing: {sorted(missing)}") + if extra: + print(f" Extra: {sorted(extra)}") + sys.exit(1) + + print(f"\n✓ All {len(all_records)} records validated successfully") + print(f"✓ All expected IDs present: {sorted(found_ids)[:5]}...{sorted(found_ids)[-2:]}") + + # Validate time-based partitioning if year/month/day columns are used + if 'year' in expected_partition_columns: + print("\n--- Validating Time-based Partitioning ---") + + # Check that partition values match the expected date (2024-01-15) + expected_year = "2024" + expected_month = "01" + expected_day = "15" + + for file_key in found_files: + partitions = parse_partition_from_path(file_key, table_name) + + if 'year' in partitions and partitions['year'] != expected_year: + print(f"✗ ERROR: Unexpected year in {file_key}: {partitions['year']} (expected {expected_year})") + sys.exit(1) + + if 'month' in partitions and partitions['month'] != expected_month: + print(f"✗ ERROR: Unexpected month in {file_key}: {partitions['month']} (expected {expected_month})") + sys.exit(1) + + if 'day' in partitions and partitions['day'] != expected_day: + print(f"✗ ERROR: Unexpected day in {file_key}: {partitions['day']} (expected {expected_day})") + sys.exit(1) + + print(f"✓ All time-based partitions match expected date: {expected_year}-{expected_month}-{expected_day}") + + print("\n" + "="*60) + print("✓ ALL VALIDATIONS PASSED") + print("="*60) + sys.exit(0) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n\nTest interrupted by user") + sys.exit(1) + except Exception as e: + print(f"\n✗ Unexpected error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) From 1ee38b7e3174c2678cb4245c4e126e700f80606d Mon Sep 17 00:00:00 2001 From: Tomas Neubauer Date: Thu, 20 Nov 2025 17:20:48 +0100 Subject: [PATCH 2/5] Renames library item ID Updates the library item ID to be more descriptive of the destination. --- python/destinations/quixlake-timeseries/library.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/destinations/quixlake-timeseries/library.json b/python/destinations/quixlake-timeseries/library.json index 8cca517f2..9c2673b25 100644 --- a/python/destinations/quixlake-timeseries/library.json +++ b/python/destinations/quixlake-timeseries/library.json @@ -1,5 +1,5 @@ { - "libraryItemId": "starter-destination", + "libraryItemId": "quixlake-timeseries-destination", "name": "Quix TS Datalake Sink", "language": "Python", "tags": { From 6d7acd6024080d7a00cafe711514faa655d13078 Mon Sep 17 00:00:00 2001 From: Tomas Neubauer Date: Fri, 21 Nov 2025 10:43:07 +0100 Subject: [PATCH 3/5] Improves Quixlake TS and S3 destination tests Refactors test configurations for Quixlake Timeseries and S3 File destinations. Updates test parameters such as batch sizes, commit intervals, worker counts, and message counts to optimize test execution time and reliability. Adds `mypy-boto3-s3` dependency to s3-file destination. Renames "Quix TS Datalake Sink" to "Quix DataLake Timeseries Sink" for clarity. --- python/destinations/quixlake-timeseries/library.json | 2 +- python/destinations/s3-file/requirements.txt | 3 ++- .../quixlake-timeseries/docker-compose.test.yml | 10 +++++----- tests/destinations/s3-file/docker-compose.test.yml | 6 +++--- tests/destinations/s3-file/verify_output.py | 2 +- 5 files changed, 12 insertions(+), 11 deletions(-) diff --git a/python/destinations/quixlake-timeseries/library.json b/python/destinations/quixlake-timeseries/library.json index 9c2673b25..c2910840e 100644 --- a/python/destinations/quixlake-timeseries/library.json +++ b/python/destinations/quixlake-timeseries/library.json @@ -1,6 +1,6 @@ { "libraryItemId": "quixlake-timeseries-destination", - "name": "Quix TS Datalake Sink", + "name": "Quix DataLake Timeseries Sink", "language": "Python", "tags": { "Pipeline Stage": ["Destination"], diff --git a/python/destinations/s3-file/requirements.txt b/python/destinations/s3-file/requirements.txt index 170a474f4..db0a482d1 100644 --- a/python/destinations/s3-file/requirements.txt +++ b/python/destinations/s3-file/requirements.txt @@ -3,4 +3,5 @@ python-dotenv requests pandas boto3 -pyarrow \ No newline at end of file +pyarrow +mypy-boto3-s3 \ No newline at end of file diff --git a/tests/destinations/quixlake-timeseries/docker-compose.test.yml b/tests/destinations/quixlake-timeseries/docker-compose.test.yml index abc79db83..07f381f74 100644 --- a/tests/destinations/quixlake-timeseries/docker-compose.test.yml +++ b/tests/destinations/quixlake-timeseries/docker-compose.test.yml @@ -1,6 +1,6 @@ # Test configuration for Quix Lake Timeseries destination # Uses only public Docker images + mock catalog service -# timeout: 90 +# timeout: 120 services: # MinIO for S3-compatible storage @@ -105,8 +105,8 @@ services: # Performance settings - BATCH_SIZE=5 - - COMMIT_INTERVAL=5 - - MAX_WRITE_WORKERS=3 + - COMMIT_INTERVAL=1 + - MAX_WRITE_WORKERS=5 # Kafka consumer settings - CONSUMER_GROUP=quixlake-ts-test-consumer @@ -140,7 +140,7 @@ services: # Test configuration - TEST_INPUT_TOPIC=test-quixlake-input - - TEST_MESSAGE_COUNT=10 + - TEST_MESSAGE_COUNT=6 # S3 configuration for verification - AWS_ENDPOINT_URL=http://minio:9000 @@ -165,7 +165,7 @@ services: python /tests/produce_test_data.py && echo '' && echo '=== Waiting for quixlake-timeseries to process messages ===' && - sleep 25 && + sleep 10 && echo '' && echo '=== Verifying Parquet data in S3 ===' && python /tests/verify_output.py && diff --git a/tests/destinations/s3-file/docker-compose.test.yml b/tests/destinations/s3-file/docker-compose.test.yml index 3ca994d50..e2cfaf9e8 100644 --- a/tests/destinations/s3-file/docker-compose.test.yml +++ b/tests/destinations/s3-file/docker-compose.test.yml @@ -1,4 +1,4 @@ -# timeout: 60 +# timeout: 90 services: minio: image: minio/minio:latest @@ -83,7 +83,7 @@ services: environment: - Quix__Broker__Address=kafka:9092 - TEST_INPUT_TOPIC=test-s3-input - - TEST_MESSAGE_COUNT=3 + - TEST_MESSAGE_COUNT=2 - MINIO_ENDPOINT=minio:9000 - MINIO_ACCESS_KEY=minioadmin - MINIO_SECRET_KEY=minioadmin @@ -96,7 +96,7 @@ services: echo 'Producing test messages to Kafka...' && python /tests/produce_test_data.py && echo 'Waiting for s3-file-dest to process messages...' && - sleep 15 && + sleep 8 && echo 'Verifying data in S3...' && python /tests/verify_output.py " diff --git a/tests/destinations/s3-file/verify_output.py b/tests/destinations/s3-file/verify_output.py index 18cb49ef3..cb734b5da 100644 --- a/tests/destinations/s3-file/verify_output.py +++ b/tests/destinations/s3-file/verify_output.py @@ -89,7 +89,7 @@ def main(): print(f"\nTotal records found: {len(all_records)}") # Verify we have the expected number of records - expected_message_count = 3 + expected_message_count = int(os.getenv("TEST_MESSAGE_COUNT", "3")) if len(all_records) < expected_message_count: print(f"ERROR: Expected {expected_message_count} records, found {len(all_records)}") sys.exit(1) From 9948aa528185613e2b7523266d2b6c50d9955851 Mon Sep 17 00:00:00 2001 From: Tomas Neubauer Date: Wed, 26 Nov 2025 14:12:24 +0100 Subject: [PATCH 4/5] Adds S3-compatible storage support Enables the connector to support S3-compatible storage services like MinIO, Wasabi, DigitalOcean Spaces, and Backblaze B2, by allowing users to set a custom endpoint URL. Also updates the Quix platform link. --- .../quixlake-timeseries/README.md | 55 +++++++++++++++++-- .../destinations/quixlake-timeseries/main.py | 4 ++ .../quixlake-timeseries/quixlake_sink.py | 21 +++++-- 3 files changed, 71 insertions(+), 9 deletions(-) diff --git a/python/destinations/quixlake-timeseries/README.md b/python/destinations/quixlake-timeseries/README.md index 24b7379fa..b657d7b65 100644 --- a/python/destinations/quixlake-timeseries/README.md +++ b/python/destinations/quixlake-timeseries/README.md @@ -13,7 +13,7 @@ This connector consumes time-series data from a Kafka topic and writes it to S3 ## How to run -Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log in and visit the `Connectors` tab to use this connector. +Create a [Quix](https://portal.cloud.quix.io/signup?xlink=github) account or log in and visit the `Connectors` tab to use this connector. Clicking `Set up connector` allows you to enter your connection details and runtime parameters. @@ -44,8 +44,13 @@ Then either: - **`AWS_REGION`**: AWS region for S3 bucket *Default*: `us-east-1` -- **`AWS_ENDPOINT_URL`**: S3 endpoint URL (for non-AWS S3-compatible storage like MinIO) - *Default*: None +- **`AWS_ENDPOINT_URL`**: Custom S3 endpoint URL for non-AWS S3-compatible storage + *Examples*: + - MinIO: `http://minio.example.com:9000` + - Wasabi: `https://s3.wasabisys.com` + - DigitalOcean Spaces: `https://nyc3.digitaloceanspaces.com` + - Backblaze B2: `https://s3.us-west-004.backblazeb2.com` + *Default*: None (uses AWS S3) ### Data Organization @@ -126,6 +131,46 @@ HIVE_COLUMNS= ``` Creates: `s3://bucket/prefix/table/data_*.parquet` +## Using Non-AWS S3-Compatible Storage + +This connector supports any S3-compatible storage service by setting the `AWS_ENDPOINT_URL` environment variable. + +### MinIO Example +```bash +AWS_ENDPOINT_URL=http://minio.example.com:9000 +AWS_ACCESS_KEY_ID=minioadmin +AWS_SECRET_ACCESS_KEY=minioadmin +AWS_REGION=us-east-1 +S3_BUCKET=my-data-lake +``` + +### Wasabi Example +```bash +AWS_ENDPOINT_URL=https://s3.wasabisys.com +AWS_ACCESS_KEY_ID=your-wasabi-access-key +AWS_SECRET_ACCESS_KEY=your-wasabi-secret-key +AWS_REGION=us-east-1 +S3_BUCKET=my-data-lake +``` + +### DigitalOcean Spaces Example +```bash +AWS_ENDPOINT_URL=https://nyc3.digitaloceanspaces.com +AWS_ACCESS_KEY_ID=your-spaces-access-key +AWS_SECRET_ACCESS_KEY=your-spaces-secret-key +AWS_REGION=nyc3 +S3_BUCKET=my-data-lake +``` + +### Backblaze B2 Example +```bash +AWS_ENDPOINT_URL=https://s3.us-west-004.backblazeb2.com +AWS_ACCESS_KEY_ID=your-b2-key-id +AWS_SECRET_ACCESS_KEY=your-b2-application-key +AWS_REGION=us-west-004 +S3_BUCKET=my-data-lake +``` + ## Architecture The sink uses a batching architecture for high throughput: @@ -138,7 +183,9 @@ The sink uses a batching architecture for high throughput: ## Requirements -- S3 bucket access (AWS S3 or S3-compatible storage) +- S3 bucket access: + - AWS S3, or + - Any S3-compatible storage (MinIO, Wasabi, DigitalOcean Spaces, Backblaze B2, etc.) - Optional: Quix REST Catalog endpoint for data catalog integration ## Contribute diff --git a/python/destinations/quixlake-timeseries/main.py b/python/destinations/quixlake-timeseries/main.py index 9869719ae..57faac8e1 100644 --- a/python/destinations/quixlake-timeseries/main.py +++ b/python/destinations/quixlake-timeseries/main.py @@ -50,6 +50,10 @@ def parse_hive_columns(columns_str: str) -> list: s3_bucket=os.environ["S3_BUCKET"], s3_prefix=os.getenv("S3_PREFIX", "data"), table_name=table_name, + aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + aws_region=os.getenv("AWS_REGION", "us-east-1"), + s3_endpoint_url=os.getenv("AWS_ENDPOINT_URL"), hive_columns=hive_columns, timestamp_column=os.getenv("TIMESTAMP_COLUMN", "ts_ms"), catalog_url=os.getenv("CATALOG_URL"), diff --git a/python/destinations/quixlake-timeseries/quixlake_sink.py b/python/destinations/quixlake-timeseries/quixlake_sink.py index b597ff39c..e5abf4540 100644 --- a/python/destinations/quixlake-timeseries/quixlake_sink.py +++ b/python/destinations/quixlake-timeseries/quixlake_sink.py @@ -8,7 +8,6 @@ import time import logging import uuid -import os from typing import List, Dict, Any, Optional from datetime import datetime, timezone from io import BytesIO @@ -36,6 +35,10 @@ def __init__( s3_bucket: str, s3_prefix: str, table_name: str, + aws_access_key_id: Optional[str] = None, + aws_secret_access_key: Optional[str] = None, + aws_region: str = "us-east-1", + s3_endpoint_url: Optional[str] = None, hive_columns: List[str] = None, timestamp_column: str = "ts_ms", catalog_url: Optional[str] = None, @@ -52,6 +55,11 @@ def __init__( s3_bucket: S3 bucket name s3_prefix: S3 prefix/path for data files table_name: Table name for registration + aws_access_key_id: AWS access key ID + aws_secret_access_key: AWS secret access key + aws_region: AWS region (default: "us-east-1") + s3_endpoint_url: Custom S3 endpoint URL for non-AWS S3-compatible storage + (e.g., MinIO, Wasabi, DigitalOcean Spaces) hive_columns: List of columns to use for Hive partitioning. Include 'year', 'month', 'day', 'hour' to extract these from timestamp_column timestamp_column: Column containing timestamp to extract time partitions from @@ -62,10 +70,10 @@ def __init__( auto_create_bucket: if True, create bucket in S3 if missing. max_workers: Maximum number of parallel upload threads (default: 10) """ - self._aws_region = os.getenv('AWS_REGION', 'us-east-1') - self._aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID") - self._aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY") - self._aws_endpoint_url = os.getenv("AWS_ENDPOINT_URL", None) + self._aws_region = aws_region + self._aws_access_key_id = aws_access_key_id + self._aws_secret_access_key = aws_secret_access_key + self._aws_endpoint_url = s3_endpoint_url self._credentials = { "region_name": self._aws_region, "aws_access_key_id": self._aws_access_key_id, @@ -100,6 +108,9 @@ def setup(self): logger.info(f"S3 Target: s3://{self.s3_bucket}/{self.s3_prefix}/{self.table_name}") logger.info(f"Partitioning: hive_columns={self.hive_columns}") + if self._aws_endpoint_url: + logger.info(f"Using custom S3 endpoint: {self._aws_endpoint_url}") + if self._catalog and self.auto_discover: logger.info(f"Table will be auto-registered in REST Catalog on first write") From 33ac8b326a6324c2c606534e1436c0d899021406 Mon Sep 17 00:00:00 2001 From: Tomas Neubauer Date: Wed, 26 Nov 2025 14:13:09 +0100 Subject: [PATCH 5/5] Enables support for S3-compatible storage Allows the connector to use non-AWS S3-compatible storage services like MinIO, Wasabi, DigitalOcean Spaces, and Backblaze B2 by providing a custom endpoint URL. This change makes the connector more versatile and allows users to leverage alternative storage solutions. --- .../quixlake-timeseries/README.md | 55 +++++++++++++++++-- .../destinations/quixlake-timeseries/main.py | 4 ++ .../quixlake-timeseries/quixlake_sink.py | 21 +++++-- 3 files changed, 71 insertions(+), 9 deletions(-) diff --git a/python/destinations/quixlake-timeseries/README.md b/python/destinations/quixlake-timeseries/README.md index 24b7379fa..b657d7b65 100644 --- a/python/destinations/quixlake-timeseries/README.md +++ b/python/destinations/quixlake-timeseries/README.md @@ -13,7 +13,7 @@ This connector consumes time-series data from a Kafka topic and writes it to S3 ## How to run -Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log in and visit the `Connectors` tab to use this connector. +Create a [Quix](https://portal.cloud.quix.io/signup?xlink=github) account or log in and visit the `Connectors` tab to use this connector. Clicking `Set up connector` allows you to enter your connection details and runtime parameters. @@ -44,8 +44,13 @@ Then either: - **`AWS_REGION`**: AWS region for S3 bucket *Default*: `us-east-1` -- **`AWS_ENDPOINT_URL`**: S3 endpoint URL (for non-AWS S3-compatible storage like MinIO) - *Default*: None +- **`AWS_ENDPOINT_URL`**: Custom S3 endpoint URL for non-AWS S3-compatible storage + *Examples*: + - MinIO: `http://minio.example.com:9000` + - Wasabi: `https://s3.wasabisys.com` + - DigitalOcean Spaces: `https://nyc3.digitaloceanspaces.com` + - Backblaze B2: `https://s3.us-west-004.backblazeb2.com` + *Default*: None (uses AWS S3) ### Data Organization @@ -126,6 +131,46 @@ HIVE_COLUMNS= ``` Creates: `s3://bucket/prefix/table/data_*.parquet` +## Using Non-AWS S3-Compatible Storage + +This connector supports any S3-compatible storage service by setting the `AWS_ENDPOINT_URL` environment variable. + +### MinIO Example +```bash +AWS_ENDPOINT_URL=http://minio.example.com:9000 +AWS_ACCESS_KEY_ID=minioadmin +AWS_SECRET_ACCESS_KEY=minioadmin +AWS_REGION=us-east-1 +S3_BUCKET=my-data-lake +``` + +### Wasabi Example +```bash +AWS_ENDPOINT_URL=https://s3.wasabisys.com +AWS_ACCESS_KEY_ID=your-wasabi-access-key +AWS_SECRET_ACCESS_KEY=your-wasabi-secret-key +AWS_REGION=us-east-1 +S3_BUCKET=my-data-lake +``` + +### DigitalOcean Spaces Example +```bash +AWS_ENDPOINT_URL=https://nyc3.digitaloceanspaces.com +AWS_ACCESS_KEY_ID=your-spaces-access-key +AWS_SECRET_ACCESS_KEY=your-spaces-secret-key +AWS_REGION=nyc3 +S3_BUCKET=my-data-lake +``` + +### Backblaze B2 Example +```bash +AWS_ENDPOINT_URL=https://s3.us-west-004.backblazeb2.com +AWS_ACCESS_KEY_ID=your-b2-key-id +AWS_SECRET_ACCESS_KEY=your-b2-application-key +AWS_REGION=us-west-004 +S3_BUCKET=my-data-lake +``` + ## Architecture The sink uses a batching architecture for high throughput: @@ -138,7 +183,9 @@ The sink uses a batching architecture for high throughput: ## Requirements -- S3 bucket access (AWS S3 or S3-compatible storage) +- S3 bucket access: + - AWS S3, or + - Any S3-compatible storage (MinIO, Wasabi, DigitalOcean Spaces, Backblaze B2, etc.) - Optional: Quix REST Catalog endpoint for data catalog integration ## Contribute diff --git a/python/destinations/quixlake-timeseries/main.py b/python/destinations/quixlake-timeseries/main.py index 9869719ae..57faac8e1 100644 --- a/python/destinations/quixlake-timeseries/main.py +++ b/python/destinations/quixlake-timeseries/main.py @@ -50,6 +50,10 @@ def parse_hive_columns(columns_str: str) -> list: s3_bucket=os.environ["S3_BUCKET"], s3_prefix=os.getenv("S3_PREFIX", "data"), table_name=table_name, + aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + aws_region=os.getenv("AWS_REGION", "us-east-1"), + s3_endpoint_url=os.getenv("AWS_ENDPOINT_URL"), hive_columns=hive_columns, timestamp_column=os.getenv("TIMESTAMP_COLUMN", "ts_ms"), catalog_url=os.getenv("CATALOG_URL"), diff --git a/python/destinations/quixlake-timeseries/quixlake_sink.py b/python/destinations/quixlake-timeseries/quixlake_sink.py index b597ff39c..e5abf4540 100644 --- a/python/destinations/quixlake-timeseries/quixlake_sink.py +++ b/python/destinations/quixlake-timeseries/quixlake_sink.py @@ -8,7 +8,6 @@ import time import logging import uuid -import os from typing import List, Dict, Any, Optional from datetime import datetime, timezone from io import BytesIO @@ -36,6 +35,10 @@ def __init__( s3_bucket: str, s3_prefix: str, table_name: str, + aws_access_key_id: Optional[str] = None, + aws_secret_access_key: Optional[str] = None, + aws_region: str = "us-east-1", + s3_endpoint_url: Optional[str] = None, hive_columns: List[str] = None, timestamp_column: str = "ts_ms", catalog_url: Optional[str] = None, @@ -52,6 +55,11 @@ def __init__( s3_bucket: S3 bucket name s3_prefix: S3 prefix/path for data files table_name: Table name for registration + aws_access_key_id: AWS access key ID + aws_secret_access_key: AWS secret access key + aws_region: AWS region (default: "us-east-1") + s3_endpoint_url: Custom S3 endpoint URL for non-AWS S3-compatible storage + (e.g., MinIO, Wasabi, DigitalOcean Spaces) hive_columns: List of columns to use for Hive partitioning. Include 'year', 'month', 'day', 'hour' to extract these from timestamp_column timestamp_column: Column containing timestamp to extract time partitions from @@ -62,10 +70,10 @@ def __init__( auto_create_bucket: if True, create bucket in S3 if missing. max_workers: Maximum number of parallel upload threads (default: 10) """ - self._aws_region = os.getenv('AWS_REGION', 'us-east-1') - self._aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID") - self._aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY") - self._aws_endpoint_url = os.getenv("AWS_ENDPOINT_URL", None) + self._aws_region = aws_region + self._aws_access_key_id = aws_access_key_id + self._aws_secret_access_key = aws_secret_access_key + self._aws_endpoint_url = s3_endpoint_url self._credentials = { "region_name": self._aws_region, "aws_access_key_id": self._aws_access_key_id, @@ -100,6 +108,9 @@ def setup(self): logger.info(f"S3 Target: s3://{self.s3_bucket}/{self.s3_prefix}/{self.table_name}") logger.info(f"Partitioning: hive_columns={self.hive_columns}") + if self._aws_endpoint_url: + logger.info(f"Using custom S3 endpoint: {self._aws_endpoint_url}") + if self._catalog and self.auto_discover: logger.info(f"Table will be auto-registered in REST Catalog on first write")