diff --git a/.env b/.env index 1de5616..11e9604 100644 --- a/.env +++ b/.env @@ -16,4 +16,8 @@ MINIO_ROOT_USER=minioadmin MINIO_ROOT_PASSWORD=minioadmin MINIO_BUCKET_NAME=roman-coins MINIO_NEW_USER=roman-coins-user -MINIO_NEW_USER_PASSWORD=nonprodpasswd \ No newline at end of file +MINIO_NEW_USER_PASSWORD=nonprodpasswd + +# DuckDB database +DUCKDB_DATABASE=data/data.duckdb +LOADING_TABLE=raw_roman_coins \ No newline at end of file diff --git a/compose.test.yaml b/compose.test.yaml index f6bb6fb..40fc91b 100644 --- a/compose.test.yaml +++ b/compose.test.yaml @@ -97,8 +97,12 @@ services: dagster-webserver: depends_on: - - db - - airbyte-configurator + db: + condition: service_healthy + airbyte-configurator: + condition: service_completed_successfully + volumes: + - duckdb-data:/opt/dagster/app/data environment: - POSTGRES_USER=${POSTGRES_USER} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} @@ -107,6 +111,12 @@ services: - AIRBYTE_USERNAME=${AIRBYTE_USERNAME} - AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD} - HOST=${HOST} + - MINIO_ENDPOINT=${HOST}:9000 + - DUCKDB_DATABASE=${DUCKDB_DATABASE} + - LOADING_TABLE=${LOADING_TABLE} + - MINIO_BUCKET_NAME=${MINIO_BUCKET_NAME} + - MINIO_USER=${MINIO_NEW_USER} + - MINIO_PASSWORD=${MINIO_NEW_USER_PASSWORD} build: context: ./extract-load-transform/orchestration dockerfile: Dockerfile @@ -125,8 +135,12 @@ services: dagster-daemon: depends_on: - - db - - airbyte-configurator + db: + condition: service_healthy + airbyte-configurator: + condition: service_completed_successfully + volumes: + - duckdb-data:/opt/dagster/app/data environment: - POSTGRES_USER=${POSTGRES_USER} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} @@ -135,6 +149,12 @@ services: - AIRBYTE_USERNAME=${AIRBYTE_USERNAME} - AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD} - HOST=${HOST} + - MINIO_ENDPOINT=${HOST}:9000 + - DUCKDB_DATABASE=${DUCKDB_DATABASE} + - LOADING_TABLE=${LOADING_TABLE} + - MINIO_BUCKET_NAME=${MINIO_BUCKET_NAME} + - MINIO_USER=${MINIO_NEW_USER} + - MINIO_PASSWORD=${MINIO_NEW_USER_PASSWORD} build: context: ./extract-load-transform/orchestration dockerfile: Dockerfile diff --git a/compose.yaml b/compose.yaml index 8189140..6d3c0b0 100644 --- a/compose.yaml +++ b/compose.yaml @@ -97,8 +97,12 @@ services: dagster-webserver: depends_on: - - db - - airbyte-configurator + db: + condition: service_healthy + airbyte-configurator: + condition: service_completed_successfully + volumes: + - duckdb-data:/opt/dagster/app/data environment: - POSTGRES_USER=${POSTGRES_USER} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} @@ -107,6 +111,12 @@ services: - AIRBYTE_USERNAME=${AIRBYTE_USERNAME} - AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD} - HOST=${HOST} + - MINIO_ENDPOINT=${HOST}:9000 + - DUCKDB_DATABASE=${DUCKDB_DATABASE} + - LOADING_TABLE=${LOADING_TABLE} + - MINIO_BUCKET_NAME=${MINIO_BUCKET_NAME} + - MINIO_USER=${MINIO_NEW_USER} + - MINIO_PASSWORD=${MINIO_NEW_USER_PASSWORD} build: context: ./extract-load-transform/orchestration dockerfile: Dockerfile @@ -125,8 +135,12 @@ services: dagster-daemon: depends_on: - - db - - airbyte-configurator + db: + condition: service_healthy + airbyte-configurator: + condition: service_completed_successfully + volumes: + - duckdb-data:/opt/dagster/app/data environment: - POSTGRES_USER=${POSTGRES_USER} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} @@ -135,6 +149,12 @@ services: - AIRBYTE_USERNAME=${AIRBYTE_USERNAME} - AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD} - HOST=${HOST} + - MINIO_ENDPOINT=${HOST}:9000 + - DUCKDB_DATABASE=${DUCKDB_DATABASE} + - LOADING_TABLE=${LOADING_TABLE} + - MINIO_BUCKET_NAME=${MINIO_BUCKET_NAME} + - MINIO_USER=${MINIO_NEW_USER} + - MINIO_PASSWORD=${MINIO_NEW_USER_PASSWORD} build: context: ./extract-load-transform/orchestration dockerfile: Dockerfile @@ -147,3 +167,4 @@ volumes: db-data: web-scraper-flag: minio-data: + duckdb-data: diff --git a/extract-load-transform/orchestration/Dockerfile b/extract-load-transform/orchestration/Dockerfile index 9fa38fe..21441d2 100644 --- a/extract-load-transform/orchestration/Dockerfile +++ b/extract-load-transform/orchestration/Dockerfile @@ -2,7 +2,8 @@ FROM python:3.10-slim RUN mkdir -p /opt/dagster/dagster_home /opt/dagster/app -RUN pip install dagster dagster-webserver dagster-postgres dagster-airbyte +RUN pip install dagster dagster-webserver dagster-postgres +RUN pip install dagster-airbyte dagster-duckdb minio duckdb # Copy your code and workspace to /opt/dagster/app COPY . /opt/dagster/app/ diff --git a/extract-load-transform/orchestration/orchestration/__init__.py b/extract-load-transform/orchestration/orchestration/__init__.py index a51aa7b..5869301 100644 --- a/extract-load-transform/orchestration/orchestration/__init__.py +++ b/extract-load-transform/orchestration/orchestration/__init__.py @@ -1,16 +1,23 @@ -from dagster import Definitions +from dagster import Definitions, load_assets_from_modules from dagster_airbyte import load_assets_from_airbyte_instance -from .jobs import airbyte_sync_job -from .schedules import airbyte_sync_schedule -from .resources import airbyte_instance +from .assets import raw_roman_coins +from .resources import database_resource, airbyte_instance, minio_resource +from .jobs import airbyte_sync_job, loading_job +from .sensors import api_sensor, extracted_file_sensor airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance) +loading_assets = load_assets_from_modules(modules=[raw_roman_coins], group_name="load") + +all_jobs = [airbyte_sync_job, loading_job] +all_sensors = [api_sensor, extracted_file_sensor] -all_jobs = [airbyte_sync_job] -all_schedules = [airbyte_sync_schedule] defs = Definitions( - assets=[airbyte_assets], + assets=[airbyte_assets, *loading_assets], + resources={ + "database":database_resource, + "storage":minio_resource + }, jobs=all_jobs, - schedules=all_schedules + sensors=all_sensors ) diff --git a/extract-load-transform/orchestration/orchestration/assets/__init__.py b/extract-load-transform/orchestration/orchestration/assets/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/extract-load-transform/orchestration/orchestration/assets/__init__.py @@ -0,0 +1 @@ + diff --git a/extract-load-transform/orchestration/orchestration/assets/raw_roman_coins.py b/extract-load-transform/orchestration/orchestration/assets/raw_roman_coins.py new file mode 100644 index 0000000..1e5eaf9 --- /dev/null +++ b/extract-load-transform/orchestration/orchestration/assets/raw_roman_coins.py @@ -0,0 +1,69 @@ +from dagster import asset +from dagster_duckdb import DuckDBResource +from ..resources import MinioResource +import os +import tempfile +import duckdb + +@asset( + deps=["roman_coin_api_stream"] +) +def processed_files_set(context, database: DuckDBResource) -> set: + """ + Fetches the set of filenames that have been processed and stored in the + database to avoid reprocessing. + """ + query = "SELECT filename FROM processed_files;" + + with database.get_connection() as conn: + with conn.cursor() as cursor: # Cursor needed for concurrency + try: + result = cursor.execute(query).fetchall() + processed_files_set = set([row[0] for row in result]) + except duckdb.CatalogException: + # Assuming the table doesn't exist, create it and return an empty set + cursor.execute("CREATE TABLE IF NOT EXISTS processed_files (filename VARCHAR);") + processed_files_set = set() + + return processed_files_set + +@asset +def unprocessed_files_list(storage: MinioResource, processed_files_set:set) -> list: + """ + Returns a list of files in minio which haven't been processed yet + """ + unprocessed_files_list = [] + client = storage.client() + objects = client.list_objects(os.getenv("MINIO_BUCKET_NAME"), recursive=True) + + for obj in objects: + if obj.object_name not in processed_files_set: + unprocessed_files_list.append(obj.object_name) + + return unprocessed_files_list + +@asset +def roman_coins(database:DuckDBResource, storage:MinioResource, unprocessed_files_list:list) -> None: + """ + The raw roman coins dataset, loaded from incremental csv files stored in + minio into a DuckDB database. + """ + client = storage.client() + table_name = os.getenv("LOADING_TABLE") + + with database.get_connection() as conn: + with conn.cursor() as cursor: # Cursor needed for concurrency + # Download the file to a temporary location + for file_name in unprocessed_files_list: + with tempfile.NamedTemporaryFile(delete=True) as temp_file: + client.fget_object(os.getenv("MINIO_BUCKET_NAME"), file_name, temp_file.name) + print(f"Loading {file_name.split('/')[2]}") + + # Load data from the temporary file into DuckDB + try: + cursor.execute(f"COPY {table_name} FROM '{temp_file.name}' (FORMAT 'csv', HEADER);") + except duckdb.CatalogException: + cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS FROM read_csv('{temp_file.name}', AUTO_DETECT=TRUE);") + + # Mark the file as processed to avoid reprocessing + cursor.execute("INSERT INTO processed_files VALUES (?);", (file_name,)) diff --git a/extract-load-transform/orchestration/orchestration/jobs/__init__.py b/extract-load-transform/orchestration/orchestration/jobs/__init__.py index c431265..feeee5a 100644 --- a/extract-load-transform/orchestration/orchestration/jobs/__init__.py +++ b/extract-load-transform/orchestration/orchestration/jobs/__init__.py @@ -1,6 +1,14 @@ from dagster import AssetSelection, define_asset_job +airbyte_assets = AssetSelection.groups("roman_coins_api_minio") +loading_assets = AssetSelection.groups("load") + airbyte_sync_job = define_asset_job( name='airbyte_sync_job', - selection=AssetSelection.all() + selection=airbyte_assets +) + +loading_job = define_asset_job( + name="loading_job", + selection=loading_assets ) \ No newline at end of file diff --git a/extract-load-transform/orchestration/orchestration/resources/__init__.py b/extract-load-transform/orchestration/orchestration/resources/__init__.py index 127af13..a050fa6 100644 --- a/extract-load-transform/orchestration/orchestration/resources/__init__.py +++ b/extract-load-transform/orchestration/orchestration/resources/__init__.py @@ -1,9 +1,38 @@ -from dagster import EnvVar +from dagster import EnvVar, ConfigurableResource from dagster_airbyte import AirbyteResource +from dagster_duckdb import DuckDBResource +from minio import Minio airbyte_instance = AirbyteResource( host=EnvVar("HOST"), port="8000", username=EnvVar("AIRBYTE_USERNAME"), password=EnvVar("AIRBYTE_PASSWORD"), + request_max_retries=6, + request_retry_delay=5 ) + +database_resource = DuckDBResource( + database=EnvVar("DUCKDB_DATABASE") +) + +class MinioResource(ConfigurableResource): + + endpoint:str + access_key:str + secret_key:str + session_token:str + secure:bool + region:str + + def client(self): + return Minio(self.endpoint, self.access_key, self.secret_key, + self.session_token, self.secure, self.region) + +minio_resource = MinioResource( + endpoint=EnvVar("MINIO_ENDPOINT"), + access_key=EnvVar("MINIO_USER"), + secret_key=EnvVar("MINIO_PASSWORD"), + session_token="", + secure=False, + region="") \ No newline at end of file diff --git a/extract-load-transform/orchestration/orchestration/schedules/__init__.py b/extract-load-transform/orchestration/orchestration/schedules/__init__.py deleted file mode 100644 index 4ca520f..0000000 --- a/extract-load-transform/orchestration/orchestration/schedules/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -from dagster import ScheduleDefinition, DefaultScheduleStatus -from ..jobs import airbyte_sync_job - -airbyte_sync_schedule = ScheduleDefinition( - job=airbyte_sync_job, - cron_schedule="*/30 * * * *", - default_status=DefaultScheduleStatus.RUNNING -) \ No newline at end of file diff --git a/extract-load-transform/orchestration/orchestration/sensors/__init__.py b/extract-load-transform/orchestration/orchestration/sensors/__init__.py new file mode 100644 index 0000000..7fec5ed --- /dev/null +++ b/extract-load-transform/orchestration/orchestration/sensors/__init__.py @@ -0,0 +1,55 @@ +from dagster import RunRequest, SensorResult, sensor, DefaultSensorStatus +from ..jobs import airbyte_sync_job, loading_job +from ..resources import MinioResource +import os +import requests + +@sensor( + job=airbyte_sync_job, + default_status=DefaultSensorStatus.RUNNING, + minimum_interval_seconds=60, + description="Triggers airbyte_sync_job when modified records are detected in the api." +) +def api_sensor(context): + previous_state = context.cursor if context.cursor else None + run_requests = [] + + endpoint = f'http://{os.getenv("HOST")}:8010/v1/coins/?sort_by=modified&desc=true' + response = requests.get(endpoint) + + try: + current_state = response.json()["data"][0]["modified"] + if current_state != previous_state: + run_requests.append(RunRequest()) + except IndexError as e: + # Assuming no data has been added yet + current_state = None + + return SensorResult( + run_requests=run_requests, + cursor=current_state + ) + +@sensor( + job=loading_job, + default_status=DefaultSensorStatus.RUNNING, + minimum_interval_seconds=60, + description="Triggers loading_job when new files are detected in storage." +) +def extracted_file_sensor(context, storage:MinioResource): + previous_state = context.cursor if context.cursor else None + run_requests = [] + + client = storage.client() + objects = client.list_objects(os.getenv("MINIO_BUCKET_NAME"), recursive=True) + mod_times = [str(obj.last_modified) for obj in objects] + + current_state = max(mod_times, default=None) + + if current_state != previous_state: + run_requests.append(RunRequest()) + + return SensorResult( + run_requests=run_requests, + cursor=current_state + ) \ No newline at end of file diff --git a/extract-load-transform/orchestration/setup.py b/extract-load-transform/orchestration/setup.py index 8d233c9..c687399 100644 --- a/extract-load-transform/orchestration/setup.py +++ b/extract-load-transform/orchestration/setup.py @@ -6,7 +6,10 @@ install_requires=[ "dagster", "dagster-cloud", - "dagster-airbyte" + "dagster-airbyte", + "dagster-duckdb", + "minio", + "duckdb" ], extras_require={"dev": ["dagster-webserver", "pytest"]}, )