/
table.py
664 lines (580 loc) · 28.7 KB
/
table.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
#!/usr/bin/env python3
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#
from typing import Dict, List, NamedTuple, Optional, Union, overload
import snowflake.snowpark
from snowflake.snowpark._internal.analyzer.binary_plan_node import create_join_type
from snowflake.snowpark._internal.analyzer.select_statement import (
SelectableEntity,
SelectStatement,
)
from snowflake.snowpark._internal.analyzer.snowflake_plan_node import UnresolvedRelation
from snowflake.snowpark._internal.analyzer.table_merge_expression import (
DeleteMergeExpression,
InsertMergeExpression,
TableDelete,
TableMerge,
TableUpdate,
UpdateMergeExpression,
)
from snowflake.snowpark._internal.error_message import SnowparkClientExceptionMessages
from snowflake.snowpark._internal.telemetry import add_api_call, set_api_call_source
from snowflake.snowpark._internal.type_utils import ColumnOrLiteral
from snowflake.snowpark.column import Column
from snowflake.snowpark.dataframe import DataFrame, _disambiguate
from snowflake.snowpark.row import Row
# Python 3.8 needs to use typing.Iterable because collections.abc.Iterable is not subscriptable
# Python 3.9 can use both
# Python 3.10 needs to use collections.abc.Iterable because typing.Iterable is removed
try:
from typing import Iterable
except ImportError:
from collections.abc import Iterable
class UpdateResult(NamedTuple):
"""Result of updating rows in a :class:`Table`."""
rows_updated: int #: The number of rows modified.
multi_joined_rows_updated: int #: The number of multi-joined rows modified.
class DeleteResult(NamedTuple):
"""Result of deleting rows in a :class:`Table`."""
rows_deleted: int #: The number of rows deleted.
class MergeResult(NamedTuple):
"""Result of merging a :class:`DataFrame` into a :class:`Table`."""
rows_inserted: int #: The number of rows inserted.
rows_updated: int #: The number of rows updated.
rows_deleted: int #: The number of rows deleted.
class WhenMatchedClause:
"""
A matched clause for the :meth:`Table.merge` action. It matches all
remaining rows in the target :class:`Table` that satisfy ``join_expr``
while also satisfying ``condition``, if it is provided. You can use
:func:`functions.when_matched` to instantiate this class.
Args:
condition: An optional :class:`Column` object representing the
specified condition.
"""
def __init__(self, condition: Optional[Column] = None) -> None:
self._condition_expr = condition._expression if condition is not None else None
self._clause = None
def update(self, assignments: Dict[str, ColumnOrLiteral]) -> "WhenMatchedClause":
"""
Defines an update action for the matched clause and
returns an updated :class:`WhenMatchedClause` with the new
update action added.
Args:
assignments: A list of values or a ``dict`` that associates
the names of columns with the values that should be updated.
The value of ``assignments`` can either be a literal value or
a :class:`Column` object.
Example::
>>> # Adds a matched clause where a row in source is matched
>>> # if its key is equal to the key of any row in target.
>>> # For all such rows, update its value to the value of the
>>> # corresponding row in source.
>>> from snowflake.snowpark.functions import when_matched, lit
>>> target_df = session.create_dataframe([(10, "old"), (10, "too_old"), (11, "old")], schema=["key", "value"])
>>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
>>> target = session.table("my_table")
>>> source = session.create_dataframe([(10, "new")], schema=["key", "value"])
>>> target.merge(source, (target["key"] == source["key"]) & (target["value"] == lit("too_old")), [when_matched().update({"value": source["value"]})])
MergeResult(rows_inserted=0, rows_updated=1, rows_deleted=0)
>>> target.collect() # the value in the table is updated
[Row(KEY=10, VALUE='old'), Row(KEY=10, VALUE='new'), Row(KEY=11, VALUE='old')]
Note:
An exception will be raised if this method or :meth:`WhenMatchedClause.delete`
is called more than once on the same :class:`WhenMatchedClause` object.
"""
if self._clause:
raise SnowparkClientExceptionMessages.MERGE_TABLE_ACTION_ALREADY_SPECIFIED(
"update"
if isinstance(self._clause, UpdateMergeExpression)
else "delete",
"WhenMatchedClause",
)
self._clause = UpdateMergeExpression(
self._condition_expr,
{Column(k)._expression: Column._to_expr(v) for k, v in assignments.items()},
)
return self
def delete(self):
"""
Defines a delete action for the matched clause and
returns an updated :class:`WhenMatchedClause` with the new
delete action added.
Example::
>>> # Adds a matched clause where a row in source is matched
>>> # if its key is equal to the key of any row in target.
>>> # For all such rows, delete them.
>>> from snowflake.snowpark.functions import when_matched
>>> target_df = session.create_dataframe([(10, "old"), (10, "too_old"), (11, "old")], schema=["key", "value"])
>>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
>>> target = session.table("my_table")
>>> source = session.create_dataframe([(10, "new")], schema=["key", "value"])
>>> target.merge(source, target["key"] == source["key"], [when_matched().delete()])
MergeResult(rows_inserted=0, rows_updated=0, rows_deleted=2)
>>> target.collect() # the rows are deleted
[Row(KEY=11, VALUE='old')]
Note:
An exception will be raised if this method or :meth:`WhenMatchedClause.update`
is called more than once on the same :class:`WhenMatchedClause` object.
"""
if self._clause:
raise SnowparkClientExceptionMessages.MERGE_TABLE_ACTION_ALREADY_SPECIFIED(
"update"
if isinstance(self._clause, UpdateMergeExpression)
else "delete",
"WhenMatchedClause",
)
self._clause = DeleteMergeExpression(self._condition_expr)
return self
class WhenNotMatchedClause:
"""
A not-matched clause for the :meth:`Table.merge` action. It matches all
remaining rows in the target :class:`Table` that do not satisfy ``join_expr``
but satisfy ``condition``, if it is provided. You can use
:func:`functions.when_not_matched` to instantiate this class.
Args:
condition: An optional :class:`Column` object representing the
specified condition.
"""
def __init__(self, condition: Optional[Column] = None) -> None:
self._condition_expr = condition._expression if condition is not None else None
self._clause = None
def insert(
self, assignments: Union[Iterable[ColumnOrLiteral], Dict[str, ColumnOrLiteral]]
) -> "WhenNotMatchedClause":
"""
Defines an insert action for the not-matched clause and
returns an updated :class:`WhenNotMatchedClause` with the new
insert action added.
Args:
assignments: A list of values or a ``dict`` that associates
the names of columns with the values that should be inserted.
The value of ``assignments`` can either be a literal value or
a :class:`Column` object.
Examples::
>>> # Adds a not-matched clause where a row in source is not matched
>>> # if its key does not equal the key of any row in target.
>>> # For all such rows, insert a row into target whose ley and value
>>> # are assigned to the key and value of the not matched row.
>>> from snowflake.snowpark.functions import when_not_matched
>>> target_df = session.create_dataframe([(10, "old"), (10, "too_old"), (11, "old")], schema=["key", "value"])
>>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
>>> target = session.table("my_table")
>>> source = session.create_dataframe([(12, "new")], schema=["key", "value"])
>>> target.merge(source, target["key"] == source["key"], [when_not_matched().insert([source["key"], source["value"]])])
MergeResult(rows_inserted=1, rows_updated=0, rows_deleted=0)
>>> target.collect() # the rows are inserted
[Row(KEY=12, VALUE='new'), Row(KEY=10, VALUE='old'), Row(KEY=10, VALUE='too_old'), Row(KEY=11, VALUE='old')]
>>> # For all such rows, insert a row into target whose key is
>>> # assigned to the key of the not matched row.
>>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
>>> target.merge(source, target["key"] == source["key"], [when_not_matched().insert({"key": source["key"]})])
MergeResult(rows_inserted=1, rows_updated=0, rows_deleted=0)
>>> target.collect() # the rows are inserted
[Row(KEY=12, VALUE=None), Row(KEY=10, VALUE='old'), Row(KEY=10, VALUE='too_old'), Row(KEY=11, VALUE='old')]
Note:
An exception will be raised if this method is called more than once
on the same :class:`WhenNotMatchedClause` object.
"""
if self._clause:
raise SnowparkClientExceptionMessages.MERGE_TABLE_ACTION_ALREADY_SPECIFIED(
"insert", "WhenNotMatchedClause"
)
if isinstance(assignments, dict):
keys = [Column(k)._expression for k in assignments.keys()]
values = [Column._to_expr(v) for v in assignments.values()]
else:
keys = []
values = [Column._to_expr(v) for v in assignments]
self._clause = InsertMergeExpression(self._condition_expr, keys, values)
return self
def _get_update_result(rows: List[Row]) -> UpdateResult:
return UpdateResult(int(rows[0][0]), int(rows[0][1]))
def _get_delete_result(rows: List[Row]) -> DeleteResult:
return DeleteResult(int(rows[0][0]))
def _get_merge_result(
rows: List[Row], inserted: bool, updated: bool, deleted: bool
) -> MergeResult:
idx = 0
rows_inserted, rows_updated, rows_deleted = 0, 0, 0
if inserted:
rows_inserted = int(rows[0][idx])
idx += 1
if updated:
rows_updated = int(rows[0][idx])
idx += 1
if deleted:
rows_deleted = int(rows[0][idx])
return MergeResult(rows_inserted, rows_updated, rows_deleted)
class Table(DataFrame):
"""
Represents a lazily-evaluated Table. It extends :class:`DataFrame` so all
:class:`DataFrame` operations can be applied to it.
You can create a :class:`Table` object by calling :meth:`Session.table`
with the name of the table in Snowflake. See examples in :meth:`Session.table`.
"""
def __init__(
self,
table_name: str,
session: Optional["snowflake.snowpark.session.Session"] = None,
) -> None:
super().__init__(
session, session._analyzer.resolve(UnresolvedRelation(table_name))
)
self.is_cached: bool = self.is_cached #: Whether the table is cached.
self.table_name: str = table_name #: The table name
if self._session.sql_simplifier_enabled:
self._select_statement = SelectStatement(
from_=SelectableEntity(table_name, analyzer=session._analyzer),
analyzer=session._analyzer,
)
# By default, the set the initial API call to say 'Table.__init__' since
# people could instantiate a table directly. This value is overwritten when
# created from Session object
set_api_call_source(self, "Table.__init__")
def __copy__(self) -> "Table":
return Table(self.table_name, self._session)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.drop_table()
def sample(
self,
frac: Optional[float] = None,
n: Optional[int] = None,
*,
seed: Optional[float] = None,
sampling_method: Optional[str] = None,
) -> "DataFrame":
"""Samples rows based on either the number of rows to be returned or a percentage of rows to be returned.
Sampling with a seed is not supported on views or subqueries. This method works on tables so it supports ``seed``.
This is the main difference between :meth:`DataFrame.sample` and this method.
Args:
frac: The percentage of rows to be sampled.
n: The fixed number of rows to sample in the range of 0 to 1,000,000 (inclusive). Either ``frac`` or ``n`` should be provided.
seed: Specifies a seed value to make the sampling deterministic. Can be any integer between 0 and 2147483647 inclusive.
Default value is ``None``.
sampling_method: Specifies the sampling method to use:
- "BERNOULLI" (or "ROW"): Includes each row with a probability of p/100. Similar to flipping a weighted coin for each row.
- "SYSTEM" (or "BLOCK"): Includes each block of rows with a probability of p/100. Similar to flipping a weighted coin for each block of rows. This method does not support fixed-size sampling.
Default is ``None``. Then the Snowflake database will use "ROW" by default.
Note:
- SYSTEM | BLOCK sampling is often faster than BERNOULLI | ROW sampling.
- Sampling without a seed is often faster than sampling with a seed.
- Fixed-size sampling can be slower than equivalent fraction-based sampling because fixed-size sampling prevents some query optimization.
- Fixed-size sampling doesn't work with SYSTEM | BLOCK sampling.
"""
if sampling_method is None and seed is None:
return super().sample(frac=frac, n=n)
DataFrame._validate_sample_input(frac, n)
if sampling_method and sampling_method.upper() not in (
"BERNOULLI",
"ROW",
"SYSTEM",
"BLOCK",
):
raise ValueError(
f"'sampling_method' value {sampling_method} must be None or one of 'BERNOULLI', 'ROW', 'SYSTEM', or 'BLOCK'."
)
# The analyzer will generate a sql with subquery. So we build the sql directly without using the analyzer.
sampling_method_text = sampling_method or ""
frac_or_rowcount_text = str(frac * 100.0) if frac is not None else f"{n} ROWS"
seed_text = f" SEED ({seed})" if seed is not None else ""
sql_text = f"SELECT * FROM {self.table_name} SAMPLE {sampling_method_text} ({frac_or_rowcount_text}) {seed_text}"
return self._session.sql(sql_text)
@overload
def update(
self,
assignments: Dict[str, ColumnOrLiteral],
condition: Optional[Column] = None,
source: Optional[DataFrame] = None,
*,
statement_params: Optional[Dict[str, str]] = None,
block: bool = True,
) -> UpdateResult:
... # pragma: no cover
@overload
def update(
self,
assignments: Dict[str, ColumnOrLiteral],
condition: Optional[Column] = None,
source: Optional[DataFrame] = None,
*,
statement_params: Optional[Dict[str, str]] = None,
block: bool = False,
) -> "snowflake.snowpark.AsyncJob":
... # pragma: no cover
def update(
self,
assignments: Dict[str, ColumnOrLiteral],
condition: Optional[Column] = None,
source: Optional[DataFrame] = None,
*,
statement_params: Optional[Dict[str, str]] = None,
block: bool = True,
) -> Union[UpdateResult, "snowflake.snowpark.AsyncJob"]:
"""
Updates rows in the Table with specified ``assignments`` and returns a
:class:`UpdateResult`, representing the number of rows modified and the
number of multi-joined rows modified.
Args:
assignments: A ``dict`` that associates the names of columns with the
values that should be updated. The value of ``assignments`` can
either be a literal value or a :class:`Column` object.
condition: An optional :class:`Column` object representing the
specified condition. It must be provided if ``source`` is provided.
source: An optional :class:`DataFrame` that is included in ``condition``.
It can also be another :class:`Table`.
statement_params: Dictionary of statement level parameters to be set while executing this action.
block: A bool value indicating whether this function will wait until the result is available.
When it is ``False``, this function executes the underlying queries of the dataframe
asynchronously and returns an :class:`AsyncJob`.
Examples::
>>> target_df = session.create_dataframe([(1, 1),(1, 2),(2, 1),(2, 2),(3, 1),(3, 2)], schema=["a", "b"])
>>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
>>> t = session.table("my_table")
>>> # update all rows in column "b" to 0 and all rows in column "a"
>>> # to the summation of column "a" and column "b"
>>> t.update({"b": 0, "a": t.a + t.b})
UpdateResult(rows_updated=6, multi_joined_rows_updated=0)
>>> t.collect()
[Row(A=2, B=0), Row(A=3, B=0), Row(A=3, B=0), Row(A=4, B=0), Row(A=4, B=0), Row(A=5, B=0)]
>>> # update all rows in column "b" to 0 where column "a" has value 1
>>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
>>> t.update({"b": 0}, t["a"] == 1)
UpdateResult(rows_updated=2, multi_joined_rows_updated=0)
>>> t.collect()
[Row(A=1, B=0), Row(A=1, B=0), Row(A=2, B=1), Row(A=2, B=2), Row(A=3, B=1), Row(A=3, B=2)]
>>> # update all rows in column "b" to 0 where column "a" in this
>>> # table is equal to column "a" in another dataframe
>>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
>>> source_df = session.create_dataframe([1, 2, 3, 4], schema=["a"])
>>> t.update({"b": 0}, t["a"] == source_df.a, source_df)
UpdateResult(rows_updated=6, multi_joined_rows_updated=0)
>>> t.collect()
[Row(A=1, B=0), Row(A=1, B=0), Row(A=2, B=0), Row(A=2, B=0), Row(A=3, B=0), Row(A=3, B=0)]
"""
if source:
assert (
condition is not None
), "condition should also be provided if source is provided"
new_df = self._with_plan(
TableUpdate(
self.table_name,
{
Column(k)._expression: Column._to_expr(v)
for k, v in assignments.items()
},
condition._expression if condition is not None else None,
_disambiguate(self, source, create_join_type("left"), [])[1]._plan
if source
else None,
)
)
add_api_call(new_df, "Table.update")
result = new_df._internal_collect_with_tag(
statement_params=statement_params,
block=block,
data_type=snowflake.snowpark.async_job._AsyncResultType.UPDATE,
)
return _get_update_result(result) if block else result
@overload
def delete(
self,
condition: Optional[Column] = None,
source: Optional[DataFrame] = None,
*,
statement_params: Optional[Dict[str, str]] = None,
block: bool = True,
) -> DeleteResult:
... # pragma: no cover
@overload
def delete(
self,
condition: Optional[Column] = None,
source: Optional[DataFrame] = None,
*,
statement_params: Optional[Dict[str, str]] = None,
block: bool = False,
) -> "snowflake.snowpark.AsyncJob":
... # pragma: no cover
def delete(
self,
condition: Optional[Column] = None,
source: Optional[DataFrame] = None,
*,
statement_params: Optional[Dict[str, str]] = None,
block: bool = True,
) -> Union[DeleteResult, "snowflake.snowpark.AsyncJob"]:
"""
Deletes rows in a Table and returns a :class:`DeleteResult`,
representing the number of rows deleted.
Args:
condition: An optional :class:`Column` object representing the
specified condition. It must be provided if ``source`` is provided.
source: An optional :class:`DataFrame` that is included in ``condition``.
It can also be another :class:`Table`.
statement_params: Dictionary of statement level parameters to be set while executing this action.
block: A bool value indicating whether this function will wait until the result is available.
When it is ``False``, this function executes the underlying queries of the dataframe
asynchronously and returns an :class:`AsyncJob`.
Examples::
>>> target_df = session.create_dataframe([(1, 1),(1, 2),(2, 1),(2, 2),(3, 1),(3, 2)], schema=["a", "b"])
>>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
>>> t = session.table("my_table")
>>> # delete all rows in a table
>>> t.delete()
DeleteResult(rows_deleted=6)
>>> t.collect()
[]
>>> # delete all rows where column "a" has value 1
>>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
>>> t.delete(t["a"] == 1)
DeleteResult(rows_deleted=2)
>>> t.collect()
[Row(A=2, B=1), Row(A=2, B=2), Row(A=3, B=1), Row(A=3, B=2)]
>>> # delete all rows in this table where column "a" in this
>>> # table is equal to column "a" in another dataframe
>>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
>>> source_df = session.create_dataframe([2, 3, 4, 5], schema=["a"])
>>> t.delete(t["a"] == source_df.a, source_df)
DeleteResult(rows_deleted=4)
>>> t.collect()
[Row(A=1, B=1), Row(A=1, B=2)]
"""
if source:
assert (
condition is not None
), "condition should also be provided if source is provided"
new_df = self._with_plan(
TableDelete(
self.table_name,
condition._expression if condition is not None else None,
_disambiguate(self, source, create_join_type("left"), [])[1]._plan
if source
else None,
)
)
add_api_call(new_df, "Table.delete")
result = new_df._internal_collect_with_tag(
statement_params=statement_params,
block=block,
data_type=snowflake.snowpark.async_job._AsyncResultType.DELETE,
)
return _get_delete_result(result) if block else result
@overload
def merge(
self,
source: DataFrame,
join_expr: Column,
clauses: Iterable[Union[WhenMatchedClause, WhenNotMatchedClause]],
*,
statement_params: Optional[Dict[str, str]] = None,
block: bool = True,
) -> MergeResult:
... # pragma: no cover
@overload
def merge(
self,
source: DataFrame,
join_expr: Column,
clauses: Iterable[Union[WhenMatchedClause, WhenNotMatchedClause]],
*,
statement_params: Optional[Dict[str, str]] = None,
block: bool = False,
) -> "snowflake.snowpark.AsyncJob":
... # pragma: no cover
def merge(
self,
source: DataFrame,
join_expr: Column,
clauses: Iterable[Union[WhenMatchedClause, WhenNotMatchedClause]],
*,
statement_params: Optional[Dict[str, str]] = None,
block: bool = True,
) -> Union[MergeResult, "snowflake.snowpark.AsyncJob"]:
"""
Merges this :class:`Table` with :class:`DataFrame` source on the specified
join expression and a list of matched or not-matched clauses, and returns
a :class:`MergeResult`, representing the number of rows inserted,
updated and deleted by this merge action.
See `MERGE <https://docs.snowflake.com/en/sql-reference/sql/merge.html#merge>`_
for details.
Args:
source: A :class:`DataFrame` to join with this :class:`Table`.
It can also be another :class:`Table`.
join_expr: A :class:`Column` object representing the expression on which
to join this :class:`Table` and ``source``.
clauses: A list of matched or not-matched clauses specifying the actions
to perform when the values from this :class:`Table` and ``source``
match or not match on ``join_expr``. These actions can only be instances
of :class:`WhenMatchedClause` and :class:`WhenNotMatchedClause`, and will
be performed sequentially in this list.
statement_params: Dictionary of statement level parameters to be set while executing this action.
block: A bool value indicating whether this function will wait until the result is available.
When it is ``False``, this function executes the underlying queries of the dataframe
asynchronously and returns an :class:`AsyncJob`.
Example::
>>> from snowflake.snowpark.functions import when_matched, when_not_matched
>>> target_df = session.create_dataframe([(10, "old"), (10, "too_old"), (11, "old")], schema=["key", "value"])
>>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
>>> target = session.table("my_table")
>>> source = session.create_dataframe([(10, "new"), (12, "new"), (13, "old")], schema=["key", "value"])
>>> target.merge(source, (target["key"] == source["key"]) & (target["value"] == "too_old"),
... [when_matched().update({"value": source["value"]}), when_not_matched().insert({"key": source["key"]})])
MergeResult(rows_inserted=2, rows_updated=1, rows_deleted=0)
>>> target.collect()
[Row(KEY=12, VALUE=None), Row(KEY=13, VALUE=None), Row(KEY=10, VALUE='old'), Row(KEY=10, VALUE='new'), Row(KEY=11, VALUE='old')]
"""
inserted, updated, deleted = False, False, False
merge_exprs = []
for c in clauses:
if isinstance(c, WhenMatchedClause):
if isinstance(c._clause, UpdateMergeExpression):
updated = True
else:
deleted = True
elif isinstance(c, WhenNotMatchedClause):
inserted = True
else:
raise TypeError(
"clauses only accepts WhenMatchedClause or WhenNotMatchedClause instances"
)
merge_exprs.append(c._clause)
new_df = self._with_plan(
TableMerge(
self.table_name,
_disambiguate(self, source, create_join_type("left"), [])[1]._plan,
join_expr._expression,
merge_exprs,
)
)
add_api_call(new_df, "Table.update")
result = new_df._internal_collect_with_tag(
statement_params=statement_params,
block=block,
data_type=snowflake.snowpark.async_job._AsyncResultType.MERGE,
)
if not block:
result._inserted = inserted
result._updated = updated
result._deleted = deleted
return (
_get_merge_result(
result,
inserted=inserted,
updated=updated,
deleted=deleted,
)
if block
else result
)
def drop_table(self) -> None:
"""Drops the table from the Snowflake database.
Note that subsequent operations such as :meth:`DataFrame.select`, :meth:`DataFrame.collect` on this ``Table`` instance and the derived DataFrame will raise errors because the underlying
table in the Snowflake database no longer exists.
"""
self._session.sql(
f"drop table {self.table_name}"
)._internal_collect_with_tag_no_telemetry()