Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fully automated extraction and loading: add sensors to trigger airbyt… #100

Merged
merged 3 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ MINIO_ROOT_USER=minioadmin
MINIO_ROOT_PASSWORD=minioadmin
MINIO_BUCKET_NAME=roman-coins
MINIO_NEW_USER=roman-coins-user
MINIO_NEW_USER_PASSWORD=nonprodpasswd
MINIO_NEW_USER_PASSWORD=nonprodpasswd

# DuckDB database
DUCKDB_DATABASE=data/data.duckdb
LOADING_TABLE=raw_roman_coins
29 changes: 25 additions & 4 deletions compose.test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,12 @@ services:

dagster-webserver:
depends_on:
- db
- airbyte-configurator
test_db:
condition: service_healthy
airbyte-configurator:
condition: service_started
volumes:
- duckdb-data:/opt/dagster/app/data
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
Expand All @@ -107,6 +111,12 @@ services:
- AIRBYTE_USERNAME=${AIRBYTE_USERNAME}
- AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD}
- HOST=${HOST}
- MINIO_ENDPOINT=${HOST}:9000
- DUCKDB_DATABASE=${DUCKDB_DATABASE}
- LOADING_TABLE=${LOADING_TABLE}
- MINIO_BUCKET_NAME=${MINIO_BUCKET_NAME}
- MINIO_USER=${MINIO_NEW_USER}
- MINIO_PASSWORD=${MINIO_NEW_USER_PASSWORD}
build:
context: ./extract-load-transform/orchestration
dockerfile: Dockerfile
Expand All @@ -125,8 +135,12 @@ services:

dagster-daemon:
depends_on:
- db
- airbyte-configurator
test_db:
condition: service_healthy
airbyte-configurator:
condition: service_started
volumes:
- duckdb-data:/opt/dagster/app/data
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
Expand All @@ -135,6 +149,12 @@ services:
- AIRBYTE_USERNAME=${AIRBYTE_USERNAME}
- AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD}
- HOST=${HOST}
- MINIO_ENDPOINT=${HOST}:9000
- DUCKDB_DATABASE=${DUCKDB_DATABASE}
- LOADING_TABLE=${LOADING_TABLE}
- MINIO_BUCKET_NAME=${MINIO_BUCKET_NAME}
- MINIO_USER=${MINIO_NEW_USER}
- MINIO_PASSWORD=${MINIO_NEW_USER_PASSWORD}
build:
context: ./extract-load-transform/orchestration
dockerfile: Dockerfile
Expand All @@ -159,3 +179,4 @@ volumes:
db-data:
web-scraper-flag:
minio-data:
duckdb-data:
29 changes: 25 additions & 4 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,12 @@ services:

dagster-webserver:
depends_on:
- db
- airbyte-configurator
db:
condition: service_healthy
airbyte-configurator:
condition: service_completed_successfully
volumes:
- duckdb-data:/opt/dagster/app/data
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
Expand All @@ -107,6 +111,12 @@ services:
- AIRBYTE_USERNAME=${AIRBYTE_USERNAME}
- AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD}
- HOST=${HOST}
- MINIO_ENDPOINT=${HOST}:9000
- DUCKDB_DATABASE=${DUCKDB_DATABASE}
- LOADING_TABLE=${LOADING_TABLE}
- MINIO_BUCKET_NAME=${MINIO_BUCKET_NAME}
- MINIO_USER=${MINIO_NEW_USER}
- MINIO_PASSWORD=${MINIO_NEW_USER_PASSWORD}
build:
context: ./extract-load-transform/orchestration
dockerfile: Dockerfile
Expand All @@ -125,8 +135,12 @@ services:

dagster-daemon:
depends_on:
- db
- airbyte-configurator
db:
condition: service_healthy
airbyte-configurator:
condition: service_completed_successfully
volumes:
- duckdb-data:/opt/dagster/app/data
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
Expand All @@ -135,6 +149,12 @@ services:
- AIRBYTE_USERNAME=${AIRBYTE_USERNAME}
- AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD}
- HOST=${HOST}
- MINIO_ENDPOINT=${HOST}:9000
- DUCKDB_DATABASE=${DUCKDB_DATABASE}
- LOADING_TABLE=${LOADING_TABLE}
- MINIO_BUCKET_NAME=${MINIO_BUCKET_NAME}
- MINIO_USER=${MINIO_NEW_USER}
- MINIO_PASSWORD=${MINIO_NEW_USER_PASSWORD}
build:
context: ./extract-load-transform/orchestration
dockerfile: Dockerfile
Expand All @@ -147,3 +167,4 @@ volumes:
db-data:
web-scraper-flag:
minio-data:
duckdb-data:
3 changes: 2 additions & 1 deletion extract-load-transform/orchestration/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ FROM python:3.10-slim

RUN mkdir -p /opt/dagster/dagster_home /opt/dagster/app

RUN pip install dagster dagster-webserver dagster-postgres dagster-airbyte
RUN pip install dagster dagster-webserver dagster-postgres
RUN pip install dagster-airbyte dagster-duckdb minio duckdb

# Copy your code and workspace to /opt/dagster/app
COPY . /opt/dagster/app/
Expand Down
23 changes: 15 additions & 8 deletions extract-load-transform/orchestration/orchestration/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
from dagster import Definitions
from dagster import Definitions, load_assets_from_modules
from dagster_airbyte import load_assets_from_airbyte_instance
from .jobs import airbyte_sync_job
from .schedules import airbyte_sync_schedule
from .resources import airbyte_instance
from .assets import raw_roman_coins
from .resources import database_resource, airbyte_instance, minio_resource
from .jobs import airbyte_sync_job, loading_job
from .sensors import api_sensor, extracted_file_sensor

airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)
loading_assets = load_assets_from_modules(modules=[raw_roman_coins], group_name="load")

all_jobs = [airbyte_sync_job, loading_job]
all_sensors = [api_sensor, extracted_file_sensor]

all_jobs = [airbyte_sync_job]
all_schedules = [airbyte_sync_schedule]

defs = Definitions(
assets=[airbyte_assets],
assets=[airbyte_assets, *loading_assets],
resources={
"database":database_resource,
"storage":minio_resource
},
jobs=all_jobs,
schedules=all_schedules
sensors=all_sensors
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from dagster import asset
from dagster_duckdb import DuckDBResource
from ..resources import MinioResource
import os
import tempfile
import duckdb

@asset(
deps=["roman_coin_api_stream"]
)
def processed_files_set(context, database: DuckDBResource) -> set:
"""
Fetches the set of filenames that have been processed and stored in the
database to avoid reprocessing.
"""
query = "SELECT filename FROM processed_files;"

with database.get_connection() as conn:
with conn.cursor() as cursor: # Cursor needed for concurrency
try:
result = cursor.execute(query).fetchall()
processed_files_set = set([row[0] for row in result])
except duckdb.CatalogException:
# Assuming the table doesn't exist, create it and return an empty set
cursor.execute("CREATE TABLE IF NOT EXISTS processed_files (filename VARCHAR);")
processed_files_set = set()

return processed_files_set

@asset
def unprocessed_files_list(storage: MinioResource, processed_files_set:set) -> list:
"""
Returns a list of files in minio which haven't been processed yet
"""
unprocessed_files_list = []
client = storage.client()
objects = client.list_objects(os.getenv("MINIO_BUCKET_NAME"), recursive=True)

for obj in objects:
if obj.object_name not in processed_files_set:
unprocessed_files_list.append(obj.object_name)

return unprocessed_files_list

@asset
def roman_coins(database:DuckDBResource, storage:MinioResource, unprocessed_files_list:list) -> None:
"""
The raw roman coins dataset, loaded from incremental csv files stored in
minio into a DuckDB database.
"""
client = storage.client()
table_name = os.getenv("LOADING_TABLE")

with database.get_connection() as conn:
with conn.cursor() as cursor: # Cursor needed for concurrency
# Download the file to a temporary location
for file_name in unprocessed_files_list:
with tempfile.NamedTemporaryFile(delete=True) as temp_file:
client.fget_object(os.getenv("MINIO_BUCKET_NAME"), file_name, temp_file.name)
print(f"Loading {file_name.split('/')[2]}")

# Load data from the temporary file into DuckDB
try:
cursor.execute(f"COPY {table_name} FROM '{temp_file.name}' (FORMAT 'csv', HEADER);")
except duckdb.CatalogException:
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS FROM read_csv('{temp_file.name}', AUTO_DETECT=TRUE);")

# Mark the file as processed to avoid reprocessing
cursor.execute("INSERT INTO processed_files VALUES (?);", (file_name,))
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
from dagster import AssetSelection, define_asset_job

airbyte_assets = AssetSelection.groups("roman_coins_api_minio")
loading_assets = AssetSelection.groups("load")

airbyte_sync_job = define_asset_job(
name='airbyte_sync_job',
selection=AssetSelection.all()
selection=airbyte_assets
)

loading_job = define_asset_job(
name="loading_job",
selection=loading_assets
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,38 @@
from dagster import EnvVar
from dagster import EnvVar, ConfigurableResource
from dagster_airbyte import AirbyteResource
from dagster_duckdb import DuckDBResource
from minio import Minio

airbyte_instance = AirbyteResource(
host=EnvVar("HOST"),
port="8000",
username=EnvVar("AIRBYTE_USERNAME"),
password=EnvVar("AIRBYTE_PASSWORD"),
request_max_retries=6,
request_retry_delay=5
)

database_resource = DuckDBResource(
database=EnvVar("DUCKDB_DATABASE")
)

class MinioResource(ConfigurableResource):

endpoint:str
access_key:str
secret_key:str
session_token:str
secure:bool
region:str

def client(self):
return Minio(self.endpoint, self.access_key, self.secret_key,
self.session_token, self.secure, self.region)

minio_resource = MinioResource(
endpoint=EnvVar("MINIO_ENDPOINT"),
access_key=EnvVar("MINIO_USER"),
secret_key=EnvVar("MINIO_PASSWORD"),
session_token="",
secure=False,
region="")

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from dagster import RunRequest, SensorResult, sensor, DefaultSensorStatus
from ..jobs import airbyte_sync_job, loading_job
from ..resources import MinioResource
import os
import requests

@sensor(
job=airbyte_sync_job,
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=60,
description="Triggers airbyte_sync_job when modified records are detected in the api."
)
def api_sensor(context):
previous_state = context.cursor if context.cursor else None
run_requests = []

endpoint = f'http://{os.getenv("HOST")}:8010/v1/coins/?sort_by=modified&desc=true'
response = requests.get(endpoint)

try:
current_state = response.json()["data"][0]["modified"]
if current_state != previous_state:
run_requests.append(RunRequest())
except IndexError as e:
# Assuming no data has been added yet
current_state = None

return SensorResult(
run_requests=run_requests,
cursor=current_state
)

@sensor(
job=loading_job,
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=60,
description="Triggers loading_job when new files are detected in storage."
)
def extracted_file_sensor(context, storage:MinioResource):
previous_state = context.cursor if context.cursor else None
run_requests = []

client = storage.client()
objects = client.list_objects(os.getenv("MINIO_BUCKET_NAME"), recursive=True)
mod_times = [str(obj.last_modified) for obj in objects]

current_state = max(mod_times, default=None)

if current_state != previous_state:
run_requests.append(RunRequest())

return SensorResult(
run_requests=run_requests,
cursor=current_state
)
5 changes: 4 additions & 1 deletion extract-load-transform/orchestration/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
install_requires=[
"dagster",
"dagster-cloud",
"dagster-airbyte"
"dagster-airbyte",
"dagster-duckdb",
"minio",
"duckdb"
],
extras_require={"dev": ["dagster-webserver", "pytest"]},
)
Loading