Skip to content

Commit

Permalink
centrifugation: split steps ingest, reingest and reprocess
Browse files Browse the repository at this point in the history
  • Loading branch information
darkk committed Dec 12, 2017
1 parent efb274c commit a86e4fe
Show file tree
Hide file tree
Showing 6 changed files with 963 additions and 440 deletions.
27 changes: 22 additions & 5 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ file truncations.
### Autoclaving

The purpose of the **autoclaving** stage is to perform normalization and
sanitization of the report files.
sanitization of the report files including
[PII cleanup](https://github.com/TheTorProject/ooni-pipeline/issues/105)
if leak is discovered.
This means converting legacy YAML reports to JSON as well as converting all
measurements to a consistent JSON format (and performing some fixes to the data
format to avoid surprising consumers of the data).
Expand All @@ -87,7 +89,7 @@ stores metadata into a PostgreSQL database for further processing.

Note: in the airflow DAG view this is actually called `meta_pg`.

## Raw reports sensor
### Raw reports sensor

This is not technically a task, but is rather a "sensor" in airflow lingo. What
this means is that a check is done at `chameleon` (the host that aggregates all
Expand Down Expand Up @@ -311,7 +313,7 @@ Reprocessing the whole dataset takes a couple of days, it's done
asynchronously, but the pipeline does not implement any priorities, so it may
block data ingestion for a while.

## Extending or adding a new DAG
### Extending or adding a new DAG

Some examples of tasks that require one or the other:

Expand All @@ -330,13 +332,28 @@ Add a `BashOperator`, get the `task_id`.
Then add a new switch case inside of
`ansible/roles/airflow/files/docker-trampoline`.

### Partial re-processing

There are following usual reasons to run partial re-processing:

- PII leaks are identified and corresponding public autoclaved files have to be
updated (as well as LZ4 framing in these files), so these files must be
re-processed from scratch
- processing schema changes for some "low-volume" test method like
`vanilla_tor` (~99% of data volume is `web_connectivity` test data)
- new files are added to the bucket when pipeline ticks more often than daily

Both these cases are handled with `UPDATE autoclaved SET code_ver = 0 WHERE …`.
`centrifugation.py` will re-ingest alike autoclaved file while preserving `msm_no`.

## OONI Infrastructure specific

To access the machine you need to setup a SSH tunnel. It runs at
`datacollector.ooni.io`.
To access airflow web-interface you need to setup a SSH tunnel. It runs at
`datacollector.infra.ooni.io`.

This line is what you want inside of your config:
```
Host datacollector.infra.ooni.io
LocalForward localhost:8080 172.26.43.254:8080
```

Expand Down
30 changes: 30 additions & 0 deletions af/oometa/006-reingestion.install.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
BEGIN;

select _v.register_patch( '006-reingestion', ARRAY[ '005-repeated-report' ], NULL );

ALTER SEQUENCE autoclaved_autoclaved_no_seq RENAME TO autoclaved_no_seq;

DROP TABLE report_meta;
DROP TABLE report_blob;
DROP TABLE measurement_meta;
DROP TABLE measurement_blob;
DROP TABLE measurement_exc;
DROP TABLE badmeta_tpl;
DROP TABLE dns_a_tpl;

-- `badblob` is a nice table, but we can't properly ingest it right now and
-- it's something that does not actually exist after `autoclaved` stage, these
-- bad blobs are lost during autoclaving, only canning preserves them.
DROP TABLE badblob;

ALTER TABLE badmeta DROP COLUMN textname;

-- re-ingest `vanilla_tor`
UPDATE autoclaved SET code_ver = 4
WHERE code_ver = 3 AND NOT EXISTS (
SELECT 1 FROM report
WHERE autoclaved_no = autoclaved.autoclaved_no
AND test_name = 'vanilla_tor'
);

COMMIT;
13 changes: 13 additions & 0 deletions af/shovel/canning.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,19 @@ def read(self, *args):
self.__size += len(ret)
self.__copy.write(ret)
return ret
def tell(self):
return self.__src.tell()
def seek(self, dest): # NB: no `whence` ~ whence=os.SEEK_SET
skipwant = dest - self.tell()
if skipwant < 0:
raise RuntimeError('Unable to seek backwards while reading the stream')
elif skipwant > 0:
while skipwant:
step = min(skipwant, 262144)
skiplen = len(self.read(step))
if skiplen != step:
raise RuntimeError('Unexpected end of file', step, skiplen)
skipwant -= step
@property
def size(self):
return self.__size
Expand Down

0 comments on commit a86e4fe

Please sign in to comment.