Skip to content
This repository has been archived by the owner on Feb 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1 from ustudio/initial-implementation
Browse files Browse the repository at this point in the history
Initial implementation
  • Loading branch information
spiralman committed Apr 12, 2019
2 parents b612096 + 88088ea commit 63b5c1d
Show file tree
Hide file tree
Showing 21 changed files with 1,049 additions and 2 deletions.
101 changes: 101 additions & 0 deletions .circleci/config.yml
@@ -0,0 +1,101 @@
version: 2
jobs:
test:
docker:
- image: circleci/python:3.7
- image: circleci/postgres:9.6-alpine
- image: circleci/mongo:3.6-ram

working_directory: ~/repo

environment:
MONGODB_URI: mongodb://localhost/testing
SQL_URI: postgresql://postgres@localhost/testing

steps:
- checkout

- restore_cache:
keys:
- v1-dependencies-{{ checksum "requirements.txt" }}
- v1-dependencies-

- run:
name: install dependencies
command: |
python3 -m venv ~/venv
. ~/venv/bin/activate
pip install -r requirements.txt
mkdir -p test-reports
- save_cache:
paths:
- ~/venv
key: v1-dependencies-{{ checksum "requirements.txt" }}

- run:
name: Wait for SQL
command: dockerize -wait tcp://localhost:5432 -timeout 1m

- run:
name: Run Tests
command: |
. ~/venv/bin/activate
nosetests --verbose --with-xunit --xunit-file=test-reports/nosetests.xml
- run:
name: Run Linter
command: |
. ~/venv/bin/activate
flake8
- store_artifacts:
path: test-reports

- store_test_results:
path: test-reports
publish:
docker:
- image: circleci/python:3.7
working_directory: ~/repo
steps:
- checkout

- restore_cache:
keys:
- v1-publish-dependencies-

- run:
name: install dependencies
command: |
python3 -m venv ~/venv
. ~/venv/bin/activate
pip install twine
- save_cache:
paths:
- ~/venv
key: v1-publish-dependencies-

- run:
name: Publish to PyPI
command: |
. ~/venv/bin/activate
./publish_to_pypi.sh
workflows:
version: 2
test-and-build:
jobs:
- test:
filters:
tags:
only: /.*/
- publish:
requires:
- test
filters:
tags:
only: /^v[0-9]+(\.[0-9]+)*.*/
branches:
ignore: /.*/
context: org-global
21 changes: 21 additions & 0 deletions LICENSE
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2019 uStudio

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
174 changes: 172 additions & 2 deletions README.md
@@ -1,2 +1,172 @@
# ustack-etl
Framework for writing ETL processes, specifically from MongoDB to SQL
# ustack-etl #

Framework for writing ETL processes, specifically from MongoDB to SQL.

## Usage ##

ETL processes are defined by one or more `CollectionETL` sub-classes,
which are responsible for inserting records into the SQL database for
a given document in a MongoDB collection.

### Defining the ETL process for a Collection ###

The ETL process for a MongoDB collection is defined in a sub-class of
`ustack_etl.etl.CollectionETL`. The sub-class must define:

* `mongo_collection_name`, which is the name of the MongoDB collection
to be extracted
* `sql_metadata`, which is a SQLAlchemy `MetaData` instance,
containing all the SQL tables that will be written to by the
sub-class
* `process_document`, which transforms the given document and loads it
into SQL using the provided SQL connection.

#### Example: ####

```python
import sqlalchemy

from ustack_etl.etl import CollectionETL


metadata = sqlalchemy.MetaData()
my_table = sqlalchemy.Table("my_table", metadata)


class MyCollectionETL(CollectionETL):
mongo_collection_name = "my_collection"
sql_metadata = metadata

def process_document(self, sql_connection, my_document):
sql_conn.execute(my_table.insert().values(
# ...
)
```

### Passing State Between ETL Processes ###

Sometimes, it is necessary for the ETL process for one collection to
build up a cache of known state, in order for another collection's ETL
process to write de-normalized data to SQL (perhaps for easier or
faster querying in your analytics or BI tool).

The `CollectionETL` which produces this cached state can specify this
by adding another member variable, named `produces`, which is a tuple
of the state name, and a function (of zero arguments) which will be
used to construct an empty state variable.

Any `CollectionETL` sub-classes which need to access this state can
include a member named `consumes`, which is set to the string name of
the state, as defined by the producer.

The ETL class will ensure that the collections are ETLed in the
correct order, so that the cache will have been generated before the
consumers need to use it.

**Note** that, as of now, only one level of dependencies is
supported. If a `CollectionETL` sub-class declares that it both
`produces` and `consumes` state, an error will be raised.

#### Example: ####

```python
class MyProducer(CollectionETL):
# Other attributes...
produces = ("state_name", dict)

def process_document(self, sql_conn, my_document):
self.state_name["some_key"] = my_document["some_value"]
# ...

class MyConsumer(CollectionETL):
# Other attributes...
consumes = "state_name"

def process_document(self, sql_conn, my_document):
cached_value = self.state_name["some_key"]
# ...
```

### Initiating the ETL Process ###

The ETL process is initiated by constructing an instance of
`ustack_etl.etl.ETL`, adding the `CollectionETL` classes to it, and
invoking `perform_etl`.

#### Example ####

```python
from ustack_etl.etl import ETL

etl = ETL(mongo_db, sql_engine)

etl.add_collection(MyCollection)
etl.add_collection(OtherCollection)

etl.perform_etl()
```

**Note** that the `mongo_db` and `sql_engine` instances must have a
default database set on them.

### Testing CollectionETL Sub-classes ###

Some convenience methods are provided to make it easier to test
`CollectionETL` sub-classes, in the `ustack_etl.testing` module.

Your `TestCase` can inherit from `ETLTestCase`, and define an
`add_collection` method, which will be invoked in the `setUp` to add
the `CollectionETL` sub-classes needed to test.

Within the tests, invoke `self.perform_etl()` to initiate the ETL
process, and use `self.mongo_db` and `self.sql_engine` to access the
databases. The tests will connect to the databases defined in the
`MONGODB_URI` and `SQL_URI` environment variables.

#### Example ####

```python
from ustack_etl.testing import ETLTestCase


class TestMyCollection(ETLTestCase):
def add_collections(self, etl):
etl.add_collection(MyCollection)

def test_something(self):
self.mongo_db.mycollection.insert_many([
# ...
])

self.perform_etl()

self.sql_engine.execute("select * from mytable")

# Assert invariants...
```

#### Testing Producers and Consumers ####

There are two classes, `MockProducer` and `MockConsumer` which can be
used when testing `CollectionETL` sub-classes that produce or consume
state.

Use `MockConsumer` to test your producer by inspecting the state that
it received.

Use `MockProducer` to test your consumers by injecting the state that
they require.

## Developing ##

The unit tests assume that you are running a local SQL and MongoDB
instance, and will connect to them using the OS environment variables
MONGODB_URI and SQL_URI.

If you are running a local Kubernetes cluster using `minikube`, you
can run `./start-databases.sh` to create the appropriate databases,
then run `./nosetests` to run the unit tests against the databases
running inside minikube.

If you wish to run your own instances of MongoDB and SQL, you can set
the environment variables appropriately.
16 changes: 16 additions & 0 deletions dev-deployment/platform-etl-mongodb-service.yaml
@@ -0,0 +1,16 @@
kind: Service
apiVersion: v1
metadata:
name: ustack-etl-mongodb
labels:
app: ustack-etl-testing
role: mongo-db
namespace: ustack-etl-testing
spec:
ports:
- port: 27017
targetPort: mongo
selector:
app: ustack-etl-testing
role: mongo-db
type: NodePort
29 changes: 29 additions & 0 deletions dev-deployment/platform-etl-mongodb.yaml
@@ -0,0 +1,29 @@
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
name: ustack-etl-mongodb
labels:
app: ustack-etl-testing
role: mongo-db
namespace: ustack-etl-testing
spec:
replicas: 1
template:
metadata:
labels:
app: ustack-etl-testing
role: mongo-db
spec:
containers:
- name: mongo
image: mongo:3.4
imagePullPolicy: IfNotPresent
ports:
- name: mongo
containerPort: 27017
volumeMounts:
- name: mongo-persistent-storage
mountPath: /data/db
volumes:
- name: mongo-persistent-storage
emptyDir: {}
16 changes: 16 additions & 0 deletions dev-deployment/platform-etl-postgres-service.yaml
@@ -0,0 +1,16 @@
kind: Service
apiVersion: v1
metadata:
name: ustack-etl-postgres
labels:
app: ustack-etl-testing
role: postgres-db
namespace: ustack-etl-testing
spec:
ports:
- port: 5432
targetPort: postgres
selector:
app: ustack-etl-testing
role: postgres-db
type: NodePort
32 changes: 32 additions & 0 deletions dev-deployment/platform-etl-postgres.yaml
@@ -0,0 +1,32 @@
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
name: ustack-etl-postgres
labels:
app: ustack-etl-testing
role: postgres-db
namespace: ustack-etl-testing
spec:
replicas: 1
template:
metadata:
labels:
app: ustack-etl-testing
role: postgres-db
spec:
containers:
- name: postgres
image: postgres:9.6-alpine
imagePullPolicy: IfNotPresent
env:
- name: POSTGRES_DB
value: ustack-etl-testing
ports:
- name: postgres
containerPort: 5432
volumeMounts:
- name: postgres-persistent-storage
mountPath: /var/lib/postgresql/data
volumes:
- name: postgres-persistent-storage
emptyDir: {}

0 comments on commit 63b5c1d

Please sign in to comment.