forked from polyneme/openalex-terminusdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
insert.py
104 lines (93 loc) · 3.93 KB
/
insert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env python3
import argparse
import subprocess
import threading
import os
import json
import sys
import time
from multiprocessing import Pool
TERMINUSDB_COMMAND = 'terminusdb'
def prefix_number(number):
str_number = str(number)
if number < 10:
return "00" + str_number
elif number < 100:
return "0" + str_number
return str_number
def init_db(schema, threads):
for x in range(0, threads):
number = prefix_number(x)
try:
subprocess.run(f'{TERMINUSDB_COMMAND} db create admin/openalex_{number}', shell=True)
except:
pass
subprocess.run(f"{TERMINUSDB_COMMAND} db create admin/openalex", shell=True)
def add_types(filename):
with open(filename, 'r') as f:
with open('converted.json', 'w') as f2:
lines = f.readlines()
for line in lines:
parsed = json.loads(line)
parsed['@type'] = 'Author'
json.dump(parsed, f2)
f2.write("\n")
def split_json(threads, filename):
subprocess.run(f"split -n l/{threads} -d -a 3 {filename} openalex_split", shell=True)
def apply_triples(threads):
print("APPLYING TRIPPLES")
for x in range(0, threads):
number = prefix_number(x)
db_name = f"openalex_{number}"
db = f'admin/{db_name}'
start = time.time()
subprocess.run(f'{TERMINUSDB_COMMAND} triples load admin/openalex/local/branch/main/instance {db_name}.triples', shell=True)
subprocess.run(f'sudo docker exec -i terminusdb /bin/bash -c \'rm -rf {db_name}.triples\'', shell=True)
def ingest_json(args):
start = time.time()
filename = args[0]
number = args[1]
schema = args[2]
db_name = f"openalex_{number}"
db = f'admin/{db_name}'
with open(f"log/{db_name}.log", 'w') as f:
subprocess.run(f"{TERMINUSDB_COMMAND} doc insert {db} -g schema --full-replace < {schema}", shell=True, stdout=f, stderr=f)
subprocess.run(f'{TERMINUSDB_COMMAND} doc insert {db} -s < {filename}', shell=True, stdout=f, stderr=f)
end_insert = time.time() - start
f.write(f"\n\nEND TIME: {end_insert}\n")
# print(f"THREAD {number} finished inserting in: {end_insert} seconds")
# start = time.time()
# subprocess.run(f'sudo docker exec -i terminusdb /bin/bash -c \'./terminusdb triples dump {db}/local/branch/main/instance > {db_name}.triples\'', shell=True)
# end_triples = time.time() - start
# print(f"THREAD {number} dumped triples in: {end_triples} seconds")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("file", type=os.path.abspath)
parser.add_argument("schema", type=os.path.abspath)
parser.add_argument("split", type=int)
parser.add_argument("threads", type=int)
args = parser.parse_args()
# add_types(args.file)
init_db(args.schema, args.split)
split_json(args.split, args.file)
threads = []
args_process = [('openalex_split' + prefix_number(x), prefix_number(x), args.schema) for x in range(0, args.split)]
with Pool(args.threads) as p:
p.map(ingest_json, args_process)
print("FINISHED")
dbs = ""
for x in range(0,args.split):
dbs += f"admin/openalex_{x:03d}\n"
print("Merging database")
reset_output =subprocess.run(f'{TERMINUSDB_COMMAND} concat admin/openalex -j', text=True, shell=True, input=dbs, capture_output=True)
print(reset_output)
reset_id = reset_output.stdout.split('\n')[0]
print(reset_id)
subprocess.run(f'{TERMINUSDB_COMMAND} reset admin/openalex {reset_id}', text=True, shell=True)
print("Loading schema")
# subprocess.run(f'{TERMINUSDB_COMMAND} doc insert -f -g schema admin/openalex < ./new_schema.json', text=True, shell=True)
# subprocess.run(f'{TERMINUSDB_COMMAND} db update admin/openalex --schema=true', text=True, shell=True)
# apply_triples(args.threads)
# TODO: We have to squash all the different data products into one
if __name__ == '__main__':
main()