Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dvc/remote/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def get_file_hash(self, path_info):
# NOTE: pyarrow doesn't support checksum, so we need to use hadoop
regex = r".*\t.*\t(?P<checksum>.*)"
stdout = self.hadoop_fs(
f"checksum {path_info.path}", user=path_info.user
f"checksum {path_info.url}", user=path_info.user
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This worked only if your defaultFS matched the url you were connecting too.

)
return self._group(regex, stdout, "checksum")

Expand Down
4 changes: 0 additions & 4 deletions scripts/ci/before_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ if [[ "$TRAVIS_BUILD_STAGE_NAME" == "test" ]]; then
ssh 0.0.0.0 ls &>/dev/null
fi

if [ "$TRAVIS_OS_NAME" == "linux" ]; then
bash "$scriptdir/install_hadoop.sh"
fi

if [[ "$TRAVIS_OS_NAME" == "osx" && "$TRAVIS_PULL_REQUEST" == "false" ]]; then
brew install openssl
$scriptdir/retry.sh brew cask install google-cloud-sdk
Expand Down
6 changes: 0 additions & 6 deletions scripts/ci/core-site.xml

This file was deleted.

6 changes: 0 additions & 6 deletions scripts/ci/hdfs-site.xml

This file was deleted.

48 changes: 0 additions & 48 deletions scripts/ci/install_hadoop.sh

This file was deleted.

7 changes: 0 additions & 7 deletions scripts/ci/remove_hadoop.sh

This file was deleted.

2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def run(self):
"pylint",
"pylint-pytest",
"pylint-plugin-utils",
"wget",
"filelock",
]

if (sys.version_info) >= (3, 6):
Expand Down
8 changes: 8 additions & 0 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,11 @@ services:
image: rkuprieiev/oss-emulator
ports:
- "8880"
hdfs:
image: rkuprieiev/docker-hdfs
ports:
- "8020"
# NOTE: having this port as dynamic one will require modifying
# `dfs.datanode.address` in `hdfs-site.xml` and probably something
# else, so using default one for now.
- "50010:50010"
31 changes: 30 additions & 1 deletion tests/remotes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from .azure import Azure, azure, azure_server # noqa: F401
from .hdfs import HDFS, hdfs # noqa: F401
from .hdfs import HDFS, hadoop, hdfs, hdfs_server # noqa: F401
from .http import HTTP, http, http_server # noqa: F401
from .local import Local, local_cloud, local_remote # noqa: F401
from .oss import OSS, TEST_OSS_REPO_BUCKET, oss, oss_server # noqa: F401
Expand Down Expand Up @@ -45,6 +45,35 @@ def docker_compose():
pytest.skip("no docker-compose installed")


@pytest.fixture(scope="session")
def docker_compose_project_name():
return "pytest-dvc-test"


@pytest.fixture(scope="session")
def docker_services(
docker_compose_file, docker_compose_project_name, tmp_path_factory
):
# overriding `docker_services` fixture from `pytest_docker` plugin to
# only launch docker images once.

from filelock import FileLock
from pytest_docker.plugin import DockerComposeExecutor, Services

executor = DockerComposeExecutor(
docker_compose_file, docker_compose_project_name,
)

# making sure we don't accidentally launch docker-compose in parallel,
# as it might result in network conflicts. Inspired by:
# https://github.com/pytest-dev/pytest-xdist#making-session-scoped-fixtures-execute-only-once
lockfile = tmp_path_factory.getbasetemp().parent / "docker-compose.lock"
with FileLock(str(lockfile)):
executor.execute("up --build -d")

return Services(executor)


@pytest.fixture
def remote(tmp_dir, dvc, request):
cloud = request.param
Expand Down
118 changes: 76 additions & 42 deletions tests/remotes/hdfs.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,22 @@
import getpass
import locale
import os
import platform
import uuid
from contextlib import contextmanager
from subprocess import CalledProcessError, Popen, check_output

import pytest

from dvc.path_info import URLInfo

from .base import Base
from .local import Local


class HDFS(Base, URLInfo):
@staticmethod
def should_test():
if platform.system() != "Linux":
return False

try:
# pylint: disable=unexpected-keyword-arg
# see: https://github.com/PyCQA/pylint/issues/3645
check_output(
["hadoop", "version"],
shell=True,
executable=os.getenv("SHELL"),
)
except (CalledProcessError, OSError):
return False

p = Popen(
"hadoop fs -ls hdfs://127.0.0.1/",
shell=True,
executable=os.getenv("SHELL"),
)
p.communicate()
if p.returncode != 0:
return False

return True

@staticmethod
def get_url():
return "hdfs://{}@127.0.0.1{}".format(
getpass.getuser(), Local.get_storagepath()
)

class HDFS(Base, URLInfo): # pylint: disable=abstract-method
@contextmanager
def _hdfs(self):
import pyarrow

conn = pyarrow.hdfs.connect()
conn = pyarrow.hdfs.connect(self.host, self.port)
try:
yield conn
finally:
Expand Down Expand Up @@ -103,8 +68,77 @@ def read_text(self, encoding=None, errors=None):
return self.read_bytes().decode(encoding)


@pytest.fixture(scope="session")
def hadoop():
import wget
import tarfile
from appdirs import user_cache_dir

if platform.system() != "Linux":
pytest.skip("only supported on Linux")

hadoop_name = "hadoop-2.7.2.tar.gz"
java_name = "openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz"

base_url = "https://s3-us-east-2.amazonaws.com/dvc-public/dvc-test/"
hadoop_url = base_url + hadoop_name
java_url = base_url + java_name

(cache_dir,) = (user_cache_dir("dvc-test", "iterative"),)
dname = os.path.join(cache_dir, "hdfs")

java_tar = os.path.join(dname, java_name)
hadoop_tar = os.path.join(dname, hadoop_name)

java_home = os.path.join(dname, "java-se-7u75-ri")
hadoop_home = os.path.join(dname, "hadoop-2.7.2")

def _get(url, tar, target):
if os.path.isdir(target):
return

if not os.path.exists(tar):
wget.download(url, out=tar)
tar = tarfile.open(tar)
tar.extractall(dname)
assert os.path.isdir(target)

os.makedirs(dname, exist_ok=True)
_get(hadoop_url, hadoop_tar, hadoop_home)
_get(java_url, java_tar, java_home)

os.environ["JAVA_HOME"] = java_home
os.environ["HADOOP_HOME"] = hadoop_home
os.environ["PATH"] += f":{hadoop_home}/bin:{hadoop_home}/sbin"


@pytest.fixture(scope="session")
def hdfs_server(hadoop, docker_compose, docker_services):
import pyarrow

port = docker_services.port_for("hdfs", 8020)

def _check():
try:
# NOTE: just connecting or even opening something is not enough,
# we need to make sure that we are able to write something.
conn = pyarrow.hdfs.connect("127.0.0.1", port)
try:
with conn.open(str(uuid.uuid4()), "wb") as fobj:
fobj.write(b"test")
finally:
conn.close()
return True
except (pyarrow.ArrowException, OSError):
return False

docker_services.wait_until_responsive(timeout=30.0, pause=5, check=_check)

return port


@pytest.fixture
def hdfs():
if not HDFS.should_test():
pytest.skip("no hadoop running")
yield HDFS(HDFS.get_url())
def hdfs(hdfs_server):
port = hdfs_server
url = f"hdfs://127.0.0.1:{port}/{uuid.uuid4()}"
yield HDFS(url)