In [1]:
import elasticsearch

In [2]:
from elasticsearch import helpers  # bulk データ投入用

In [3]:
import io
import requests
import pandas as pd

In [4]:
URL = ""  # Google spread sheetのURL
def load_data_from_google_drive(URL, index_col=None):
    '''サンプルデータ読み込み'''

    URL = URL.replace("open", "uc")
    r = requests.get(URL)
    df = pd.read_csv(io.BytesIO(r.content), index_col=index_col)
    return df

__データ確認__

In [None]:
df = load_data_from_google_drive(URL, index_col=0)
df.head()

In [26]:
df.shape

(19234, 52)

# ESに接続

In [5]:
es = elasticsearch.Elasticsearch("elasticsearch:9200")  
# es = elasticsearch.Elasticsearch("localhost:9200")  # local (コンテナ外)  から接続する場合はこちらを利用

# インデックスの操作（生成、確認、削除）

`expression`という名称でindexを作成  
下記のデータ形式を想定
```
    {'sample_id': 'DRR130762',
     'gene_id': 'Mp1g00010',
     'tpm': 79.6023443575001}
```

In [6]:
# スキーマ定義
mapping = {
    "mappings" : {
            "properties" : {
                "gene_id": {"type":"text"},
                "sample_id": {"type":"text"},
                "tpm": {"type":"float"}
            }
    }
}

In [7]:
# index生成 (既に存在していたらエラー)
es.indices.create(index='expression', body=mapping)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'expression'}

In [8]:
# indexの存在を確認
es.indices.exists(index="expression")

True

In [9]:
# indexを削除
# es.indices.delete(index="expression")

#  データ操作 (追加・更新・検索・削除）

In [10]:
# テストデータ登録 （更新）
# 同じIDに対して実行した場合、updateになる
 
data = {"gene_id": "test_gene", "sample_id": "test_sample", "tpm": 133.4}
_id = 10
es.index(index='expression', doc_type='_doc', id=_id, body=data)
data = {"gene_id": "test_gene2", "sample_id": "test_sample", "tpm": 200}
_id = 20
es.index(index='expression', id=_id, body=data)  # doc_typeは指定しなくてもよい (_docになる）
data = {"gene_id": "test_gene", "sample_id2": "test_sample", "tpm": 333}
# _id = 30
es.index(index='expression', body=data)  # IDも指定しなくてもよい（自動で生成）
data = {"gene_id": "test_gene2", "sample_id2": "test_sample", "tpm": 4444}
_id = 40
es.index(index='expression', doc_type='_doc', id=_id, body=data)

{'_index': 'expression',
 '_type': '_doc',
 '_id': '40',
 '_version': 1,
 'result': 'created',
 '_shards': {'total': 2, 'successful': 1, 'failed': 0},
 '_seq_no': 3,
 '_primary_term': 1}

In [11]:
# テストデータの検索 
result = es.search(
        index='expression',
        body={'query': {'match': {'sample_id': 'test_sample'}}})
hits = result['hits']

print('ヒット数 : %s' % hits['total'])
print('最初のID : %s' % hits['hits'][0]['_id'])
print('最初のhit : %s' % hits['hits'][0])
print('2件目のhit : %s' % hits['hits'][1])


ヒット数 : {'value': 2, 'relation': 'eq'}
最初のID : 10
最初のhit : {'_index': 'expression', '_type': '_doc', '_id': '10', '_score': 0.18232156, '_source': {'gene_id': 'test_gene', 'sample_id': 'test_sample', 'tpm': 133.4}}
2件目のhit : {'_index': 'expression', '_type': '_doc', '_id': '20', '_score': 0.18232156, '_source': {'gene_id': 'test_gene2', 'sample_id': 'test_sample', 'tpm': 200}}


In [12]:
# テストデータの検索2  gene_idで検索
result = es.search(
        index='expression',
        body={'query': {'match': {'gene_id': 'test_gene'}}})
hits = result['hits']

print('ヒット数 : %s' % hits['total'])
print('最初のID : %s' % hits['hits'][0]['_id'])
print('最初のhit : %s' % hits['hits'][0])
print('2件目のhit : %s' % hits['hits'][1])  # IDは自動生成されている

ヒット数 : {'value': 2, 'relation': 'eq'}
最初のID : 10
最初のhit : {'_index': 'expression', '_type': '_doc', '_id': '10', '_score': 0.6931471, '_source': {'gene_id': 'test_gene', 'sample_id': 'test_sample', 'tpm': 133.4}}
2件目のhit : {'_index': 'expression', '_type': '_doc', '_id': 'Xm5rjXEBSNVM_oz7SBJ-', '_score': 0.6931471, '_source': {'gene_id': 'test_gene', 'sample_id2': 'test_sample', 'tpm': 333}}


In [13]:
# ID指定で取得
es.get("expression", id=20)  #→ GET /expression/_doc/20  と同等

{'_index': 'expression',
 '_type': '_doc',
 '_id': '20',
 '_version': 1,
 '_seq_no': 1,
 '_primary_term': 1,
 'found': True,
 '_source': {'gene_id': 'test_gene2', 'sample_id': 'test_sample', 'tpm': 200}}

In [14]:
# ID指定で削除
es.delete(index='expression', id='20')

{'_index': 'expression',
 '_type': '_doc',
 '_id': '20',
 '_version': 2,
 'result': 'deleted',
 '_shards': {'total': 2, 'successful': 1, 'failed': 0},
 '_seq_no': 4,
 '_primary_term': 1}

In [15]:
#  クエリで検索して削除
es.delete_by_query(
        index='expression', 
        body={'query': {'match': {'gene_id': 'test_gene'}}})

{'took': 18,
 'timed_out': False,
 'total': 2,
 'deleted': 2,
 'batches': 1,
 'version_conflicts': 0,
 'noops': 0,
 'retries': {'bulk': 0, 'search': 0},
 'throttled_millis': 0,
 'requests_per_second': -1.0,
 'throttled_until_millis': 0,
 'failures': []}

In [16]:
# 全件検索 (ただし、defaultでは10000件までしか取得してくれない。elasticsearchの設定を変えるか後述のscrollを使用する必要がある)
result = es.search(
        index='expression',
        body={'query': {'match_all': {}}})
result

{'took': 2,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 1, 'relation': 'eq'},
  'max_score': 1.0,
  'hits': [{'_index': 'expression',
    '_type': '_doc',
    '_id': '40',
    '_score': 1.0,
    '_source': {'gene_id': 'test_gene2',
     'sample_id2': 'test_sample',
     'tpm': 4444}}]}}

In [17]:
# 全件削除 (indexを削除して再生成したほうが早いかも)
es.delete_by_query(
        index='expression',
        body={"query": {"match_all": {}}})

{'took': 29,
 'timed_out': False,
 'total': 1,
 'deleted': 1,
 'batches': 1,
 'version_conflicts': 0,
 'noops': 0,
 'retries': {'bulk': 0, 'search': 0},
 'throttled_millis': 0,
 'requests_per_second': -1.0,
 'throttled_until_millis': 0,
 'failures': []}

In [18]:
# 削除されたことを確認
result = es.search(
        index='expression',
        body={'query': {'match_all': {}}})
result["hits"]["total"]

{'value': 0, 'relation': 'eq'}

# バルクデータの投入

In [19]:
def create_data(URL, index_name="expression"):
    '''CSVデータから１行ずつデータをjsonで返すジェネレータ関数'''
    df = load_data_from_google_drive(URL, index_col=0)
    D = df.stack().to_dict()
    for (gene_id, sample_id), tpm in D.items():
        source = {"sample_id": sample_id, "gene_id": gene_id, "tpm": tpm}
        yield {
            "_index": index_name,
            "_id": f"{sample_id}_{gene_id}",   # 例) 'DRR130762_Mp1g00010'
            "_source": source
        }

 1000168 件のデータ投入

In [20]:
%%time
# chuck_size=1000だとwall time 2min15s, 10000だと2m1s
elasticsearch.helpers.bulk(
        es,
        actions=create_data(URL),  # ジェネレータ関数を渡しているがiterableであれば良いのでリストでも動くと思われる
        chunk_size=1000)

CPU times: user 34.5 s, sys: 910 ms, total: 35.4 s
Wall time: 1min 54s


(1000168, [])

# データ全件取得 (scrollを使用)

In [21]:
def get_all_docs(index, query=None, scroll="2m", size=10000, request_timeout=150):
    if query is None:
        query = {'match_all': {}}
    query_body = {'query': query}
    data = es.search(index="expression", body=query_body, scroll=scroll, size=size, request_timeout=request_timeout)
    s_id = data['_scroll_id']
    s_size = len(data['hits']['hits'])
    result = data['hits']['hits']
    while (s_size > 0):
        data = es.scroll(scroll_id=s_id, scroll=scroll,request_timeout=request_timeout)
        s_id = data['_scroll_id']
        s_size = len(data['hits']['hits'])
        result.extend(data['hits']["hits"])
    return result

In [22]:
%%time
result = get_all_docs(index="expression")

CPU times: user 4.49 s, sys: 437 ms, total: 4.93 s
Wall time: 23.5 s


In [23]:
len(result)

1000168

__データを発現量テーブルの形に整形__

In [27]:
tmp_result = [x["_source"] for x in result]
df_result = pd.DataFrame(tmp_result)
df_result = df_result.set_index(["gene_id", "sample_id"]).unstack()

In [None]:
df_result.head()

In [29]:
df_result.shape

(19234, 52)

In [30]:
df_result.size

1000168

__特定の`gene_id`のみを選択してデータ取得__

In [31]:
%%time
query = {
    "match": {"gene_id": "Mp1g00030 Mp2g00130 Mp3g00330"}
    }
result = get_all_docs(index="expression", query=query, size=10000)

CPU times: user 266 ms, sys: 3.82 ms, total: 269 ms
Wall time: 290 ms


In [32]:
tmp_result = [x["_source"] for x in result]
df_result = pd.DataFrame(tmp_result)
df_result = df_result.set_index(["gene_id", "sample_id"]).unstack()

In [None]:
df_result.head()

__特定の`sample_id`のみを選択してデータ取得__

In [34]:
%%time
query = {
    "match": {"sample_id": "DRR130765 ERR364416 SRR971245 SRR7977555 SRR7977550 DRR130763 DRR130768"}
    }
result = get_all_docs(index="expression", query=query, size=10000)

CPU times: user 466 ms, sys: 14 ms, total: 480 ms
Wall time: 2.39 s


In [35]:
tmp_result = [x["_source"] for x in result]
df_result = pd.DataFrame(tmp_result)
df_result = df_result.set_index(["gene_id", "sample_id"]).unstack()

In [None]:
df_result.head()

In [37]:
df_result.shape

(19234, 7)

In [38]:
df_result.columns

MultiIndex([('tpm',  'DRR130763'),
            ('tpm',  'DRR130765'),
            ('tpm',  'DRR130768'),
            ('tpm',  'ERR364416'),
            ('tpm', 'SRR7977550'),
            ('tpm', 'SRR7977555'),
            ('tpm',  'SRR971245')],
           names=[None, 'sample_id'])

In [None]:
df_result[[('tpm',  'SRR971245'), ('tpm',  'SRR7977550')]]