Skip to content

Commit

Permalink
versatile-data-kit: Airflow Documentation (#857)
Browse files Browse the repository at this point in the history
* versatile-data-kit: Airflow example

This change introduces an example where the user deploys
three jobs: two of these ingest data from predefined
sources into a target database and the third aggregates
parts of the data. The example also guides the user to
instantiate an Airflow DAG which runs these jobs in a
dependent manner, meaning that the two ingest jobs must
pass successfully before the third job is ran.

Signed-off-by: Gabriel Georgiev <gageorgiev@vmware.com>
  • Loading branch information
gageorgiev committed Jun 29, 2022
1 parent 2961d04 commit cf94bac
Show file tree
Hide file tree
Showing 22 changed files with 648 additions and 0 deletions.
467 changes: 467 additions & 0 deletions examples/airflow-example/README.md

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions examples/airflow-example/airflow-transform-job/10_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
job1_data = job_input.execute_query("SELECT * FROM memory.default.test_airflow_one")
job2_data = job_input.execute_query("SELECT * FROM memory.default.test_airflow_two")

print(f"Job 1 Data ===> {job1_data} \n\n\n Job 2 Data ===> {job2_data}")
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table if exists memory.default.test_airflow_one
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table if exists memory.default.test_airflow_two
14 changes: 14 additions & 0 deletions examples/airflow-example/airflow-transform-job/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
; Supported format: https://docs.python.org/3/library/configparser.html#supported-ini-file-structure

; This is the only file required to deploy a Data Job.
; Read more to understand what each option means:

; Information about the owner of the Data Job
[owner]

; Team is a way to group Data Jobs that belonged to the same team.
team = my-team

; Configuration related to running data jobs
[job]
db_default_type = TRINO
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vdk-trino
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table if exists memory.default.test_airflow_one
42 changes: 42 additions & 0 deletions examples/airflow-example/airflow-trino-job1/10_insert_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
import pathlib

from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
data_job_dir = pathlib.Path(job_input.get_job_directory())
data_file = data_job_dir / "data.json"

if data_file.exists():
with open(data_file) as f:
data = json.load(f)

rows = [tuple(i.values()) for i in data]
insert_query = """
INSERT INTO memory.default.test_airflow_one VALUES
""" + ", ".join(
str(i) for i in rows
)

job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS memory.default.test_airflow_one
(
id varchar,
first_name varchar,
last_name varchar,
city varchar,
country varchar,
phone varchar
)
"""
)

job_input.execute_query(insert_query)

print("Success! The data was send trino.")
else:
print("No data File Available! Exiting job execution!")
14 changes: 14 additions & 0 deletions examples/airflow-example/airflow-trino-job1/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
; Supported format: https://docs.python.org/3/library/configparser.html#supported-ini-file-structure

; This is the only file required to deploy a Data Job.
; Read more to understand what each option means:

; Information about the owner of the Data Job
[owner]

; Team is a way to group Data Jobs that belonged to the same team.
team = my-team

; Configuration related to running data jobs
[job]
db_default_type = TRINO
1 change: 1 addition & 0 deletions examples/airflow-example/airflow-trino-job1/data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[{"id":"18","FirstName":"Michelle","LastName":"Brooks","City":"New York","Country":"USA","Phone":"+1 (212) 221-3546"},{"id":"19","FirstName":"Tim","LastName":"Goyer","City":"Cupertino","Country":"USA","Phone":"+1 (408) 996-1010"},{"id":"20","FirstName":"Dan","LastName":"Miller","City":"Mountain View","Country":"USA","Phone":"+ 1(650) 644 - 3358"},{"id":"21","FirstName":"Kathy","LastName":"Chase","City":"Reno","Country":"USA","Phone":"+1 (775) 223-7665"},{"id":"22","FirstName":"Heather","LastName":"Leacock","City":"Orlando","Country":"USA","Phone":"+1 (407) 999-7788"},{"id":"23","FirstName":"John","LastName":"Gordon","City":"Boston","Country":"USA","Phone":"+1 (617) 522-1333"},{"id":"24","FirstName":"Frank","LastName":"Ralston","City":"Chicago","Country":"USA","Phone":"+1 (312) 332-3232"},{"id":"25","FirstName":"Victor","LastName":"Stevens","City":"Madison","Country":"USA","Phone":"+1 (608) 257-0597"},{"id":"26","FirstName":"Richard","LastName":"Cunningham","City":"Fort Worth","Country":"USA","Phone":"+1 (817) 924-7272"},{"id":"27","FirstName":"Patrick","LastName":"Gray","City":"Tucson","Country":"USA","Phone":"+1 (520) 622-4200"},{"id":"28","FirstName":"Julia","LastName":"Barnett","City":"Salt Lake City","Country":"USA","Phone":"+1 (801) 531-7272"},{"id":"29","FirstName":"Robert","LastName":"Brown","City":"Toronto","Country":"Canada","Phone":"+1 (416) 363-8888"},{"id":"30","FirstName":"Edward","LastName":"Francis","City":"Ottawa","Country":"Canada","Phone":"+1 (613) 234-3322"}]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vdk-trino
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table if exists memory.default.test_airflow_two
42 changes: 42 additions & 0 deletions examples/airflow-example/airflow-trino-job2/10_insert_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
import pathlib

from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
data_job_dir = pathlib.Path(job_input.get_job_directory())
data_file = data_job_dir / "data.json"

if data_file.exists():
with open(data_file) as f:
data = json.load(f)

rows = [tuple(i.values()) for i in data]
insert_query = """
INSERT INTO memory.default.test_airflow_two VALUES
""" + ", ".join(
str(i) for i in rows
)

job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS memory.default.test_airflow_two
(
id integer,
first_name varchar,
last_name varchar,
city varchar,
country varchar,
phone varchar
)
"""
)

job_input.execute_query(insert_query)

print("Success! The data was send trino.")
else:
print("No data File Available! Exiting job execution!")
14 changes: 14 additions & 0 deletions examples/airflow-example/airflow-trino-job2/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
; Supported format: https://docs.python.org/3/library/configparser.html#supported-ini-file-structure

; This is the only file required to deploy a Data Job.
; Read more to understand what each option means:

; Information about the owner of the Data Job
[owner]

; Team is a way to group Data Jobs that belonged to the same team.
team = my-team

; Configuration related to running data jobs
[job]
db_default_type = TRINO
1 change: 1 addition & 0 deletions examples/airflow-example/airflow-trino-job2/data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[{"id": 31, "FirstName": "Martha", "LastName": "Silk", "City": "Halifax", "Country": "Canada", "Phone": "+1 (902) 450-0450"}, {"id": 32, "FirstName": "Aaron", "LastName": "Mitchell", "City": "Winnipeg", "Country": "Canada", "Phone": "+1 (204) 452-6452"}, {"id": 33, "FirstName": "Ellie", "LastName": "Sullivan", "City": "Yellowknife", "Country": "Canada", "Phone": "+1 (867) 920-2233"}, {"id": 34, "FirstName": "Jo\u00e3o", "LastName": "Fernandes", "City": "Lisbon", "Country": "Portugal", "Phone": "+351 (213) 466-111"}, {"id": 35, "FirstName": "Madalena", "LastName": "Sampaio", "City": "Porto", "Country": "Portugal", "Phone": "+351 (225) 022-448"}, {"id": 36, "FirstName": "Hannah", "LastName": "Schneider", "City": "Berlin", "Country": "Germany", "Phone": "+49 030 26550280"}, {"id": 37, "FirstName": "Fynn", "LastName": "Zimmermann", "City": "Frankfurt", "Country": "Germany", "Phone": "+49 069 40598889"}, {"id": 38, "FirstName": "Niklas", "LastName": "Schr\u00f6der", "City": "Berlin", "Country": "Germany", "Phone": "+49 030 2141444"}, {"id": 39, "FirstName": "Camille", "LastName": "Bernard", "City": "Paris", "Country": "France", "Phone": "+33 01 49 70 65 65"}, {"id": 40, "FirstName": "Dominique", "LastName": "Lefebvre", "City": "Paris", "Country": "France", "Phone": "+33 01 47 42 71 71"}, {"id": 41, "FirstName": "Marc", "LastName": "Dubois", "City": "Lyon", "Country": "France", "Phone": "+33 04 78 30 30 30"}, {"id": 42, "FirstName": "Wyatt", "LastName": "Girard", "City": "Bordeaux", "Country": "France", "Phone": "+33 05 56 96 96 96"}, {"id": 43, "FirstName": "Isabelle", "LastName": "Mercier", "City": "Dijon", "Country": "France", "Phone": "+33 03 80 73 66 99"}, {"id": 44, "FirstName": "Terhi", "LastName": "H\u00e4m\u00e4l\u00e4inen", "City": "Helsinki", "Country": "Finland", "Phone": "+358 09 870 2000"}, {"id": 45, "FirstName": "Ladislav", "LastName": "Kov\u00e1cs", "City": "Budapest", "Country": "Hungary", "Phone": "+123 123 456"}, {"id": 46, "FirstName": "Hugh", "LastName": "OReilly", "City": "Dublin", "Country": "Ireland", "Phone": "+353 01 6792424"}]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vdk-trino
36 changes: 36 additions & 0 deletions examples/airflow-example/dags/airflow_example_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from datetime import datetime

from airflow import DAG
from vdk_provider.operators.vdk import VDKOperator

with DAG(
"airflow_example_vdk",
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False,
tags=["example", "vdk"],
) as dag:
trino_job1 = VDKOperator(
conn_id="vdk-default",
job_name="airflow-trino-job1",
team_name="taurus",
task_id="trino-job1",
)

trino_job2 = VDKOperator(
conn_id="vdk-default",
job_name="airflow-trino-job2",
team_name="taurus",
task_id="trino-job2",
)

transform_job = VDKOperator(
conn_id="vdk-default",
job_name="airflow-transform-job",
team_name="taurus",
task_id="transform-job",
)

[trino_job1, trino_job2] >> transform_job
Binary file added examples/airflow-example/images/connections.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/airflow-example/images/dags.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/airflow-example/images/new_conn.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/airflow-example/images/trigger_dag.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/airflow-example/images/vdk_conn.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit cf94bac

Please sign in to comment.