Luft is standard operators replacement for Airflow with declarative DAGs via Yaml file. It is basically client that helps you with everyday BI tasks.
Airflow comes with batteries loaded - couple of operators that makes your BI work less painful. But after years of using it we stumbled upon to some issues with standard operators.
- Operators are closely tied to Airflow. So for example if Data Scientist wants to ad-hoc download one table from MySQL database and save it to BigQuery, he/she have to create new DAG, operator, jdbc credentials and run whole Airflow ecosystem on localhost. Which is usually overkill.
- Standard loading operators (eg. MySqlToGoogleCloudStorageOperator) doesn't work in Kubernetes.
- Standard loading operators are slow due to Python implementation and are not usable for big loads.
- It is really hard to debug and test operators.
- Airflow doesn't include standard principles for solving DWH problems.
- Schema of data is not usually versioned in standard loading operators.
- Airflow can be replaced with some alternatives in near future. E.g. Prefect, Dagster etc.
Luft is solving most of those problems.
Luft is ment to be running inside Docker container (but of course it can run without it). It is just a simple Python library that is wrapper of multiple services. For example for paralell and fast bulk loading of data from any JDBC to BigQuery it uses Embulk, for executin BigQuery command it use standard Python BQ library, etc.
Every work is done by task which is defined in YAML file (example is in examples/tasks
).
For example loading table Test from MySQL database into S3 is one task, loading data from GA into S3 is another task, historization script in BQ is another task etc.
Mandatory parameters of every task are:
- name: name of task. In case of tables it is usually table name.
- source_system: name of source system. Usually name of database in JDBC databases. Used for better organization especially on blob storage. E.g. jobs, prace, pzr. If not specified the source_system is taken from folder hierarchy. If you look into
example/tasks/world
thenworld
will be source system if you do not specify it in your yaml file. - source_subsystem: name of source subsystem. Usually name of schema in JDBC databases. Used for better organization especially on blob storage. E.g. public, b2b. If not specified the source_subsystem is taken from folder hierarchy. If you look into
example/tasks/world/public
thenpublic
will be source subsystem if you do not specify it in your yaml file. - task_type: type of task. E.g. embulk-jdbc-load, mongo-load, etc. Luft will automatically decide which task will be used based on your cli command. So you do not have to manually specify it. But it can be useful when you want to enforce certain task regardless of cli command (e.g. you want to run BigQuery even if all other tasks in folder are responsible for loading data from MySQL to S3).
Tasks are organized into Task Lists that is an array of work to be done for certain period of time. E.g. you want to download tables T1, T2 and T3 from MySQL database into S3 from 2018-01-01 to 2019-05-02 (and you have where condition on some date).
Luft is currently supporting following task types:
Run Embulk and load data from JDBC db into S3 or GCS. Data are stored as CSV. Some other output data formats will be added later.
luft jdbc load
y
,--yml-path
(mandatory): folder or single yml file inside default tasks folder (see luft.cfg).-s
,--start-date
: Start date in format YYYY-MM-DD for executing task in loop. If not specified yesterday date is used.-e
,--end-date
: End date in format YYYY-MM-DD for executing task in loop. This day is not included. If not specified today date is used.-sys
,--source-system
: override source_system parameter. See description in Task section. Has to be same as name in jdbc.cfg to get right credentials for JDBC database.-sub
,--source-subsystem
: override source_subsystem parameter. See description in Task section.-b
,--blacklist
: Name of tables/objects to be ignored during processing. E.g. --yml-path gis and -b TEST. It will process all objects in gis folder except object TEST.-w
,--whitelist
: Name of tables/objects to be processed. E.g. --yml-path gis and -b TEST. It will process only object TEST.
- Embulk in your docker image (see Dockerfile) or on your local.
- Appropiriate Embulk plugins:
- Output - embulk-output-gcs or embulk-output-s3
- Input - any you need of embulk-input-jdbc
- Luft installed :).
jdbc.cfg
file with right configuration.
This file contains basic jdbc configuration for all of your databases. Every database has to have [DATABASE_NAME]
header. This has to be same as source_system. Supported parameters are:
- type - type of database accourding to embulk-input-jdbc
- uri - uri of database
- port - database port
- database - database name
- user - username of user who you want to log into database
- password - you can specify your password even it is not recommeded way how to do that because you password can be stolen. It is good for DEV but not for PROD.
- password_env - name of enviromental variable used for storing password. If you use this variant you can then pass password in docker run command e.g. if password_env is set to
MY_DB_PASS
thendocker run -e MY_DB_PASS=Password123 luft jdbc load -y <path_to_yml>
should work.
Inside yaml file, following parameters are supported:
- name - Table name.
- source_system - usually name of database - used for organizational purposes and blob storage path. Has to be same as name in jdbc.cfg to get right credentials for JDBC database.
- source_subsystem - usually name of schema - used for organizational purposes and blob storage path.
- task_type -
embulk-jdbc-load
by default but can be overidden. When overriden it is going to be different kind of task :). - thread_name - applicable only when used with Airflow. Thread name is automatically genereted based on number of threads. If you need this task to have totally different thread you can specify custom thread name. Eg. I have tasks T1, T2, T3, T4 and T5 in my task list. and thread count set to 3. By default (if no task has thread_name specified) it will look like this in Airflow:
|T1| -> |T4|
|T2| -> |T5|
|T3|
When I specify any thread_name in task T4:
|T1| -> |T5|
|T2|
|T3|
|T4|
- color - applicable only when used with Airflow. Hex color of Task in Airflow. If not specified
#A3E9DA
is used. - path_prefix - Path prefix (path) on blob storage. You can use following templated fields:
- {env} - your environment (DEV/PROD...)
- {source_system} - name of source system (whatever you like) - in case of jdbc it is usually friendly name of db
- {source_subsystem} - name of source subsystem (whatever you like) - in case of jdbc it is schema name
- {name} - name of table
- {date_valid} - date of valid of export
- {time_valid} - time valid of export
- embulk_template - path to your custom embulk template. Otherwise default from
luft.cfg
will be used. - fetch_rows - number of rows to fetch one time. Default 10000.
- source_table - in case you need different name in blob storage. E.g. Table name is Test1 but you want to rename it to Test in your DWH and on your blob storage. In this case you will write Test to your name parameter in yaml file and Test1 in source_table parameter.
- where_clause - Where condition in your SQL command. You can use
{date_valid}
parameter inside this command to print actual date valid. E.g.where_clause: date_of_change >= '{date_valid}'
. And if you executeluft jdbc load -y <path_to_task> -s 2019-01-01 -e 2019-05-01
for evey date between2019-01-01
and2019-05-01
it will printWHERE date_of_change >= '2019-01-01'
. - columns - list of columns to download. Column parameters:
- name - column name.
- type - column type.
- mandatory - wheter column is mandatory. Default false.
- pk - wheter column is primary key. Default false.
- escape - escape name of column with `. Some databases reqire it.
- value - fixed column value. You should never delete any of your columns from yaml file. Instead you should set
value: 'Null'
.
Load data from BigQuery from Google Cloud Storage and historize them. Currently only CSV is supported
luft bq load
y
,--yml-path
(mandatory): folder or single yml file inside default tasks folder (see luft.cfg).-s
,--start-date
: Start date in format YYYY-MM-DD for executing task in loop. If not specified yesterday date is used.-e
,--end-date
: End date in format YYYY-MM-DD for executing task in loop. This day is not included. If not specified today date is used.-sys
,--source-system
: override source_system parameter. See description in Task section. Has to be same as name in jdbc.cfg to get right credentials for JDBC database.-sub
,--source-subsystem
: override source_subsystem parameter. See description in Task section.-b
,--blacklist
: Name of tables/objects to be ignored during processing. E.g. --yml-path gis and -b TEST. It will process all objects in gis folder except object TEST.-w
,--whitelist
: Name of tables/objects to be processed. E.g. --yml-path gis and -b TEST. It will process only object TEST.
- Luft installed :) with BigQuery -
pip install luft[bq]
. - Credentials file (usually
service_account.json
) mapped into docker and configured inluft.cfg
.
Inside yaml file, following parameters are supported:
-
name - Any name you want. Used mainly for name in Airflow UI.
-
source_system - only for organizational purposes. In exec has not some special role.
-
source_subsystem - only for organizational purposes. In exec has not some special role.
-
task_type -
bq-load
by default but can be overidden. When overriden it is going to be different kind of task :). -
thread_name - applicable only when used with Airflow. Thread name is automatically genereted based on number of threads. If you need this task to have totally different thread you can specify custom thread name. Eg. I have tasks T1, T2, T3, T4 and T5 in my task list. and thread count set to 3. By default (if no task has thread_name specified) it will look like this in Airflow:
|T1| -> |T4| |T2| -> |T5| |T3|
When I specify any thread_name in task T4:
|T1| -> |T5| |T2| |T3| |T4|
-
color - applicable only when used with Airflow. Hex color of Task in Airflow. If not specified
#03A0F3
is used. -
project_id = BigQuery project id. Default from
luft.cfg
. -
location = BigQuery location. Default from
location.cfg
. -
columns - list of columns to download. Column parameters:
- name - column name.
- type - column type.
- mandatory - wheter column is mandatory. Default false.
- pk - wheter column is primary key. Default false.
- escape - escape name of column with `. Some databases reqire it.
- value - fixed column value. You should never delete any of your columns from yaml file. Instead you should set
value: 'Null'
.
-
dataset_id - Google BigQuery dataset name. If not specified, source_system name is used. It will be created if does not exists.
-
path_prefix - Path prefix (path) on blob storage. You can use following templated fields:
- {env} - your environment (DEV/PROD...)
- {source_system} - name of source system (whatever you like) - in case of jdbc it is usually friendly name of db
- {source_subsystem} - name of source subsystem (whatever you like) - in case of jdbc it is schema name
- {name} - name of table
- {date_valid} - date of valid of export
- {time_valid} - time valid of export
-
skip_leading_rows - whether first row of CSV should be considered header and not loaded. Default True.
-
allow_quoted_newlines - quoted data sections that contain newline characters in a CSV file are allowed. Defaults to True.
-
field_delimiter - how the fields are delimited. Default '\t' (tab).
-
disable_check - by default, the check for number of loader rows into stage schema is enabled. If no data are loaded the error will appear. If you need to disable this check, set this flag to True. Default False.
Run BigQuery sql command from file.
luft bq exec
y
,--yml-path
(mandatory): folder or single yml file inside default tasks folder (see luft.cfg).-s
,--start-date
: Start date in format YYYY-MM-DD for executing task in loop. If not specified yesterday date is used.-e
,--end-date
: End date in format YYYY-MM-DD for executing task in loop. This day is not included. If not specified today date is used.-sys
,--source-system
: override source_system parameter. See description in Task section. Has to be same as name in jdbc.cfg to get right credentials for JDBC database.-sub
,--source-subsystem
: override source_subsystem parameter. See description in Task section.-b
,--blacklist
: Name of tables/objects to be ignored during processing. E.g. --yml-path gis and -b TEST. It will process all objects in gis folder except object TEST.-w
,--whitelist
: Name of tables/objects to be processed. E.g. --yml-path gis and -b TEST. It will process only object TEST.
- Luft installed :) with BigQuery -
pip install luft[bq]
. - Credentials file (usually
service_account.json
) mapped into docker and configured inluft.cfg
.
Inside yaml file, following parameters are supported:
-
name - Any name you want. Used mainly for name in Airflow UI.
-
source_system - only for organizational purposes. In exec has not some special role.
-
source_subsystem - only for organizational purposes. In exec has not some special role.
-
task_type -
bq-load
by default but can be overidden. When overriden it is going to be different kind of task :). -
thread_name - applicable only when used with Airflow. Thread name is automatically genereted based on number of threads. If you need this task to have totally different thread you can specify custom thread name. Eg. I have tasks T1, T2, T3, T4 and T5 in my task list. and thread count set to 3. By default (if no task has thread_name specified) it will look like this in Airflow:
|T1| -> |T4| |T2| -> |T5| |T3|
When I specify any thread_name in task T4:
|T1| -> |T5| |T2| |T3| |T4|
-
color - applicable only when used with Airflow. Hex color of Task in Airflow. If not specified
#73DBF5
is used. -
sql_folder - path of folder where your SQL are located.
-
sql_files - list of SQL files to be executed.
-
project_id = BigQuery project id. Default from
luft.cfg
. -
location = BigQuery location. Default from
location.cfg
.
Inside of SQL you can use shortcuts for some useful variables:
- ENV: Environment. E.g. PROD.
- TASK_TYPE: Task type.
bq-exec
. - NAME: Name from yaml param.
- SOURCE_SYSTEM: Source system.
- SOURCE_SUBSYSTEM: Source subsystem.
- DATE_VALID: Date valid of current run.
- TIME_VALID: Time valid.
- TASK_ID: Id of task.
- THREAD_NAME: Thread name of task.
- YAML_FILE: Yaml file location.
- BQ_PROJECT_ID: BigQuery project id.
- BQ_LOCATION: BigQuery location.
Example:
-- Example of templating
SELECT '{{ BQ_LOCATION }}';
SELECT '{{ BQ_PROJECT_ID }}';
SELECT '{{ DATE_VALID }}';
SELECT '{{ SOURCE_SYSTEM }}';
SELECT '{{ ENV }}';
Export application from Qlik Sense Enterprise, upload it to Qlik Sense cloud and publish it into certain stream.
luft qlik-cloud upload
y
,--yml-path
(mandatory): folder or single yml file inside default tasks folder (see luft.cfg).-s
,--start-date
: Start date in format YYYY-MM-DD for executing task in loop. If not specified yesterday date is used.-e
,--end-date
: End date in format YYYY-MM-DD for executing task in loop. This day is not included. If not specified today date is used.-sys
,--source-system
: override source_system parameter. See description in Task section. Has to be same as name in jdbc.cfg to get right credentials for JDBC database.-sub
,--source-subsystem
: override source_subsystem parameter. See description in Task section.-b
,--blacklist
: Name of tables/objects to be ignored during processing. E.g. --yml-path gis and -b TEST. It will process all objects in gis folder except object TEST.-w
,--whitelist
: Name of tables/objects to be processed. E.g. --yml-path gis and -b TEST. It will process only object TEST.
- Luft installed :) with Qlik Sense CLoud -
pip install luft[qlik-cloud]
. - Installed
google-chrome
andchromedriver
in your Docker image or localhost - see Python Selenium Installation. - Credentials files (
client_key.pem
,client.pem
androot.pem
) mapped into docker and configured inluft.cfg
in[qlik_enterprise]
section. - Set all other configs in
luft.cfg
in sections[qlik_enterprise]
and[qlik_cloud]
.
Inside yaml file, following parameters are supported:
-
name - Any name you want. Used mainly for name in Airflow UI.
-
group_id: Qlik cloud Group ID.
-
source_system - only for organizational purposes. In exec has not some special role.
-
source_subsystem - only for organizational purposes. In exec has not some special role.
-
task_type -
bq-load
by default but can be overidden. When overriden it is going to be different kind of task :). -
thread_name - applicable only when used with Airflow. Thread name is automatically genereted based on number of threads. If you need this task to have totally different thread you can specify custom thread name. Eg. I have tasks T1, T2, T3, T4 and T5 in my task list. and thread count set to 3. By default (if no task has thread_name specified) it will look like this in Airflow:
|T1| -> |T4| |T2| -> |T5| |T3|
When I specify any thread_name in task T4:
|T1| -> |T5| |T2| |T3| |T4|
-
color - applicable only when used with Airflow. Hex color of Task in Airflow. If not specified
#009845
is used. -
apps - list of applications for loading from QSE into certain account on Qlik Sense Cloud. Has following sublists:
- name: name to show in Airflow.
- filename: name of file on file on filesystem.
- qse_id: Qlik Sense Enterprise application id.
- qsc_stream: Qlik Sense Cloud stream name.
Load data from Qlik metric, convert them to json and upload to blob storage.
luft qlik-metric load
y
,--yml-path
(mandatory): folder or single yml file inside default tasks folder (see luft.cfg).-s
,--start-date
: Start date in format YYYY-MM-DD for executing task in loop. If not specified yesterday date is used.-e
,--end-date
: End date in format YYYY-MM-DD for executing task in loop. This day is not included. If not specified today date is used.-sys
,--source-system
: override source_system parameter. See description in Task section. Has to be same as name in jdbc.cfg to get right credentials for JDBC database.-sub
,--source-subsystem
: override source_subsystem parameter. See description in Task section.-b
,--blacklist
: Name of tables/objects to be ignored during processing. E.g. --yml-path gis and -b TEST. It will process all objects in gis folder except object TEST.-w
,--whitelist
: Name of tables/objects to be processed. E.g. --yml-path gis and -b TEST. It will process only object TEST.
- Luft installed :) with Qlik Sense CLoud -
pip install luft[qlik-metric]
. - Credentials files (
client_key.pem
,client.pem
androot.pem
) mapped into docker and configured inluft.cfg
in[qlik_enterprise]
section. - Set all other configs in
luft.cfg
in sections[qlik_enterprise]
.
Inside yaml file, following parameters are supported:
-
name - Any name you want. Used mainly for name in Airflow UI.
-
source_system - only for organizational purposes. In exec has not some special role.
-
source_subsystem - only for organizational purposes. In exec has not some special role.
-
task_type -
bq-load
by default but can be overidden. When overriden it is going to be different kind of task :). -
thread_name - applicable only when used with Airflow. Thread name is automatically genereted based on number of threads. If you need this task to have totally different thread you can specify custom thread name. Eg. I have tasks T1, T2, T3, T4 and T5 in my task list. and thread count set to 3. By default (if no task has thread_name specified) it will look like this in Airflow:
|T1| -> |T4| |T2| -> |T5| |T3|
When I specify any thread_name in task T4:
|T1| -> |T5| |T2| |T3| |T4|
-
color - applicable only when used with Airflow. Hex color of Task in Airflow. If not specified
#009845
is used. -
app_id - Qlik application id.
-
dimensions - list. List of field names.
-
measures - list. List of Master Measure names.
-
selections - List of selection dictionaries to filter data.
First you need to create config file luft.cfg
according to example in example/config/luft.cfg
and place it into root folder. If you want to use BigQuery and Google Cloud Storage you of course need credentials for it - GC authentication. In case of AWS S3 you need to get AWS Access Key ID
and AWS Secret Access Key
.
Credentials (GCS, AWS, BigQuery) can be specified by three ways:
WARNING: this possibility is recommended only for local development. Because if you publish image to public repository, everybody will know your secrets |
---|
EMBULK_COMMAND=embulk
LUFT_CONFIG=example/config/luft.cfg
JDBC_CONFIG=example/config/jdbc.cfg
TASKS_FOLDER=example/tasks
BLOB_STORAGE=gcs
GCS_BUCKET=
GCS_AUTH_METHOD=json_key
GCS_JSON_KEYFILE=
BQ_PROJECT_ID=
BQ_CREDENTIALS_FILE=
BQ_LOCATION=US
AWS_BUCKET=
AWS_ENDPOINT=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
And then run your Docker command with this enviroment file:
docker run -it -rm --env-file .env luft
This variant is prefered.
docker run -it -rm -e BLOB_STORAGE=gcs luft
For example purposes just copy jdbc.cfg
from example/config/
into root folder or set JDBC_CONFIG
in your .env
file or by -e
parameter.
Just run:
docker build -t luft .
docker run -d -p 5432:5432 aa8y/postgres-dataset:world
Store example data from postgres database in S3 or GCS.
docker run -rm luft jdbc load -y world
Optionally if you have configured BigQuery in your luft.cfg
you can run:
docker run -rm luft bq exec -y bq