Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revert testing to using socket #21242

Merged
merged 3 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 45 additions & 38 deletions pythonFiles/tests/pytestadapter/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,14 @@
import socket
import subprocess
import sys
import threading
import uuid
from typing import Any, Dict, List, Optional, Union

TEST_DATA_PATH = pathlib.Path(__file__).parent / ".data"
from typing_extensions import TypedDict


@contextlib.contextmanager
def test_output_file(root: pathlib.Path, ext: str = ".txt"):
"""Creates a temporary python file with a random name."""
basename = (
"".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(9)) + ext
)
fullpath = root / basename
try:
fullpath.write_text("", encoding="utf-8")
yield fullpath
finally:
os.unlink(str(fullpath))


def create_server(
host: str = "127.0.0.1",
port: int = 0,
Expand Down Expand Up @@ -116,31 +103,51 @@ def runner(args: List[str]) -> Optional[Dict[str, Any]]:
"-p",
"vscode_pytest",
] + args
listener: socket.socket = create_server()
_, port = listener.getsockname()
listener.listen()

env = os.environ.copy()
env.update(
{
"TEST_UUID": str(uuid.uuid4()),
"TEST_PORT": str(port),
"PYTHONPATH": os.fspath(pathlib.Path(__file__).parent.parent.parent),
}
)

result: list = []
t1: threading.Thread = threading.Thread(
target=_listen_on_socket, args=(listener, result)
)
t1.start()

t2 = threading.Thread(
target=lambda proc_args, proc_env, proc_cwd: subprocess.run(
proc_args, env=proc_env, cwd=proc_cwd
),
args=(process_args, env, TEST_DATA_PATH),
)
t2.start()

t1.join()
t2.join()

return process_rpc_json(result[0]) if result else None


with test_output_file(TEST_DATA_PATH) as output_path:
env = os.environ.copy()
env.update(
{
"TEST_UUID": str(uuid.uuid4()),
"TEST_PORT": str(12345), # port is not used for tests
"PYTHONPATH": os.fspath(pathlib.Path(__file__).parent.parent.parent),
"TEST_OUTPUT_FILE": os.fspath(output_path),
}
)

result = subprocess.run(
process_args,
env=env,
cwd=os.fspath(TEST_DATA_PATH),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if result.returncode != 0:
print("Subprocess Run failed with:")
print(result.stdout.decode(encoding="utf-8"))
print(result.stderr.decode(encoding="utf-8"))

return process_rpc_json(output_path.read_text(encoding="utf-8"))
def _listen_on_socket(listener: socket.socket, result: List[str]):
"""Listen on the socket for the JSON data from the server.
Created as a seperate function for clarity in threading.
"""
sock, (other_host, other_port) = listener.accept()
all_data: list = []
while True:
data: bytes = sock.recv(1024 * 1024)
if not data:
break
all_data.append(data.decode("utf-8"))
result.append("".join(all_data))


def find_test_line_number(test_name: str, test_file_path) -> str:
Expand Down
40 changes: 14 additions & 26 deletions pythonFiles/vscode_pytest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,19 +437,13 @@ def execution_post(
Request-uuid: {testuuid}

{data}"""
test_output_file: Optional[str] = os.getenv("TEST_OUTPUT_FILE", None)
if test_output_file == "stdout":
print(request)
elif test_output_file:
pathlib.Path(test_output_file).write_text(request, encoding="utf-8")
else:
try:
with socket_manager.SocketManager(addr) as s:
if s.socket is not None:
s.socket.sendall(request.encode("utf-8"))
except Exception as e:
print(f"Plugin error connection error[vscode-pytest]: {e}")
print(f"[vscode-pytest] data: {request}")
try:
with socket_manager.SocketManager(addr) as s:
if s.socket is not None:
s.socket.sendall(request.encode("utf-8"))
except Exception as e:
print(f"Plugin error connection error[vscode-pytest]: {e}")
print(f"[vscode-pytest] data: {request}")


def post_response(cwd: str, session_node: TestNode) -> None:
Expand Down Expand Up @@ -477,16 +471,10 @@ def post_response(cwd: str, session_node: TestNode) -> None:
Request-uuid: {testuuid}

{data}"""
test_output_file: Optional[str] = os.getenv("TEST_OUTPUT_FILE", None)
if test_output_file == "stdout":
print(request)
elif test_output_file:
pathlib.Path(test_output_file).write_text(request, encoding="utf-8")
else:
try:
with socket_manager.SocketManager(addr) as s:
if s.socket is not None:
s.socket.sendall(request.encode("utf-8"))
except Exception as e:
print(f"Plugin error connection error[vscode-pytest]: {e}")
print(f"[vscode-pytest] data: {request}")
try:
with socket_manager.SocketManager(addr) as s:
if s.socket is not None:
s.socket.sendall(request.encode("utf-8"))
except Exception as e:
print(f"Plugin error connection error[vscode-pytest]: {e}")
print(f"[vscode-pytest] data: {request}")
Loading