forked from ourresearch/oadoi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
update_crossref_in_elastic.py
189 lines (147 loc) · 5.03 KB
/
update_crossref_in_elastic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import os
import sickle
import boto
import datetime
import requests
from time import sleep
from time import time
from util import elapsed
import logging
import zlib
import re
import json
import sys
import random
import argparse
from elasticsearch import Elasticsearch, RequestsHttpConnection, compat, exceptions
from elasticsearch.helpers import parallel_bulk
from elasticsearch.helpers import bulk
from elasticsearch.helpers import scan
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import Pool
from HTMLParser import HTMLParser
import oa_local
from oa_base import get_urls_from_our_base_doc
from publication import call_targets_in_parallel
from webpage import WebpageInUnknownRepo
from util import JSONSerializerPython2
# set up elasticsearch
INDEX_NAME = "crossref"
TYPE_NAME = "crosserf_api" #was typo on insert, so still running with it
libraries_to_mum = [
"requests.packages.urllib3",
"requests_oauthlib",
"stripe",
"oauthlib",
"boto",
"newrelic",
"RateLimiter",
"elasticsearch",
"urllib3"
]
for a_library in libraries_to_mum:
the_logger = logging.getLogger(a_library)
the_logger.setLevel(logging.WARNING)
the_logger.propagate = True
class MissingTagException(Exception):
pass
def set_up_elastic(url):
if not url:
url = os.getenv("CROSSREF_ES_URL")
es = Elasticsearch(url,
serializer=JSONSerializerPython2(),
retry_on_timeout=True,
max_retries=100)
return es
def save_records_in_es(es, records_to_save, threads, chunk_size):
start_time = time()
# have to do call parallel_bulk in a for loop because is parallel_bulk is a generator so you have to call it to
# have it do the work. see https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498
if threads > 1:
for success, info in parallel_bulk(es,
actions=records_to_save,
refresh=False,
request_timeout=60,
thread_count=threads,
chunk_size=chunk_size):
if not success:
print('A document failed:', info)
else:
for success_info in bulk(es, actions=records_to_save, refresh=False, request_timeout=60, chunk_size=chunk_size):
pass
print u"done sending {} records to elastic in {}s".format(len(records_to_save), elapsed(start_time, 4))
print u"most recent record: {}".format(records_to_save[0])
# remaning: 84,778,593 ata 9:20pm sunday
# 84,474,078 at 9:28pm
# 82,890,743 at 9:55pm
query = {
"_source": [
"title",
"id"
],
"size": 1000,
"from": int(random.random()*8999),
"query": {
"bool": {
"must_not": {
"exists": {
"field": "random"
}
}
}
}
}
class CrossrefResult(object):
def __init__(self, id, doc):
self.id = id
self.doc = doc
def make_action_record(self):
update_doc = {
"random": random.random()
}
action = {"doc": update_doc}
action["_id"] = self.id
action['_op_type'] = 'update'
action["_type"] = TYPE_NAME
action['_index'] = INDEX_NAME
# print "\n", action
return action
def do_a_loop(first=None, last=None, url=None, threads=0, chunk_size=None):
es = set_up_elastic(url)
loop_start = time()
results = es.search(index=INDEX_NAME, body=query, request_timeout=10000)
# print u"search body:\n{}".format(query)
print u"took {}s to search ES. remaining: {:,}".format(
elapsed(loop_start, 2), results["hits"]["total"])
records_to_save = []
# decide if should stop looping after this
if not results['hits']['hits']:
sys.exit()
crossref_results = []
for crossref_hit in results['hits']['hits']:
crossref_hit_doc = crossref_hit["_source"]
crossref_results.append(CrossrefResult(crossref_hit["_id"], crossref_hit_doc))
for crossref_result in crossref_results:
records_to_save.append(crossref_result.make_action_record())
# print "records_to_save", records_to_save
print "starting saving"
save_records_in_es(es, records_to_save, threads, chunk_size)
print "** {}s to do {}\n".format(elapsed(loop_start, 2), len(crossref_results))
def update_everything():
has_more_records = True
while has_more_records:
pool_time = time()
my_process = Process(target=do_a_loop)
my_process.daemon = True
my_process.start()
my_process.join()
my_process.terminate()
print u"waited {}s for do_a_loop".format(elapsed(pool_time, 2))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run stuff.")
# just for updating lots
function = update_everything
parsed = parser.parse_args()
print u"calling {} with these args: {}".format(function.__name__, vars(parsed))
function(**vars(parsed))