diff --git a/.env b/.env index f843028..1de5616 100644 --- a/.env +++ b/.env @@ -1,5 +1,5 @@ -HOST=http://host.docker.internal +HOST=host.docker.internal # Database name and credentials POSTGRES_USER=postgres diff --git a/airbyte-api-minio-connection/airbyte_connection_config.py b/airbyte-api-minio-connection/airbyte_connection_config.py index 48769cd..932b32e 100644 --- a/airbyte-api-minio-connection/airbyte_connection_config.py +++ b/airbyte-api-minio-connection/airbyte_connection_config.py @@ -256,7 +256,7 @@ def cancel_job(self, jobId:str): destination_config = { "destinationType": "s3", "s3_bucket_region": "us-west-2", - "format": {"format_type": "CSV", "flattening": "No flattening", + "format": {"format_type": "CSV", "flattening": "Root level flattening", "compression": { "compression_type": "No Compression" }}, "access_key_id": os.getenv("MINIO_NEW_USER", "roman-coins-user"), "secret_access_key": os.getenv("MINIO_NEW_USER_PASSWORD", "nonprodpasswd"), @@ -279,11 +279,35 @@ def cancel_job(self, jobId:str): destination_config=destination_config) print(f"AIRBYTE STATUS: {airbyte.status()}") - print('ADDING CUSTOM SOURCE...') - print(airbyte.add_custom_source()) - print('CREATING SOURCE...') - print(airbyte.create_source(start_date=os.getenv("CONNECTOR_START_DATE", "2024-01-01"))) - print('CREATING DESTINATION...') - print(airbyte.create_destination()) - print('CREATING CONNECTION...') - print(airbyte.create_connection()) + + # Add custom source connector to airbyte instance if it doesn't already exist + custom_source_exists = any(source['name'] == 'Roman Coins API' for source in (airbyte.instanceSources() or [])) + if not custom_source_exists: + print('ADDING CUSTOM SOURCE...') + print(airbyte.add_custom_source()) + else: + print('CUSTOM SOURCE CONNECTOR ALREADY EXISTS.') + + # Create airbyte source if it doesn't already exist + active_source_exists = any(source['name'] == 'Roman Coins API' for source in (airbyte.activeSources() or [])) + if not active_source_exists: + print('CREATING SOURCE...') + print(airbyte.create_source(start_date=os.getenv("CONNECTOR_START_DATE", "2024-01-01"))) + else: + print('SOURCE ALREADY EXISTS.') + + # Create airbyte destination if it doesn't already exist + active_destination_exists = any(source['name'] == 'MinIO' for source in (airbyte.activeDestinations() or [])) + if not active_destination_exists: + print('CREATING DESTINATION...') + print(airbyte.create_destination()) + else: + print('DESTINATION ALREADY EXISTS.') + + # Create airbyte connection if it doesn't already exist + active_connection_exists = any(source['name'] == 'Roman Coins API - MinIO' for source in (airbyte.activeConnections() or [])) + if not active_connection_exists: + print('CREATING CONNECTION...') + print(airbyte.create_connection()) + else: + print('CONNECTION ALREADY EXISTS.') diff --git a/compose.yaml b/compose.yaml index 311995d..3e4fc9d 100644 --- a/compose.yaml +++ b/compose.yaml @@ -12,6 +12,7 @@ services: condition: service_started db: condition: service_healthy + web_scraper: build: ./web_scraping environment: @@ -23,8 +24,9 @@ services: depends_on: db: condition: service_healthy + db: - image: postgres:latest + image: postgres restart: unless-stopped volumes: - db-data:/var/lib/postgresql/data @@ -39,8 +41,9 @@ services: interval: 10s timeout: 5s retries: 5 + minio: - image: minio/minio:latest + image: minio/minio volumes: - minio-data:/data environment: @@ -50,8 +53,9 @@ services: ports: - 9000:9000 - 9090:9090 + minio-setup: - image: minio/mc:latest + image: minio/mc depends_on: - minio entrypoint: > @@ -64,14 +68,16 @@ services: mc admin user add myminio ${MINIO_NEW_USER} ${MINIO_NEW_USER_PASSWORD}; mc admin policy attach myminio readwrite --user=${MINIO_NEW_USER}; " + custom-airbyte-connector: environment: - - HOST=${HOST} + - AIRBYTE_HOST=http://${HOST} build: context: ./custom-airbyte-connector dockerfile: Dockerfile image: airbyte/source-roman-coins-api:latest pull_policy: build + airbyte-configurator: depends_on: - api @@ -80,7 +86,7 @@ services: - MINIO_BUCKET_NAME=${MINIO_BUCKET_NAME} - MINIO_NEW_USER=${MINIO_NEW_USER} - MINIO_NEW_USER_PASSWORD=${MINIO_NEW_USER_PASSWORD} - - AIRBYTE_HOST=${HOST} + - AIRBYTE_HOST=http://${HOST} - AIRBYTE_USERNAME=${AIRBYTE_USERNAME} - AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD} - CONNECTOR_START_DATE=${CONNECTOR_START_DATE} @@ -88,6 +94,55 @@ services: context: ./airbyte-api-minio-connection dockerfile: Dockerfile pull_policy: build + + dagster-webserver: + depends_on: + - db + - airbyte-configurator + environment: + - POSTGRES_USER=${POSTGRES_USER} + - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} + - POSTGRES_DB=${POSTGRES_DB} + - DB_HOST=db + - AIRBYTE_USERNAME=${AIRBYTE_USERNAME} + - AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD} + - HOST=${HOST} + build: + context: ./orchestration + dockerfile: Dockerfile + entrypoint: + - dagster-webserver + - -h + - "0.0.0.0" + - -p + - "3000" + - -w + - workspace.yaml + expose: + - 3000 + ports: + - 3000:3000 + + dagster-daemon: + depends_on: + - db + - airbyte-configurator + environment: + - POSTGRES_USER=${POSTGRES_USER} + - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} + - POSTGRES_DB=${POSTGRES_DB} + - DB_HOST=db + - AIRBYTE_USERNAME=${AIRBYTE_USERNAME} + - AIRBYTE_PASSWORD=${AIRBYTE_PASSWORD} + - HOST=${HOST} + build: + context: ./orchestration + dockerfile: Dockerfile + entrypoint: + - dagster-daemon + - run + restart: on-failure + volumes: db-data: web-scraper-flag: diff --git a/orchestration/Dockerfile b/orchestration/Dockerfile new file mode 100644 index 0000000..9fa38fe --- /dev/null +++ b/orchestration/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.10-slim + +RUN mkdir -p /opt/dagster/dagster_home /opt/dagster/app + +RUN pip install dagster dagster-webserver dagster-postgres dagster-airbyte + +# Copy your code and workspace to /opt/dagster/app +COPY . /opt/dagster/app/ + +ENV DAGSTER_HOME=/opt/dagster/dagster_home/ +ENV PYTHONPATH="/opt/dagster/app:${PYTHONPATH}" + +# Copy dagster instance YAML to $DAGSTER_HOME +COPY dagster.yaml workspace.yaml /opt/dagster/dagster_home/ + +WORKDIR /opt/dagster/app \ No newline at end of file diff --git a/orchestration/dagster.yaml b/orchestration/dagster.yaml new file mode 100644 index 0000000..37f27df --- /dev/null +++ b/orchestration/dagster.yaml @@ -0,0 +1,21 @@ +storage: + postgres: + postgres_db: + username: + env: POSTGRES_USER + password: + env: POSTGRES_PASSWORD + hostname: + env: DB_HOST + db_name: + env: POSTGRES_DB + port: 5432 + +local_artifact_storage: + module: dagster.core.storage.root + class: LocalArtifactStorage + config: + base_dir: "/opt/dagster/local/" + +telemetry: + enabled: false diff --git a/orchestration/orchestration/__init__.py b/orchestration/orchestration/__init__.py new file mode 100644 index 0000000..a51aa7b --- /dev/null +++ b/orchestration/orchestration/__init__.py @@ -0,0 +1,16 @@ +from dagster import Definitions +from dagster_airbyte import load_assets_from_airbyte_instance +from .jobs import airbyte_sync_job +from .schedules import airbyte_sync_schedule +from .resources import airbyte_instance + +airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance) + +all_jobs = [airbyte_sync_job] +all_schedules = [airbyte_sync_schedule] + +defs = Definitions( + assets=[airbyte_assets], + jobs=all_jobs, + schedules=all_schedules +) diff --git a/orchestration/orchestration/jobs/__init__.py b/orchestration/orchestration/jobs/__init__.py new file mode 100644 index 0000000..c431265 --- /dev/null +++ b/orchestration/orchestration/jobs/__init__.py @@ -0,0 +1,6 @@ +from dagster import AssetSelection, define_asset_job + +airbyte_sync_job = define_asset_job( + name='airbyte_sync_job', + selection=AssetSelection.all() +) \ No newline at end of file diff --git a/orchestration/orchestration/resources/__init__.py b/orchestration/orchestration/resources/__init__.py new file mode 100644 index 0000000..127af13 --- /dev/null +++ b/orchestration/orchestration/resources/__init__.py @@ -0,0 +1,9 @@ +from dagster import EnvVar +from dagster_airbyte import AirbyteResource + +airbyte_instance = AirbyteResource( + host=EnvVar("HOST"), + port="8000", + username=EnvVar("AIRBYTE_USERNAME"), + password=EnvVar("AIRBYTE_PASSWORD"), +) diff --git a/orchestration/orchestration/schedules/__init__.py b/orchestration/orchestration/schedules/__init__.py new file mode 100644 index 0000000..4ca520f --- /dev/null +++ b/orchestration/orchestration/schedules/__init__.py @@ -0,0 +1,8 @@ +from dagster import ScheduleDefinition, DefaultScheduleStatus +from ..jobs import airbyte_sync_job + +airbyte_sync_schedule = ScheduleDefinition( + job=airbyte_sync_job, + cron_schedule="*/30 * * * *", + default_status=DefaultScheduleStatus.RUNNING +) \ No newline at end of file diff --git a/orchestration/pyproject.toml b/orchestration/pyproject.toml new file mode 100644 index 0000000..3a32f36 --- /dev/null +++ b/orchestration/pyproject.toml @@ -0,0 +1,6 @@ +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[tool.dagster] +module_name = "orchestration" diff --git a/orchestration/setup.cfg b/orchestration/setup.cfg new file mode 100644 index 0000000..73d406b --- /dev/null +++ b/orchestration/setup.cfg @@ -0,0 +1,2 @@ +[metadata] +name = orchestration diff --git a/orchestration/setup.py b/orchestration/setup.py new file mode 100644 index 0000000..8d233c9 --- /dev/null +++ b/orchestration/setup.py @@ -0,0 +1,12 @@ +from setuptools import find_packages, setup + +setup( + name="orchestration", + packages=find_packages(exclude=["orchestration_tests"]), + install_requires=[ + "dagster", + "dagster-cloud", + "dagster-airbyte" + ], + extras_require={"dev": ["dagster-webserver", "pytest"]}, +) diff --git a/orchestration/workspace.yaml b/orchestration/workspace.yaml new file mode 100644 index 0000000..db161bd --- /dev/null +++ b/orchestration/workspace.yaml @@ -0,0 +1,2 @@ +load_from: + - python_module: "orchestration" \ No newline at end of file