forked from apache/airflow
/
mysql_to_gcs.py
138 lines (123 loc) · 5.26 KB
/
mysql_to_gcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""MySQL to GCS operator."""
import base64
from datetime import date, datetime, time, timedelta
from decimal import Decimal
from typing import Dict
from MySQLdb.constants import FIELD_TYPE
from airflow.providers.google.cloud.transfers.sql_to_gcs import BaseSQLToGCSOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.utils.decorators import apply_defaults
class MySQLToGCSOperator(BaseSQLToGCSOperator):
"""Copy data from MySQL to Google Cloud Storage in JSON or CSV format.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:MySQLToGCSOperator`
:param mysql_conn_id: Reference to a specific MySQL hook.
:type mysql_conn_id: str
:param ensure_utc: Ensure TIMESTAMP columns exported as UTC. If set to
`False`, TIMESTAMP columns will be exported using the MySQL server's
default timezone.
:type ensure_utc: bool
"""
ui_color = '#a0e08c'
type_map = {
FIELD_TYPE.BIT: 'INTEGER',
FIELD_TYPE.DATETIME: 'TIMESTAMP',
FIELD_TYPE.DATE: 'TIMESTAMP',
FIELD_TYPE.DECIMAL: 'FLOAT',
FIELD_TYPE.NEWDECIMAL: 'FLOAT',
FIELD_TYPE.DOUBLE: 'FLOAT',
FIELD_TYPE.FLOAT: 'FLOAT',
FIELD_TYPE.INT24: 'INTEGER',
FIELD_TYPE.LONG: 'INTEGER',
FIELD_TYPE.LONGLONG: 'INTEGER',
FIELD_TYPE.SHORT: 'INTEGER',
FIELD_TYPE.TIME: 'TIME',
FIELD_TYPE.TIMESTAMP: 'TIMESTAMP',
FIELD_TYPE.TINY: 'INTEGER',
FIELD_TYPE.YEAR: 'INTEGER',
}
@apply_defaults
def __init__(self, *, mysql_conn_id='mysql_default', ensure_utc=False, **kwargs):
super().__init__(**kwargs)
self.mysql_conn_id = mysql_conn_id
self.ensure_utc = ensure_utc
def query(self):
"""Queries mysql and returns a cursor to the results."""
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
conn = mysql.get_conn()
cursor = conn.cursor()
if self.ensure_utc:
# Ensure TIMESTAMP results are in UTC
tz_query = "SET time_zone = '+00:00'"
self.log.info('Executing: %s', tz_query)
cursor.execute(tz_query)
self.log.info('Executing: %s', self.sql)
cursor.execute(self.sql)
return cursor
def field_to_bigquery(self, field) -> Dict[str, str]:
field_type = self.type_map.get(field[1], "STRING")
# Always allow TIMESTAMP to be nullable. MySQLdb returns None types
# for required fields because some MySQL timestamps can't be
# represented by Python's datetime (e.g. 0000-00-00 00:00:00).
field_mode = "NULLABLE" if field[6] or field_type == "TIMESTAMP" else "REQUIRED"
return {
'name': field[0],
'type': field_type,
'mode': field_mode,
}
def convert_type(self, value, schema_type: str):
"""
Takes a value from MySQLdb, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery.
* Datetimes are converted to `str(value)` (`datetime.isoformat(' ')`)
strings.
* Times are converted to `str((datetime.min + value).time())` strings.
* Decimals are converted to floats.
* Dates are converted to ISO formatted strings if given schema_type is
DATE, or `datetime.isoformat(' ')` strings otherwise.
* Binary type fields are converted to integer if given schema_type is
INTEGER, or encoded with base64 otherwise. Imported BYTES data must
be base64-encoded according to BigQuery documentation:
https://cloud.google.com/bigquery/data-types
:param value: MySQLdb column value
:type value: Any
:param schema_type: BigQuery data type
:type schema_type: str
"""
if value is None:
return value
if isinstance(value, datetime):
value = str(value)
elif isinstance(value, timedelta):
value = str((datetime.min + value).time())
elif isinstance(value, Decimal):
value = float(value)
elif isinstance(value, date):
if schema_type == "DATE":
value = value.isoformat()
else:
value = str(datetime.combine(value, time.min))
elif isinstance(value, bytes):
if schema_type == "INTEGER":
value = int.from_bytes(value, "big")
else:
value = base64.standard_b64encode(value).decode('ascii')
return value