Skip to content
This repository has been archived by the owner on Nov 8, 2021. It is now read-only.

Commit

Permalink
Add more tutorial text and update example readme (#16)
Browse files Browse the repository at this point in the history
* Add first part of tutorial

* Change example/README.md to reflect current logic

* Fix linting issues
  • Loading branch information
villebro committed Nov 6, 2019
1 parent 667158d commit 7d14237
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 34 deletions.
4 changes: 2 additions & 2 deletions docs/classes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ Classes
.. autoclass:: sqltask.base.engine.EngineContext
:members:

.. autoclass:: sqltask.base.table.TableContext
.. autoclass:: sqltask.base.table.BaseTableContext
:members:

.. autoclass:: sqltask.base.table.OutputRow
.. autoclass:: sqltask.base.table.BaseOutputRow
:members:

.. autoclass:: sqltask.base.table.DqTableContext
Expand Down
103 changes: 96 additions & 7 deletions docs/tutorial.rst
Original file line number Diff line number Diff line change
@@ -1,11 +1,46 @@
Tutorial - Creating a simple ETL task
=====================================

When creating a task, you should extend the SqlTask class, creating a
constructor that takes the parameters required for running a single idempotent
task. For a regular batch task this is usully the reporting date. It is also
perfectly fine to have no parameters; this is generally the case when populating
static dimension tables.
This tutorial shows how to construct a typical ETL task. A fully functional
example can be found in the main repo of SqlTask:
`<https://github.com/villebro/sqltask/tree/master/example>`_

Introduction
------------

When creating a task, you will start by extending the `sqltask.SqlTask` class.
A task constitutes the the sequential execution of the following stages defined
as methods:

1. `__init__(**kwargs)`: define the target table(s), row source(s) and lookup
source(s). `kwargs` denote the batch parameters based on which a single snapshot
is run, e.g. `report_date=date(2019, 12, 31)`.
2. `transform()`: All transformation operations, i.e. reading inputs row-by-row and
mapping values (transformed or not) to the output columns. During transformation
data can be read from multiple sources, and can be mapped to multiple output tables,
depending on what the transformation logic is. During transformation row-by-row
data quality issues can be logged to the output table if using the `DqTableContext`
target table class.
3. `validate()`: Post transformation data validation step, where the final output rows
can be validated prior to insertion. In contrast to the data quality logging in
the transform phase, validation should be done on an aggregate level, i.e. checking
that row counts are in line with what is acceptable, null counts are acceptable
etc.
4. `delete_rows()`: If an exception hasn't been raised before this step, the rows
corresponding to the batch parameters will be deleted from the target table. If
the task is defined to have one batch parameter `report_date`, this step in
practice issues a `DELETE FROM tbl WHERE report_date = 'yyyy-mm-dd'` statement.
5. `insert_rows()`: This step inserts any rows that have been appended to the
output tables using whichever insertion mode has been specified. Generic
SqlAlchemy drivers will fall back to single or multirow inserts if supported,
but engines with dedicated upload support will perform file-based uploading.

Base task
---------

For DAGs consisting of multiple tasks, it is commonly a good idea to create a base
task on which all tasks in the DAG are based, fixing the batch parameters in the
constructor as follows:

.. code-block:: python
Expand All @@ -14,8 +49,62 @@ static dimension tables.
from sqltask import SqlTask
class MyEtlTask(SqlTask):
class MyBaseTask(SqlTask):
def __init__(report_date: date):
super().__init__(report_date: report_date)
asdf
This way developers will have less ambiguity on which parameters the DAG tasks are
based on. For a regular batch task this is usully the date of the snapshot in question.
It is also perfectly fine to have no parameters or multiple parameters. Typical
scenarios:

- No parameters: Initialization of static dimension tables
- Single parameter: Calculation of a single snapshot, typically the snapshot date
- Multiple parameters: If data is further partitioned, it might be feasible to
split up the calculation into further batches, e.g. per region, per hour.

In this example, the the unit of work for the task constitutes creating a single
snapshot for a certain `report_date`.

Creating an actual task
-----------------------

In the following example, we will construct a task that outputs data into a single
target table, reads data from a SQL query and uses a CSV table as a lookup table.
The class is based on `MyBaseTask` defined above. We will do the following

- Define a target table `my_table` based on `DqTableContext` into which data is
inserted.
- Define a `SqlRowSource` instance that reads data from a SQL query.
- Define a `CsvLookupSource` instance that is used as a lookup table.

We have chosed `DqTableContext` as our target table class, as it can be used for
logging data quality issues. If we have our primary row data in CSV format, we
could also have used a `CsvRowSource` instance as the primary data source. Similarly
we could also use `SqlLookupSource` to construct our lookup table from a SQL query.

.. code-block:: python
class MyTask(MyBaseTask):
def __init__(self, report_date: date):
super().__init__(report_date)
# Define the metadata for the main fact table
self.add_table(DqTableContext(
name="my_table",
engine_context=self.ENGINE_TARGET,
columns=[
Column("report_date", Date, comment="Date of snapshot", primary_key=True),
Column("etl_timestamp", DateTime, comment="Timestamp when the row was created", nullable=False),
Column("customer_name", String(10), comment="Unique customer identifier (name)", primary_key=True),
Column("birthdate", Date, comment="Birthdate of customer if defined and in the past", nullable=True),
Column("age", Integer, comment="Age of customer in years if birthdate defined", nullable=True),
Column("blood_group", String(3), comment="Blood group of the customer", nullable=True),
],
comment="The customer table",
timestamp_column_name="etl_timestamp",
batch_params={"report_date": report_date},
dq_info_column_names=["etl_timestamp"],
))
TBC
48 changes: 27 additions & 21 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,37 @@
A data analyst/engineer has the task of populating a customer table.
The final customer table is expected to look like this:

| report_date|etl_timestamp|customer_id|birthdate|age|sector_code |
| report_date|etl_timestamp|customer_id|birthdate|age|blood_group |
| ---- | ---| --- | --- | --- | --- |
||


The task will consist of joining two source queries:
* customers
* sector_code
* customer_blood_groups

Blood groups are to be validated via a lookup table of valid blood groups:
* valid_blood_groups.csv

sink table is
The sink table is
* fact_customer

As part of the ETL process a data quality table is created:
* customer_dq


## Setting up sqlalchemy engines

In the example database urls are retrieved from environment variables,
defaulting to `sqlite` (can be replaced by any SqlAlchemy connection
string):
defaulting to `sqlite`. This can be replaced by any SqlAlchemy connection
that supports ANSI SQL syntax:

### bash
### bash (Mac, Linux)

```bash
export SQLTASK_SOURCE="sqlite:///source.db"
export SQLTASK_TARGET="sqlite:///target.db"
```

### powershell
### powershell (Windows)

```powershell
$env:SQLTASK_TARGET = "sqlite:///target.db"
Expand All @@ -58,12 +59,13 @@ echo "select * from fact_customer;" | sqlite3 --column --header target.db
```


|report_date|etl_timestamp|customer_id|birthdate|age|sector_code|
|report_date|etl_timestamp|customer_id|birthdate|age|blood_group|
| --- | --- | --- | --- | --- | --- |
|2019-06-30|2019-10-09 13:28:06.081927|1234567|1980-01-01|39|111211|
|2019-06-30|2019-10-09 13:28:06.090929|2345678|||143|
|2019-06-30|2019-10-09 13:28:06.090929|2245678||||
|2019-06-30|2019-10-09 13:28:06.090929|3456789||||
2019-06-30|2019-11-06 05:59:52.380735|Terminator|||
2019-06-30|2019-11-06 05:59:52.380654|Sarah Connor|1956-09-26|62|A+
2019-06-30|2019-11-06 05:59:52.380425|Peter Impossible|||
2019-06-30|2019-11-06 05:59:52.380202|Mary Null|||
2019-06-30|2019-11-06 05:59:52.378324|John Connor|||A-

### Table fact_customer_dq

Expand All @@ -76,10 +78,14 @@ echo "select * from fact_customer_dq;" | sqlite3 --column --header target.db

|report_date|customer_id|rowid|source|priority|category|column_name|message|
| --- | --- | --- | --- | --- | --- | --- | --- |
2019-06-30|2245678|4a167eb2-34d5-4473-af18-e4238dedf2e3|source|high|incorrect|birthdate|Cannot parse birthdate: 1980-13-01
2019-06-30|2245678|654326b4-3442-431e-85c7-525d1ffea97a|transform|medium|missing|age|Age is undefined due to undefined birthdate
2019-06-30|2245678|6ed6d51c-2d70-4f12-91d8-6d86fd6d14b3|source|low|missing|sector_code|Sector code undefined in lookup table
2019-06-30|2345678|e307dfd0-7b01-4fcc-b09e-7f52aaf4820a|source|high|incorrect|birthdate|Birthdate in future: 2080-01-01
2019-06-30|2345678|b16c9337-abdd-4dce-b03b-bbddc6dca875|transform|medium|missing|age|Age is undefined due to undefined birthdate
2019-06-30|3456789|ff04ebef-76f1-48e9-a314-2eb05e9f9c41|source|medium|missing|birthdate|Missing birthdate
2019-06-30|3456789|ab197fa2-9c69-4bca-acd4-ebafc5eacbda|transform|medium|missing|age|Age is undefined due to undefined birthdate
2019-06-30|Terminator|2019-11-06 05:59:52.380884|source|high|incorrect|blood_group|Invalid blood group: Liquid Metal
2019-06-30|Terminator|2019-11-06 05:59:52.380825|transform|medium|missing|age|Age is undefined due to undefined
2019-06-30|Terminator|2019-11-06 05:59:52.380767|source|high|incorrect|birthdate|Birthdate in future: 2095-01-01
2019-06-30|Peter Impossible|2019-11-06 05:59:52.380575|source|high|incorrect|blood_group|Invalid blood group: X+
2019-06-30|Peter Impossible|2019-11-06 05:59:52.380516|transform|medium|missing|age|Age is undefined due to undefined's blood group table
2019-06-30|Peter Impossible|2019-11-06 05:59:52.380459|source|high|incorrect|birthdate|Cannot parse birthdate: 1980-13-01
2019-06-30|Mary Null|2019-11-06 05:59:52.380341|source|medium|missing|blood_group|Blood group undefined in customer's blood group table
2019-06-30|Mary Null|2019-11-06 05:59:52.380280|transform|medium|missing|age|Age is undefined due to undefined birthdate
2019-06-30|Mary Null|2019-11-06 05:59:52.380219|source|medium|missing|birthdate|Missing birthdate
2019-06-30|John Connor|2019-11-06 05:59:52.378454|transform|medium|missing|age|Age is undefined due to undefined birthdate
2019-06-30|John Connor|2019-11-06 05:59:52.378385|source|high|incorrect|birthdate|Birthdate in future: 2080-01-01
5 changes: 3 additions & 2 deletions sqltask/base/lookup_source.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Any, Dict, Optional, Sequence, Tuple
from typing import Any, cast, Dict, Optional, Sequence, Tuple

from sqltask.base.row_source import BaseRowSource

Expand Down Expand Up @@ -58,6 +58,7 @@ def get(self, *unnamed_keys, **named_keys) -> Dict[str, Any]:
if self._store is None:
self._init_store()

store = cast(Dict[Tuple, Any], self._store)
if len(unnamed_keys) + len(named_keys) != len(self.keys):
raise Exception(f"Incorrect key count: expected {len(self.keys)} keys, "
f"got {len(unnamed_keys) + len(named_keys)}")
Expand All @@ -66,4 +67,4 @@ def get(self, *unnamed_keys, **named_keys) -> Dict[str, Any]:
if key not in named_keys:
raise Exception(f"Key not in lookup: {key}")
keys.append(named_keys[key])
return self._store.get(tuple(keys), {})
return store.get(tuple(keys), {})
4 changes: 2 additions & 2 deletions sqltask/engine_specs/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from enum import Enum
from typing import Optional, Set
from typing import Any, Dict, List, Optional, Set

from sqlalchemy.engine.url import URL
from sqlalchemy.schema import Column
Expand Down Expand Up @@ -60,7 +60,7 @@ def _insert_rows_sql_insert(cls, table_context: "BaseTableContext") -> None:
raise Exception(f"SQL INSERT not supported by `{cls.__name__}`")
with table_context.engine_context.engine.begin() as conn:
while table_context.output_rows:
insert_chunk = []
insert_chunk: List[Dict[str, Any]] = []
while table_context.output_rows and \
len(insert_chunk) < cls.insert_chunksize:
insert_chunk.append(table_context.output_rows.pop())
Expand Down

0 comments on commit 7d14237

Please sign in to comment.