Open Targets Library ETL Pipeline | Apache Beam
Open Targets Library - NLP Pipeline

NLP Analysis of MedLine/PubMed Running in Apache Beam

This pipeline is designed to run with Apache Beam using the dataflow runner. It has not been tested with other Beam backends, but it should work there as well pending minimal modifications. Please see the Apache Beam SDK for more info.

Steps to reproduce a full run

Use python2 with pip and virtualenv

  • Generate a mirror of MEDLINE FTP to a Google Storage Bucket (any other storage provider supported by Python Beam SDK should work). E.g. using rclone

    • Download pre-built rclone binaries rather than platform packaged ones as they tend to be more up-to-date
    • configure rclone with MEDLINE FTP and your target gcp project (my-gcp-project-buckets) rclone config. Medline must have username anonymous and password anonymous.
    • Generate a full mirror: rclone sync -v medline-ftp:pubmed/baseline my-gcp-project-buckets:my-medline-bucket/baseline
    • Update new files: rclone sync -v medline-ftp:pubmed/updatefiles my-gcp-project-buckets:my-medline-bucket/updatefiles
    • Note: you can use --dry-run argument to test
  • Download the pipeline

    git clone
    cd library-beam
  • Create a virtual environment to manage dependencies in

    virtualenv venv --python=python2
    source venv/bin/activate
  • Pin the version of Pip that is used. This is because apache-beam 2.2.0 uses pip install --download which was replaced with pip download from 8.0.0 and removed in 10.0.0

    pip install 'pip==9.0.3'
  • Pin the version of six that is used. See for details of why.

    pip install 'six=1.10.0'
  • Install the pipeline into the virtual environment

    python install
    pip install
  • Run NLP analytical pipeline

    python -m main \
        --project your-project \
        --job_name medline-nlp\
        --runner DataflowRunner \
        --temp_location gs://my-tmp-bucket/temp \
        --setup_file ./ \
        --worker_machine_type n1-highmem-32 \
        --input_baseline gs://my-medline-bucket/baseline/pubmed18n*.xml.gz \
        --input_updates gs://my-medline-bucket/updatefiles/pubmed18n*.xml.gz \
        --output_enriched gs://my-medline-bucket-output/analyzed/pubmed18 \
        --max_num_workers 32 \
        --zone europe-west1-d

    This can be monitored via Google Dataflow. Note that "wall time" displayed is not the usual definition but is per thread and worker. image

  • Run job to split Enriched JSONs in smaller pieces

    python -m main \
        --project open-targets \
        --job_name open-targets-medline-process-split\
        --runner DataflowRunner \
        --temp_location gs://my-tmp-bucket/temp \
        --setup_file ./ \
        --worker_machine_type n1-highmem-16 \
        --input_enriched gs://my-medline-bucket/analyzed/pubmed18*_enriched.json.gz \
        --output_splitted gs://my-medline-bucket/splitted/pubmed18 \
        --max_num_workers 32 \
        --zone europe-west1-d

    image NOTE: you can chain the analytical and the split steps by adding the option --output_splitted gs://my-medline-bucket/splitted/pubmed18 to the analytical step

  • Run job load JSONs in Elasticsearch

    python publication --es http://myesnode1:9200  --es http://myesnode2:9200
    python bioentity --es http://myesnode1:9200  --es http://myesnode2:9200
    python taggedtext --es http://myesnode1:9200  --es http://myesnode2:9200
    python concept --es http://myesnode1:9200  --es http://myesnode2:9200

    WARNING: the loading scripts takes a lot of time currently, particurlarly the concept one (24h+). It is good to use screen or tmux or similar, so it will keep going after disconect and can be recovered. E.g.

    tmux new-session "time -p python publication --es http://be-es-debian-3n-node01:39200 --es http://be-es-debian-3n-node02:39200 --es http://be-es-debian-3n-node03:39200 "
    tmux new-session "time -p python bioentity --es http://be-es-debian-3n-node01:39200 --es http://be-es-debian-3n-node02:39200 --es http://be-es-debian-3n-node03:39200 "
    tmux new-session "time -p python taggedtext --es http://be-es-debian-3n-node01:39200 --es http://be-es-debian-3n-node02:39200 --es http://be-es-debian-3n-node03:39200 "
    tmux new-session "time -p python concept --es http://be-es-debian-3n-node01:39200 --es http://be-es-debian-3n-node02:39200 --es http://be-es-debian-3n-node03:39200 "
  • OPTIONAL: If needed create appropriate aliases in elasticsearch

    curl -XPOST 'http://myesnode1:9200/_aliases' -H 'Content-Type: application/json' -d '
          "actions": [
              {"add": {"index": "pubmed-18", "alias": "!publication-data"}}
      } '
  • OPTIONAL: Increase elasticsearch capacity for the adjancency matrix aggregation (used by LINK tool)

    curl -XPUT 'http://myesnode1:9200/pubmed-18-concept/_settings' -H 'Content-Type: application/json' -d'
          "index" : {
              "max_adjacency_matrix_filters" : 500