Skip to content

Commit

Permalink
fully automated extraction and loading: add sensors to trigger airbyt…
Browse files Browse the repository at this point in the history
…e sync and loading jobs, closes #95
  • Loading branch information
vbalalian committed Mar 5, 2024
1 parent a70772e commit 783a812
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 29 deletions.
6 changes: 5 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ MINIO_ROOT_USER=minioadmin
MINIO_ROOT_PASSWORD=minioadmin
MINIO_BUCKET_NAME=roman-coins
MINIO_NEW_USER=roman-coins-user
MINIO_NEW_USER_PASSWORD=nonprodpasswd
MINIO_NEW_USER_PASSWORD=nonprodpasswd

# DuckDB database
DUCKDB_DATABASE=data/data.duckdb
LOADING_TABLE=raw_roman_coins
28 changes: 24 additions & 4 deletions compose.test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,12 @@ services:

dagster-webserver:
depends_on:
- db
- airbyte-configurator
db:
condition: service_healthy
airbyte-configurator:
condition: service_completed_successfully
volumes:
- duckdb-data:/opt/dagster/app/data
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
Expand All @@ -107,6 +111,12 @@ services:
- AIRBYTE_USERNAME=${AIRBYTE_USERNAME}
- AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD}
- HOST=${HOST}
- MINIO_ENDPOINT=${HOST}:9000
- DUCKDB_DATABASE=${DUCKDB_DATABASE}
- LOADING_TABLE=${LOADING_TABLE}
- MINIO_BUCKET_NAME=${MINIO_BUCKET_NAME}
- MINIO_USER=${MINIO_NEW_USER}
- MINIO_PASSWORD=${MINIO_NEW_USER_PASSWORD}
build:
context: ./extract-load-transform/orchestration
dockerfile: Dockerfile
Expand All @@ -125,8 +135,12 @@ services:

dagster-daemon:
depends_on:
- db
- airbyte-configurator
db:
condition: service_healthy
airbyte-configurator:
condition: service_completed_successfully
volumes:
- duckdb-data:/opt/dagster/app/data
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
Expand All @@ -135,6 +149,12 @@ services:
- AIRBYTE_USERNAME=${AIRBYTE_USERNAME}
- AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD}
- HOST=${HOST}
- MINIO_ENDPOINT=${HOST}:9000
- DUCKDB_DATABASE=${DUCKDB_DATABASE}
- LOADING_TABLE=${LOADING_TABLE}
- MINIO_BUCKET_NAME=${MINIO_BUCKET_NAME}
- MINIO_USER=${MINIO_NEW_USER}
- MINIO_PASSWORD=${MINIO_NEW_USER_PASSWORD}
build:
context: ./extract-load-transform/orchestration
dockerfile: Dockerfile
Expand Down
29 changes: 25 additions & 4 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,12 @@ services:

dagster-webserver:
depends_on:
- db
- airbyte-configurator
db:
condition: service_healthy
airbyte-configurator:
condition: service_completed_successfully
volumes:
- duckdb-data:/opt/dagster/app/data
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
Expand All @@ -107,6 +111,12 @@ services:
- AIRBYTE_USERNAME=${AIRBYTE_USERNAME}
- AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD}
- HOST=${HOST}
- MINIO_ENDPOINT=${HOST}:9000
- DUCKDB_DATABASE=${DUCKDB_DATABASE}
- LOADING_TABLE=${LOADING_TABLE}
- MINIO_BUCKET_NAME=${MINIO_BUCKET_NAME}
- MINIO_USER=${MINIO_NEW_USER}
- MINIO_PASSWORD=${MINIO_NEW_USER_PASSWORD}
build:
context: ./extract-load-transform/orchestration
dockerfile: Dockerfile
Expand All @@ -125,8 +135,12 @@ services:

dagster-daemon:
depends_on:
- db
- airbyte-configurator
db:
condition: service_healthy
airbyte-configurator:
condition: service_completed_successfully
volumes:
- duckdb-data:/opt/dagster/app/data
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
Expand All @@ -135,6 +149,12 @@ services:
- AIRBYTE_USERNAME=${AIRBYTE_USERNAME}
- AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD}
- HOST=${HOST}
- MINIO_ENDPOINT=${HOST}:9000
- DUCKDB_DATABASE=${DUCKDB_DATABASE}
- LOADING_TABLE=${LOADING_TABLE}
- MINIO_BUCKET_NAME=${MINIO_BUCKET_NAME}
- MINIO_USER=${MINIO_NEW_USER}
- MINIO_PASSWORD=${MINIO_NEW_USER_PASSWORD}
build:
context: ./extract-load-transform/orchestration
dockerfile: Dockerfile
Expand All @@ -147,3 +167,4 @@ volumes:
db-data:
web-scraper-flag:
minio-data:
duckdb-data:
3 changes: 2 additions & 1 deletion extract-load-transform/orchestration/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ FROM python:3.10-slim

RUN mkdir -p /opt/dagster/dagster_home /opt/dagster/app

RUN pip install dagster dagster-webserver dagster-postgres dagster-airbyte
RUN pip install dagster dagster-webserver dagster-postgres
RUN pip install dagster-airbyte dagster-duckdb minio duckdb

# Copy your code and workspace to /opt/dagster/app
COPY . /opt/dagster/app/
Expand Down
23 changes: 15 additions & 8 deletions extract-load-transform/orchestration/orchestration/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
from dagster import Definitions
from dagster import Definitions, load_assets_from_modules
from dagster_airbyte import load_assets_from_airbyte_instance
from .jobs import airbyte_sync_job
from .schedules import airbyte_sync_schedule
from .resources import airbyte_instance
from .assets import raw_roman_coins
from .resources import database_resource, airbyte_instance, minio_resource
from .jobs import airbyte_sync_job, loading_job
from .sensors import api_sensor, extracted_file_sensor

airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)
loading_assets = load_assets_from_modules(modules=[raw_roman_coins], group_name="load")

all_jobs = [airbyte_sync_job, loading_job]
all_sensors = [api_sensor, extracted_file_sensor]

all_jobs = [airbyte_sync_job]
all_schedules = [airbyte_sync_schedule]

defs = Definitions(
assets=[airbyte_assets],
assets=[airbyte_assets, *loading_assets],
resources={
"database":database_resource,
"storage":minio_resource
},
jobs=all_jobs,
schedules=all_schedules
sensors=all_sensors
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from dagster import asset
from dagster_duckdb import DuckDBResource
from ..resources import MinioResource
import os
import tempfile
import duckdb

@asset(
deps=["roman_coin_api_stream"]
)
def processed_files_set(context, database: DuckDBResource) -> set:
"""
Fetches the set of filenames that have been processed and stored in the
database to avoid reprocessing.
"""
query = "SELECT filename FROM processed_files;"

with database.get_connection() as conn:
with conn.cursor() as cursor: # Cursor needed for concurrency
try:
result = cursor.execute(query).fetchall()
processed_files_set = set([row[0] for row in result])
except duckdb.CatalogException:
# Assuming the table doesn't exist, create it and return an empty set
cursor.execute("CREATE TABLE IF NOT EXISTS processed_files (filename VARCHAR);")
processed_files_set = set()

return processed_files_set

@asset
def unprocessed_files_list(storage: MinioResource, processed_files_set:set) -> list:
"""
Returns a list of files in minio which haven't been processed yet
"""
unprocessed_files_list = []
client = storage.client()
objects = client.list_objects(os.getenv("MINIO_BUCKET_NAME"), recursive=True)

for obj in objects:
if obj.object_name not in processed_files_set:
unprocessed_files_list.append(obj.object_name)

return unprocessed_files_list

@asset
def roman_coins(database:DuckDBResource, storage:MinioResource, unprocessed_files_list:list) -> None:
"""
The raw roman coins dataset, loaded from incremental csv files stored in
minio into a DuckDB database.
"""
client = storage.client()
table_name = os.getenv("LOADING_TABLE")

with database.get_connection() as conn:
with conn.cursor() as cursor: # Cursor needed for concurrency
# Download the file to a temporary location
for file_name in unprocessed_files_list:
with tempfile.NamedTemporaryFile(delete=True) as temp_file:
client.fget_object(os.getenv("MINIO_BUCKET_NAME"), file_name, temp_file.name)
print(f"Loading {file_name.split('/')[2]}")

# Load data from the temporary file into DuckDB
try:
cursor.execute(f"COPY {table_name} FROM '{temp_file.name}' (FORMAT 'csv', HEADER);")
except duckdb.CatalogException:
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS FROM read_csv('{temp_file.name}', AUTO_DETECT=TRUE);")

# Mark the file as processed to avoid reprocessing
cursor.execute("INSERT INTO processed_files VALUES (?);", (file_name,))
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
from dagster import AssetSelection, define_asset_job

airbyte_assets = AssetSelection.groups("roman_coins_api_minio")
loading_assets = AssetSelection.groups("load")

airbyte_sync_job = define_asset_job(
name='airbyte_sync_job',
selection=AssetSelection.all()
selection=airbyte_assets
)

loading_job = define_asset_job(
name="loading_job",
selection=loading_assets
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,38 @@
from dagster import EnvVar
from dagster import EnvVar, ConfigurableResource
from dagster_airbyte import AirbyteResource
from dagster_duckdb import DuckDBResource
from minio import Minio

airbyte_instance = AirbyteResource(
host=EnvVar("HOST"),
port="8000",
username=EnvVar("AIRBYTE_USERNAME"),
password=EnvVar("AIRBYTE_PASSWORD"),
request_max_retries=6,
request_retry_delay=5
)

database_resource = DuckDBResource(
database=EnvVar("DUCKDB_DATABASE")
)

class MinioResource(ConfigurableResource):

endpoint:str
access_key:str
secret_key:str
session_token:str
secure:bool
region:str

def client(self):
return Minio(self.endpoint, self.access_key, self.secret_key,
self.session_token, self.secure, self.region)

minio_resource = MinioResource(
endpoint=EnvVar("MINIO_ENDPOINT"),
access_key=EnvVar("MINIO_USER"),
secret_key=EnvVar("MINIO_PASSWORD"),
session_token="",
secure=False,
region="")

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from dagster import RunRequest, SensorResult, sensor, DefaultSensorStatus
from ..jobs import airbyte_sync_job, loading_job
from ..resources import MinioResource
import os
import requests

@sensor(
job=airbyte_sync_job,
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=60,
description="Triggers airbyte_sync_job when modified records are detected in the api."
)
def api_sensor(context):
previous_state = context.cursor if context.cursor else None
run_requests = []

endpoint = f'http://{os.getenv("HOST")}:8010/v1/coins/?sort_by=modified&desc=true'
response = requests.get(endpoint)

try:
current_state = response.json()["data"][0]["modified"]
if current_state != previous_state:
run_requests.append(RunRequest())
except IndexError as e:
# Assuming no data has been added yet
current_state = None

return SensorResult(
run_requests=run_requests,
cursor=current_state
)

@sensor(
job=loading_job,
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=60,
description="Triggers loading_job when new files are detected in storage."
)
def extracted_file_sensor(context, storage:MinioResource):
previous_state = context.cursor if context.cursor else None
run_requests = []

client = storage.client()
objects = client.list_objects(os.getenv("MINIO_BUCKET_NAME"), recursive=True)
mod_times = [str(obj.last_modified) for obj in objects]

current_state = max(mod_times, default=None)

if current_state != previous_state:
run_requests.append(RunRequest())

return SensorResult(
run_requests=run_requests,
cursor=current_state
)
5 changes: 4 additions & 1 deletion extract-load-transform/orchestration/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
install_requires=[
"dagster",
"dagster-cloud",
"dagster-airbyte"
"dagster-airbyte",
"dagster-duckdb",
"minio",
"duckdb"
],
extras_require={"dev": ["dagster-webserver", "pytest"]},
)

0 comments on commit 783a812

Please sign in to comment.