Skip to content

Commit

Permalink
Run autopep8 on everything; flake8 fixes everywhere; create "make tes…
Browse files Browse the repository at this point in the history
…t" command; add docker-compose.yaml
  • Loading branch information
jayrbolton committed Oct 25, 2019
1 parent 29a4cec commit bb65ab9
Show file tree
Hide file tree
Showing 34 changed files with 821 additions and 619 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.PHONY: test

test:
pipenv run sh scripts/run_tests.sh
16 changes: 6 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,21 @@ Build status (master):

## Setup

For Ubuntu (and possibly other distributions) ensure that the appropriate `python-dev` package
is installed, e.g. `sudo apt install python3.7-dev`.
Install Python 3.7, preferably using pyenv: https://github.com/pyenv/pyenv

With [pipenv](https://github.com/pypa/pipenv) installed, run:
Then install [pipenv](https://github.com/pypa/pipenv) and run:

```sh
pipenv install
```

Alternatively, you can use [pyenv](https://github.com/pyenv/pyenv) to manage your python
installations.

## Running tests

To run tests, arangodb must be running locally on the default port with default root credentials.
Then from the repository root:
```
$ pipenv shell
$ export PYTHONPATH=$(pwd):$(pwd)/src/; pytest

```sh
make test
```

## Standard loader usage
Expand Down Expand Up @@ -211,4 +207,4 @@ one-off scripts there.
* Merge edges are never expired.
* Currently only handles merge edges where
* The merged node was present in the prior load and
* The target node is present in the current load.
* The target node is present in the current load.
12 changes: 12 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: '3'

# This docker-compose is for developer convenience, not for running in production.

services:

# Arangodb server in cluster mode
arangodb:
image: arangodb:3.5
ports:
- "127.0.0.1:8529:8529"
command: sh -c "arangodb --starter.local"
83 changes: 56 additions & 27 deletions relation_engine/batchload/delta_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@

# TODO TEST
# TODO DOCS document reserved fields that will be overwritten if supplied
# TODO CODE add notification callback so that the caller can implement % complete or logs or whatever based on what's happening in the delta load algorithm. Remove _VERBOSE prints at that point
# TODO CODE add notification callback so that the caller can implement %
# complete or logs or whatever based on what's happening in the delta load
# algorithm. Remove _VERBOSE prints at that point

# TODO CODE consider threading / multiprocessing here. Virtually all the time is db access

_VERBOSE = False
_ID = 'id'
_KEY = '_key'


def load_graph_delta(
load_namespace,
vertex_source,
Expand Down Expand Up @@ -67,30 +70,34 @@ def load_graph_delta(
db = database
if merge_source and not db.get_merge_collection():
raise ValueError('A merge source is specified but the database ' +
'has no merge collection')
'has no merge collection')
db.register_load_start(
load_namespace, load_version, timestamp, release_timestamp, _get_current_timestamp())

_process_verts(db, vertex_source, timestamp, release_timestamp, load_version, batch_size)
if merge_source:
_process_merges(db, merge_source, timestamp, release_timestamp, load_version, batch_size)

if _VERBOSE: print(f'expiring vertices: {_time.time()}')

if _VERBOSE:
print(f'expiring vertices: {_time.time()}')
db.expire_extant_vertices_without_last_version(
timestamp - 1, release_timestamp - 1, load_version)

_process_edges(db, edge_source, timestamp, release_timestamp, load_version, batch_size)

if _VERBOSE: print(f'expiring edges: {_time.time()}')

if _VERBOSE:
print(f'expiring edges: {_time.time()}')
for col in db.get_edge_collections():
db.expire_extant_edges_without_last_version(
timestamp - 1, release_timestamp -1, load_version, edge_collection=col)
timestamp - 1, release_timestamp - 1, load_version, edge_collection=col)

db.register_load_complete(load_namespace, load_version, _get_current_timestamp())


def _get_current_timestamp():
return int(_dt.datetime.now(tz=_dt.timezone.utc).timestamp() * 1000)


def _process_verts(db, vertex_source, timestamp, release_timestamp, load_version, batch_size):
"""
For each vertex we're importing, either replace and expire an existing vertex, create a
Expand All @@ -99,12 +106,15 @@ def _process_verts(db, vertex_source, timestamp, release_timestamp, load_version
count = 1
for vertgen in _chunkiter(vertex_source, batch_size):
vertices = list(vertgen)
if _VERBOSE: print(f'vertex batch {count}: {_time.time()}')
if _VERBOSE:
print(f'vertex batch {count}: {_time.time()}')
count += 1
keys = [v[_ID] for v in vertices]
if _VERBOSE: print(f' looking up {len(keys)} vertices: {_time.time()}')
if _VERBOSE:
print(f' looking up {len(keys)} vertices: {_time.time()}')
dbverts = db.get_vertices(keys, timestamp)
if _VERBOSE: print(f' got {len(dbverts)} vertices: {_time.time()}')
if _VERBOSE:
print(f' got {len(dbverts)} vertices: {_time.time()}')
bulk = db.get_batch_updater()
for v in vertices:
dbv = dbverts.get(v[_ID])
Expand All @@ -116,9 +126,11 @@ def _process_verts(db, vertex_source, timestamp, release_timestamp, load_version
else:
# mark node as seen in this version
bulk.set_last_version_on_vertex(dbv[_KEY], load_version)
if _VERBOSE: print(f' updating {bulk.count()} vertices: {_time.time()}')
if _VERBOSE:
print(f' updating {bulk.count()} vertices: {_time.time()}')
bulk.update()


def _process_merges(db, merge_source, timestamp, release_timestamp, load_version, batch_size):
"""
For each merge edge, if both vertices exist in the current graph (it is expected that vertices
Expand All @@ -129,12 +141,15 @@ def _process_merges(db, merge_source, timestamp, release_timestamp, load_version
count = 1
for mergen in _chunkiter(merge_source, batch_size):
merges = list(mergen)
if _VERBOSE: print(f'merge batch {count}: {_time.time()}')
if _VERBOSE:
print(f'merge batch {count}: {_time.time()}')
count += 1
keys = list({m['from'] for m in merges} | {m['to'] for m in merges})
if _VERBOSE: print(f' looking up {len(keys)} vertices: {_time.time()}')
if _VERBOSE:
print(f' looking up {len(keys)} vertices: {_time.time()}')
dbverts = db.get_vertices(keys, timestamp)
if _VERBOSE: print(f' got {len(dbverts)} vertices: {_time.time()}')
if _VERBOSE:
print(f' got {len(dbverts)} vertices: {_time.time()}')
bulk = db.get_batch_updater(db.get_merge_collection())
vertbulk = db.get_batch_updater()
for m in merges:
Expand All @@ -147,12 +162,16 @@ def _process_merges(db, merge_source, timestamp, release_timestamp, load_version
vertbulk.expire_vertex(dbmerged[_KEY], timestamp - 1, release_timestamp - 1)
bulk.create_edge(
m[_ID], dbmerged, dbtarget, load_version, timestamp, release_timestamp, m)
if _VERBOSE: print(f' updating {bulk.count()} edges: {_time.time()}')
if _VERBOSE:
print(f' updating {bulk.count()} edges: {_time.time()}')
bulk.update()
if _VERBOSE: print(f' updating {vertbulk.count()} vertices: {_time.time()}')
if _VERBOSE:
print(f' updating {vertbulk.count()} vertices: {_time.time()}')
vertbulk.update()

# assumes verts have been processed


def _process_edges(db, edge_source, timestamp, release_timestamp, load_version, batch_size):
"""
For each edge we're importing, either replace and expire an existing edge, create a
Expand All @@ -161,7 +180,8 @@ def _process_edges(db, edge_source, timestamp, release_timestamp, load_version,
count = 1
for edgegen in _chunkiter(edge_source, batch_size):
edges = list(edgegen)
if _VERBOSE: print(f'edge batch {count}: {_time.time()}')
if _VERBOSE:
print(f'edge batch {count}: {_time.time()}')
count += 1
keys = _defaultdict(list)
bulkset = {}
Expand All @@ -178,16 +198,20 @@ def _process_edges(db, edge_source, timestamp, release_timestamp, load_version,
bulkset[col] = db.get_batch_updater(col)
dbedges = {}
for col, keys in keys.items():
if _VERBOSE: print(f' looking up {len(keys)} edges in {col}: {_time.time()}')
if _VERBOSE:
print(f' looking up {len(keys)} edges in {col}: {_time.time()}')
dbedges[col] = db.get_edges(keys, timestamp, edge_collection=col)
if _VERBOSE: print(f' got {len(dbedges[col])} edges: {_time.time()}')

if _VERBOSE:
print(f' got {len(dbedges[col])} edges: {_time.time()}')

# Could cache these, may be fetching the same vertex over and over, but no guarantees
# the same vertexes are repeated in a reasonable amount of time
# Batching the fetch is probably enough
if _VERBOSE: print(f' looking up {len(vertkeys)} vertices: {_time.time()}')
if _VERBOSE:
print(f' looking up {len(vertkeys)} vertices: {_time.time()}')
dbverts = db.get_vertices(list(vertkeys), timestamp)
if _VERBOSE: print(f' got {len(dbverts)} vertices: {_time.time()}')
if _VERBOSE:
print(f' got {len(dbverts)} vertices: {_time.time()}')
keys = None
vertkeys = None

Expand All @@ -201,7 +225,7 @@ def _process_edges(db, edge_source, timestamp, release_timestamp, load_version,
to = dbverts[e['to']]
if dbe:
if (not _special_equal(e, dbe) or
# these two conditions check whether the nodes the edge is attached to
# these two conditions check whether the nodes the edge is attached to
# have been updated this load
# This is an abstraction leak, bleah
dbe['_from'] != from_['_id'] or
Expand All @@ -218,6 +242,7 @@ def _process_edges(db, edge_source, timestamp, release_timestamp, load_version,
print(f' updating {b.count()} edges in {b.get_collection()}: {_time.time()}')
b.update()


# TODO CODE these fields are shared between here and the database. Should probably put them somewhere in common.
# same with the id and _key fields in the code above
# arango db api is leaking a bit here, but the chance we're going to rewrite this for something
Expand All @@ -226,6 +251,7 @@ def _process_edges(db, edge_source, timestamp, release_timestamp, load_version,
'release_created', 'release_expired',
'first_version', 'last_version']


def _special_equal(doc1, doc2):
"""
Checks if two dicts are equal other than special fields.
Expand All @@ -236,8 +262,9 @@ def _special_equal(doc1, doc2):
for f in _SPECIAL_EQUAL_IGNORED_FIELDS:
d1c.pop(f, None)
d2c.pop(f, None)

return d1c == d2c

return d1c == d2c


def _chunkiter(iterable, size):
"""
Expand All @@ -248,6 +275,8 @@ def _chunkiter(iterable, size):
yield _itertools.chain([first], _itertools.islice(iterator, size - 1))

# TODO CODE fields here shared with the DB. Put them somewhere in common.


def roll_back_last_load(database, load_namespace):
"""
Removes the most recent data load to a namespace and reverts it to the prior state.
Expand All @@ -270,7 +299,7 @@ def roll_back_last_load(database, load_namespace):
collections = loads[0]['edge_collections'] + [loads[0]['vertex_collection']]
if loads[0]['merge_collection']:
collections.append(loads[0]['merge_collection'])

db = database.get_instance(
loads[0]['vertex_collection'],
edge_collections=loads[0]['edge_collections'],
Expand All @@ -285,5 +314,5 @@ def roll_back_last_load(database, load_namespace):
db.delete_created_documents(c, timestamp)
db.undo_expire_documents(c, timestamp - 1)
db.reset_last_version(c, current_ver, prior_ver)

db.delete_registered_load(load_namespace, current_ver)
30 changes: 18 additions & 12 deletions relation_engine/batchload/load_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
"""
Utilities for loading graph data into the relation engine.
"""

Expand All @@ -8,6 +8,8 @@
import unicodedata

# assumes there's at least one non-whitespace char in string


def canonicalize(string, ignore_tokens):
"""
Canonicalizes a string by:
Expand Down Expand Up @@ -36,11 +38,13 @@ def canonicalize(string, ignore_tokens):
ret.append(t)
return ret


# TODO CODE this should probably be a parameter? YAGNI for now
# see https://www.arangodb.com/2018/07/time-traveling-with-graph-databases/
# in unix epoch ms this is 2255/6/5
_MAX_ADB_INTEGER = 2**53 - 1


def process_nodes(nodeprov, load_version, timestamp, nodes_out):
"""
Process graph nodes from a provider into a JSON load file for a batch time travelling load.
Expand All @@ -63,15 +67,16 @@ def process_nodes(nodeprov, load_version, timestamp, nodes_out):
'last_version': load_version,
'created': timestamp,
'expired': _MAX_ADB_INTEGER
})
})
nodes_out.write(json.dumps(n) + '\n')


def process_edge(edge, load_version, timestamp):
"""
Note that this funtion modifies the edge argument in place.
Process a graph edge for a batch time travelling load.
Adds appropriate fields to the edge.
Adds appropriate fields to the edge.
This function is only suitable for the initial load in the time travelling database.
Further loads must use a delta load algorithm.
Expand All @@ -89,16 +94,17 @@ def process_edge(edge, load_version, timestamp):
Returns - the updated edge as a dict.
"""
edge.update({
'_key': edge['id'] + '_' + load_version,
'_from': edge['from'] + '_' + load_version,
'_to': edge['to'] + '_' + load_version,
'first_version': load_version,
'last_version': load_version,
'created': timestamp,
'expired': _MAX_ADB_INTEGER,
})
'_key': edge['id'] + '_' + load_version,
'_from': edge['from'] + '_' + load_version,
'_to': edge['to'] + '_' + load_version,
'first_version': load_version,
'last_version': load_version,
'created': timestamp,
'expired': _MAX_ADB_INTEGER,
})
return edge


def process_edges(edgeprov, load_version, timestamp, edges_out):
"""
Process graph edges from a provider into a JSON load file for a batch time travelling load.
Expand All @@ -119,4 +125,4 @@ def process_edges(edgeprov, load_version, timestamp, edges_out):
"""
for e in edgeprov:
e = process_edge(e, load_version, timestamp)
edges_out.write(json.dumps(e) + '\n')
edges_out.write(json.dumps(e) + '\n')
Loading

0 comments on commit bb65ab9

Please sign in to comment.