-
Notifications
You must be signed in to change notification settings - Fork 0
/
es-exporter.py
executable file
·128 lines (105 loc) · 4.83 KB
/
es-exporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python3
import argparse
import asyncio
import json
import subprocess
import sys
from typing import AsyncIterator
from elastic_transport import NodeConfig
# noinspection PyProtectedMember
from elastic_transport._models import DEFAULT
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_scan
async def process_query(query_file: str, es: AsyncElasticsearch, chunk_size: int, index: str) -> AsyncIterator[dict]:
if query_file == '-':
query_source = sys.stdin
else:
query_source = open(query_file, 'r')
try:
query = json.load(query_source)
async for result in async_scan(
es,
query=query,
scroll='20m',
size=chunk_size,
index=index
):
yield result
finally:
if query_file != '-':
query_source.close()
async def post_process_document(document: dict, command: str, encoding: str) -> str:
proc = await asyncio.create_subprocess_shell(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE
)
input_json = json.dumps(document)
proc.stdin.write(input_json.encode(encoding))
proc.stdin.close()
output_bytes = await proc.stdout.read()
await proc.wait()
output = output_bytes.decode(encoding).strip()
return output
async def process_results(results: AsyncIterator[dict], post_process: str, encoding: str, output_file: str,
full: bool) -> None:
output_target = sys.stdout if output_file is None else open(output_file, 'w', encoding=encoding)
try:
async for result in results:
processed_result = result if full else result['_source']
if post_process:
processed_output = await post_process_document(processed_result, post_process, encoding)
else:
processed_output = json.dumps(processed_result, ensure_ascii=False)
if processed_output:
print(processed_output, file=output_target)
finally:
if output_file is not None:
output_target.close()
async def main(query_file: str, post_process: str, output_file: str, full: bool, encoding: str,
scheme: str, host: str, port: int, username: str, password: str, ca_cert: str,
chunk_size: int, index: str = None) -> None:
if username != 'elastic' and password is not None:
raise ValueError("Username and password must be provided together.")
if ca_cert is not None and scheme != 'https':
raise ValueError("CA certificate can only be used with HTTPS.")
es = AsyncElasticsearch(
hosts=[NodeConfig(scheme=scheme, host=host, port=port)],
basic_auth=(username, password) if username and password else None,
ca_certs=ca_cert if ca_cert is not None else DEFAULT
)
results = process_query(query_file, es, chunk_size, index)
await process_results(results, post_process, encoding, output_file, full)
await es.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Query, process and save data from Elasticsearch')
parser.add_argument('query_file', type=str, help='Path to the query file. Use - for stdin.')
parser.add_argument('--post-process', default=None, type=str,
help='Specify a command for post-processing each document')
parser.add_argument('-o', '--out', default=None, type=str, help='Specify the output file')
parser.add_argument('--full', action='store_true', help='Include the full document')
parser.add_argument('--file-encoding', default='utf-8', help='Specify the encoding for file output')
parser.add_argument('--scheme', default='https', help='Elasticsearch HTTP scheme (default: https)')
parser.add_argument('--host', default='localhost', help='Elasticsearch host (default: localhost)')
parser.add_argument('--port', default=9200, help='Elasticsearch port (default: 9200)')
parser.add_argument('-u', '--username', default='elastic', help='Username for authentication (default: elastic)')
parser.add_argument('-p', '--password', default=None, type=str, help='Password for authentication')
parser.add_argument('--ca-cert', default=None, type=str, help='Path to the CA certificate file')
parser.add_argument('--chunk-size', default=1000, help='Number of documents to process at once (default: 1000)')
parser.add_argument('-i', '--index', default=None, type=str, help='Specify the Elasticsearch index')
args = parser.parse_args()
asyncio.run(main(
args.query_file,
args.post_process,
args.out,
args.full,
args.file_encoding,
args.scheme,
args.host,
args.port,
args.username,
args.password,
args.ca_cert,
args.chunk_size,
args.index
))