This repository has been archived by the owner on Dec 14, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 87
/
copy_from.py
73 lines (51 loc) · 2.17 KB
/
copy_from.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import tempfile
import psycopg2
from psycopg2.extras import DictCursor
from mediawords.db.exceptions.handler import McDatabaseHandlerException
from mediawords.util.log import create_logger
from mediawords.util.perl import decode_object_from_bytes_if_needed
log = create_logger(__name__)
class McCopyFromException(McDatabaseHandlerException):
pass
# FIXME writes everything to a temporary file first, does the actual copying in end()
class CopyFrom(object):
"""COPY FROM helper."""
# Chunk size to COPY FROM
__COPY_CHUNK_SIZE = 100 * 1024
# SQL to run
__sql = None
# Database cursor
__cursor = None
# Temporary file buffer
__temp_file_buffer = None
def __init__(self, cursor: DictCursor, sql: str):
sql = decode_object_from_bytes_if_needed(sql)
self.__start_copy_from(cursor=cursor, sql=sql)
def __start_copy_from(self, cursor: DictCursor, sql: str) -> None:
"""Start COPY FROM."""
sql = decode_object_from_bytes_if_needed(sql)
if sql is None:
raise McDatabaseHandlerException("SQL is None.")
if len(sql) == '':
raise McDatabaseHandlerException("SQL is empty.")
self.__sql = sql
self.__cursor = cursor
self.__temp_file_buffer = tempfile.TemporaryFile(mode='w+', encoding='utf-8')
def put_line(self, line: str) -> None:
"""Write line."""
line = decode_object_from_bytes_if_needed(line)
line = line.rstrip('\n')
try:
self.__temp_file_buffer.write("%s\n" % line)
except Exception as ex:
raise McCopyFromException("Error write writing line '%s': %s" % (line, str(ex)))
def end(self) -> None:
"""Stop writing (and run the actual COPY FROM)."""
try:
self.__temp_file_buffer.seek(0)
self.__cursor.copy_expert(sql=self.__sql, file=self.__temp_file_buffer, size=self.__COPY_CHUNK_SIZE)
self.__temp_file_buffer.close()
except psycopg2.Warning as ex:
log.warning('Warning while running COPY FROM query: %s' % str(ex))
except Exception as ex:
raise McCopyFromException('COPY FROM query failed: %s' % str(ex))