diff --git a/orchestration/Dockerfile b/orchestration/Dockerfile new file mode 100644 index 0000000..9fa38fe --- /dev/null +++ b/orchestration/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.10-slim + +RUN mkdir -p /opt/dagster/dagster_home /opt/dagster/app + +RUN pip install dagster dagster-webserver dagster-postgres dagster-airbyte + +# Copy your code and workspace to /opt/dagster/app +COPY . /opt/dagster/app/ + +ENV DAGSTER_HOME=/opt/dagster/dagster_home/ +ENV PYTHONPATH="/opt/dagster/app:${PYTHONPATH}" + +# Copy dagster instance YAML to $DAGSTER_HOME +COPY dagster.yaml workspace.yaml /opt/dagster/dagster_home/ + +WORKDIR /opt/dagster/app \ No newline at end of file diff --git a/orchestration/dagster.yaml b/orchestration/dagster.yaml new file mode 100644 index 0000000..37f27df --- /dev/null +++ b/orchestration/dagster.yaml @@ -0,0 +1,21 @@ +storage: + postgres: + postgres_db: + username: + env: POSTGRES_USER + password: + env: POSTGRES_PASSWORD + hostname: + env: DB_HOST + db_name: + env: POSTGRES_DB + port: 5432 + +local_artifact_storage: + module: dagster.core.storage.root + class: LocalArtifactStorage + config: + base_dir: "/opt/dagster/local/" + +telemetry: + enabled: false diff --git a/orchestration/orchestration/__init__.py b/orchestration/orchestration/__init__.py new file mode 100644 index 0000000..a51aa7b --- /dev/null +++ b/orchestration/orchestration/__init__.py @@ -0,0 +1,16 @@ +from dagster import Definitions +from dagster_airbyte import load_assets_from_airbyte_instance +from .jobs import airbyte_sync_job +from .schedules import airbyte_sync_schedule +from .resources import airbyte_instance + +airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance) + +all_jobs = [airbyte_sync_job] +all_schedules = [airbyte_sync_schedule] + +defs = Definitions( + assets=[airbyte_assets], + jobs=all_jobs, + schedules=all_schedules +) diff --git a/orchestration/orchestration/jobs/__init__.py b/orchestration/orchestration/jobs/__init__.py new file mode 100644 index 0000000..c431265 --- /dev/null +++ b/orchestration/orchestration/jobs/__init__.py @@ -0,0 +1,6 @@ +from dagster import AssetSelection, define_asset_job + +airbyte_sync_job = define_asset_job( + name='airbyte_sync_job', + selection=AssetSelection.all() +) \ No newline at end of file diff --git a/orchestration/orchestration/resources/__init__.py b/orchestration/orchestration/resources/__init__.py new file mode 100644 index 0000000..127af13 --- /dev/null +++ b/orchestration/orchestration/resources/__init__.py @@ -0,0 +1,9 @@ +from dagster import EnvVar +from dagster_airbyte import AirbyteResource + +airbyte_instance = AirbyteResource( + host=EnvVar("HOST"), + port="8000", + username=EnvVar("AIRBYTE_USERNAME"), + password=EnvVar("AIRBYTE_PASSWORD"), +) diff --git a/orchestration/orchestration/schedules/__init__.py b/orchestration/orchestration/schedules/__init__.py new file mode 100644 index 0000000..4ca520f --- /dev/null +++ b/orchestration/orchestration/schedules/__init__.py @@ -0,0 +1,8 @@ +from dagster import ScheduleDefinition, DefaultScheduleStatus +from ..jobs import airbyte_sync_job + +airbyte_sync_schedule = ScheduleDefinition( + job=airbyte_sync_job, + cron_schedule="*/30 * * * *", + default_status=DefaultScheduleStatus.RUNNING +) \ No newline at end of file diff --git a/orchestration/pyproject.toml b/orchestration/pyproject.toml new file mode 100644 index 0000000..3a32f36 --- /dev/null +++ b/orchestration/pyproject.toml @@ -0,0 +1,6 @@ +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[tool.dagster] +module_name = "orchestration" diff --git a/orchestration/setup.cfg b/orchestration/setup.cfg new file mode 100644 index 0000000..73d406b --- /dev/null +++ b/orchestration/setup.cfg @@ -0,0 +1,2 @@ +[metadata] +name = orchestration diff --git a/orchestration/setup.py b/orchestration/setup.py new file mode 100644 index 0000000..8d233c9 --- /dev/null +++ b/orchestration/setup.py @@ -0,0 +1,12 @@ +from setuptools import find_packages, setup + +setup( + name="orchestration", + packages=find_packages(exclude=["orchestration_tests"]), + install_requires=[ + "dagster", + "dagster-cloud", + "dagster-airbyte" + ], + extras_require={"dev": ["dagster-webserver", "pytest"]}, +) diff --git a/orchestration/workspace.yaml b/orchestration/workspace.yaml new file mode 100644 index 0000000..db161bd --- /dev/null +++ b/orchestration/workspace.yaml @@ -0,0 +1,2 @@ +load_from: + - python_module: "orchestration" \ No newline at end of file