Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions dvc/remote/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import logging
from datetime import datetime, timedelta

from funcy import cached_property

from dvc.scheme import Schemes

try:
Expand Down Expand Up @@ -71,29 +73,27 @@ def __init__(self, repo, config):
if not self.connection_string:
raise ValueError("azure storage connection string missing")

self.__blob_service = None
self.path_info = (
self.path_cls(url)
if path
else self.path_cls.from_parts(scheme=self.scheme, netloc=bucket)
)

@property
@cached_property
def blob_service(self):
if self.__blob_service is None:
logger.debug("URL {}".format(self.path_info))
logger.debug("Connection string {}".format(self.connection_string))
self.__blob_service = BlockBlobService(
connection_string=self.connection_string
logger.debug("URL {}".format(self.path_info))
logger.debug("Connection string {}".format(self.connection_string))
blob_service = BlockBlobService(
connection_string=self.connection_string
)
logger.debug("Container name {}".format(self.path_info.bucket))
try: # verify that container exists
blob_service.list_blobs(
self.path_info.bucket, delimiter="/", num_results=1
)
logger.debug("Container name {}".format(self.path_info.bucket))
try: # verify that container exists
self.__blob_service.list_blobs(
self.path_info.bucket, delimiter="/", num_results=1
)
except AzureMissingResourceHttpError:
self.__blob_service.create_container(self.path_info.bucket)
return self.__blob_service
except AzureMissingResourceHttpError:
blob_service.create_container(self.path_info.bucket)
return blob_service

def remove(self, path_info):
if path_info.scheme != self.scheme:
Expand Down Expand Up @@ -137,6 +137,10 @@ def _download(
from_info.bucket, from_info.path, to_file, progress_callback=cb
)

def exists(self, path_info):
paths = self._list_paths(path_info.bucket, path_info.path)
return any(path_info.path == path for path in paths)

def open(self, path_info, mode="r", encoding=None):
get_url = lambda: self._generate_download_url(path_info) # noqa: E731
return open_url(get_url, mode=mode, encoding=encoding)
Expand Down
37 changes: 15 additions & 22 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,7 @@
DvcIgnoreInCollectedDirError,
)
from dvc.progress import progress, ProgressCallback
from dvc.utils import (
LARGE_DIR_SIZE,
tmp_fname,
to_chunks,
move,
relpath,
makedirs,
)
from dvc.utils import LARGE_DIR_SIZE, tmp_fname, move, relpath, makedirs
from dvc.state import StateNoop
from dvc.path_info import PathInfo, URLInfo

Expand Down Expand Up @@ -642,22 +635,22 @@ def cache_exists(self, checksums, jobs=None):
Returns:
A list with checksums that were found in the remote
"""
progress_callback = ProgressCallback(len(checksums))

def exists_with_progress(chunks):
return self.batch_exists(chunks, callback=progress_callback)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've lost the progress bar :)

if not self.no_traverse:
return list(set(checksums) & set(self.all()))

if self.no_traverse and hasattr(self, "batch_exists"):
with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor:
path_infos = [self.checksum_to_path_info(x) for x in checksums]
chunks = to_chunks(path_infos, num_chunks=self.JOBS)
results = executor.map(exists_with_progress, chunks)
in_remote = itertools.chain.from_iterable(results)
ret = list(itertools.compress(checksums, in_remote))
progress_callback.finish("")
return ret
progress_callback = ProgressCallback(len(checksums))

return list(set(checksums) & set(self.all()))
def exists_with_progress(path_info):
ret = self.exists(path_info)
progress_callback.update(str(path_info))
return ret

with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor:
path_infos = [self.checksum_to_path_info(x) for x in checksums]
in_remote = executor.map(exists_with_progress, path_infos)
ret = list(itertools.compress(checksums, in_remote))
progress_callback.finish("")
return ret

def already_cached(self, path_info):
current = self.get_checksum(path_info)
Expand Down
12 changes: 0 additions & 12 deletions dvc/remote/gs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import unicode_literals

import logging
import itertools
from funcy import cached_property

try:
Expand Down Expand Up @@ -89,17 +88,6 @@ def exists(self, path_info):
paths = set(self._list_paths(path_info.bucket, path_info.path))
return any(path_info.path == path for path in paths)

def batch_exists(self, path_infos, callback):
paths = []

for path_info in path_infos:
paths.append(self._list_paths(path_info.bucket, path_info.path))
callback.update(str(path_info))

paths = set(itertools.chain.from_iterable(paths))

return [path_info.path in paths for path_info in path_infos]

def _upload(self, from_file, to_info, **_kwargs):
bucket = self.gs.bucket(to_info.bucket)
blob = bucket.blob(to_info.path)
Expand Down
9 changes: 0 additions & 9 deletions dvc/remote/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,6 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False):
def exists(self, path_info):
return bool(self._request("HEAD", path_info.url))

def batch_exists(self, path_infos, callback):
results = []

for path_info in path_infos:
results.append(self.exists(path_info))
callback.update(str(path_info))

return results

def _content_length(self, url):
return self._request("HEAD", url).headers.get("Content-Length")

Expand Down
4 changes: 4 additions & 0 deletions dvc/remote/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,7 @@ def _download(
self.oss_service.get_object_to_file(
from_info.path, to_file, progress_callback=cb
)

def exists(self, path_info):
paths = self._list_paths(path_info.path)
return any(path_info.path == path for path in paths)
11 changes: 0 additions & 11 deletions dvc/remote/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import threading
import logging
import itertools
from funcy import cached_property

try:
Expand Down Expand Up @@ -216,16 +215,6 @@ def exists(self, path_info):
paths = self._list_paths(path_info.bucket, path_info.path)
return any(path_info.path == path for path in paths)

def batch_exists(self, path_infos, callback):
paths = []

for path_info in path_infos:
paths.append(self._list_paths(path_info.bucket, path_info.path))
callback.update(str(path_info))

paths = set(itertools.chain.from_iterable(paths))
return [path_info.path in paths for path_info in path_infos]

def _upload(self, from_file, to_info, name=None, no_progress_bar=False):
total = os.path.getsize(from_file)
cb = None if no_progress_bar else Callback(name, total)
Expand Down
89 changes: 57 additions & 32 deletions dvc/remote/ssh/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from __future__ import unicode_literals

import errno
import itertools
import os
import getpass
import logging
import itertools
import errno
from concurrent.futures import ThreadPoolExecutor
import threading
from concurrent.futures import ThreadPoolExecutor

from dvc.progress import ProgressCallback
from dvc.utils import to_chunks

try:
import paramiko
Expand All @@ -15,7 +18,6 @@

import dvc.prompt as prompt
from dvc.config import Config
from dvc.utils import to_chunks
from dvc.utils.compat import urlparse, StringIO
from dvc.remote.base import RemoteBASE
from dvc.scheme import Schemes
Expand Down Expand Up @@ -148,34 +150,6 @@ def exists(self, path_info):
with self.ssh(path_info) as ssh:
return ssh.exists(path_info.path)

def batch_exists(self, path_infos, callback):
def _exists(chunk_and_channel):
chunk, channel = chunk_and_channel
ret = []
for path in chunk:
try:
channel.stat(path)
ret.append(True)
except IOError as exc:
if exc.errno != errno.ENOENT:
raise
ret.append(False)
callback.update(path)
return ret

with self.ssh(path_infos[0]) as ssh:
channels = ssh.open_max_sftp_channels()
max_workers = len(channels)

with ThreadPoolExecutor(max_workers=max_workers) as executor:
paths = [path_info.path for path_info in path_infos]
chunks = to_chunks(paths, num_chunks=max_workers)
chunks_and_channels = zip(chunks, channels)
outcome = executor.map(_exists, chunks_and_channels)
results = list(itertools.chain.from_iterable(outcome))

return results

def get_file_checksum(self, path_info):
if path_info.scheme != self.scheme:
raise NotImplementedError
Expand Down Expand Up @@ -240,3 +214,54 @@ def walk(self, path_info):
def makedirs(self, path_info):
with self.ssh(path_info) as ssh:
ssh.makedirs(path_info.path)

def batch_exists(self, path_infos, callback):
def _exists(chunk_and_channel):
chunk, channel = chunk_and_channel
ret = []
for path in chunk:
try:
channel.stat(path)
ret.append(True)
except IOError as exc:
if exc.errno != errno.ENOENT:
raise
ret.append(False)
callback.update(path)
return ret

with self.ssh(path_infos[0]) as ssh:
channels = ssh.open_max_sftp_channels()
max_workers = len(channels)

with ThreadPoolExecutor(max_workers=max_workers) as executor:
paths = [path_info.path for path_info in path_infos]
chunks = to_chunks(paths, num_chunks=max_workers)
chunks_and_channels = zip(chunks, channels)
outcome = executor.map(_exists, chunks_and_channels)
results = list(itertools.chain.from_iterable(outcome))

return results

def cache_exists(self, checksums, jobs=None):
"""This is older implementation used in remote/base.py
We are reusing it in RemoteSSH, because SSH's batch_exists proved to be
faster than current approach (relying on exists(path_info)) applied in
remote/base.
"""
progress_callback = ProgressCallback(len(checksums))

def exists_with_progress(chunks):
return self.batch_exists(chunks, callback=progress_callback)

if self.no_traverse:
with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor:
path_infos = [self.checksum_to_path_info(x) for x in checksums]
chunks = to_chunks(path_infos, num_chunks=self.JOBS)
results = executor.map(exists_with_progress, chunks)
in_remote = itertools.chain.from_iterable(results)
ret = list(itertools.compress(checksums, in_remote))
progress_callback.finish("")
return ret

return list(set(checksums) & set(self.all()))