In [1]:
%load_ext autoreload

%autoreload 2

In [2]:
# ! pip install loguru


In [3]:
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from collections import deque
from loguru import logger
import django_project.config as config
import datetime
import time
import gzip
import pandas as pd
import vaex

# VER43_R has different structure
# new
# `SENTENCE_ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
# `PMID` varchar(20) NOT NULL DEFAULT '',
# `TYPE` varchar(2) NOT NULL DEFAULT '',
# `NUMBER` int(10) unsigned NOT NULL DEFAULT '0',
# `SENT_START_INDEX` int(10) unsigned NOT NULL DEFAULT '0',
# `SENTENCE` varchar(999) DEFAULT NULL,
# `SENT_END_INDEX` int(10) unsigned NOT NULL DEFAULT '0',
# `SECTION_HEADER` varchar(100) DEFAULT NULL,
# `NORMALIZED_SECTION_HEADER` varchar(50) DEFAULT NULL,

# old
# `SENTENCE_ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
# `PMID` varchar(20) NOT NULL DEFAULT '',
# `TYPE` varchar(2) NOT NULL DEFAULT '',
# `NUMBER` int(10) unsigned NOT NULL DEFAULT '0',
# `SENT_START_INDEX` int(10) unsigned NOT NULL DEFAULT '0',
# `SENT_END_INDEX` int(10) unsigned NOT NULL DEFAULT '0',
# `SECTION_HEADER` varchar(100) DEFAULT NULL,
# `NORMALIZED_SECTION_HEADER` varchar(50) DEFAULT NULL,
# `SENTENCE` varchar(999) CHARACTER SET utf8 NOT NULL DEFAULT '',

es = Elasticsearch(
    [{"host": config.elastic_host, "port": config.elastic_port}],
)

timeout = 300



In [4]:
def get_date():
    d = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    return d


def create_index(index_name, shards=3):
    print("Creating index", index_name)
    if es.indices.exists(index_name, request_timeout=timeout):
        print("Index name already exists, please choose another")
    else:
        print("Creating index " + index_name)
        request_body = {
            "settings": {
                "number_of_shards": shards,
                "number_of_replicas": 1,
                # "index.codec": "best_compression",
                "refresh_interval": -1,
                "index.max_result_window": 1000,
            },
            "mappings": {
                "_doc": {
                    "properties": {
                        "SENTENCE_ID": {"type": "keyword"},
                        "PMID": {"type": "keyword"},
                        "TYPE": {"type": "keyword"},
                        "NUMBER": {"type": "keyword"},
                        "SENT_START_INDEX": {"type": "integer"},
                        "SENT_END_INDEX": {"type": "integer"},
                        # "SECTION_HEADER": {"type": "keyword"},
                        # "NORMALIZED_SECTION_HEADER": {"type": "keyword"},
                        "SENTENCE": {"type": "text"},
                    }
                }
            },
        }
        es.indices.create(index=index_name, body=request_body, request_timeout=timeout)


def read_pmids():
    df = pd.read_csv("data/pmids.txt", header=None, dtype=str, names=["pmid"])
    return df["pmid"].tolist()



In [68]:
sentence_data =config.semmed_sentence_data
index_name =config.semmed_sentence_index

In [12]:
chunkSize = 100000
    # get list of pmids from predicates index
pmids = set(read_pmids())
logger.info(f"pmids: {len(pmids)}")
logger.info(f"Reading {sentence_data}")
df = vaex.open(sentence_data)
col_names = [
"SENTENCE_ID",
"PMID",
"TYPE",
"NUMBER",
"SENT_START_INDEX",
"SENTENCE",
"SENT_END_INDEX",
"SECTION_HEADER",
"NORMALIZED_SECTION_HEADER",
]
# df.columns = col_names
#    df.drop(columns=["SECTION_HEADER", "NORMALIZED_SECTION_HEADER"], inplace=True)
    # df.dropna(inplace=True)
#    df.fillna("NA", inplace=True)
df['PMIDvirt'] = df.PMID

2022-02-08 20:51:38.530 | INFO     | __main__:<module>:4 - pmids: 3922961
2022-02-08 20:51:38.531 | INFO     | __main__:<module>:5 - Reading /home/ubuntu/data/semmed_raw/semmedVER43_2021_R_SENTENCE_2.csv.gz.hdf5


In [13]:
df

#,SENTENCE_ID,PMID,TYPE,NUMBER,SENT_START_INDEX,SENTENCE,SENT_END_INDEX,SECTION_HEADER,NORMALIZED_SECTION_HEADER,PMIDvirt
0,6,16530473,ti,1,21,'Fluoride-selective colorimetric sensor based on...,119,--,--,16530473
1,7,16530473,ab,1,125,"'A structurally simple colorimetric sensor, N-4-...",302,--,--,16530473
2,8,16530473,ab,2,302,"'In acetonitrile, the addition of F(-) changed 1...",385,--,--,16530473
3,9,16530473,ab,3,385,'In the presence of other anions such as CH(3)CO...,578,--,--,16530473
4,10,16530473,ab,4,578,'The association constants of anionic complexes ...,757,--,--,16530473
...,...,...,...,...,...,...,...,...,...,...
227668762,376523184,34837689,ab,3,508,'The patient had a heterozygous LAMA3 mut...,616,--,--,34837689
227668763,376523185,34837689,ab,4,616,'Our results propose that these mutations produc...,754,--,--,34837689
227668764,376523186,34837689,ab,5,754,"'Interestingly, this is the first report indicat...",907,--,--,34837689
227668765,376523187,34837689,ab,6,907,"'Also, this is the first digenic inheritance rec...",989,--,--,34837689


In [21]:
%%time
for i,row in df[:1000].iterrows():
    pass

CPU times: user 8.55 s, sys: 211 ms, total: 8.76 s
Wall time: 8.56 s


In [None]:
# data_dict = {
#                 "SENTENCE_ID": row["SENTENCE_ID"],
#                 "PMID": row["PMID"],
#                 "TYPE": row["TYPE"],
#                 "NUMBER": row["NUMBER"],
#                 "SENT_START_INDEX": int(row["SENT_START_INDEX"]),
#                 "SENTENCE": row["SENTENCE"],
#                 "SENT_END_INDEX": int(row["SENT_END_INDEX"]),
#                 # "SECTION_HEADER": row['SECTION_HEADER'],
#                 # "NORMALIZED_SECTION_HEADER": l[8],
#             }

In [74]:
headers = ["SENTENCE_ID","PMID","TYPE","NUMBER","SENT_START_INDEX","SENTENCE","SENT_END_INDEX"]

In [75]:
dfr = vaex.open(sentence_data)[:100000][headers]

In [76]:
dfr

#,SENTENCE_ID,PMID,TYPE,NUMBER,SENT_START_INDEX,SENTENCE,SENT_END_INDEX
0,6,16530473,ti,1,21,'Fluoride-selective colorimetric sensor based on...,119
1,7,16530473,ab,1,125,"'A structurally simple colorimetric sensor, N-4-...",302
2,8,16530473,ab,2,302,"'In acetonitrile, the addition of F(-) changed 1...",385
3,9,16530473,ab,3,385,'In the presence of other anions such as CH(3)CO...,578
4,10,16530473,ab,4,578,'The association constants of anionic complexes ...,757
...,...,...,...,...,...,...,...
99995,112487,16550014,ab,2,265,'Positive sentinel lymph node (SLN) biopsy neces...,372
99996,112488,16550014,ab,3,372,'Failure to visualize a sentinel lymph node in r...,567
99997,112489,16550014,ab,4,567,'The reason for failure to visualize the sentine...,756
99998,112490,16550014,ab,5,756,'Alternative pathways for the drainage of lymph ...,852


In [77]:
pmm = list(pmids)

In [78]:
ab = dfr.PMID.isin(pmm)

In [79]:
dfr = dfr[ab]

In [98]:
dfr.shape[0]

20037

In [86]:
recs = dfr.to_records(chunk_size = 100)

In [87]:
for record in recs:
    pass

In [89]:
type(record[2][0]['SENT_START_INDEX'])

str

In [102]:
df.shape


(227668767, 10)

In [105]:
38129436/100000

381.29436

In [104]:
227668767-38129436

189539331

In [90]:
bulk_data=[]

In [91]:
for data_dict in record[2]:
    # logger.info(row)
    data_dict["SENT_START_INDEX"] = int(data_dict["SENT_START_INDEX"])
    data_dict["SENT_END_INDEX"] = int(data_dict["SENT_END_INDEX"])
    op_dict = {
        "_index": index_name,
        # "_id": l[0],
        # "_op_type": "create",
        "_type": "_doc",
        "_source": data_dict,
    }
    # check for bad entries
    nan_check = True
    for i in data_dict:
        if not data_dict[i]:
            nan_check = False
    if nan_check == True:
        bulk_data.append(op_dict)


In [95]:
record[0:2]

(20000, 20037)

In [92]:
type(record[2][0]['SENT_START_INDEX'])

int

In [73]:
data_dict

{'SENTENCE_ID': 4564564564,
 'PMID': '16550001',
 'TYPE': 'ab',
 'NUMBER': '10',
 'SENT_START_INDEX': 1314,
 'SENTENCE': 'Overexpression of the 67-kDa laminin receptor led to an increased binding of the cells to PCK3145.'}

In [70]:
record[2][0]['SENTENCE_ID'] = 4564564564

In [5]:
predicate_data = config.semmed_predicate_data
concept_data=config.semmed_concept_data
index_name =config.semmed_predicate_index

In [6]:
concept_data

'/home/ubuntu/data/semmed_raw/semmedVER43_2021_R_GENERIC_CONCEPT.csv.gz'

In [7]:
df = vaex.from_csv(predicate_data, encoding="ISO-8859-1", convert=True, dtype='object',chunk_size=1_000_000)

In [8]:
predicate_data

'/home/ubuntu/data/semmed_raw/semmedVER43_2021_R_PREDICATION_2.csv.gz'

In [9]:
df

#,PREDICATION_ID,SENTENCE_ID,PMID,PREDICATE,SUBJECT_CUI,SUBJECT_NAME,SUBJECT_SEMTYPE,SUBJECT_NOVELTY,OBJECT_CUI,OBJECT_NAME,OBJECT_SEMTYPE,OBJECT_NOVELTY,x,y,z
0,10592604,16,16530475,PROCESS_OF,C0003725,Arboviruses,virs,1,C0999630,Lepus capensis,mamm,1,\N,\N,\N
1,10592697,17,16530475,ISA,C0039258,Tahyna virus,virs,1,C0446169,California Group Viruses,virs,1,\N,\N,\N
2,10592728,17,16530475,ISA,C0318627,Eyach virus,virs,1,C0206590,Coltivirus,virs,1,\N,\N,\N
3,10592759,17,16530475,ISA,C0446169,California Group Viruses,virs,1,C0003725,Arboviruses,virs,1,\N,\N,\N
4,10592832,18,16530475,PROCESS_OF,C0012634,Disease,dsyn,0,C0020114,Human,humn,0,\N,\N,\N
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
112796181,194462837,369458366,34146390,CAUSES,C3642279|84557,'Microtubule-Associated Proteins 1A/1B Light Cha...,gngm,1,C0004391,Autophagy,celf,1,\N,\N,\N
112796182,194462838,369458367,34146390,CAUSES,C0036117,Salmonella infections,dsyn,1,C0009319,Colitis,dsyn,1,\N,\N,\N
112796183,194462839,369458367,34146390,LOCATION_OF,C0015733,Feces,bdsu,1,C0061202,genistein,bacs,1,\N,\N,\N
112796184,194462840,369458368,34146390,DISRUPTS,C0009319,Colitis,dsyn,1,C0506994,Goblet Cells,cell,1,\N,\N,\N


In [38]:
index_name = config.semmed_predicate_index
batch_size = 100000
url = (
    "http://"
    + config.elastic_host
    + ":"
    + config.elastic_port
    + "/"
    + index_name
    + "/_search/"
)
headers = {"Content-Type": "application/json"}


In [39]:
index_name

'predicate_index'

In [40]:
semmed_type = "semmeddb_triple"
if semmed_type == "semmeddb_triple":
    source = "SUB_PRED_OBJ"

In [41]:
payload = {
        "aggs": {
            "my_buckets": {
                "composite": {
                    "size": batch_size,
                    "sources": [{semmed_type: {"terms": {"field": source}}}],
                }
            }
        }
    }

In [42]:
payload

{'aggs': {'my_buckets': {'composite': {'size': 100000,
    'sources': [{'semmeddb_triple': {'terms': {'field': 'SUB_PRED_OBJ'}}}]}}}}

In [43]:
url

'http://localhost:9200/predicate_index/_search/'

In [44]:
import requests

In [45]:
r = requests.post(url, json=payload, headers=headers)

In [46]:
res = r.json()

In [48]:
res

{'error': {'root_cause': [{'type': 'too_many_buckets_exception',
    'reason': 'Trying to create too many buckets. Must be less than or equal to: [10000] but was [100000]. This limit can be set by changing the [search.max_buckets] cluster level setting.',
    'max_buckets': 10000}],
  'type': 'search_phase_execution_exception',
  'reason': 'all shards failed',
  'phase': 'query',
  'grouped': True,
  'failed_shards': [{'shard': 0,
    'index': 'predicate_index',
    'node': '1RpYqzjcSweLVR_lSkBKCg',
    'reason': {'type': 'too_many_buckets_exception',
     'reason': 'Trying to create too many buckets. Must be less than or equal to: [10000] but was [100000]. This limit can be set by changing the [search.max_buckets] cluster level setting.',
     'max_buckets': 10000}}]},
 'status': 503}

In [47]:
predicate_data="data/freqs/predicate_index/semmeddb_triple_freqs.txt"

KeyError: 'aggregations'