Skip to content

Commit

Permalink
fix: random port assignment (#4139)
Browse files Browse the repository at this point in the history
* fix: assign only unique ports

* fix: check for none ports

* fix: use port as int

* fix: change debug out

* fix: add more debug out

* fix: protect partiald port finding

* fix: track partiald ports

* fix: move partial ports up

* fix: lock as cls var

* fix: more debug stuff

* fix: more log output

* fix: remove get

* fix: try again on docker fail

* Revert "fix: try again on docker fail"

This reverts commit c2947ee.

* fix: add more debug

* fix: try connect with socket

* style: fix overload and cli autocomplete

* fix: set min port env in ci

* fix: set min port in jinad

* fix: port helper test

* fix: keep track of port in

* fix: clean up

* fix: remove connect check

* fix: remove psutil

* style: fix overload and cli autocomplete

* fix: seperate jinad port range

* fix: use asyncio to run jinad pea

* fix: kill jinad process with fire

* fix: remove codecov

* fix: docker compose tests

* Revert "fix: remove codecov"

This reverts commit 31d0d41.

* fix: upgrade codecov action

* fix: clean up

* fix: remove codecov

* fix: readd code cov

* fix: increase timeout for k8s test

* fix: wrong cov tag

* Revert "fix: wrong cov tag"

This reverts commit 00ce072.

* Revert "fix: increase timeout for k8s test"

This reverts commit 9b0e313.

* fix: reset ci file

* fix: readd port config

* fix: use run_async helper again

* fix: dont touch import

Co-authored-by: Jina Dev Bot <dev-bot@jina.ai>
  • Loading branch information
jacobowitz and jina-bot committed Jan 31, 2022
1 parent 6b16872 commit f4f8f31
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 41 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ jobs:
with:
files: "coverage.xml"
- name: Upload coverage from test to Codecov
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v2
if: steps.check_files.outputs.files_exists == 'true' && ${{ matrix.python-version }} == '3.7'
with:
file: coverage.xml
Expand Down Expand Up @@ -235,7 +235,7 @@ jobs:
with:
files: "coverage.xml"
- name: Upload coverage from test to Codecov
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v2
if: steps.check_files.outputs.files_exists == 'true' && ${{ matrix.python-version }} == '3.7'
with:
file: coverage.xml
Expand All @@ -260,6 +260,7 @@ jobs:
runs-on: ubuntu-latest
env:
JINA_DAEMON_DOCKERFILE: DEVEL
JINA_RANDOM_PORT_MIN: 16384
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -287,7 +288,7 @@ jobs:
docker build -f Dockerfiles/debianx.Dockerfile --build-arg PIP_TAG=daemon -t jinaai/jina:test-daemon .
declare -a distributed_tests=("tests/distributed/test_topologies/" "tests/distributed/test_topologies_docker/" "tests/distributed/test_workspaces/" "tests/distributed/test_dir_structures/" "tests/distributed/test_scale_remote/" "tests/distributed/test_scale_remote_executors/" "tests/distributed/test_env_vars/" "tests/distributed/test_remote_pods/" "tests/distributed/test_remote_flows/" "tests/distributed/test_remote_peas/")
if [[ " ${distributed_tests[*]} " =~ "${{ matrix.test-path }}" ]]; then
docker run --add-host=host.docker.internal:host-gateway --name jinad --env JINA_DAEMON_BUILD=DEVEL -v /var/run/docker.sock:/var/run/docker.sock -v /tmp/jinad:/tmp/jinad -p 8000:8000 -d jinaai/jina:test-daemon
docker run --add-host=host.docker.internal:host-gateway --name jinad --env JINA_DAEMON_BUILD=DEVEL --env JINA_RANDOM_PORT_MIN=12500 --env JINA_RANDOM_PORT_MAX=16383 -v /var/run/docker.sock:/var/run/docker.sock -v /tmp/jinad:/tmp/jinad -p 8000:8000 -d jinaai/jina:test-daemon
fi
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s ${{ matrix.test-path }}
docker rm -f jinad || true
Expand All @@ -308,15 +309,15 @@ jobs:
echo "flag it as jina for codeoverage"
echo "::set-output name=codecov_flag::jina"
fi
timeout-minutes: 30
- name: Check codecov file
id: check_files
uses: andstor/file-existence-action@v1
with:
files: "coverage.xml"
- name: Upload coverage from test to Codecov
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v2
if: steps.check_files.outputs.files_exists == 'true' && ${{ matrix.python-version }} == '3.7'
with:
file: coverage.xml
Expand All @@ -337,4 +338,3 @@ jobs:
- name: Success
if: ${{ success() }}
run: echo "All Done"

17 changes: 16 additions & 1 deletion daemon/dockerize.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import docker

from jina import __docker_host__
from jina import __docker_host__, helper
from jina.helper import colored
from jina.logging.logger import JinaLogger

Expand Down Expand Up @@ -470,3 +470,18 @@ def rm_container(cls, id: str):
def _validate(cls):
# TODO
pass

@classmethod
def exposed_ports(cls):
"""
Checks the currently running docker containers for ports exposed to the host
:return: A set of ports exposed by all currently running docker containers
"""
ports = set()

for c in cls.client.containers.list():
for port_list in c.ports.values():
if port_list:
for port_mapping in port_list:
ports.add(int(port_mapping['HostPort']))
return ports
2 changes: 2 additions & 0 deletions daemon/stores/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import multiprocessing
import os
import pickle
import shutil
Expand All @@ -21,6 +22,7 @@ class BaseStore(MutableMapping):

_kind = ''
_status_model = StoreStatus
_lock = multiprocessing.Lock()

def __init__(self):
self._logger = JinaLogger(self.__class__.__name__, **vars(jinad_args))
Expand Down
21 changes: 20 additions & 1 deletion daemon/stores/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class ContainerStore(BaseStore, ABC):

_kind = 'container'
_status_model = ContainerStoreStatus
_exposed_ports = set()

@abstractmethod
async def add_in_partial(self, uri, envs, *args, **kwargs):
Expand Down Expand Up @@ -146,10 +147,13 @@ async def add(
f'{workspace_id} is not ACTIVE yet. Please retry once it becomes ACTIVE'
)

partiald_port = random_port()
dockerports = (
ports.docker_ports if isinstance(ports, PortMappings) else ports
)
with self._lock:
for port in dockerports.values():
self._exposed_ports.add(port)
partiald_port = self._find_partiald_port()
dockerports.update({f'{partiald_port}/tcp': partiald_port})
uri = self._uri(partiald_port)
entrypoint = self._entrypoint(partiald_port, workspace_id)
Expand Down Expand Up @@ -223,6 +227,21 @@ async def add(
workspace_store[workspace_id].metadata.managed_objects.add(id)
return id

def _find_partiald_port(self):
exposed_docker_ports = Dockerizer.exposed_ports()
partiald_port = random_port()
port_assignment_runs = 0
while (
partiald_port in exposed_docker_ports
or partiald_port in self._exposed_ports
):
if port_assignment_runs >= 2 ** 16:
raise OSError('No available ports to new container')
partiald_port = random_port()
port_assignment_runs += 1
self._exposed_ports.add(partiald_port)
return partiald_port

@BaseStore.dump
async def delete(self, id: DaemonID, **kwargs) -> None:
"""Delete a container from the store
Expand Down
79 changes: 47 additions & 32 deletions jina/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,54 +404,69 @@ def random_name() -> str:


assigned_ports = set()
unassigned_ports = []
DEFAULT_MIN_PORT = 49153
MAX_PORT = 65535


def reset_ports():
def _get_unassigned_ports():
# if we are running out of ports, lower default minimum port
if MAX_PORT - DEFAULT_MIN_PORT - len(assigned_ports) < 100:
min_port = int(os.environ.get('JINA_RANDOM_PORT_MIN', '16384'))
else:
min_port = int(
os.environ.get('JINA_RANDOM_PORT_MIN', str(DEFAULT_MIN_PORT))
)
max_port = int(os.environ.get('JINA_RANDOM_PORT_MAX', str(MAX_PORT)))
return set(range(min_port, max_port + 1)) - set(assigned_ports)

unassigned_ports.clear()
assigned_ports.clear()
unassigned_ports.extend(_get_unassigned_ports())
random.shuffle(unassigned_ports)


def random_port() -> Optional[int]:
"""
Get a random available port number from '49153' to '65535'.
Get a random available port number.
:return: A random port.
"""

import threading
import multiprocessing
from contextlib import closing
import socket
def _random_port():
import socket

def _get_port(port=0):
with multiprocessing.Lock():
with threading.Lock():
if port not in assigned_ports:
with closing(
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
) as s:
try:
s.bind(('', port))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]
except OSError:
pass
else:
def _check_bind(port):
with socket.socket() as s:
try:
s.bind(('', port))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return port
except OSError:
return None

_port = None
if 'JINA_RANDOM_PORT_MIN' in os.environ or 'JINA_RANDOM_PORT_MAX' in os.environ:
min_port = int(os.environ.get('JINA_RANDOM_PORT_MIN', '49153'))
max_port = int(os.environ.get('JINA_RANDOM_PORT_MAX', '65535'))
all_ports = list(range(min_port, max_port + 1))
random.shuffle(all_ports)
for _port in all_ports:
if _get_port(_port) is not None:
_port = None
if len(unassigned_ports) == 0:
reset_ports()
for idx, _port in enumerate(unassigned_ports):
if _check_bind(_port) is not None:
break
else:
raise OSError(
f'can not find an available port between [{min_port}, {max_port}].'
f'can not find an available port in {len(unassigned_ports)} unassigned ports, assigned already {len(assigned_ports)} ports'
)
else:
_port = _get_port()
int_port = int(_port)
unassigned_ports.pop(idx)
assigned_ports.add(int_port)
return int_port

assigned_ports.add(int(_port))
return int(_port)
try:
return _random_port()
except OSError:
assigned_ports.clear()
unassigned_ports.clear()
return _random_port()


def random_identity(use_uuid1: bool = False) -> str:
Expand Down
5 changes: 4 additions & 1 deletion tests/unit/test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
find_request_binding,
dunder_get,
get_ci_vendor,
reset_ports,
)
from jina.hubble.helper import _get_hubble_base_url
from jina.jaml.helper import complete_path
Expand Down Expand Up @@ -257,18 +258,20 @@ def config():


def test_random_port(config):
reset_ports()
assert os.environ['JINA_RANDOM_PORT_MIN']
port = random_port()
assert 49153 <= port <= 65535


def test_random_port_unique(config):
reset_ports()
assert os.environ['JINA_RANDOM_PORT_MIN']
generated_ports = set()
for i in range(1000):
port = random_port()
assert port not in generated_ports
assert 49153 <= port <= 65535
assert int(os.environ['JINA_RANDOM_PORT_MIN']) <= port <= 65535
generated_ports.add(port)


Expand Down

0 comments on commit f4f8f31

Please sign in to comment.