/
rdbms.py
96 lines (71 loc) · 3.27 KB
/
rdbms.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
'''
A common module for posgres like databases, such as postgres or redshift
'''
import abc
import logging
import luigi
logger = logging.getLogger('luigi-interface')
class CopyToTable(luigi.Task):
"""
An abstract task for inserting a data set into RDBMS
Usage:
Subclass and override the required `host`, `database`, `user`,
`password`, `table` and `columns` attributes.
"""
@abc.abstractproperty
def host(self):
return None
@abc.abstractproperty
def database(self):
return None
@abc.abstractproperty
def user(self):
return None
@abc.abstractproperty
def password(self):
return None
@abc.abstractproperty
def table(self):
return None
# specify the columns that are to be inserted (same as are returned by columns)
# overload this in subclasses with the either column names of columns to import:
# e.g. ['id', 'username', 'inserted']
# or tuples with column name, postgres column type strings:
# e.g. [('id', 'SERIAL PRIMARY KEY'), ('username', 'VARCHAR(255)'), ('inserted', 'DATETIME')]
columns = []
# options
null_values = (None,) # container of values that should be inserted as NULL values
column_separator = "\t" # how columns are separated in the file copied into postgres
def create_table(self, connection):
""" Override to provide code for creating the target table.
By default it will be created using types (optionally) specified in columns.
If overridden, use the provided connection object for setting up the table in order to
create the table and insert data using the same transaction.
"""
if len(self.columns[0]) == 1:
# only names of columns specified, no types
raise NotImplementedError("create_table() not implemented for %r and columns types not specified" % self.table)
elif len(self.columns[0]) == 2:
# if columns is specified as (name, type) tuples
coldefs = ','.join(
'{name} {type}'.format(name=name, type=type) for name, type in self.columns
)
query = "CREATE TABLE {table} ({coldefs})".format(table=self.table, coldefs=coldefs)
connection.cursor().execute(query)
def update_id(self):
"""This update id will be a unique identifier for this insert on this table."""
return self.task_id
@abc.abstractmethod
def output(self):
raise NotImplementedError("This method must be overridden")
def init_copy(self, connection):
""" Override to perform custom queries.
Any code here will be formed in the same transaction as the main copy, just prior to copying data. Example use cases include truncating the table or removing all data older than X in the database to keep a rolling window of data available in the table.
"""
# TODO: remove this after sufficient time so most people using the
# clear_table attribtue will have noticed it doesn't work anymore
if hasattr(self, "clear_table"):
raise Exception("The clear_table attribute has been removed. Override init_copy instead!")
@abc.abstractmethod
def copy(self, cursor, file):
raise NotImplementedError("This method must be overridden")