Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/batch/speechmatics/batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ._exceptions import TimeoutError
from ._exceptions import TransportError
from ._models import ConnectionConfig
from ._models import FetchData
from ._models import FormatType
from ._models import JobConfig
from ._models import JobDetails
Expand Down Expand Up @@ -50,4 +51,5 @@
"JobStatus",
"JobType",
"FormatType",
"FetchData",
]
83 changes: 56 additions & 27 deletions sdk/batch/speechmatics/batch/_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:

async def submit_job(
self,
audio_file: Union[str, BinaryIO],
audio_file: Union[str, BinaryIO, None],
*,
config: Optional[JobConfig] = None,
transcription_config: Optional[TranscriptionConfig] = None,
Expand All @@ -148,7 +148,8 @@ async def submit_job(
asynchronously on the server.

Args:
audio_file: Path to audio file or file-like object containing audio data.
audio_file: Path to audio file or file-like object containing audio data, or None if using fetch_data.
NOTE: You must explicitly pass audio_file=None if providing a fetch_data in the config
config: Complete job configuration. If not provided, uses transcription_config
to build a basic job configuration.
transcription_config: Transcription-specific configuration. Used if config
Expand Down Expand Up @@ -181,36 +182,29 @@ async def submit_job(
transcription_config = transcription_config or TranscriptionConfig()
config = JobConfig(type=JobType.TRANSCRIPTION, transcription_config=transcription_config)

# Prepare file data using async context manager
# Check for fetch_data configuration
config_dict = config.to_dict()
has_fetch_data = "fetch_data" in config_dict

# Validate input combination
if audio_file is not None and has_fetch_data:
raise ValueError("Cannot specify both audio_file and fetch_data")
if audio_file is None and not has_fetch_data:
raise ValueError("Must provide either audio_file or fetch_data in config")

try:
async with prepare_audio_file(audio_file) as (filename, file_data):
# Prepare multipart form data
multipart_data = {
"config": config.to_dict(),
"data_file": (filename, file_data, "audio/wav"),
}

response = await self._transport.post("/jobs", multipart_data=multipart_data)

# Extract job info from response
job_id = response.get("id")
if not job_id:
raise BatchError("No job ID returned from server")

self._logger.debug("Job submitted successfully (job_id=%s, filename=%s)", job_id, filename)

return JobDetails(
id=job_id,
status=JobStatus.RUNNING, # Assume running initially
created_at=response.get("created_at", ""),
data_name=filename,
config=config,
)
# Prepare multipart data based on strategy
if has_fetch_data:
multipart_data, filename = await self._prepare_fetch_data_submission(config_dict)
else:
assert audio_file is not None # for type checker; validated above
multipart_data, filename = await self._prepare_file_submission(audio_file, config_dict)

return await self._submit_and_create_job_details(multipart_data, filename, config)
except Exception as e:
if isinstance(e, (AuthenticationError, BatchError)):
raise
raise BatchError(f"Failed to submit job: {e}") from e
raise BatchError(f"Job submission failed: {e}") from e

async def get_job_info(self, job_id: str) -> JobDetails:
"""
Expand Down Expand Up @@ -514,3 +508,38 @@ async def close(self) -> None:
await self._transport.close()
except Exception:
pass # Best effort cleanup

# ------------------------------------------------------------------
# Internal helpers for job submission strategies
# ------------------------------------------------------------------
async def _prepare_fetch_data_submission(self, config_dict: dict) -> tuple[dict, str]:
"""Prepare multipart data for fetch_data submission."""
filename = config_dict["fetch_data"]["url"]
multipart_data = {"config": config_dict}
return multipart_data, filename

async def _prepare_file_submission(self, audio_file: Union[str, BinaryIO], config_dict: dict) -> tuple[dict, str]:
"""Prepare multipart data for file upload submission."""
async with prepare_audio_file(audio_file) as (filename, file_data):
multipart_data = {
"config": config_dict,
"data_file": (filename, file_data, "audio/wav"),
}
return multipart_data, filename

async def _submit_and_create_job_details(
self, multipart_data: dict, filename: str, config: JobConfig
) -> JobDetails:
"""Submit job and create JobDetails response."""
response = await self._transport.post("/jobs", multipart_data=multipart_data)
job_id = response.get("id")
if not job_id:
raise BatchError("No job ID returned from server")
self._logger.debug("Job submitted successfully (job_id=%s, filename=%s)", job_id, filename)
return JobDetails(
id=job_id,
status=JobStatus.RUNNING,
created_at=response.get("created_at", ""),
data_name=filename,
config=config,
)
23 changes: 23 additions & 0 deletions sdk/batch/speechmatics/batch/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,25 @@ def to_dict(self) -> dict[str, Any]:
return asdict(self)


@dataclass
class FetchData:
"""Batch: Optional configuration for fetching file for transcription."""

url: str
"""URL to fetch"""

auth_headers: Optional[list[str]] = None
"""
A list of additional headers to be added to the input fetch request
when using http or https. This is intended to support authentication or
authorization, for example by supplying an OAuth2 bearer token
"""

def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary, excluding None values."""
return {k: v for k, v in asdict(self).items() if v is not None}


@dataclass
class NotificationConfig:
"""Configuration for job completion notifications."""
Expand Down Expand Up @@ -229,6 +248,7 @@ class JobConfig:

Attributes:
type: Type of job (transcription or alignment).
fetch_data: Configuration for fetching an audio file for transcription.
transcription_config: Configuration for transcription behavior.
alignment_config: Configuration for alignment jobs.
notification_config: Webhook notification configuration.
Expand All @@ -243,6 +263,7 @@ class JobConfig:
"""

type: JobType
fetch_data: Optional[FetchData] = None
transcription_config: Optional[TranscriptionConfig] = None
alignment_config: Optional[AlignmentConfig] = None
notification_config: Optional[NotificationConfig] = None
Expand All @@ -259,6 +280,8 @@ def to_dict(self) -> dict[str, Any]:
"""Convert job config to dictionary for API submission."""
config: dict[str, Any] = {"type": self.type.value}

if self.fetch_data:
config["fetch_data"] = self.fetch_data.to_dict()
if self.transcription_config:
config["transcription_config"] = self.transcription_config.to_dict()
if self.alignment_config:
Expand Down
4 changes: 2 additions & 2 deletions sdk/batch/speechmatics/batch/_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ async def _request(
if json_data:
kwargs["json"] = json_data
elif multipart_data:
# Create proper multipart/form-data
form_data = aiohttp.FormData()
# Force multipart encoding even when no files are present (for fetch_data support)
form_data = aiohttp.FormData(default_to_multipart=True)
for key, value in multipart_data.items():
if isinstance(value, tuple) and len(value) == 3:
# File data: (filename, file_data, content_type)
Expand Down