Skip to content

Commit

Permalink
[Datastore] Rely on buffer instead of mmap [1.6.x] (#5445)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerm-iguazio committed Apr 18, 2024
1 parent aba8e7c commit 412dc91
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 70 deletions.
18 changes: 0 additions & 18 deletions mlrun/datastore/helpers.py

This file was deleted.

77 changes: 27 additions & 50 deletions mlrun/datastore/v3io.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import mmap
import os
import time
from datetime import datetime

Expand All @@ -22,7 +20,6 @@
from v3io.dataplane.response import HttpResponseError

import mlrun
from mlrun.datastore.helpers import ONE_GB, ONE_MB

from ..platforms.iguazio import parse_path, split_path
from .base import (
Expand All @@ -32,6 +29,7 @@
)

V3IO_LOCAL_ROOT = "v3io"
V3IO_DEFAULT_UPLOAD_CHUNK_SIZE = 1024 * 1024 * 100


class V3ioStore(DataStore):
Expand Down Expand Up @@ -94,46 +92,28 @@ def get_storage_options(self):
)
return self._sanitize_storage_options(res)

def _upload(self, key: str, src_path: str, max_chunk_size: int = ONE_GB):
def _upload(
self,
key: str,
src_path: str,
max_chunk_size: int = V3IO_DEFAULT_UPLOAD_CHUNK_SIZE,
):
"""helper function for upload method, allows for controlling max_chunk_size in testing"""
container, path = split_path(self._join(key))
file_size = os.path.getsize(src_path) # in bytes
if file_size <= ONE_MB:
with open(src_path, "rb") as source_file:
data = source_file.read()
self._do_object_request(
self.object.put,
container=container,
path=path,
body=data,
append=False,
)
return
# chunk must be a multiple of the ALLOCATIONGRANULARITY
# https://docs.python.org/3/library/mmap.html
if residue := max_chunk_size % mmap.ALLOCATIONGRANULARITY:
# round down to the nearest multiple of ALLOCATIONGRANULARITY
max_chunk_size -= residue

with open(src_path, "rb") as file_obj:
file_offset = 0
while file_offset < file_size:
chunk_size = min(file_size - file_offset, max_chunk_size)
with mmap.mmap(
file_obj.fileno(),
length=chunk_size,
access=mmap.ACCESS_READ,
offset=file_offset,
) as mmap_obj:
append = file_offset != 0
self._do_object_request(
self.object.put,
container=container,
path=path,
body=mmap_obj,
append=append,
)
file_offset += chunk_size
append = False
while True:
data = memoryview(file_obj.read(max_chunk_size))
if not data:
break
self._do_object_request(
self.object.put,
container=container,
path=path,
body=data,
append=append,
)
append = True

def upload(self, key, src_path):
return self._upload(key, src_path)
Expand All @@ -148,19 +128,16 @@ def get(self, key, size=None, offset=0):
num_bytes=size,
).body

def _put(self, key, data, append=False, max_chunk_size: int = ONE_GB):
def _put(
self,
key,
data,
append=False,
max_chunk_size: int = V3IO_DEFAULT_UPLOAD_CHUNK_SIZE,
):
"""helper function for put method, allows for controlling max_chunk_size in testing"""
container, path = split_path(self._join(key))
buffer_size = len(data) # in bytes
if buffer_size <= ONE_MB:
self._do_object_request(
self.object.put,
container=container,
path=path,
body=data,
append=append,
)
return
buffer_offset = 0
try:
data = memoryview(data)
Expand Down
6 changes: 4 additions & 2 deletions tests/system/datastore/test_v3io.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,15 @@ def _setup_df_dir(self, first_file_path, second_file_path, file_extension):
@pytest.mark.skip(
reason="Skipping this test as it hangs when running against the CI system. ML-5598"
)
def test_v3io_large_object_upload(self, tmp_path):
@pytest.mark.parametrize(
"file_size", [4 * 1024 * 1024, 20 * 1024 * 1024]
) # 4MB and 20MB
def test_v3io_large_object_upload(self, tmp_path, file_size):
tempfile_1_path = os.path.join(tmp_path, "tempfile_1")
tempfile_2_path = os.path.join(tmp_path, "tempfile_2")
cmp_command = ["cmp", tempfile_1_path, tempfile_2_path]

with open(tempfile_1_path, "wb") as f:
file_size = 20 * 1024 * 1024 # 20MB
f.truncate(file_size)
r = random.Random(123)
for i in range(min(100, file_size)):
Expand Down

0 comments on commit 412dc91

Please sign in to comment.