Skip to content

Commit

Permalink
Merge branch 'lmdb' of https://github.com/reimandlab/ActiveDriverDB i…
Browse files Browse the repository at this point in the history
…nto lmdb
  • Loading branch information
krassowski committed Jun 9, 2019
2 parents b2324d9 + cfef83c commit bdb3e47
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 69 deletions.
3 changes: 3 additions & 0 deletions website/app.py
@@ -1,4 +1,5 @@
import os

from flask import Flask
from flask_apscheduler import APScheduler
from flask_assets import Environment
Expand Down Expand Up @@ -185,3 +186,5 @@ def create_app(config_filename='config.py', config_override={}):
jinja_filters['pluralize'] = pluralize

return app


5 changes: 5 additions & 0 deletions website/database/lightning.py
Expand Up @@ -30,5 +30,10 @@ def __len__(self):
with self.env.begin() as transaction:
return transaction.stat()['entries']

def __contains__(self, item):
indicator = object()
with self.env.begin() as transaction:
return transaction.get(item, default=indicator) is not indicator

def close(self):
self.env.close()
2 changes: 1 addition & 1 deletion website/example_config.py
Expand Up @@ -32,7 +32,7 @@
# -Hash-key databases settings
HDB_DNA_TO_PROTEIN_PATH = 'databases/dna_to_protein/'
HDB_GENE_TO_ISOFORM_PATH = 'databases/gene_to_isoform/'
HDB_READONLY = True
HDB_READONLY = False

# -Application settings
# counting everything in the database in order to prepare statistics might be
Expand Down
91 changes: 34 additions & 57 deletions website/hash_set_db.py
@@ -1,4 +1,3 @@
import shutil
from pathlib import Path

import gc
Expand Down Expand Up @@ -63,9 +62,7 @@ def _create_path(self, name) -> Path:
will be created (if it does not exist).
"""
self.name = name
name = Path(name)
base_dir = Path(__file__).parent.resolve()
db_dir = base_dir / name
db_dir = path_relative_to_app(name)
db_dir.mkdir(parents=True, exist_ok=True)
return db_dir

Expand Down Expand Up @@ -133,12 +130,7 @@ def values(self):
def update(self, key, value):
key = bytes(key, 'utf-8')
try:
items = set(
filter(
bool,
self.db.get(key).split(b'|')
)
)
items = self._to_set(self.db.get(key))
except (KeyError, AttributeError):
items = set()

Expand All @@ -149,16 +141,20 @@ def update(self, key, value):

def _get(self, key):
try:
items = set(
filter(
bool,
self.db.get(key).split(b'|')
)
)
items = self._to_set(self.db.get(key))
except (KeyError, AttributeError):
items = set()
return items

@staticmethod
def _to_set(value: bytes):
return set(
filter(
bool,
value.split(b'|')
)
)

def add(self, key, value):
key = bytes(key, 'utf-8')
items = self._get(key)
Expand All @@ -183,7 +179,8 @@ def __len__(self):
@require_open
def drop(self, not_exists_ok=True):
try:
shutil.rmtree(self.path, ignore_errors=True)
for f in self.path.glob('*'):
f.unlink()
except FileNotFoundError:
if not_exists_ok:
pass
Expand All @@ -208,75 +205,55 @@ class HashSetWithCache(HashSet):

def __init__(self, name=None, integer_values=False):
self.in_cached_session = False
self.keys_on_disk = None
self.cache = {}
self.i = None
super().__init__(name=name, integer_values=integer_values)

def _cached_get_with_old_values(self, key):
cache = self.cache
if key in cache:
return cache[key]
else:
items = self._get(key)
cache[key] = items
return items

def _cached_get_ignore_old_values(self, key):
if key in self.keys_on_disk:
return self._get(key)
return self.cache[key]

_cached_get = _cached_get_with_old_values

def cached_add(self, key: str, value: str):
items = self._cached_get(bytes(key, 'utf-8'))
items.add(bytes(value, 'utf-8'))
self.cache[bytes(key, 'utf-8')].add(bytes(value, 'utf-8'))

def cached_add_integer(self, key: str, value: int):
items = self._cached_get(bytes(key, 'utf-8'))
items.add(value)
self.cache[bytes(key, 'utf-8')].add(b'%d' % value)

def flush_cache(self):
assert self.in_cached_session

with self.db.env.begin(write=True) as transaction:
put = transaction.put
if self.integer_values:
for key, items in self.cache.items():
value = '|'.join(map(str, items))
put(key, bytes(value, 'utf-8'))
else:
for key, items in self.cache.items():
put(key, b'|'.join(items))
get = transaction.get
to_set = self._to_set

for key, items in self.cache.items():
old_values = get(key) # will return None if the key does not exist in the db
if old_values:
items.update(to_set(old_values))

for key, items in self.cache.items():
put(key, b'|'.join(items))

self.keys_on_disk.update(self.cache.keys())
self.cache = defaultdict(set)

self.i += 1
if self.i % 1000 == 999:
if self.i % 100 == 99:
gc.collect()

@contextmanager
def cached_session(self, overwrite_db_values=False):
def cached_session(self):
self.i = 0
old_cache = self.cache
old_cached_get = self._cached_get
self.in_cached_session = True
self.keys_on_disk = set()
self.cache = defaultdict(set)

self._cached_get = (
self._cached_get_ignore_old_values
if overwrite_db_values else
self._cached_get_with_old_values
)

yield

print('Flushing changes')

self.flush_cache()
self.cache = old_cache
self._cached_get = old_cached_get
self.in_cached_session = False


def path_relative_to_app(path):
path = Path(path)
base_dir = Path(__file__).parent.resolve()
return base_dir / path
21 changes: 14 additions & 7 deletions website/imports/mappings.py
Expand Up @@ -34,7 +34,8 @@ def import_genome_proteome_mappings(

bdb.open(path, size=1e11)

with bdb.cached_session(overwrite_db_values=True):
with bdb.cached_session():
add = bdb.cached_add
for line in read_from_gz_files(mappings_dir, mappings_file_pattern, after_batch=bdb.flush_cache):
try:
chrom, pos, ref, alt, prot = line.rstrip().split('\t')
Expand All @@ -57,7 +58,12 @@ def import_genome_proteome_mappings(
except ValueError as e:
print(e, line)
continue
assert refseq.startswith('NM_')

try:
assert refseq.startswith('NM_')
except AssertionError as e:
print(e, line)
continue
# refseq = int(refseq[3:])
# name and refseq are redundant with respect one to another

Expand Down Expand Up @@ -117,7 +123,7 @@ def import_genome_proteome_mappings(
is_ptm_related
)

bdb.cached_add(snv, item)
add(snv, item)

return broken_seq

Expand All @@ -140,14 +146,15 @@ def import_aminoacid_mutation_refseq_mappings(
if bdb_dir:
path = bdb_dir + '/' + basename(path)

bdb_refseq.open(path, size=1e11)
bdb_refseq.open(path, size=1e9)

genes = {
protein: protein.gene.name
protein: protein.gene_name
for protein in proteins.values()
}

with bdb_refseq.cached_session(overwrite_db_values=True):
with bdb_refseq.cached_session():
add = bdb_refseq.cached_add_integer
for line in read_from_gz_files(mappings_dir, mappings_file_pattern, after_batch=bdb_refseq.flush_cache):
try:
chrom, pos, ref, alt, prot = line.rstrip().split('\t')
Expand Down Expand Up @@ -199,7 +206,7 @@ def import_aminoacid_mutation_refseq_mappings(
if broken_sequence_tuple:
continue

bdb_refseq.cached_add_integer(
add(
genes[protein] + ' ' + aa_ref + str(aa_pos) + aa_alt,
protein.id
)
Expand Down
7 changes: 5 additions & 2 deletions website/imports/protein_data.py
Expand Up @@ -25,16 +25,19 @@
from models import GeneListEntry


def get_proteins(cached_proteins={}, reload_cache=False):
def get_proteins(cached_proteins={}, reload_cache=False, options=None):
"""Fetch all proteins from database as refseq => protein object mapping.
By default proteins will be cached at first call and until cached_proteins
is set explicitly to a (new, empty) dict() in subsequent calls, the
cached results from the first time will be returned."""
if reload_cache:
cached_proteins.clear()
query = Protein.query
if options:
query = query.options(options)
if not cached_proteins:
for protein in Protein.query:
for protein in query:
cached_proteins[protein.refseq] = protein
return cached_proteins

Expand Down
18 changes: 17 additions & 1 deletion website/manage.py
Expand Up @@ -211,11 +211,27 @@ class Mappings(CommandTarget):
@command
def load(self, args):
print(f'Importing {args.restrict_to or "all"} mappings')
proteins = get_proteins()

if args.restrict_to != 'aminoacid_refseq':
from models import Gene, Protein
from database import db
from collections import namedtuple

protein = namedtuple('Protein', ('id', 'refseq', 'sequence', 'gene_name'))

proteins = {
data[1]: protein(*data)
for data in (
db.session.query(Protein.id, Protein.refseq, Protein.sequence, Gene.name)
.select_from(Protein).join(Protein.gene)
)
}
import_genome_proteome_mappings(proteins, bdb_dir=args.path)

if args.restrict_to != 'genome_proteome':
from sqlalchemy.orm import load_only
proteins = get_proteins(options=load_only('id', 'refseq', 'sequence'))

import_aminoacid_mutation_refseq_mappings(proteins, bdb_dir=args.path)

@load.argument
Expand Down
4 changes: 3 additions & 1 deletion website/tests/database_testing.py
Expand Up @@ -5,6 +5,7 @@
from flask_testing import TestCase

from app import create_app, scheduler
from hash_set_db import path_relative_to_app
from database import db
from database import bdb
from database import bdb_refseq
Expand All @@ -16,7 +17,7 @@


def test_hash_set_path(prefix):
parent = hash_sets_path.resolve()
parent = path_relative_to_app(hash_sets_path)
parent.mkdir(parents=True, exist_ok=True)
directory = TemporaryDirectory(dir=parent, prefix=prefix)
# keep the object in memory to prevent deletion
Expand All @@ -35,6 +36,7 @@ class DatabaseTest(TestCase):

HDB_DNA_TO_PROTEIN_PATH = test_hash_set_path('dna_to_protein')
HDB_GENE_TO_ISOFORM_PATH = test_hash_set_path('gene_to_isoform')
HDB_READONLY = False
SQL_LEVENSTHEIN = False
USE_LEVENSTHEIN_MYSQL_UDF = False
CONTACT_LIST = ['dummy.maintainer@domain.org']
Expand Down
3 changes: 3 additions & 0 deletions website/tests/test_lmdb_interface.py
Expand Up @@ -19,3 +19,6 @@ def test_basic_interface(tmpdir):
items_gathered.add((key, value))

assert items_expected == items_gathered

assert b'x' in db
assert b'z' not in db

0 comments on commit bdb3e47

Please sign in to comment.