Skip to content

Commit 9f59b22

Browse files
Added a batch_size parameter to executemany() to let it operate on data
in batches.
1 parent 168c39d commit 9f59b22

22 files changed

+697
-147
lines changed

doc/src/api_manual/async_cursor.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ AsyncCursor Methods
7777

7878
.. automethod:: AsyncCursor.executemany
7979

80+
.. versionchanged:: 3.4.0
81+
82+
The ``batch_size`` parameter was added.
83+
8084
.. versionchanged:: 3.3.0
8185

8286
Added support for passing data frames in the ``parameters`` parameter.

doc/src/api_manual/cursor.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ Cursor Methods
8383

8484
.. automethod:: Cursor.executemany
8585

86+
.. versionchanged:: 3.4.0
87+
88+
The ``batch_size`` parameter was added.
89+
8690
.. versionchanged:: 3.3.0
8791

8892
Added support for passing data frames in the ``parameters`` parameter.

doc/src/release_notes.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ Common Changes
4747
#) Added support for types ``date32`` and ``date64`` when ingesting data
4848
frames supporting the Arrow PyCapsule interface as requested
4949
(`issue 535 <https://github.com/oracle/python-oracledb/issues/535>`__).
50+
#) Added a ``batch_size`` parameter to :meth:`Cursor.executemany()` and
51+
:meth:`AsyncCursor.executemany()` to let these methods operate on data in
52+
batches.
53+
#) Data frames with multiple chunks are now supported.
5054
#) Added ``fetch_lobs`` and ``fetch_decimals`` parameters where applicable to
5155
the methods used for fetching rows or data frames from the database. Note
5256
that for the creation of pipeline operations, if these parameters are not

doc/src/user_guide/batch_statement.rst

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,7 @@ Each tuple value maps to one of the bind variable placeholders.
7070

7171
This code requires only one :ref:`round-trip <roundtrips>` from the client to
7272
the database instead of the five round-trips that would be required for
73-
repeated calls to :meth:`~Cursor.execute()`. For very large data sets, there
74-
may be an external buffer or network limits to how many rows can be processed,
75-
so repeated calls to ``executemany()`` may be required. The limits are based
76-
on both the number of rows being processed as well as the "size" of each row
77-
that is being processed. Repeated calls to :meth:`~Cursor.executemany()` are
78-
still better than repeated calls to :meth:`~Cursor.execute()`.
73+
repeated calls to :meth:`~Cursor.execute()`.
7974

8075
To insert a single column, make sure the bind variables are correctly created
8176
as tuples, for example:
@@ -173,6 +168,38 @@ With named bind variables, use named parameters when calling
173168
values (:pid, :pdesc)""", data)
174169
175170
171+
Batching of Large Datasets
172+
--------------------------
173+
174+
For very large data sets, there may be a buffer or network limit on how many
175+
rows can be processed. The limit is based on both the number of records as
176+
well as the size of each record that is being processed. In other cases, it may
177+
be faster to process smaller sets of records.
178+
179+
To reduce the data sizes involved, you can either make repeated calls to
180+
:meth:`~Cursor.executemany()` as shown later in the CSV examples, or you can
181+
use the ``batch_size`` parameter to optimize transfer across the network to the
182+
database. For example:
183+
184+
.. code-block:: python
185+
186+
data = [
187+
(1, "Parent 1"),
188+
(2, "Parent 2"),
189+
. . .
190+
(9_999_999, "Parent 9,999,999"),
191+
(10_000_000, "Parent 10,000,000"),
192+
193+
]
194+
195+
cursor.executemany("insert into ParentTable values (:1, :2)", data, batch_size=200_000)
196+
197+
This will send the data to the database in batches of 200,000 records until all
198+
10,000,000 records have been inserted.
199+
200+
If :attr:`Connection.autocommit` is ``True``, then a commit will take place per
201+
batch of records processed.
202+
176203
.. _batchplsql:
177204

178205
Batch Execution of PL/SQL
@@ -446,8 +473,8 @@ And the schema:
446473
447474
create table test (id number, name varchar2(25));
448475
449-
Data loading can be done in batches of records since the number of records may
450-
prevent all data being inserted at once:
476+
Data loading can be done in batches of records since Python memory limitations
477+
may prevent all the records being held in memory at once:
451478

452479
.. code-block:: python
453480

src/oracledb/base_impl.pxd

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ from .arrow_impl cimport (
4343
ArrowTimeUnit,
4444
ArrowType,
4545
ArrowArrayImpl,
46-
ArrowSchemaImpl
46+
ArrowSchemaImpl,
47+
DataFrameImpl,
4748
)
4849

4950
cdef enum:
@@ -263,6 +264,30 @@ cdef class DefaultsImpl:
263264
cdef DefaultsImpl C_DEFAULTS
264265

265266

267+
cdef class BatchLoadManager:
268+
cdef:
269+
readonly uint32_t num_rows
270+
readonly uint64_t message_offset
271+
uint64_t offset
272+
BaseCursorImpl cursor_impl
273+
uint32_t batch_size
274+
uint32_t batch_num
275+
object type_handler
276+
object cursor
277+
object conn
278+
279+
cdef int _calculate_num_rows_in_batch(self, uint64_t total_rows) except -1
280+
cdef int _next_batch(self) except -1
281+
cdef int _setup_cursor(self) except -1
282+
@staticmethod
283+
cdef BatchLoadManager create_for_executemany(
284+
object cursor,
285+
BaseCursorImpl cursor_impl,
286+
object parameters,
287+
uint32_t batch_size,
288+
)
289+
290+
266291
cdef class Buffer:
267292
cdef:
268293
ssize_t _max_size, _size, _pos
@@ -689,7 +714,6 @@ cdef class BaseCursorImpl:
689714
object params, uint32_t num_rows,
690715
uint32_t row_num,
691716
bint defer_type_assignment) except -1
692-
cdef int _check_binds(self, uint32_t num_execs) except -1
693717
cdef int _close(self, bint in_del) except -1
694718
cdef BaseVarImpl _create_fetch_var(self, object conn, object cursor,
695719
object type_handler, bint
@@ -706,10 +730,9 @@ cdef class BaseCursorImpl:
706730
cdef int _perform_binds(self, object conn, uint32_t num_execs) except -1
707731
cdef int _prepare(self, str statement, str tag,
708732
bint cache_statement) except -1
709-
cdef int _reset_bind_vars(self, uint32_t num_rows) except -1
733+
cdef int _reset_bind_vars(self, uint64_t array_offset,
734+
uint32_t num_rows) except -1
710735
cdef int _verify_var(self, object var) except -1
711-
cdef object bind_arrow_arrays(self, object cursor, list arrays)
712-
cdef int bind_many(self, object cursor, list parameters) except -1
713736
cdef int bind_one(self, object cursor, object parameters) except -1
714737
cdef object _finish_building_arrow_arrays(self)
715738
cdef int _create_arrow_arrays(self) except -1
@@ -749,7 +772,8 @@ cdef class BaseVarImpl:
749772
cdef DbType _get_adjusted_type(self, uint8_t ora_type_num)
750773
cdef list _get_array_value(self)
751774
cdef object _get_scalar_value(self, uint32_t pos)
752-
cdef int _on_reset_bind(self, uint32_t num_rows) except -1
775+
cdef int _on_reset_bind(self, uint64_t array_offset,
776+
uint32_t num_rows) except -1
753777
cdef int _resize(self, uint32_t new_size) except -1
754778
cdef int _set_metadata_from_type(self, object typ) except -1
755779
cdef int _set_metadata_from_value(self, object value,
@@ -857,9 +881,6 @@ cdef class BindVar:
857881
ssize_t pos
858882
bint has_value
859883

860-
cdef int _create_var_from_arrow_array(self, object conn,
861-
BaseCursorImpl cursor_impl,
862-
ArrowArrayImpl array) except -1
863884
cdef int _create_var_from_type(self, object conn,
864885
BaseCursorImpl cursor_impl,
865886
object value) except -1
@@ -907,6 +928,7 @@ cdef class PipelineOpImpl:
907928
readonly uint8_t op_type
908929
readonly bint fetch_lobs
909930
readonly bint fetch_decimals
931+
BatchLoadManager batch_load_manager
910932
uint32_t num_execs
911933

912934

src/oracledb/base_impl.pyx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ from .arrow_impl cimport (
8282
import array
8383

8484
import base64
85+
import collections
86+
import copy
8587
import copy
8688
import datetime
8789
import decimal
@@ -169,6 +171,7 @@ include "impl/base/pool.pyx"
169171
include "impl/base/cursor.pyx"
170172
include "impl/base/var.pyx"
171173
include "impl/base/bind_var.pyx"
174+
include "impl/base/batch_load_manager.pyx"
172175
include "impl/base/dbobject.pyx"
173176
include "impl/base/lob.pyx"
174177
include "impl/base/soda.pyx"

src/oracledb/cursor.py

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -851,9 +851,11 @@ def executemany(
851851
self,
852852
statement: Optional[str],
853853
parameters: Any,
854+
*,
854855
batcherrors: bool = False,
855856
arraydmlrowcounts: bool = False,
856857
suspend_on_success: bool = False,
858+
batch_size: int = 2**32 - 1,
857859
) -> None:
858860
"""
859861
Executes a SQL statement once using all bind value mappings or
@@ -900,21 +902,35 @@ def executemany(
900902
sessionless transaction will be suspended when ``executemany()``
901903
completes successfully. See :ref:`suspendtxns`.
902904
905+
The ``batch_size`` parameter is used to split large data sets into
906+
smaller pieces for sending to the database. It is the number of records
907+
in each batch. This parameter can be used to tune performance. When
908+
``Connection.autocommit`` is *True*, a commit will take place for each
909+
batch.
910+
903911
For maximum efficiency, it is best to use the :meth:`setinputsizes()`
904912
method to specify the bind value types and sizes. In particular, if the
905913
type is not explicitly specified, the value *None* is assumed to be a
906914
string of length 1 so any values that are later bound as numbers or
907915
dates will raise a TypeError exception.
908916
"""
909917
self._verify_open()
910-
num_execs = self._impl._prepare_for_executemany(
911-
self, self._normalize_statement(statement), parameters
918+
manager = self._impl._prepare_for_executemany(
919+
self,
920+
self._normalize_statement(statement),
921+
parameters,
922+
batch_size,
912923
)
913924
self._impl.suspend_on_success = suspend_on_success
914-
if num_execs > 0:
925+
while manager.num_rows > 0:
915926
self._impl.executemany(
916-
self, num_execs, bool(batcherrors), bool(arraydmlrowcounts)
927+
self,
928+
manager.num_rows,
929+
batcherrors,
930+
arraydmlrowcounts,
931+
manager.message_offset,
917932
)
933+
manager.next_batch()
918934

919935
def fetchall(self) -> list:
920936
"""
@@ -1188,9 +1204,11 @@ async def executemany(
11881204
self,
11891205
statement: Optional[str],
11901206
parameters: Any,
1207+
*,
11911208
batcherrors: bool = False,
11921209
arraydmlrowcounts: bool = False,
11931210
suspend_on_success: bool = False,
1211+
batch_size: int = 2**32 - 1,
11941212
) -> None:
11951213
"""
11961214
Executes a SQL statement once using all bind value mappings or
@@ -1236,21 +1254,32 @@ async def executemany(
12361254
sessionless transaction will be suspended when ``executemany()``
12371255
completes successfully. See :ref:`suspendtxns`.
12381256
1257+
The ``batch_size`` parameter is used to split large data sets into
1258+
smaller pieces for sending to the database. It is the number of records
1259+
in each batch. This parameter can be used to tune performance. When
1260+
``Connection.autocommit`` is *True*, a commit will take place for each
1261+
batch. Do not set ``batch_size`` when ``suspend_on_success`` is *True*.
1262+
12391263
For maximum efficiency, it is best to use the :meth:`setinputsizes()`
12401264
method to specify the parameter types and sizes ahead of time. In
12411265
particular, the value *None* is assumed to be a string of length 1 so
12421266
any values that are later bound as numbers or dates will raise a
12431267
TypeError exception.
12441268
"""
12451269
self._verify_open()
1246-
num_execs = self._impl._prepare_for_executemany(
1247-
self, self._normalize_statement(statement), parameters
1270+
manager = self._impl._prepare_for_executemany(
1271+
self, self._normalize_statement(statement), parameters, batch_size
12481272
)
12491273
self._impl.suspend_on_success = suspend_on_success
1250-
if num_execs > 0:
1274+
while manager.num_rows > 0:
12511275
await self._impl.executemany(
1252-
self, num_execs, bool(batcherrors), bool(arraydmlrowcounts)
1276+
self,
1277+
manager.num_rows,
1278+
batcherrors,
1279+
arraydmlrowcounts,
1280+
manager.message_offset,
12531281
)
1282+
manager.next_batch()
12541283

12551284
async def fetchall(self) -> list:
12561285
"""

src/oracledb/impl/arrow/dataframe.pyx

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,16 @@ cdef class DataFrameImpl:
6262
df_impl.schema_impls.append(schema_impl)
6363

6464
# populate list of arrays
65-
_check_nanoarrow(arrow_stream.get_next(arrow_stream, &arrow_array))
66-
for i in range(arrow_schema.n_children):
67-
array_impl = ArrowArrayImpl.__new__(ArrowArrayImpl)
68-
array_impl.populate_from_array(df_impl.schema_impls[i],
69-
arrow_array.children[i])
70-
df_impl.arrays.append(array_impl)
71-
_check_nanoarrow(arrow_stream.get_next(arrow_stream, &arrow_array))
72-
if arrow_array.release != NULL:
73-
raise NotImplementedError("multiple chunks not supported")
65+
while True:
66+
_check_nanoarrow(arrow_stream.get_next(arrow_stream, &arrow_array))
67+
if arrow_array.release == NULL:
68+
break
69+
for i in range(arrow_schema.n_children):
70+
array_impl = ArrowArrayImpl.__new__(ArrowArrayImpl)
71+
array_impl.populate_from_array(df_impl.schema_impls[i],
72+
arrow_array.children[i])
73+
df_impl.arrays.append(array_impl)
74+
7475
ArrowArrayStreamRelease(arrow_stream)
7576
return df_impl
7677

0 commit comments

Comments
 (0)