Skip to content

Commit

Permalink
Merge pull request #93 from vbalalian/dagster-integration-1
Browse files Browse the repository at this point in the history
Dagster integration 1
  • Loading branch information
vbalalian committed Feb 19, 2024
2 parents 6a6d812 + ca16865 commit 7b416b2
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

HOST=http://host.docker.internal
HOST=host.docker.internal

# Database name and credentials
POSTGRES_USER=postgres
Expand Down
42 changes: 33 additions & 9 deletions airbyte-api-minio-connection/airbyte_connection_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def cancel_job(self, jobId:str):
destination_config = {
"destinationType": "s3",
"s3_bucket_region": "us-west-2",
"format": {"format_type": "CSV", "flattening": "No flattening",
"format": {"format_type": "CSV", "flattening": "Root level flattening",
"compression": { "compression_type": "No Compression" }},
"access_key_id": os.getenv("MINIO_NEW_USER", "roman-coins-user"),
"secret_access_key": os.getenv("MINIO_NEW_USER_PASSWORD", "nonprodpasswd"),
Expand All @@ -279,11 +279,35 @@ def cancel_job(self, jobId:str):
destination_config=destination_config)

print(f"AIRBYTE STATUS: {airbyte.status()}")
print('ADDING CUSTOM SOURCE...')
print(airbyte.add_custom_source())
print('CREATING SOURCE...')
print(airbyte.create_source(start_date=os.getenv("CONNECTOR_START_DATE", "2024-01-01")))
print('CREATING DESTINATION...')
print(airbyte.create_destination())
print('CREATING CONNECTION...')
print(airbyte.create_connection())

# Add custom source connector to airbyte instance if it doesn't already exist
custom_source_exists = any(source['name'] == 'Roman Coins API' for source in (airbyte.instanceSources() or []))
if not custom_source_exists:
print('ADDING CUSTOM SOURCE...')
print(airbyte.add_custom_source())
else:
print('CUSTOM SOURCE CONNECTOR ALREADY EXISTS.')

# Create airbyte source if it doesn't already exist
active_source_exists = any(source['name'] == 'Roman Coins API' for source in (airbyte.activeSources() or []))
if not active_source_exists:
print('CREATING SOURCE...')
print(airbyte.create_source(start_date=os.getenv("CONNECTOR_START_DATE", "2024-01-01")))
else:
print('SOURCE ALREADY EXISTS.')

# Create airbyte destination if it doesn't already exist
active_destination_exists = any(source['name'] == 'MinIO' for source in (airbyte.activeDestinations() or []))
if not active_destination_exists:
print('CREATING DESTINATION...')
print(airbyte.create_destination())
else:
print('DESTINATION ALREADY EXISTS.')

# Create airbyte connection if it doesn't already exist
active_connection_exists = any(source['name'] == 'Roman Coins API - MinIO' for source in (airbyte.activeConnections() or []))
if not active_connection_exists:
print('CREATING CONNECTION...')
print(airbyte.create_connection())
else:
print('CONNECTION ALREADY EXISTS.')
65 changes: 60 additions & 5 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
condition: service_started
db:
condition: service_healthy

web_scraper:
build: ./web_scraping
environment:
Expand All @@ -23,8 +24,9 @@ services:
depends_on:
db:
condition: service_healthy

db:
image: postgres:latest
image: postgres
restart: unless-stopped
volumes:
- db-data:/var/lib/postgresql/data
Expand All @@ -39,8 +41,9 @@ services:
interval: 10s
timeout: 5s
retries: 5

minio:
image: minio/minio:latest
image: minio/minio
volumes:
- minio-data:/data
environment:
Expand All @@ -50,8 +53,9 @@ services:
ports:
- 9000:9000
- 9090:9090

minio-setup:
image: minio/mc:latest
image: minio/mc
depends_on:
- minio
entrypoint: >
Expand All @@ -64,14 +68,16 @@ services:
mc admin user add myminio ${MINIO_NEW_USER} ${MINIO_NEW_USER_PASSWORD};
mc admin policy attach myminio readwrite --user=${MINIO_NEW_USER};
"
custom-airbyte-connector:
environment:
- HOST=${HOST}
- AIRBYTE_HOST=http://${HOST}
build:
context: ./custom-airbyte-connector
dockerfile: Dockerfile
image: airbyte/source-roman-coins-api:latest
pull_policy: build

airbyte-configurator:
depends_on:
- api
Expand All @@ -80,14 +86,63 @@ services:
- MINIO_BUCKET_NAME=${MINIO_BUCKET_NAME}
- MINIO_NEW_USER=${MINIO_NEW_USER}
- MINIO_NEW_USER_PASSWORD=${MINIO_NEW_USER_PASSWORD}
- AIRBYTE_HOST=${HOST}
- AIRBYTE_HOST=http://${HOST}
- AIRBYTE_USERNAME=${AIRBYTE_USERNAME}
- AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD}
- CONNECTOR_START_DATE=${CONNECTOR_START_DATE}
build:
context: ./airbyte-api-minio-connection
dockerfile: Dockerfile
pull_policy: build

dagster-webserver:
depends_on:
- db
- airbyte-configurator
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_DB=${POSTGRES_DB}
- DB_HOST=db
- AIRBYTE_USERNAME=${AIRBYTE_USERNAME}
- AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD}
- HOST=${HOST}
build:
context: ./orchestration
dockerfile: Dockerfile
entrypoint:
- dagster-webserver
- -h
- "0.0.0.0"
- -p
- "3000"
- -w
- workspace.yaml
expose:
- 3000
ports:
- 3000:3000

dagster-daemon:
depends_on:
- db
- airbyte-configurator
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_DB=${POSTGRES_DB}
- DB_HOST=db
- AIRBYTE_USERNAME=${AIRBYTE_USERNAME}
- AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD}
- HOST=${HOST}
build:
context: ./orchestration
dockerfile: Dockerfile
entrypoint:
- dagster-daemon
- run
restart: on-failure

volumes:
db-data:
web-scraper-flag:
Expand Down
16 changes: 16 additions & 0 deletions orchestration/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.10-slim

RUN mkdir -p /opt/dagster/dagster_home /opt/dagster/app

RUN pip install dagster dagster-webserver dagster-postgres dagster-airbyte

# Copy your code and workspace to /opt/dagster/app
COPY . /opt/dagster/app/

ENV DAGSTER_HOME=/opt/dagster/dagster_home/
ENV PYTHONPATH="/opt/dagster/app:${PYTHONPATH}"

# Copy dagster instance YAML to $DAGSTER_HOME
COPY dagster.yaml workspace.yaml /opt/dagster/dagster_home/

WORKDIR /opt/dagster/app
21 changes: 21 additions & 0 deletions orchestration/dagster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
storage:
postgres:
postgres_db:
username:
env: POSTGRES_USER
password:
env: POSTGRES_PASSWORD
hostname:
env: DB_HOST
db_name:
env: POSTGRES_DB
port: 5432

local_artifact_storage:
module: dagster.core.storage.root
class: LocalArtifactStorage
config:
base_dir: "/opt/dagster/local/"

telemetry:
enabled: false
16 changes: 16 additions & 0 deletions orchestration/orchestration/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dagster import Definitions
from dagster_airbyte import load_assets_from_airbyte_instance
from .jobs import airbyte_sync_job
from .schedules import airbyte_sync_schedule
from .resources import airbyte_instance

airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)

all_jobs = [airbyte_sync_job]
all_schedules = [airbyte_sync_schedule]

defs = Definitions(
assets=[airbyte_assets],
jobs=all_jobs,
schedules=all_schedules
)
6 changes: 6 additions & 0 deletions orchestration/orchestration/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from dagster import AssetSelection, define_asset_job

airbyte_sync_job = define_asset_job(
name='airbyte_sync_job',
selection=AssetSelection.all()
)
9 changes: 9 additions & 0 deletions orchestration/orchestration/resources/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dagster import EnvVar
from dagster_airbyte import AirbyteResource

airbyte_instance = AirbyteResource(
host=EnvVar("HOST"),
port="8000",
username=EnvVar("AIRBYTE_USERNAME"),
password=EnvVar("AIRBYTE_PASSWORD"),
)
8 changes: 8 additions & 0 deletions orchestration/orchestration/schedules/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dagster import ScheduleDefinition, DefaultScheduleStatus
from ..jobs import airbyte_sync_job

airbyte_sync_schedule = ScheduleDefinition(
job=airbyte_sync_job,
cron_schedule="*/30 * * * *",
default_status=DefaultScheduleStatus.RUNNING
)
6 changes: 6 additions & 0 deletions orchestration/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.dagster]
module_name = "orchestration"
2 changes: 2 additions & 0 deletions orchestration/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[metadata]
name = orchestration
12 changes: 12 additions & 0 deletions orchestration/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from setuptools import find_packages, setup

setup(
name="orchestration",
packages=find_packages(exclude=["orchestration_tests"]),
install_requires=[
"dagster",
"dagster-cloud",
"dagster-airbyte"
],
extras_require={"dev": ["dagster-webserver", "pytest"]},
)
2 changes: 2 additions & 0 deletions orchestration/workspace.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
load_from:
- python_module: "orchestration"

0 comments on commit 7b416b2

Please sign in to comment.