Skip to content

Commit

Permalink
Add a type of DownloadFile that can be split
Browse files Browse the repository at this point in the history
This allows Kolibri to create content nodes from arbitrarily large
source files for certain types of content.
  • Loading branch information
dylanmccall committed Jun 29, 2022
1 parent a74e193 commit e3cfde3
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 24 deletions.
127 changes: 106 additions & 21 deletions ricecooker/classes/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ..exceptions import UnknownFileTypeError
from ricecooker.utils.encodings import get_base64_encoding
from ricecooker.utils.encodings import write_base64_to_file
from ricecooker.utils.file_slice import FileSlice
from ricecooker.utils.images import create_image_from_epub
from ricecooker.utils.images import create_image_from_pdf_page
from ricecooker.utils.images import create_image_from_zip
Expand Down Expand Up @@ -114,13 +115,19 @@ def generate_key(action, path_or_id, settings=None, default=" (default)"):
def get_cache_filename(key):
cache_file = FILECACHE.get(key)
if cache_file:
cache_file = cache_file.decode("utf-8")
cache_file = cache_file.decode("utf-8").split(",")
# if the file was somehow deleted, make sure we don't return it.
if not os.path.exists(config.get_storage_path(cache_file)):
if not all(map(cache_file_exists, cache_file)):
cache_file = None
if cache_file and len(cache_file) == 1:
cache_file = cache_file[0]
return cache_file


def cache_file_exists(cache_file):
return os.path.exists(config.get_storage_path(cache_file))


def cache_is_outdated(path, cache_file):
outdated = True
if not cache_file:
Expand All @@ -137,15 +144,18 @@ def cache_is_outdated(path, cache_file):
return outdated


def download(path, default_ext=None):
def download(path, default_ext=None, slice_size=None):
"""
Download `path` and save to storage based on file extension derived from `path`.
:param path: An URL or a local path
:param default_ext: fallback ext for file when path does not end with .ext
:return: filename derived from hash of file contents {md5hash(file)}.ext
:rtype: sting (path of the form `{md5hash(file at path)}.ext`
"""
key = "DOWNLOAD:{}".format(path)
if slice_size is not None:
key = "DOWNLOAD:{}:{}".format(path, slice_size)
else:
key = "DOWNLOAD:{}".format(path)

cache_file = get_cache_filename(key)
if not config.UPDATE and not cache_is_outdated(path, cache_file):
Expand All @@ -161,8 +171,12 @@ def download(path, default_ext=None):
# Get extension of file or use `default_ext` if none found
if not ext:
ext = extract_path_ext(path, default_ext=default_ext)
filename = copy_file_to_storage(tempf.name, ext=ext)
FILECACHE.set(key, bytes(filename, "utf-8"))
filename = copy_file_to_storage(tempf.name, ext=ext, slice_size=slice_size)
if isinstance(filename, list):
cache_value = ",".join(filename)
else:
cache_value = filename
FILECACHE.set(key, bytes(cache_value, "utf-8"))
config.LOGGER.info("\t--- Downloaded {}".format(filename))
os.unlink(tempf.name)

Expand Down Expand Up @@ -242,29 +256,54 @@ def write_path_to_filename(path, write_to_file):


def get_hash(filepath):
file_hash = hashlib.md5()
with open(filepath, "rb") as fobj:
for chunk in iter(lambda: fobj.read(2097152), b""):
file_hash.update(chunk)
return get_hash_from_fd(fobj)


def get_hash_from_fd(fobj):
file_hash = hashlib.md5()
for chunk in iter(lambda: fobj.read(2097152), b""):
file_hash.update(chunk)
return file_hash.hexdigest()


def copy_file_to_storage(srcfilename, ext=None):
def copy_file_to_storage(src_file_name, ext=None, slice_size=None):
"""
Copy `srcfilename` (filepath) to destination.
Copy `src_file_name` (filepath) to destination.
If `slice_size` is set, the file will be broken into slices if it exceeds
that size in bytes.
:rtype: None
"""
if ext is None:
ext = extract_path_ext(srcfilename)
ext = extract_path_ext(src_file_name)

hash = get_hash(srcfilename)
filename = "{}.{}".format(hash, ext)
try:
shutil.copy(srcfilename, config.get_storage_path(filename))
except shutil.SameFileError:
pass
filenames = []

return filename
with open(src_file_name, "rb") as src_fd:
slices = list(FileSlice.from_file(src_fd, slice_size))

for slice in slices:
slice_hash = get_hash_from_fd(slice)
slice.seek(0)

out_file_name = "{}.{}".format(slice_hash, ext)
storage_path = config.get_storage_path(out_file_name)

try:
is_same_file = os.path.samefile(storage_path, src_fd.name)
except FileNotFoundError:
is_same_file = False

if not is_same_file:
with open(storage_path, "wb") as out_fd:
shutil.copyfileobj(slice, out_fd)

filenames.append(out_file_name)

if slice_size is None:
return filenames[0]
else:
return filenames


def compress_video_file(filename, ffmpeg_settings):
Expand Down Expand Up @@ -490,22 +529,68 @@ def validate(self):

def process_file(self):
try:
self.filename, self.ext = download(self.path, default_ext=self.default_ext)
self.filename, self.ext = self._download()
# don't validate for single-digit extension, or no extension
if not self.ext:
self.ext = extract_path_ext(self.path)
return self.filename
# Catch errors related to reading file path and handle silently
except HTTP_CAUGHT_EXCEPTIONS as err:
self.error = str(err)
config.LOGGER.debug("Failed to download, error is: {}".format(err))
config.FAILED_FILES.append(self)
return None

return self.filename

def _download(self):
return download(self.path, default_ext=self.default_ext)

def __str__(self):
return self.path


class SplittableDownloadFile(DownloadFile):
"""
A type of DownloadFile that will be split into pieces if the source file
exceeds `slice_size`. This is separate from DownloadFile because not all
content types support file splitting.
"""

# 2 GB in bytes
slice_size = 2000000000

def process_file(self):
filenames = super(SplittableDownloadFile, self).process_file()

# TODO: When we call node.add_file, we are assuming files will be
# added in sequence and that order will be maintained. Should we
# add a mechanism where it adds split file order to extra_fields,
# similar to SlideshowNode?

if isinstance(filenames, list):
self.filename = filenames[0]
for extra_filename in filenames[1:]:
extra_file = self.create_split(extra_filename)
self.node.add_file(extra_file)

def create_split(self, filename):
download_file = SplittableDownloadFile(
self.path,
preset=self.get_preset(),
language=self.language,
default_ext=self.default_ext,
source_url=self.source_url,
)
download_file.filename = filename
download_file.ext = self.ext
return download_file

def _download(self):
return download(
self.path, default_ext=self.default_ext, slice_size=self.slice_size
)


IMAGE_EXTENSIONS = {
file_formats.PNG,
file_formats.JPG,
Expand Down
12 changes: 9 additions & 3 deletions ricecooker/classes/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,15 @@ def process_files(self):
- (optionally) generate thumbnail file from the node's content
Returns: content-hash based filenames of all the files for this node
"""
filenames = []
for file in self.files:
filenames.append(file.process_file())

# Items may be added to self.files during file.process_file(), so
# we will work with a copy and generate our list of filenames
# separately.

for file in list(self.files):
file.process_file()

filenames = [file.filename for file in self.files]

# Auto-generation of thumbnails happens here if derive_thumbnail or config.THUMBNAILS is set
if not self.has_thumbnail() and (config.THUMBNAILS or self.derive_thumbnail):
Expand Down
74 changes: 74 additions & 0 deletions ricecooker/utils/file_slice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
class FileSlice(object):
"""
File-like object that represents a slice of a file, starting from its
current offset until `count`. Reads are always relative to the slice's
start and end point.
"""

def __init__(self, file, count=None):
self.file = file
self.start = file.tell()

file.seek(0, 2)
self.file_size = file.tell()

if count is None:
count = self.file_size

count = min(self.file_size - self.start, count)
self.end = self.start + count

# Seek to the end of the file so the next FileSlice object will be
# created from that point.
file.seek(self.end)

self.__last_offset = self.start

@classmethod
def from_file(cls, file, chunk_size):
slice = cls(file, chunk_size)
yield slice

while slice.end < slice.file_size:
slice = cls(file, chunk_size)
yield slice

@property
def size(self):
return self.end - self.start

def seek(self, offset, whence=0):
if whence == 0:
offset = self.start + offset
elif whence == 1:
offset = self.tell() + offset
elif whence == 2:
offset = self.end - offset
self.file.seek(offset)
self.__store_offset()

def __reset_offset(self):
if self.file.tell() != self.__last_offset:
self.file.seek(self.__last_offset)

def __store_offset(self):
self.__last_offset = self.file.tell()

def tell(self):
self.__reset_offset()
return self.file.tell() - self.start

def read(self, count=None):
self.__reset_offset()

if count is None:
count = self.size

remaining = max(0, self.size - self.tell())

buffer = self.file.read(min(count, remaining))
self.__store_offset()
return buffer

def write(self, string):
raise NotImplementedError()

0 comments on commit e3cfde3

Please sign in to comment.