## Importing all samples into elastic search index

In [1]:
import sys
sys.path.append('..')

In [25]:
from elasticsearch import Elasticsearch
from elasticsearch.client import IndicesClient
from elasticsearch_dsl import Mapping, String, Integer, Nested, Date, Object, Float
from elasticsearch.helpers import bulk
import pymongo
from time import sleep
from lib.utils import iter_bucket
import json

In [3]:
es = Elasticsearch()
ies = IndicesClient(es)

In [4]:
index_name = 'samples_dev'

## Dropping and creating index

In [47]:
if ies.exists(index_name):
    ies.delete(index_name)
ies.create(index_name)
sleep(1)

## Creating mapping and settings

In [48]:
ies.close(index_name)
m = Mapping(index_name)

m.field('accession', String(index='not_analyzed'))
m.field('organism', String(index='not_analyzed'))
m.field('platform', String(index='not_analyzed'))
m.field('series', String(index='not_analyzed'))
m.field('channel', Integer(index='not_analyzed'))
m.field('sample_type', String(index='not_analyzed'))
m.field('data_source', String(index='not_analyzed'))

m.field('title', String())
m.field('description', String())
m.field('source_name', String())
m.field('text', String())

m.field('characteristics', Object(enabled=False))
m.field('characteristics_raw', String())

meta = Object()
meta.field('geo_id', Integer(index='not_analyzed'))
m.field('meta', meta)

annotation = Object()
annotation.field('age', Float())
annotation.field('tissue_id', String(index='not_analyzed'))
annotation.field('disease_id', String(index='not_analyzed'))
annotation.field('preprocessed', Integer(index='not_analyzed'))
annotation.field('annotated', Integer(index='not_analyzed'))

m.field('annotation', annotation)

m.field('library_source', String(index='no'))
m.field('protocols', Object(enabled=False))
m.field('extracted_molecule', String(index='no'))
m.field('status', String(index='no'))
m.field('label', String(index='no'))

m.field('instrument_model', String(index='no'))

supplementary_files = Nested()
supplementary_files.field("type", String(index='not_analyzed'))
supplementary_files.field("name", String(index='not_analyzed'))

m.field('supplementary_files', supplementary_files)
m.field('relations', Object(enabled=False))
m.field('$oid', String(index='no'))
m.field('submission_date', Date())
m.field('scrap_date', Date())
m.field('last_update_date', Date())


m.save(index_name, using=es)

# ies.put_settings(index=index_name, body={
#     "analysis":{
#       "analyzer":{
#         "default":{
#           "type":"custom",
#           "tokenizer":"standard",
#           "filter":[ "standard", "lowercase", "stop", "kstem" ]
#         }
#       }
#     }
# })
sleep(1)
ies.open(index_name)

{'acknowledged': True}

In [31]:
def fields(fields_list):
    return dict((f, 1) for f in fields_list)

def del_id(item):
    if '_id' in item:
        del item['_id']
    return item
    

def make_raw_chars(item):
    if 'characteristics_raw' in item:
        return item
    if not item.get('characteristics'):
        return item

    ch_raw = []
    for k, v in item['characteristics'].items():
        if isinstance(v, list):
            v = ','.join(v)
        v = v.strip(" ")
        if not v:
            continue
            
        ch_raw.append('{}: {}'.format(k, v))
                      
    item['characteristics_raw'] = ch_raw
    
    del item['characteristics']
    return item


## Inserting data

### Using only fields in mapping

In [17]:
import gzip
def read_dump(dump_file):
    with gzip.open(dump_file) as f:
        for l in f:
            l = l.decode('utf-8').rstrip('\n')
            yield json.loads(l)

In [None]:
b = 0
fields_list = list(m.to_dict()[index_name]['properties'].keys())
fs = fields(fields_list)
fs['_id'] = 0

### Creating dump

In [None]:
# !rm /data/rawdata/snapshots/mongo/scraper_test_dev.samples..json.gz

In [45]:
!mongoexport -d  scraper_test_dev -c samples | gzip > \
 /data/rawdata/snapshots/mongo/scraper_test_dev.samples.elastic_export.$(date +%Y%m%d.%H%M%S).json.gz

2015-10-29T15:07:35.396+0000	connected to: localhost
2015-10-29T15:17:19.382+0000	exported 2439255 records


### Importing from dump

In [15]:
def convert_date(item):
    def convert(field):
        if field in item:
            d = item[field]['$date']
            item[field] = d
    """
    "submission_date": {
    "$date": "2000-09-28T00:00:00.000Z"
  },
  "scrap_date": {
    "$date": "2015-09-22T12:03:46.782Z"
  }
  "last_update_date": {
    "$date": "2008-11-19T00:00:00.000Z"
  },
    """
    convert('submission_date')
    convert('scrap_date')
    convert('last_update_date')
    return item

In [49]:
b = 0
samples_iter = read_dump('/data/rawdata/snapshots/mongo/scraper_test_dev.samples.elastic_export.20151029.150735.json.gz')
for bucket in iter_bucket(map(convert_date, samples_iter)):
    actions = [dict(
                _index=index_name,
                _type=index_name,
#                 _id=s['accession'],
                _source=s if s['data_source'] == 'geo' else make_raw_chars(s)
                ) for s in bucket]
    
    b += len(actions)
    bulk(es, actions)

In [50]:
b

2439255

### Importing from array-express data source

In [38]:
b = 0
fs['characteristics'] = 1
for bucket in iter_bucket(db.samples.find({'data_source': 'array-express'}, fs)):
    actions = [dict(
                _index=index_name,
                _type=index_name,
                _source=make_raw_chars(s)
                ) for s in bucket]
#     print(actions)
    b += len(actions)
    bulk(es, actions)
print(b)

NameError: name 'fs' is not defined

In [64]:
b

695951