/
dataframe_writer.py
328 lines (286 loc) · 14.4 KB
/
dataframe_writer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#
from typing import Dict, List, Literal, Optional, Union, overload
import snowflake.snowpark # for forward references of type hints
from snowflake.snowpark._internal.analyzer.snowflake_plan_node import (
CopyIntoLocationNode,
SaveMode,
SnowflakeCreateTable,
)
from snowflake.snowpark._internal.telemetry import (
add_api_call,
dfw_collect_api_telemetry,
)
from snowflake.snowpark._internal.type_utils import ColumnOrSqlExpr
from snowflake.snowpark._internal.utils import (
SUPPORTED_TABLE_TYPES,
normalize_remote_file_or_dir,
str_to_enum,
validate_object_name,
warning,
)
from snowflake.snowpark.async_job import AsyncJob, _AsyncResultType
from snowflake.snowpark.column import Column
from snowflake.snowpark.functions import sql_expr
from snowflake.snowpark.row import Row
# Python 3.8 needs to use typing.Iterable because collections.abc.Iterable is not subscriptable
# Python 3.9 can use both
# Python 3.10 needs to use collections.abc.Iterable because typing.Iterable is removed
try:
from typing import Iterable
except ImportError:
from collections.abc import Iterable
class DataFrameWriter:
"""Provides methods for writing data from a :class:`DataFrame` to supported output destinations.
To use this object:
1. Create an instance of a :class:`DataFrameWriter` by accessing the :attr:`DataFrame.write` property.
2. (Optional) Specify the save mode by calling :meth:`mode`, which returns the same
:class:`DataFrameWriter` that is configured to save data using the specified mode.
The default mode is "errorifexists".
3. Call :meth:`save_as_table` or :meth:`copy_into_location` to save the data to the
specified destination.
"""
def __init__(self, dataframe: "snowflake.snowpark.dataframe.DataFrame") -> None:
self._dataframe = dataframe
self._save_mode = SaveMode.ERROR_IF_EXISTS
def mode(self, save_mode: str) -> "DataFrameWriter":
"""Set the save mode of this :class:`DataFrameWriter`.
Args:
save_mode: One of the following strings.
"append": Append data of this DataFrame to existing data.
"overwrite": Overwrite existing data.
"errorifexists": Throw an exception if data already exists.
"ignore": Ignore this operation if data already exists.
Default value is "errorifexists".
Returns:
The :class:`DataFrameWriter` itself.
"""
self._save_mode = str_to_enum(save_mode.lower(), SaveMode, "`save_mode`")
return self
@overload
def save_as_table(
self,
table_name: Union[str, Iterable[str]],
*,
mode: Optional[str] = None,
column_order: str = "index",
create_temp_table: bool = False,
table_type: Literal["", "temp", "temporary", "transient"] = "",
statement_params: Optional[Dict[str, str]] = None,
block: bool = True,
) -> None:
... # pragma: no cover
@overload
def save_as_table(
self,
table_name: Union[str, Iterable[str]],
*,
mode: Optional[str] = None,
column_order: str = "index",
create_temp_table: bool = False,
table_type: Literal["", "temp", "temporary", "transient"] = "",
statement_params: Optional[Dict[str, str]] = None,
block: bool = False,
) -> AsyncJob:
... # pragma: no cover
@dfw_collect_api_telemetry
def save_as_table(
self,
table_name: Union[str, Iterable[str]],
*,
mode: Optional[str] = None,
column_order: str = "index",
create_temp_table: bool = False,
table_type: Literal["", "temp", "temporary", "transient"] = "",
statement_params: Optional[Dict[str, str]] = None,
block: bool = True,
) -> Optional[AsyncJob]:
"""Writes the data to the specified table in a Snowflake database.
Args:
table_name: A string or list of strings that specify the table name or fully-qualified object identifier
(database name, schema name, and table name).
mode: One of the following values. When it's ``None`` or not provided,
the save mode set by :meth:`mode` is used.
"append": Append data of this DataFrame to existing data.
"overwrite": Overwrite existing data.
"errorifexists": Throw an exception if data already exists.
"ignore": Ignore this operation if data already exists.
column_order: When ``mode`` is "append", data will be inserted into the target table by matching column sequence or column name. Default is "index". When ``mode`` is not "append", the ``column_order`` makes no difference.
"index": Data will be inserted into the target table by column sequence.
"name": Data will be inserted into the target table by matching column names. If the target table has more columns than the source DataFrame, use this one.
create_temp_table: (Deprecated) The to-be-created table will be temporary if this is set to ``True``.
table_type: The table type of table to be created. The supported values are: ``temp``, ``temporary``,
and ``transient``. An empty string means to create a permanent table. Learn more about table
types `here <https://docs.snowflake.com/en/user-guide/tables-temp-transient.html>`_.
statement_params: Dictionary of statement level parameters to be set while executing this action.
block: (Experimental) A bool value indicating whether this function will wait until the result is available.
When it is ``False``, this function executes the underlying queries of the dataframe
asynchronously and returns an :class:`AsyncJob`.
Examples::
>>> df = session.create_dataframe([[1,2],[3,4]], schema=["a", "b"])
>>> df.write.mode("overwrite").save_as_table("my_table", table_type="temporary")
>>> session.table("my_table").collect()
[Row(A=1, B=2), Row(A=3, B=4)]
>>> df.write.save_as_table("my_table", mode="append", table_type="temporary")
>>> session.table("my_table").collect()
[Row(A=1, B=2), Row(A=3, B=4), Row(A=1, B=2), Row(A=3, B=4)]
>>> df.write.mode("overwrite").save_as_table("my_transient_table", table_type="transient")
>>> session.table("my_transient_table").collect()
[Row(A=1, B=2), Row(A=3, B=4)]
"""
if not block:
warning(
"save_as_table.block",
"block argument is experimental. Do not use it in production.",
)
save_mode = (
str_to_enum(mode.lower(), SaveMode, "'mode'") if mode else self._save_mode
)
full_table_name = (
table_name if isinstance(table_name, str) else ".".join(table_name)
)
validate_object_name(full_table_name)
if column_order is None or column_order.lower() not in ("name", "index"):
raise ValueError("'column_order' must be either 'name' or 'index'")
column_names = (
self._dataframe.columns if column_order.lower() == "name" else None
)
if create_temp_table:
warning(
"save_as_table.create_temp_table",
"create_temp_table is deprecated. We still respect this parameter when it is True but "
'please consider using `table_type="temporary"` instead.',
)
table_type = "temporary"
if table_type and table_type.lower() not in SUPPORTED_TABLE_TYPES:
raise ValueError(
f"Unsupported table type. Expected table types: {SUPPORTED_TABLE_TYPES}"
)
create_table_logic_plan = SnowflakeCreateTable(
full_table_name,
column_names,
save_mode,
self._dataframe._plan,
table_type,
)
session = self._dataframe._session
snowflake_plan = session._analyzer.resolve(create_table_logic_plan)
result = session._conn.execute(
snowflake_plan,
_statement_params=statement_params,
block=block,
data_type=_AsyncResultType.NO_RESULT,
)
return result if not block else None
@overload
def copy_into_location(
self,
location: str,
*,
partition_by: Optional[ColumnOrSqlExpr] = None,
file_format_name: Optional[str] = None,
file_format_type: Optional[str] = None,
format_type_options: Optional[Dict[str, str]] = None,
header: bool = False,
statement_params: Optional[Dict[str, str]] = None,
block: bool = True,
**copy_options: Optional[str],
) -> List[Row]:
... # pragma: no cover
@overload
def copy_into_location(
self,
location: str,
*,
partition_by: Optional[ColumnOrSqlExpr] = None,
file_format_name: Optional[str] = None,
file_format_type: Optional[str] = None,
format_type_options: Optional[Dict[str, str]] = None,
header: bool = False,
statement_params: Optional[Dict[str, str]] = None,
block: bool = False,
**copy_options: Optional[str],
) -> AsyncJob:
... # pragma: no cover
def copy_into_location(
self,
location: str,
*,
partition_by: Optional[ColumnOrSqlExpr] = None,
file_format_name: Optional[str] = None,
file_format_type: Optional[str] = None,
format_type_options: Optional[Dict[str, str]] = None,
header: bool = False,
statement_params: Optional[Dict[str, str]] = None,
block: bool = True,
**copy_options: Optional[str],
) -> Union[List[Row], AsyncJob]:
"""Executes a `COPY INTO <location> <https://docs.snowflake.com/en/sql-reference/sql/copy-into-location.html>`__ to unload data from a ``DataFrame`` into one or more files in a stage or external stage.
Args:
location: The destination stage location.
partition_by: Specifies an expression used to partition the unloaded table rows into separate files. It can be a :class:`Column`, a column name, or a SQL expression.
file_format_name: Specifies an existing named file format to use for unloading data from the table. The named file format determines the format type (CSV, JSON, PARQUET), as well as any other format options, for the data files.
file_format_type: Specifies the type of files unloaded from the table. If a format type is specified, additional format-specific options can be specified in ``format_type_options``.
format_type_options: Depending on the ``file_format_type`` specified, you can include more format specific options. Use the options documented in the `Format Type Options <https://docs.snowflake.com/en/sql-reference/sql/copy-into-location.html#format-type-options-formattypeoptions>`__.
header: Specifies whether to include the table column headings in the output files.
statement_params: Dictionary of statement level parameters to be set while executing this action.
copy_options: The kwargs that are used to specify the copy options. Use the options documented in the `Copy Options <https://docs.snowflake.com/en/sql-reference/sql/copy-into-location.html#copy-options-copyoptions>`__.
block: (Experimental) A bool value indicating whether this function will wait until the result is available.
When it is ``False``, this function executes the underlying queries of the dataframe
asynchronously and returns an :class:`AsyncJob`.
Returns:
A list of :class:`Row` objects containing unloading results.
Example::
>>> # save this dataframe to a parquet file on the session stage
>>> df = session.create_dataframe([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
>>> remote_file_path = f"{session.get_session_stage()}/names.parquet"
>>> copy_result = df.write.copy_into_location(remote_file_path, file_format_type="parquet", header=True, overwrite=True, single=True)
>>> copy_result[0].rows_unloaded
3
>>> # the following code snippet just verifies the file content and is actually irrelevant to Snowpark
>>> # download this file and read it using pyarrow
>>> import os
>>> import tempfile
>>> import pyarrow.parquet as pq
>>> with tempfile.TemporaryDirectory() as tmpdirname:
... _ = session.file.get(remote_file_path, tmpdirname)
... pq.read_table(os.path.join(tmpdirname, "names.parquet"))
pyarrow.Table
FIRST_NAME: string not null
LAST_NAME: string not null
----
FIRST_NAME: [["John","Rick","Anthony"]]
LAST_NAME: [["Berry","Berry","Davis"]]
"""
if not block:
warning(
"copy_into_location.block",
"block argument is experimental. Do not use it in production.",
)
stage_location = normalize_remote_file_or_dir(location)
if isinstance(partition_by, str):
partition_by = sql_expr(partition_by)._expression
elif isinstance(partition_by, Column):
partition_by = partition_by._expression
elif partition_by is not None:
raise TypeError( # pragma: no cover
f"'partition_by' is expected to be a column name, a Column object, or a sql expression. Got type {type(partition_by)}"
)
df = self._dataframe._with_plan(
CopyIntoLocationNode(
self._dataframe._plan,
stage_location,
partition_by=partition_by,
file_format_name=file_format_name,
file_format_type=file_format_type,
format_type_options=format_type_options,
copy_options=copy_options,
header=header,
)
)
add_api_call(df, "DataFrameWriter.copy_into_location")
return df._internal_collect_with_tag(
statement_params=statement_params, block=block
)
saveAsTable = save_as_table