Skip to content

Commit e5fc6ee

Browse files
authored
fix(embed): daemon process XPC connection crash on macos (#215)
* fix(embed): daemon process XPC connection crash on macos * other fix
1 parent bb0e031 commit e5fc6ee

File tree

4 files changed

+475
-5
lines changed

4 files changed

+475
-5
lines changed

hindsight-api/hindsight_api/engine/cross_encoder.py

Lines changed: 93 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,101 @@ async def initialize(self) -> None:
163163
else:
164164
logger.info("Reranker: local provider initialized (using existing executor)")
165165

166+
def _is_xpc_error(self, error: Exception) -> bool:
167+
"""
168+
Check if an error is an XPC connection error (macOS daemon issue).
169+
170+
On macOS, long-running daemons can lose XPC connections to system services
171+
when the process is idle for extended periods.
172+
"""
173+
error_str = str(error).lower()
174+
return "xpc_error_connection_invalid" in error_str or "xpc error" in error_str
175+
176+
def _reinitialize_model_sync(self) -> None:
177+
"""
178+
Clear and reinitialize the cross-encoder model synchronously.
179+
180+
This is used to recover from XPC errors on macOS where the
181+
PyTorch/MPS backend loses its connection to system services.
182+
"""
183+
logger.warning(f"Reinitializing reranker model {self.model_name} due to backend error")
184+
185+
# Clear existing model
186+
self._model = None
187+
188+
# Force garbage collection to free resources
189+
import gc
190+
191+
import torch
192+
193+
gc.collect()
194+
195+
# If using CUDA/MPS, clear the cache
196+
if torch.cuda.is_available():
197+
torch.cuda.empty_cache()
198+
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
199+
try:
200+
torch.mps.empty_cache()
201+
except AttributeError:
202+
pass # Method might not exist in all PyTorch versions
203+
204+
# Reinitialize the model
205+
try:
206+
from sentence_transformers import CrossEncoder
207+
except ImportError:
208+
raise ImportError(
209+
"sentence-transformers is required for LocalSTCrossEncoder. "
210+
"Install it with: pip install sentence-transformers"
211+
)
212+
213+
# Determine device based on hardware availability
214+
has_gpu = torch.cuda.is_available() or (hasattr(torch.backends, "mps") and torch.backends.mps.is_available())
215+
216+
if has_gpu:
217+
device = None # Let sentence-transformers auto-detect GPU/MPS
218+
else:
219+
device = "cpu"
220+
221+
self._model = CrossEncoder(
222+
self.model_name,
223+
device=device,
224+
model_kwargs={"low_cpu_mem_usage": False},
225+
)
226+
227+
logger.info("Reranker: local provider reinitialized successfully")
228+
229+
def _predict_with_recovery(self, pairs: list[tuple[str, str]]) -> list[float]:
230+
"""
231+
Predict with automatic recovery from XPC errors.
232+
233+
This runs synchronously in the thread pool.
234+
"""
235+
max_retries = 1
236+
for attempt in range(max_retries + 1):
237+
try:
238+
scores = self._model.predict(pairs, show_progress_bar=False)
239+
return scores.tolist() if hasattr(scores, "tolist") else list(scores)
240+
except Exception as e:
241+
# Check if this is an XPC error (macOS daemon issue)
242+
if self._is_xpc_error(e) and attempt < max_retries:
243+
logger.warning(f"XPC error detected in reranker (attempt {attempt + 1}): {e}")
244+
try:
245+
self._reinitialize_model_sync()
246+
logger.info("Reranker reinitialized successfully, retrying prediction")
247+
continue
248+
except Exception as reinit_error:
249+
logger.error(f"Failed to reinitialize reranker: {reinit_error}")
250+
raise Exception(f"Failed to recover from XPC error: {str(e)}")
251+
else:
252+
# Not an XPC error or out of retries
253+
raise
254+
166255
async def predict(self, pairs: list[tuple[str, str]]) -> list[float]:
167256
"""
168257
Score query-document pairs for relevance.
169258
170259
Uses a dedicated thread pool with limited workers to prevent CPU thrashing.
260+
Automatically recovers from XPC errors on macOS by reinitializing the model.
171261
172262
Args:
173263
pairs: List of (query, document) tuples to score
@@ -180,11 +270,11 @@ async def predict(self, pairs: list[tuple[str, str]]) -> list[float]:
180270

181271
# Use dedicated executor - limited workers naturally limits concurrency
182272
loop = asyncio.get_event_loop()
183-
scores = await loop.run_in_executor(
273+
return await loop.run_in_executor(
184274
LocalSTCrossEncoder._executor,
185-
lambda: self._model.predict(pairs, show_progress_bar=False),
275+
self._predict_with_recovery,
276+
pairs,
186277
)
187-
return scores.tolist() if hasattr(scores, "tolist") else list(scores)
188278

189279

190280
class RemoteTEICrossEncoder(CrossEncoderModel):

hindsight-api/hindsight_api/engine/embeddings.py

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,75 @@ async def initialize(self) -> None:
151151
self._dimension = self._model.get_sentence_embedding_dimension()
152152
logger.info(f"Embeddings: local provider initialized (dim: {self._dimension})")
153153

154+
def _is_xpc_error(self, error: Exception) -> bool:
155+
"""
156+
Check if an error is an XPC connection error (macOS daemon issue).
157+
158+
On macOS, long-running daemons can lose XPC connections to system services
159+
when the process is idle for extended periods.
160+
"""
161+
error_str = str(error).lower()
162+
return "xpc_error_connection_invalid" in error_str or "xpc error" in error_str
163+
164+
def _reinitialize_model_sync(self) -> None:
165+
"""
166+
Clear and reinitialize the embedding model synchronously.
167+
168+
This is used to recover from XPC errors on macOS where the
169+
PyTorch/MPS backend loses its connection to system services.
170+
"""
171+
logger.warning(f"Reinitializing embedding model {self.model_name} due to backend error")
172+
173+
# Clear existing model
174+
self._model = None
175+
176+
# Force garbage collection to free resources
177+
import gc
178+
179+
import torch
180+
181+
gc.collect()
182+
183+
# If using CUDA/MPS, clear the cache
184+
if torch.cuda.is_available():
185+
torch.cuda.empty_cache()
186+
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
187+
try:
188+
torch.mps.empty_cache()
189+
except AttributeError:
190+
pass # Method might not exist in all PyTorch versions
191+
192+
# Reinitialize the model (inline version of initialize() but synchronous)
193+
try:
194+
from sentence_transformers import SentenceTransformer
195+
except ImportError:
196+
raise ImportError(
197+
"sentence-transformers is required for LocalSTEmbeddings. "
198+
"Install it with: pip install sentence-transformers"
199+
)
200+
201+
# Determine device based on hardware availability
202+
has_gpu = torch.cuda.is_available() or (hasattr(torch.backends, "mps") and torch.backends.mps.is_available())
203+
204+
if has_gpu:
205+
device = None # Let sentence-transformers auto-detect GPU/MPS
206+
else:
207+
device = "cpu"
208+
209+
self._model = SentenceTransformer(
210+
self.model_name,
211+
device=device,
212+
model_kwargs={"low_cpu_mem_usage": False},
213+
)
214+
215+
logger.info("Embeddings: local provider reinitialized successfully")
216+
154217
def encode(self, texts: list[str]) -> list[list[float]]:
155218
"""
156219
Generate embeddings for a list of texts.
157220
221+
Automatically recovers from XPC errors on macOS by reinitializing the model.
222+
158223
Args:
159224
texts: List of text strings to encode
160225
@@ -163,8 +228,27 @@ def encode(self, texts: list[str]) -> list[list[float]]:
163228
"""
164229
if self._model is None:
165230
raise RuntimeError("Embeddings not initialized. Call initialize() first.")
166-
embeddings = self._model.encode(texts, convert_to_numpy=True, show_progress_bar=False)
167-
return [emb.tolist() for emb in embeddings]
231+
232+
# Try encoding with automatic recovery from XPC errors
233+
max_retries = 1
234+
for attempt in range(max_retries + 1):
235+
try:
236+
embeddings = self._model.encode(texts, convert_to_numpy=True, show_progress_bar=False)
237+
return [emb.tolist() for emb in embeddings]
238+
except Exception as e:
239+
# Check if this is an XPC error (macOS daemon issue)
240+
if self._is_xpc_error(e) and attempt < max_retries:
241+
logger.warning(f"XPC error detected in embedding generation (attempt {attempt + 1}): {e}")
242+
try:
243+
self._reinitialize_model_sync()
244+
logger.info("Model reinitialized successfully, retrying embedding generation")
245+
continue
246+
except Exception as reinit_error:
247+
logger.error(f"Failed to reinitialize model: {reinit_error}")
248+
raise Exception(f"Failed to recover from XPC error: {str(e)}")
249+
else:
250+
# Not an XPC error or out of retries
251+
raise
168252

169253

170254
class RemoteTEIEmbeddings(Embeddings):
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
"""
2+
Tests for XPC error recovery in LocalSTCrossEncoder.
3+
4+
This tests the automatic reinitialization of the cross-encoder model when
5+
XPC connection errors occur on macOS (common in long-running daemon processes).
6+
"""
7+
8+
import asyncio
9+
from unittest.mock import MagicMock, patch
10+
11+
import pytest
12+
13+
from hindsight_api.engine.cross_encoder import LocalSTCrossEncoder
14+
15+
16+
class TestCrossEncoderXPCErrorRecovery:
17+
"""Tests for XPC error detection and recovery in LocalSTCrossEncoder."""
18+
19+
@pytest.fixture
20+
def cross_encoder(self):
21+
"""Create a LocalSTCrossEncoder instance."""
22+
return LocalSTCrossEncoder(model_name="cross-encoder/ms-marco-TinyBERT-L-2-v2")
23+
24+
def test_is_xpc_error_detection(self, cross_encoder):
25+
"""Test that XPC errors are correctly detected."""
26+
# Test various XPC error message formats
27+
xpc_error = Exception("Compiler encountered XPC_ERROR_CONNECTION_INVALID (is the OS shutting down?)")
28+
assert cross_encoder._is_xpc_error(xpc_error)
29+
30+
xpc_error2 = Exception("XPC error occurred")
31+
assert cross_encoder._is_xpc_error(xpc_error2)
32+
33+
# Test that non-XPC errors are not detected
34+
normal_error = Exception("Some other error")
35+
assert not cross_encoder._is_xpc_error(normal_error)
36+
37+
@pytest.mark.asyncio
38+
async def test_predict_with_xpc_recovery(self, cross_encoder):
39+
"""Test that predict() recovers from XPC errors by reinitializing."""
40+
# Initialize the cross-encoder
41+
await cross_encoder.initialize()
42+
43+
# Track calls to reinitialize
44+
reinit_called = False
45+
original_reinit = cross_encoder._reinitialize_model_sync
46+
47+
def track_reinit():
48+
nonlocal reinit_called
49+
reinit_called = True
50+
original_reinit()
51+
52+
# Track predict attempts
53+
predict_attempts = []
54+
original_predict = cross_encoder._model.predict
55+
56+
def mock_predict(*args, **kwargs):
57+
predict_attempts.append(1)
58+
# Only fail on first attempt
59+
if len(predict_attempts) == 1:
60+
raise RuntimeError("Compiler encountered XPC_ERROR_CONNECTION_INVALID (is the OS shutting down?)")
61+
else:
62+
# After reinit: succeed
63+
return original_predict(*args, **kwargs)
64+
65+
# Mock the initial predict to fail, reinit happens, then new model succeeds
66+
with patch.object(cross_encoder, "_reinitialize_model_sync", side_effect=track_reinit):
67+
with patch.object(cross_encoder._model, "predict", side_effect=mock_predict):
68+
# This should trigger XPC error on first attempt, then recover and succeed
69+
result = await cross_encoder.predict([("query", "document")])
70+
71+
# Verify we got a result
72+
assert result is not None
73+
assert len(result) == 1
74+
assert isinstance(result[0], float)
75+
assert reinit_called # Should have reinitialized
76+
assert len(predict_attempts) >= 1 # At least one attempt was made
77+
78+
@pytest.mark.asyncio
79+
async def test_predict_fails_on_non_xpc_error(self, cross_encoder):
80+
"""Test that predict() does not retry for non-XPC errors."""
81+
# Initialize the cross-encoder
82+
await cross_encoder.initialize()
83+
84+
# Create a mock that raises a non-XPC error
85+
def mock_predict(*args, **kwargs):
86+
raise RuntimeError("Some other error")
87+
88+
# Patch the model's predict method
89+
with patch.object(cross_encoder._model, "predict", side_effect=mock_predict):
90+
# This should fail without retry
91+
with pytest.raises(RuntimeError) as exc_info:
92+
await cross_encoder.predict([("query", "document")])
93+
94+
assert "Some other error" in str(exc_info.value)
95+
96+
@pytest.mark.asyncio
97+
async def test_reinitialize_clears_model(self, cross_encoder):
98+
"""Test that _reinitialize_model_sync properly clears and reinits the model."""
99+
# Initialize the cross-encoder
100+
await cross_encoder.initialize()
101+
102+
original_model = cross_encoder._model
103+
assert original_model is not None
104+
105+
# Reinitialize
106+
cross_encoder._reinitialize_model_sync()
107+
108+
# Model should be reinitialized (new instance)
109+
assert cross_encoder._model is not None
110+
assert cross_encoder._model is not original_model
111+
112+
# Should still work
113+
result = await cross_encoder.predict([("test query", "test document")])
114+
assert len(result) == 1
115+
assert isinstance(result[0], float)
116+
117+
@pytest.mark.asyncio
118+
async def test_xpc_recovery_exhausts_retries(self, cross_encoder):
119+
"""Test that XPC recovery gives up after max retries."""
120+
# Initialize the cross-encoder
121+
await cross_encoder.initialize()
122+
123+
# Track reinit calls
124+
reinit_count = 0
125+
original_reinit = cross_encoder._reinitialize_model_sync
126+
127+
def track_and_fail_reinit():
128+
nonlocal reinit_count
129+
reinit_count += 1
130+
# Call original reinit, but the new model will also be mocked to fail
131+
original_reinit()
132+
# After reinit, patch the new model too
133+
cross_encoder._model.predict = MagicMock(
134+
side_effect=RuntimeError("Compiler encountered XPC_ERROR_CONNECTION_INVALID")
135+
)
136+
137+
# Mock that always raises XPC error
138+
cross_encoder._model.predict = MagicMock(
139+
side_effect=RuntimeError("Compiler encountered XPC_ERROR_CONNECTION_INVALID")
140+
)
141+
142+
with patch.object(cross_encoder, "_reinitialize_model_sync", side_effect=track_and_fail_reinit):
143+
# Should try once, reinitialize, try again, and fail
144+
with pytest.raises(Exception) as exc_info:
145+
await cross_encoder.predict([("query", "document")])
146+
147+
assert "XPC_ERROR_CONNECTION_INVALID" in str(exc_info.value) or "Failed to recover" in str(exc_info.value)
148+
assert reinit_count == 1 # Should have tried to reinitialize once

0 commit comments

Comments
 (0)