diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b0eb5c3..be44cee 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,6 +20,9 @@ jobs: - name: Install Task uses: arduino/setup-task@v2 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + version: 3.x - name: Run pre-commit run: task pre-commit @@ -39,6 +42,9 @@ jobs: - name: Install Task uses: arduino/setup-task@v2 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + version: 3.x - name: Set up Python ${{ matrix.python-version }} run: uv python install ${{ matrix.python-version }} @@ -68,66 +74,11 @@ jobs: - name: Install uv uses: astral-sh/setup-uv@v2 - - name: Set up Python - run: uv python install 3.12 - - - name: Install dependencies - run: uv sync - - - name: Setup test configuration - run: | - # Ensure config.yaml exists and is properly configured for CI - if [ ! -f config.yaml ]; then - echo "Creating default config.yaml for CI" - cat > config.yaml << EOF - servers: - default: - default: true - url: "http://localhost:18080" - EOF - fi - - - name: Verify test data - run: | - echo "Verifying test data structure..." - ls -la examples/basic/ - ls -la examples/basic/events/ - cat examples/basic/history-server.conf - - - name: Start Spark History Server - run: | - echo "Starting Spark History Server with Docker..." - docker run -d \ - --name spark-history-server \ - -v $(pwd)/examples/basic:/mnt/data \ - -p 18080:18080 \ - docker.io/apache/spark:3.5.5 \ - /opt/java/openjdk/bin/java \ - -cp '/opt/spark/conf:/opt/spark/jars/*' \ - -Xmx1g \ - org.apache.spark.deploy.history.HistoryServer \ - --properties-file /mnt/data/history-server.conf - - - name: Wait for Spark History Server - run: | - timeout 60 bash -c 'until curl -f http://localhost:18080; do sleep 2; done' - - - name: Test MCP Server startup - run: | - # Test import structure - uv run python -c "import app; print('โœ“ App imports successfully')" - uv run python -c "import main; print('โœ“ Main imports successfully')" - - # Test MCP server can start (brief startup test) - timeout 10 uv run python main.py & - SERVER_PID=$! - sleep 5 - kill $SERVER_PID 2>/dev/null || true - echo "โœ“ MCP Server startup test completed" - - - name: Cleanup - if: always() - run: | - echo "Cleaning up Docker containers..." - docker stop spark-history-server 2>/dev/null || true - docker rm spark-history-server 2>/dev/null || true + - name: Install Task + uses: arduino/setup-task@v2 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + version: 3.x + + - name: e2e + run: task test-e2e diff --git a/Taskfile.yml b/Taskfile.yml index 33ee616..2aefd0f 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -59,9 +59,34 @@ tasks: test: desc: Run tests with pytest cmds: - - uv run pytest --cov=. -cov-report=xml --cov-report=term-missing + - uv run pytest --cov=. -cov-report=xml --cov-report=term-missing . - echo "โœ… Tests completed!" + test-e2e: + desc: Run end-to-end tests with Spark and MCP servers + cmds: + - echo "๐Ÿงช Starting end-to-end tests..." + - task: start-spark-bg + - task: start-mcp-bg + - | + echo "Waiting for services to be available..." + # Wait for Spark History Server on port 18080 + while ! curl -s http://localhost:18080 > /dev/null; do + echo "Waiting for Spark History Server..." + sleep 1 + done + echo "โœ… Spark History Server is available" + + # Wait for MCP Server on port 18888 + while ! curl -s http://localhost:18888 > /dev/null; do + echo "Waiting for MCP Server..." + sleep 1 + done + echo "โœ… MCP Server is available" + - | + echo "Running e2e tests..." + uv run pytest tests/e2e.py -v + - task: stop-all test-verbose: desc: Run tests with verbose output cmds: diff --git a/pyproject.toml b/pyproject.toml index 235a852..5e6eed4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,6 +64,7 @@ python_classes = ["Test*"] python_functions = ["test_*"] addopts = "--cov=. --cov-report=term-missing" pythonpath = ["."] +asyncio_mode = "auto" [dependency-groups] dev = [ @@ -73,4 +74,5 @@ dev = [ "ruff>=0.1.0", "mypy>=1.7.0", "pre-commit>=3.0.0", + "pytest-asyncio>=1.0.0", ] diff --git a/spark_types.py b/spark_types.py index bca836e..9062fe4 100644 --- a/spark_types.py +++ b/spark_types.py @@ -89,7 +89,7 @@ class ThreadState(str, Enum): class ExecutorMetrics(BaseModel): - metrics: Dict[str, int] = Field(None, alias="metrics") + metrics: Optional[Dict[str, int]] = Field(None, alias="metrics") model_config = ConfigDict(populate_by_name=True) @@ -97,23 +97,23 @@ class ExecutorMetrics(BaseModel): class ApplicationInfo(BaseModel): id: str name: str - cores_granted: int = Field(None, alias="coresGranted") - max_cores: int = Field(None, alias="maxCores") - cores_per_executor: int = Field(None, alias="coresPerExecutor") - memory_per_executor_mb: int = Field(None, alias="memoryPerExecutorMB") + cores_granted: Optional[int] = Field(None, alias="coresGranted") + max_cores: Optional[int] = Field(None, alias="maxCores") + cores_per_executor: Optional[int] = Field(None, alias="coresPerExecutor") + memory_per_executor_mb: Optional[int] = Field(None, alias="memoryPerExecutorMB") attempts: Sequence["ApplicationAttemptInfo"] model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) class ApplicationAttemptInfo(BaseModel): - attempt_id: str = Field(None, alias="attemptId") - start_time: datetime = Field(None, alias="startTime") - end_time: datetime = Field(None, alias="endTime") - last_updated: datetime = Field(None, alias="lastUpdated") + attempt_id: Optional[str] = Field(None, alias="attemptId") + start_time: Optional[datetime] = Field(None, alias="startTime") + end_time: Optional[datetime] = Field(None, alias="endTime") + last_updated: Optional[datetime] = Field(None, alias="lastUpdated") duration: int - spark_user: str = Field(None, alias="sparkUser") - app_spark_version: str = Field(None, alias="appSparkVersion") + spark_user: Optional[str] = Field(None, alias="sparkUser") + app_spark_version: Optional[str] = Field(None, alias="appSparkVersion") completed: bool = False model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) @@ -136,83 +136,87 @@ def parse_datetime(cls, value): class ResourceProfileInfo(BaseModel): id: int - executor_resources: Dict[str, Any] = Field( + executor_resources: Optional[Dict[str, Any]] = Field( None, alias="executorResources" ) # Will be typed properly once those classes are defined - task_resources: Dict[str, Any] = Field(None, alias="taskResources") + task_resources: Optional[Dict[str, Any]] = Field(None, alias="taskResources") model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) class ExecutorStageSummary(BaseModel): - task_time: int = Field(None, alias="taskTime") - failed_tasks: int = Field(None, alias="failedTasks") - succeeded_tasks: int = Field(None, alias="succeededTasks") - killed_tasks: int = Field(None, alias="killedTasks") - input_bytes: int = Field(None, alias="inputBytes") - input_records: int = Field(None, alias="inputRecords") - output_bytes: int = Field(None, alias="outputBytes") - output_records: int = Field(None, alias="outputRecords") - shuffle_read: int = Field(None, alias="shuffleRead") - shuffle_read_records: int = Field(None, alias="shuffleReadRecords") - shuffle_write: int = Field(None, alias="shuffleWrite") - shuffle_write_records: int = Field(None, alias="shuffleWriteRecords") - memory_bytes_spilled: int = Field(None, alias="memoryBytesSpilled") - disk_bytes_spilled: int = Field(None, alias="diskBytesSpilled") - is_blacklisted_for_stage: bool = Field( + task_time: Optional[int] = Field(None, alias="taskTime") + failed_tasks: Optional[int] = Field(None, alias="failedTasks") + succeeded_tasks: Optional[int] = Field(None, alias="succeededTasks") + killed_tasks: Optional[int] = Field(None, alias="killedTasks") + input_bytes: Optional[int] = Field(None, alias="inputBytes") + input_records: Optional[int] = Field(None, alias="inputRecords") + output_bytes: Optional[int] = Field(None, alias="outputBytes") + output_records: Optional[int] = Field(None, alias="outputRecords") + shuffle_read: Optional[int] = Field(None, alias="shuffleRead") + shuffle_read_records: Optional[int] = Field(None, alias="shuffleReadRecords") + shuffle_write: Optional[int] = Field(None, alias="shuffleWrite") + shuffle_write_records: Optional[int] = Field(None, alias="shuffleWriteRecords") + memory_bytes_spilled: Optional[int] = Field(None, alias="memoryBytesSpilled") + disk_bytes_spilled: Optional[int] = Field(None, alias="diskBytesSpilled") + is_blacklisted_for_stage: Optional[bool] = Field( None, alias="isBlacklistedForStage" ) # deprecated - peak_memory_metrics: ExecutorMetrics = Field(None, alias="peakMemoryMetrics") - is_excluded_for_stage: bool = Field(None, alias="isExcludedForStage") + peak_memory_metrics: Optional[ExecutorMetrics] = Field( + None, alias="peakMemoryMetrics" + ) + is_excluded_for_stage: Optional[bool] = Field(None, alias="isExcludedForStage") model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) class SpeculationStageSummary(BaseModel): - num_tasks: int = Field(None, alias="numTasks") - num_active_tasks: int = Field(None, alias="numActiveTasks") - num_completed_tasks: int = Field(None, alias="numCompletedTasks") - num_failed_tasks: int = Field(None, alias="numFailedTasks") - num_killed_tasks: int = Field(None, alias="numKilledTasks") + num_tasks: Optional[int] = Field(None, alias="numTasks") + num_active_tasks: Optional[int] = Field(None, alias="numActiveTasks") + num_completed_tasks: Optional[int] = Field(None, alias="numCompletedTasks") + num_failed_tasks: Optional[int] = Field(None, alias="numFailedTasks") + num_killed_tasks: Optional[int] = Field(None, alias="numKilledTasks") model_config = ConfigDict(populate_by_name=True) class ExecutorSummary(BaseModel): id: str - host_port: str = Field(None, alias="hostPort") - is_active: bool = Field(None, alias="isActive") - rdd_blocks: int = Field(None, alias="rddBlocks") - memory_used: int = Field(None, alias="memoryUsed") - disk_used: int = Field(None, alias="diskUsed") - total_cores: int = Field(None, alias="totalCores") - max_tasks: int = Field(None, alias="maxTasks") - active_tasks: int = Field(None, alias="activeTasks") - failed_tasks: int = Field(None, alias="failedTasks") - completed_tasks: int = Field(None, alias="completedTasks") - total_tasks: int = Field(None, alias="totalTasks") - total_duration: int = Field(None, alias="totalDuration") - total_gc_time: int = Field(None, alias="totalGCTime") - total_input_bytes: int = Field(None, alias="totalInputBytes") - total_shuffle_read: int = Field(None, alias="totalShuffleRead") - total_shuffle_write: int = Field(None, alias="totalShuffleWrite") - is_blacklisted: bool = Field(None, alias="isBlacklisted") # deprecated - max_memory: int = Field(None, alias="maxMemory") - add_time: datetime = Field(None, alias="addTime") - remove_time: datetime = Field(None, alias="removeTime") - remove_reason: str = Field(None, alias="removeReason") - executor_logs: Dict[str, str] = Field(None, alias="executorLogs") - memory_metrics: "MemoryMetrics" = Field(None, alias="memoryMetrics") + host_port: Optional[str] = Field(None, alias="hostPort") + is_active: Optional[bool] = Field(None, alias="isActive") + rdd_blocks: Optional[int] = Field(None, alias="rddBlocks") + memory_used: Optional[int] = Field(None, alias="memoryUsed") + disk_used: Optional[int] = Field(None, alias="diskUsed") + total_cores: Optional[int] = Field(None, alias="totalCores") + max_tasks: Optional[int] = Field(None, alias="maxTasks") + active_tasks: Optional[int] = Field(None, alias="activeTasks") + failed_tasks: Optional[int] = Field(None, alias="failedTasks") + completed_tasks: Optional[int] = Field(None, alias="completedTasks") + total_tasks: Optional[int] = Field(None, alias="totalTasks") + total_duration: Optional[int] = Field(None, alias="totalDuration") + total_gc_time: Optional[int] = Field(None, alias="totalGCTime") + total_input_bytes: Optional[int] = Field(None, alias="totalInputBytes") + total_shuffle_read: Optional[int] = Field(None, alias="totalShuffleRead") + total_shuffle_write: Optional[int] = Field(None, alias="totalShuffleWrite") + is_blacklisted: Optional[bool] = Field(None, alias="isBlacklisted") # deprecated + max_memory: Optional[int] = Field(None, alias="maxMemory") + add_time: Optional[datetime] = Field(None, alias="addTime") + remove_time: Optional[datetime] = Field(None, alias="removeTime") + remove_reason: Optional[str] = Field(None, alias="removeReason") + executor_logs: Optional[Dict[str, str]] = Field(None, alias="executorLogs") + memory_metrics: Optional["MemoryMetrics"] = Field(None, alias="memoryMetrics") blacklisted_in_stages: Set[int] = Field( set(), alias="blacklistedInStages" ) # deprecated - peak_memory_metrics: ExecutorMetrics = Field(None, alias="peakMemoryMetrics") + peak_memory_metrics: Optional[ExecutorMetrics] = Field( + None, alias="peakMemoryMetrics" + ) attributes: Dict[str, str] resources: Dict[ str, Any ] # Will be typed properly once ResourceInformation is defined - resource_profile_id: int = Field(None, alias="resourceProfileId") - is_excluded: bool = Field(None, alias="isExcluded") + resource_profile_id: Optional[int] = Field(None, alias="resourceProfileId") + is_excluded: Optional[bool] = Field(None, alias="isExcluded") excluded_in_stages: Set[int] = Field(set(), alias="excludedInStages") model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) @@ -236,35 +240,43 @@ def parse_datetime(cls, value): class MemoryMetrics(BaseModel): - used_on_heap_storage_memory: int = Field(None, alias="usedOnHeapStorageMemory") - used_off_heap_storage_memory: int = Field(None, alias="usedOffHeapStorageMemory") - total_on_heap_storage_memory: int = Field(None, alias="totalOnHeapStorageMemory") - total_off_heap_storage_memory: int = Field(None, alias="totalOffHeapStorageMemory") + used_on_heap_storage_memory: Optional[int] = Field( + None, alias="usedOnHeapStorageMemory" + ) + used_off_heap_storage_memory: Optional[int] = Field( + None, alias="usedOffHeapStorageMemory" + ) + total_on_heap_storage_memory: Optional[int] = Field( + None, alias="totalOnHeapStorageMemory" + ) + total_off_heap_storage_memory: Optional[int] = Field( + None, alias="totalOffHeapStorageMemory" + ) model_config = ConfigDict(populate_by_name=True) class JobData(BaseModel): - job_id: int = Field(None, alias="jobId") + job_id: Optional[int] = Field(None, alias="jobId") name: str - description: str = None - submission_time: datetime = Field(None, alias="submissionTime") - completion_time: datetime = Field(None, alias="completionTime") - stage_ids: Sequence[int] = Field(None, alias="stageIds") - job_group: str = Field(None, alias="jobGroup") + description: Optional[str] = None + submission_time: Optional[datetime] = Field(None, alias="submissionTime") + completion_time: Optional[datetime] = Field(None, alias="completionTime") + stage_ids: Optional[Sequence[int]] = Field(None, alias="stageIds") + job_group: Optional[str] = Field(None, alias="jobGroup") job_tags: Sequence[str] = Field([], alias="jobTags") status: str # JobExecutionStatus as string - num_tasks: int = Field(None, alias="numTasks") - num_active_tasks: int = Field(None, alias="numActiveTasks") - num_completed_tasks: int = Field(None, alias="numCompletedTasks") - num_skipped_tasks: int = Field(None, alias="numSkippedTasks") - num_failed_tasks: int = Field(None, alias="numFailedTasks") - num_killed_tasks: int = Field(None, alias="numKilledTasks") - num_completed_indices: int = Field(None, alias="numCompletedIndices") - num_active_stages: int = Field(None, alias="numActiveStages") - num_completed_stages: int = Field(None, alias="numCompletedStages") - num_skipped_stages: int = Field(None, alias="numSkippedStages") - num_failed_stages: int = Field(None, alias="numFailedStages") + num_tasks: Optional[int] = Field(None, alias="numTasks") + num_active_tasks: Optional[int] = Field(None, alias="numActiveTasks") + num_completed_tasks: Optional[int] = Field(None, alias="numCompletedTasks") + num_skipped_tasks: Optional[int] = Field(None, alias="numSkippedTasks") + num_failed_tasks: Optional[int] = Field(None, alias="numFailedTasks") + num_killed_tasks: Optional[int] = Field(None, alias="numKilledTasks") + num_completed_indices: Optional[int] = Field(None, alias="numCompletedIndices") + num_active_stages: Optional[int] = Field(None, alias="numActiveStages") + num_completed_stages: Optional[int] = Field(None, alias="numCompletedStages") + num_skipped_stages: Optional[int] = Field(None, alias="numSkippedStages") + num_failed_stages: Optional[int] = Field(None, alias="numFailedStages") killed_tasks_summary: Dict[str, int] = Field({}, alias="killedTasksSummary") model_config = ConfigDict( @@ -292,15 +304,15 @@ def parse_datetime(cls, value): class RDDStorageInfo(BaseModel): id: int name: str - num_partitions: int = Field(None, alias="numPartitions") - num_cached_partitions: int = Field(None, alias="numCachedPartitions") - storage_level: str = Field(None, alias="storageLevel") - memory_used: int = Field(None, alias="memoryUsed") - disk_used: int = Field(None, alias="diskUsed") - data_distribution: Sequence["RDDDataDistribution"] = Field( + num_partitions: Optional[int] = Field(None, alias="numPartitions") + num_cached_partitions: Optional[int] = Field(None, alias="numCachedPartitions") + storage_level: Optional[str] = Field(None, alias="storageLevel") + memory_used: Optional[int] = Field(None, alias="memoryUsed") + disk_used: Optional[int] = Field(None, alias="diskUsed") + data_distribution: Optional[Sequence["RDDDataDistribution"]] = Field( None, alias="dataDistribution" ) - partitions: Sequence["RDDPartitionInfo"] = None + partitions: Optional[Sequence["RDDPartitionInfo"]] = None model_config = ConfigDict( populate_by_name=True, @@ -311,22 +323,24 @@ class RDDStorageInfo(BaseModel): class RDDDataDistribution(BaseModel): address: str - memory_used: int = Field(None, alias="memoryUsed") - memory_remaining: int = Field(None, alias="memoryRemaining") - disk_used: int = Field(None, alias="diskUsed") - on_heap_memory_used: int = Field(None, alias="onHeapMemoryUsed") - off_heap_memory_used: int = Field(None, alias="offHeapMemoryUsed") - on_heap_memory_remaining: int = Field(None, alias="onHeapMemoryRemaining") - off_heap_memory_remaining: int = Field(None, alias="offHeapMemoryRemaining") + memory_used: Optional[int] = Field(None, alias="memoryUsed") + memory_remaining: Optional[int] = Field(None, alias="memoryRemaining") + disk_used: Optional[int] = Field(None, alias="diskUsed") + on_heap_memory_used: Optional[int] = Field(None, alias="onHeapMemoryUsed") + off_heap_memory_used: Optional[int] = Field(None, alias="offHeapMemoryUsed") + on_heap_memory_remaining: Optional[int] = Field(None, alias="onHeapMemoryRemaining") + off_heap_memory_remaining: Optional[int] = Field( + None, alias="offHeapMemoryRemaining" + ) model_config = ConfigDict(populate_by_name=True) class RDDPartitionInfo(BaseModel): - block_name: str = Field(None, alias="blockName") - storage_level: str = Field(None, alias="storageLevel") - memory_used: int = Field(None, alias="memoryUsed") - disk_used: int = Field(None, alias="diskUsed") + block_name: Optional[str] = Field(None, alias="blockName") + storage_level: Optional[str] = Field(None, alias="storageLevel") + memory_used: Optional[int] = Field(None, alias="memoryUsed") + disk_used: Optional[int] = Field(None, alias="diskUsed") executors: Sequence[str] model_config = ConfigDict(populate_by_name=True) @@ -334,101 +348,121 @@ class RDDPartitionInfo(BaseModel): class StageData(BaseModel): status: str # StageStatus as string - stage_id: int = Field(None, alias="stageId") - attempt_id: int = Field(None, alias="attemptId") - num_tasks: int = Field(None, alias="numTasks") - num_active_tasks: int = Field(None, alias="numActiveTasks") - num_complete_tasks: int = Field(None, alias="numCompleteTasks") - num_failed_tasks: int = Field(None, alias="numFailedTasks") - num_killed_tasks: int = Field(None, alias="numKilledTasks") - num_completed_indices: int = Field(None, alias="numCompletedIndices") - - submission_time: datetime = Field(None, alias="submissionTime") - first_task_launched_time: datetime = Field(None, alias="firstTaskLaunchedTime") - completion_time: datetime = Field(None, alias="completionTime") - failure_reason: str = Field(None, alias="failureReason") - - executor_deserialize_time: int = Field(None, alias="executorDeserializeTime") - executor_deserialize_cpu_time: int = Field(None, alias="executorDeserializeCpuTime") - executor_run_time: int = Field(None, alias="executorRunTime") - executor_cpu_time: int = Field(None, alias="executorCpuTime") - result_size: int = Field(None, alias="resultSize") - jvm_gc_time: int = Field(None, alias="jvmGcTime") - result_serialization_time: int = Field(None, alias="resultSerializationTime") - memory_bytes_spilled: int = Field(None, alias="memoryBytesSpilled") - disk_bytes_spilled: int = Field(None, alias="diskBytesSpilled") - peak_execution_memory: int = Field(None, alias="peakExecutionMemory") - input_bytes: int = Field(None, alias="inputBytes") - input_records: int = Field(None, alias="inputRecords") - output_bytes: int = Field(None, alias="outputBytes") - output_records: int = Field(None, alias="outputRecords") - shuffle_remote_blocks_fetched: int = Field(None, alias="shuffleRemoteBlocksFetched") - shuffle_local_blocks_fetched: int = Field(None, alias="shuffleLocalBlocksFetched") - shuffle_fetch_wait_time: int = Field(None, alias="shuffleFetchWaitTime") - shuffle_remote_bytes_read: int = Field(None, alias="shuffleRemoteBytesRead") - shuffle_remote_bytes_read_to_disk: int = Field( + stage_id: Optional[int] = Field(None, alias="stageId") + attempt_id: Optional[int] = Field(None, alias="attemptId") + num_tasks: Optional[int] = Field(None, alias="numTasks") + num_active_tasks: Optional[int] = Field(None, alias="numActiveTasks") + num_complete_tasks: Optional[int] = Field(None, alias="numCompleteTasks") + num_failed_tasks: Optional[int] = Field(None, alias="numFailedTasks") + num_killed_tasks: Optional[int] = Field(None, alias="numKilledTasks") + num_completed_indices: Optional[int] = Field(None, alias="numCompletedIndices") + + submission_time: Optional[datetime] = Field(None, alias="submissionTime") + first_task_launched_time: Optional[datetime] = Field( + None, alias="firstTaskLaunchedTime" + ) + completion_time: Optional[datetime] = Field(None, alias="completionTime") + failure_reason: Optional[str] = Field(None, alias="failureReason") + + executor_deserialize_time: Optional[int] = Field( + None, alias="executorDeserializeTime" + ) + executor_deserialize_cpu_time: Optional[int] = Field( + None, alias="executorDeserializeCpuTime" + ) + executor_run_time: Optional[int] = Field(None, alias="executorRunTime") + executor_cpu_time: Optional[int] = Field(None, alias="executorCpuTime") + result_size: Optional[int] = Field(None, alias="resultSize") + jvm_gc_time: Optional[int] = Field(None, alias="jvmGcTime") + result_serialization_time: Optional[int] = Field( + None, alias="resultSerializationTime" + ) + memory_bytes_spilled: Optional[int] = Field(None, alias="memoryBytesSpilled") + disk_bytes_spilled: Optional[int] = Field(None, alias="diskBytesSpilled") + peak_execution_memory: Optional[int] = Field(None, alias="peakExecutionMemory") + input_bytes: Optional[int] = Field(None, alias="inputBytes") + input_records: Optional[int] = Field(None, alias="inputRecords") + output_bytes: Optional[int] = Field(None, alias="outputBytes") + output_records: Optional[int] = Field(None, alias="outputRecords") + shuffle_remote_blocks_fetched: Optional[int] = Field( + None, alias="shuffleRemoteBlocksFetched" + ) + shuffle_local_blocks_fetched: Optional[int] = Field( + None, alias="shuffleLocalBlocksFetched" + ) + shuffle_fetch_wait_time: Optional[int] = Field(None, alias="shuffleFetchWaitTime") + shuffle_remote_bytes_read: Optional[int] = Field( + None, alias="shuffleRemoteBytesRead" + ) + shuffle_remote_bytes_read_to_disk: Optional[int] = Field( None, alias="shuffleRemoteBytesReadToDisk" ) - shuffle_local_bytes_read: int = Field(None, alias="shuffleLocalBytesRead") - shuffle_read_bytes: int = Field(None, alias="shuffleReadBytes") - shuffle_read_records: int = Field(None, alias="shuffleReadRecords") - shuffle_corrupt_merged_block_chunks: int = Field( + shuffle_local_bytes_read: Optional[int] = Field(None, alias="shuffleLocalBytesRead") + shuffle_read_bytes: Optional[int] = Field(None, alias="shuffleReadBytes") + shuffle_read_records: Optional[int] = Field(None, alias="shuffleReadRecords") + shuffle_corrupt_merged_block_chunks: Optional[int] = Field( 0, alias="shuffleCorruptMergedBlockChunks" ) - shuffle_merged_fetch_fallback_count: int = Field( + shuffle_merged_fetch_fallback_count: Optional[int] = Field( 0, alias="shuffleMergedFetchFallbackCount" ) - shuffle_merged_remote_blocks_fetched: int = Field( + shuffle_merged_remote_blocks_fetched: Optional[int] = Field( 0, alias="shuffleMergedRemoteBlocksFetched" ) - shuffle_merged_local_blocks_fetched: int = Field( + shuffle_merged_local_blocks_fetched: Optional[int] = Field( 0, alias="shuffleMergedLocalBlocksFetched" ) - shuffle_merged_remote_chunks_fetched: int = Field( + shuffle_merged_remote_chunks_fetched: Optional[int] = Field( 0, alias="shuffleMergedRemoteChunksFetched" ) - shuffle_merged_local_chunks_fetched: int = Field( + shuffle_merged_local_chunks_fetched: Optional[int] = Field( 0, alias="shuffleMergedLocalChunksFetched" ) - shuffle_merged_remote_bytes_read: int = Field( + shuffle_merged_remote_bytes_read: Optional[int] = Field( 0, alias="shuffleMergedRemoteBytesRead" ) - shuffle_merged_local_bytes_read: int = Field(0, alias="shuffleMergedLocalBytesRead") - shuffle_remote_reqs_duration: int = Field(0, alias="shuffleRemoteReqsDuration") - shuffle_merged_remote_reqs_duration: int = Field( + shuffle_merged_local_bytes_read: Optional[int] = Field( + 0, alias="shuffleMergedLocalBytesRead" + ) + shuffle_remote_reqs_duration: Optional[int] = Field( + 0, alias="shuffleRemoteReqsDuration" + ) + shuffle_merged_remote_reqs_duration: Optional[int] = Field( 0, alias="shuffleMergedRemoteReqsDuration" ) - shuffle_write_bytes: int = Field(None, alias="shuffleWriteBytes") - shuffle_write_time: int = Field(None, alias="shuffleWriteTime") - shuffle_write_records: int = Field(None, alias="shuffleWriteRecords") + shuffle_write_bytes: Optional[int] = Field(None, alias="shuffleWriteBytes") + shuffle_write_time: Optional[int] = Field(None, alias="shuffleWriteTime") + shuffle_write_records: Optional[int] = Field(None, alias="shuffleWriteRecords") name: str - description: str = None + description: Optional[str] = None details: str - scheduling_pool: str = Field(None, alias="schedulingPool") + scheduling_pool: Optional[str] = Field(None, alias="schedulingPool") - # rdd_ids: Sequence[int] = Field(None, alias="rddIds") - accumulator_updates: Sequence["AccumulableInfo"] = Field( + # rdd_ids: Optional[Sequence[int]] = Field(None, alias="rddIds") + accumulator_updates: Optional[Sequence["AccumulableInfo"]] = Field( None, alias="accumulatorUpdates" ) - tasks: Dict[str, "TaskData"] = None - executor_summary: Dict[str, ExecutorStageSummary] = Field( + tasks: Optional[Dict[str, "TaskData"]] = None + executor_summary: Optional[Dict[str, ExecutorStageSummary]] = Field( None, alias="executorSummary" ) - speculation_summary: SpeculationStageSummary = Field( + speculation_summary: Optional[SpeculationStageSummary] = Field( None, alias="speculationSummary" ) killed_tasks_summary: Dict[str, int] = Field({}, alias="killedTasksSummary") - resource_profile_id: int = Field(None, alias="resourceProfileId") - peak_executor_metrics: ExecutorMetrics = Field(None, alias="peakExecutorMetrics") - task_metrics_distributions: "TaskMetricDistributions" = Field( + resource_profile_id: Optional[int] = Field(None, alias="resourceProfileId") + peak_executor_metrics: Optional[ExecutorMetrics] = Field( + None, alias="peakExecutorMetrics" + ) + task_metrics_distributions: Optional["TaskMetricDistributions"] = Field( None, alias="taskMetricsDistributions" ) - executor_metrics_distributions: "ExecutorMetricsDistributions" = Field( + executor_metrics_distributions: Optional["ExecutorMetricsDistributions"] = Field( None, alias="executorMetricsDistributions" ) is_shuffle_push_enabled: bool = Field(False, alias="isShufflePushEnabled") - shuffle_mergers_count: int = Field(0, alias="shuffleMergersCount") + shuffle_mergers_count: Optional[int] = Field(0, alias="shuffleMergersCount") model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True) @@ -453,26 +487,26 @@ def parse_datetime(cls, value): class TaskData(BaseModel): - task_id: int = Field(None, alias="taskId") + task_id: Optional[int] = Field(None, alias="taskId") index: int attempt: int - partition_id: int = Field(None, alias="partitionId") - launch_time: datetime = Field(None, alias="launchTime") - result_fetch_start: datetime = Field(None, alias="resultFetchStart") - duration: int = None - executor_id: str = Field(None, alias="executorId") + partition_id: Optional[int] = Field(None, alias="partitionId") + launch_time: Optional[datetime] = Field(None, alias="launchTime") + result_fetch_start: Optional[datetime] = Field(None, alias="resultFetchStart") + duration: Optional[int] = None + executor_id: Optional[str] = Field(None, alias="executorId") host: str status: str - task_locality: str = Field(None, alias="taskLocality") + task_locality: Optional[str] = Field(None, alias="taskLocality") speculative: bool - accumulator_updates: Sequence["AccumulableInfo"] = Field( + accumulator_updates: Optional[Sequence["AccumulableInfo"]] = Field( None, alias="accumulatorUpdates" ) - error_message: str = Field(None, alias="errorMessage") - task_metrics: "TaskMetrics" = Field(None, alias="taskMetrics") + error_message: Optional[str] = Field(None, alias="errorMessage") + task_metrics: Optional["TaskMetrics"] = Field(None, alias="taskMetrics") executor_logs: Dict[str, str] = Field({}, alias="executorLogs") - scheduler_delay: int = Field(0, alias="schedulerDelay") - getting_result_time: int = Field(0, alias="gettingResultTime") + scheduler_delay: Optional[int] = Field(0, alias="schedulerDelay") + getting_result_time: Optional[int] = Field(0, alias="gettingResultTime") model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True) @@ -495,20 +529,28 @@ def parse_datetime(cls, value): class TaskMetrics(BaseModel): - executor_deserialize_time: int = Field(None, alias="executorDeserializeTime") - executor_deserialize_cpu_time: int = Field(None, alias="executorDeserializeCpuTime") - executor_run_time: int = Field(None, alias="executorRunTime") - executor_cpu_time: int = Field(None, alias="executorCpuTime") - result_size: int = Field(None, alias="resultSize") - jvm_gc_time: int = Field(None, alias="jvmGcTime") - result_serialization_time: int = Field(None, alias="resultSerializationTime") - memory_bytes_spilled: int = Field(None, alias="memoryBytesSpilled") - disk_bytes_spilled: int = Field(None, alias="diskBytesSpilled") - peak_execution_memory: int = Field(None, alias="peakExecutionMemory") - input_metrics: "InputMetrics" = Field(None, alias="inputMetrics") - output_metrics: "OutputMetrics" = Field(None, alias="outputMetrics") - shuffle_read_metrics: "ShuffleReadMetrics" = Field(None, alias="shuffleReadMetrics") - shuffle_write_metrics: "ShuffleWriteMetrics" = Field( + executor_deserialize_time: Optional[int] = Field( + None, alias="executorDeserializeTime" + ) + executor_deserialize_cpu_time: Optional[int] = Field( + None, alias="executorDeserializeCpuTime" + ) + executor_run_time: Optional[int] = Field(None, alias="executorRunTime") + executor_cpu_time: Optional[int] = Field(None, alias="executorCpuTime") + result_size: Optional[int] = Field(None, alias="resultSize") + jvm_gc_time: Optional[int] = Field(None, alias="jvmGcTime") + result_serialization_time: Optional[int] = Field( + None, alias="resultSerializationTime" + ) + memory_bytes_spilled: Optional[int] = Field(None, alias="memoryBytesSpilled") + disk_bytes_spilled: Optional[int] = Field(None, alias="diskBytesSpilled") + peak_execution_memory: Optional[int] = Field(None, alias="peakExecutionMemory") + input_metrics: Optional["InputMetrics"] = Field(None, alias="inputMetrics") + output_metrics: Optional["OutputMetrics"] = Field(None, alias="outputMetrics") + shuffle_read_metrics: Optional["ShuffleReadMetrics"] = Field( + None, alias="shuffleReadMetrics" + ) + shuffle_write_metrics: Optional["ShuffleWriteMetrics"] = Field( None, alias="shuffleWriteMetrics" ) @@ -516,43 +558,59 @@ class TaskMetrics(BaseModel): class InputMetrics(BaseModel): - bytes_read: int = Field(None, alias="bytesRead") - records_read: int = Field(None, alias="recordsRead") + bytes_read: Optional[int] = Field(None, alias="bytesRead") + records_read: Optional[int] = Field(None, alias="recordsRead") model_config = ConfigDict(populate_by_name=True) class OutputMetrics(BaseModel): - bytes_written: int = Field(None, alias="bytesWritten") - records_written: int = Field(None, alias="recordsWritten") + bytes_written: Optional[int] = Field(None, alias="bytesWritten") + records_written: Optional[int] = Field(None, alias="recordsWritten") model_config = ConfigDict(populate_by_name=True) class ShufflePushReadMetrics(BaseModel): - corrupt_merged_block_chunks: int = Field(None, alias="corruptMergedBlockChunks") - merged_fetch_fallback_count: int = Field(None, alias="mergedFetchFallbackCount") - remote_merged_blocks_fetched: int = Field(None, alias="remoteMergedBlocksFetched") - local_merged_blocks_fetched: int = Field(None, alias="localMergedBlocksFetched") - remote_merged_chunks_fetched: int = Field(None, alias="remoteMergedChunksFetched") - local_merged_chunks_fetched: int = Field(None, alias="localMergedChunksFetched") - remote_merged_bytes_read: int = Field(None, alias="remoteMergedBytesRead") - local_merged_bytes_read: int = Field(None, alias="localMergedBytesRead") - remote_merged_reqs_duration: int = Field(None, alias="remoteMergedReqsDuration") + corrupt_merged_block_chunks: Optional[int] = Field( + None, alias="corruptMergedBlockChunks" + ) + merged_fetch_fallback_count: Optional[int] = Field( + None, alias="mergedFetchFallbackCount" + ) + remote_merged_blocks_fetched: Optional[int] = Field( + None, alias="remoteMergedBlocksFetched" + ) + local_merged_blocks_fetched: Optional[int] = Field( + None, alias="localMergedBlocksFetched" + ) + remote_merged_chunks_fetched: Optional[int] = Field( + None, alias="remoteMergedChunksFetched" + ) + local_merged_chunks_fetched: Optional[int] = Field( + None, alias="localMergedChunksFetched" + ) + remote_merged_bytes_read: Optional[int] = Field(None, alias="remoteMergedBytesRead") + local_merged_bytes_read: Optional[int] = Field(None, alias="localMergedBytesRead") + remote_merged_reqs_duration: Optional[int] = Field( + None, alias="remoteMergedReqsDuration" + ) model_config = ConfigDict(populate_by_name=True) class ShuffleReadMetrics(BaseModel): - remote_blocks_fetched: int = Field(None, alias="remoteBlocksFetched") - local_blocks_fetched: int = Field(None, alias="localBlocksFetched") - fetch_wait_time: int = Field(None, alias="fetchWaitTime") - remote_bytes_read: int = Field(None, alias="remoteBytesRead") - remote_bytes_read_to_disk: int = Field(None, alias="remoteBytesReadToDisk") - local_bytes_read: int = Field(None, alias="localBytesRead") - records_read: int = Field(None, alias="recordsRead") - remote_reqs_duration: int = Field(None, alias="remoteReqsDuration") - shuffle_push_read_metrics: ShufflePushReadMetrics = Field( + remote_blocks_fetched: Optional[int] = Field(None, alias="remoteBlocksFetched") + local_blocks_fetched: Optional[int] = Field(None, alias="localBlocksFetched") + fetch_wait_time: Optional[int] = Field(None, alias="fetchWaitTime") + remote_bytes_read: Optional[int] = Field(None, alias="remoteBytesRead") + remote_bytes_read_to_disk: Optional[int] = Field( + None, alias="remoteBytesReadToDisk" + ) + local_bytes_read: Optional[int] = Field(None, alias="localBytesRead") + records_read: Optional[int] = Field(None, alias="recordsRead") + remote_reqs_duration: Optional[int] = Field(None, alias="remoteReqsDuration") + shuffle_push_read_metrics: Optional[ShufflePushReadMetrics] = Field( None, alias="shufflePushReadMetrics" ) @@ -560,42 +618,54 @@ class ShuffleReadMetrics(BaseModel): class ShuffleWriteMetrics(BaseModel): - bytes_written: int = Field(None, alias="bytesWritten") - write_time: int = Field(None, alias="writeTime") - records_written: int = Field(None, alias="recordsWritten") + bytes_written: Optional[int] = Field(None, alias="bytesWritten") + write_time: Optional[int] = Field(None, alias="writeTime") + records_written: Optional[int] = Field(None, alias="recordsWritten") model_config = ConfigDict(populate_by_name=True) class TaskMetricDistributions(BaseModel): - quantiles: Sequence[float] = Field(None, alias="quantiles") + quantiles: Optional[Sequence[float]] = Field(None, alias="quantiles") - duration: Sequence[float] = Field(None, alias="duration") - executor_deserialize_time: Sequence[float] = Field( + duration: Optional[Sequence[float]] = Field(None, alias="duration") + executor_deserialize_time: Optional[Sequence[float]] = Field( None, alias="executorDeserializeTime" ) - executor_deserialize_cpu_time: Sequence[float] = Field( + executor_deserialize_cpu_time: Optional[Sequence[float]] = Field( None, alias="executorDeserializeCpuTime" ) - executor_run_time: Sequence[float] = Field(None, alias="executorRunTime") - executor_cpu_time: Sequence[float] = Field(None, alias="executorCpuTime") - result_size: Sequence[float] = Field(None, alias="resultSize") - jvm_gc_time: Sequence[float] = Field(None, alias="jvmGcTime") - result_serialization_time: Sequence[float] = Field( + executor_run_time: Optional[Sequence[float]] = Field(None, alias="executorRunTime") + executor_cpu_time: Optional[Sequence[float]] = Field(None, alias="executorCpuTime") + result_size: Optional[Sequence[float]] = Field(None, alias="resultSize") + jvm_gc_time: Optional[Sequence[float]] = Field(None, alias="jvmGcTime") + result_serialization_time: Optional[Sequence[float]] = Field( None, alias="resultSerializationTime" ) - getting_result_time: Sequence[float] = Field(None, alias="gettingResultTime") - scheduler_delay: Sequence[float] = Field(None, alias="schedulerDelay") - peak_execution_memory: Sequence[float] = Field(None, alias="peakExecutionMemory") - memory_bytes_spilled: Sequence[float] = Field(None, alias="memoryBytesSpilled") - disk_bytes_spilled: Sequence[float] = Field(None, alias="diskBytesSpilled") + getting_result_time: Optional[Sequence[float]] = Field( + None, alias="gettingResultTime" + ) + scheduler_delay: Optional[Sequence[float]] = Field(None, alias="schedulerDelay") + peak_execution_memory: Optional[Sequence[float]] = Field( + None, alias="peakExecutionMemory" + ) + memory_bytes_spilled: Optional[Sequence[float]] = Field( + None, alias="memoryBytesSpilled" + ) + disk_bytes_spilled: Optional[Sequence[float]] = Field( + None, alias="diskBytesSpilled" + ) - input_metrics: "InputMetricDistributions" = Field(None, alias="inputMetrics") - output_metrics: "OutputMetricDistributions" = Field(None, alias="outputMetrics") - shuffle_read_metrics: "ShuffleReadMetricDistributions" = Field( + input_metrics: Optional["InputMetricDistributions"] = Field( + None, alias="inputMetrics" + ) + output_metrics: Optional["OutputMetricDistributions"] = Field( + None, alias="outputMetrics" + ) + shuffle_read_metrics: Optional["ShuffleReadMetricDistributions"] = Field( None, alias="shuffleReadMetrics" ) - shuffle_write_metrics: "ShuffleWriteMetricDistributions" = Field( + shuffle_write_metrics: Optional["ShuffleWriteMetricDistributions"] = Field( None, alias="shuffleWriteMetrics" ) @@ -603,43 +673,45 @@ class TaskMetricDistributions(BaseModel): class InputMetricDistributions(BaseModel): - bytes_read: Sequence[float] = Field(None, alias="bytesRead") - records_read: Sequence[float] = Field(None, alias="recordsRead") + bytes_read: Optional[Sequence[float]] = Field(None, alias="bytesRead") + records_read: Optional[Sequence[float]] = Field(None, alias="recordsRead") model_config = ConfigDict(populate_by_name=True) class OutputMetricDistributions(BaseModel): - bytes_written: Sequence[float] = Field(None, alias="bytesWritten") - records_written: Sequence[float] = Field(None, alias="recordsWritten") + bytes_written: Optional[Sequence[float]] = Field(None, alias="bytesWritten") + records_written: Optional[Sequence[float]] = Field(None, alias="recordsWritten") model_config = ConfigDict(populate_by_name=True) class ShufflePushReadMetricDistributions(BaseModel): - corrupt_merged_block_chunks: Sequence[float] = Field( + corrupt_merged_block_chunks: Optional[Sequence[float]] = Field( None, alias="corruptMergedBlockChunks" ) - merged_fetch_fallback_count: Sequence[float] = Field( + merged_fetch_fallback_count: Optional[Sequence[float]] = Field( None, alias="mergedFetchFallbackCount" ) - remote_merged_blocks_fetched: Sequence[float] = Field( + remote_merged_blocks_fetched: Optional[Sequence[float]] = Field( None, alias="remoteMergedBlocksFetched" ) - local_merged_blocks_fetched: Sequence[float] = Field( + local_merged_blocks_fetched: Optional[Sequence[float]] = Field( None, alias="localMergedBlocksFetched" ) - remote_merged_chunks_fetched: Sequence[float] = Field( + remote_merged_chunks_fetched: Optional[Sequence[float]] = Field( None, alias="remoteMergedChunksFetched" ) - local_merged_chunks_fetched: Sequence[float] = Field( + local_merged_chunks_fetched: Optional[Sequence[float]] = Field( None, alias="localMergedChunksFetched" ) - remote_merged_bytes_read: Sequence[float] = Field( + remote_merged_bytes_read: Optional[Sequence[float]] = Field( None, alias="remoteMergedBytesRead" ) - local_merged_bytes_read: Sequence[float] = Field(None, alias="localMergedBytesRead") - remote_merged_reqs_duration: Sequence[float] = Field( + local_merged_bytes_read: Optional[Sequence[float]] = Field( + None, alias="localMergedBytesRead" + ) + remote_merged_reqs_duration: Optional[Sequence[float]] = Field( None, alias="remoteMergedReqsDuration" ) @@ -649,21 +721,29 @@ class ShufflePushReadMetricDistributions(BaseModel): class ExecutorMetricsDistributions(BaseModel): quantiles: Sequence[float] - task_time: Sequence[float] = Field(None, alias="taskTime") - failed_tasks: Sequence[float] = Field(None, alias="failedTasks") - succeeded_tasks: Sequence[float] = Field(None, alias="succeededTasks") - killed_tasks: Sequence[float] = Field(None, alias="killedTasks") - input_bytes: Sequence[float] = Field(None, alias="inputBytes") - input_records: Sequence[float] = Field(None, alias="inputRecords") - output_bytes: Sequence[float] = Field(None, alias="outputBytes") - output_records: Sequence[float] = Field(None, alias="outputRecords") - shuffle_read: Sequence[float] = Field(None, alias="shuffleRead") - shuffle_read_records: Sequence[float] = Field(None, alias="shuffleReadRecords") - shuffle_write: Sequence[float] = Field(None, alias="shuffleWrite") - shuffle_write_records: Sequence[float] = Field(None, alias="shuffleWriteRecords") - memory_bytes_spilled: Sequence[float] = Field(None, alias="memoryBytesSpilled") - disk_bytes_spilled: Sequence[float] = Field(None, alias="diskBytesSpilled") - peak_memory_metrics: "ExecutorPeakMetricsDistributions" = Field( + task_time: Optional[Sequence[float]] = Field(None, alias="taskTime") + failed_tasks: Optional[Sequence[float]] = Field(None, alias="failedTasks") + succeeded_tasks: Optional[Sequence[float]] = Field(None, alias="succeededTasks") + killed_tasks: Optional[Sequence[float]] = Field(None, alias="killedTasks") + input_bytes: Optional[Sequence[float]] = Field(None, alias="inputBytes") + input_records: Optional[Sequence[float]] = Field(None, alias="inputRecords") + output_bytes: Optional[Sequence[float]] = Field(None, alias="outputBytes") + output_records: Optional[Sequence[float]] = Field(None, alias="outputRecords") + shuffle_read: Optional[Sequence[float]] = Field(None, alias="shuffleRead") + shuffle_read_records: Optional[Sequence[float]] = Field( + None, alias="shuffleReadRecords" + ) + shuffle_write: Optional[Sequence[float]] = Field(None, alias="shuffleWrite") + shuffle_write_records: Optional[Sequence[float]] = Field( + None, alias="shuffleWriteRecords" + ) + memory_bytes_spilled: Optional[Sequence[float]] = Field( + None, alias="memoryBytesSpilled" + ) + disk_bytes_spilled: Optional[Sequence[float]] = Field( + None, alias="diskBytesSpilled" + ) + peak_memory_metrics: Optional["ExecutorPeakMetricsDistributions"] = Field( None, alias="peakMemoryMetrics" ) @@ -672,34 +752,44 @@ class ExecutorMetricsDistributions(BaseModel): class ExecutorPeakMetricsDistributions(BaseModel): quantiles: Sequence[float] - executor_metrics: Sequence[ExecutorMetrics] = Field(None, alias="executorMetrics") + executor_metrics: Optional[Sequence[ExecutorMetrics]] = Field( + None, alias="executorMetrics" + ) model_config = ConfigDict(populate_by_name=True) class ShuffleReadMetricDistributions(BaseModel): - read_bytes: Sequence[float] = Field(None, alias="readBytes") - read_records: Sequence[float] = Field(None, alias="readRecords") - remote_blocks_fetched: Sequence[float] = Field(None, alias="remoteBlocksFetched") - local_blocks_fetched: Sequence[float] = Field(None, alias="localBlocksFetched") - fetch_wait_time: Sequence[float] = Field(None, alias="fetchWaitTime") - remote_bytes_read: Sequence[float] = Field(None, alias="remoteBytesRead") - remote_bytes_read_to_disk: Sequence[float] = Field( + read_bytes: Optional[Sequence[float]] = Field(None, alias="readBytes") + read_records: Optional[Sequence[float]] = Field(None, alias="readRecords") + remote_blocks_fetched: Optional[Sequence[float]] = Field( + None, alias="remoteBlocksFetched" + ) + local_blocks_fetched: Optional[Sequence[float]] = Field( + None, alias="localBlocksFetched" + ) + fetch_wait_time: Optional[Sequence[float]] = Field(None, alias="fetchWaitTime") + remote_bytes_read: Optional[Sequence[float]] = Field(None, alias="remoteBytesRead") + remote_bytes_read_to_disk: Optional[Sequence[float]] = Field( None, alias="remoteBytesReadToDisk" ) - total_blocks_fetched: Sequence[float] = Field(None, alias="totalBlocksFetched") - remote_reqs_duration: Sequence[float] = Field(None, alias="remoteReqsDuration") - shuffle_push_read_metrics_dist: ShufflePushReadMetricDistributions = Field( - None, alias="shufflePushReadMetricsDist" + total_blocks_fetched: Optional[Sequence[float]] = Field( + None, alias="totalBlocksFetched" + ) + remote_reqs_duration: Optional[Sequence[float]] = Field( + None, alias="remoteReqsDuration" + ) + shuffle_push_read_metrics_dist: Optional[ShufflePushReadMetricDistributions] = ( + Field(None, alias="shufflePushReadMetricsDist") ) model_config = ConfigDict(populate_by_name=True) class ShuffleWriteMetricDistributions(BaseModel): - write_bytes: Sequence[float] = Field(None, alias="writeBytes") - write_records: Sequence[float] = Field(None, alias="writeRecords") - write_time: Sequence[float] = Field(None, alias="writeTime") + write_bytes: Optional[Sequence[float]] = Field(None, alias="writeBytes") + write_records: Optional[Sequence[float]] = Field(None, alias="writeRecords") + write_time: Optional[Sequence[float]] = Field(None, alias="writeTime") model_config = ConfigDict(populate_by_name=True) @@ -721,14 +811,22 @@ class VersionInfo(BaseModel): class ApplicationEnvironmentInfo(BaseModel): runtime: "RuntimeInfo" - spark_properties: Sequence[tuple[str, str]] = Field(None, alias="sparkProperties") - hadoop_properties: Sequence[tuple[str, str]] = Field(None, alias="hadoopProperties") - system_properties: Sequence[tuple[str, str]] = Field(None, alias="systemProperties") - metrics_properties: Sequence[tuple[str, str]] = Field( + spark_properties: Optional[Sequence[tuple[str, str]]] = Field( + None, alias="sparkProperties" + ) + hadoop_properties: Optional[Sequence[tuple[str, str]]] = Field( + None, alias="hadoopProperties" + ) + system_properties: Optional[Sequence[tuple[str, str]]] = Field( + None, alias="systemProperties" + ) + metrics_properties: Optional[Sequence[tuple[str, str]]] = Field( None, alias="metricsProperties" ) - classpath_entries: Sequence[tuple[str, str]] = Field(None, alias="classpathEntries") - resource_profiles: Sequence[ResourceProfileInfo] = Field( + classpath_entries: Optional[Sequence[tuple[str, str]]] = Field( + None, alias="classpathEntries" + ) + resource_profiles: Optional[Sequence[ResourceProfileInfo]] = Field( None, alias="resourceProfiles" ) @@ -736,9 +834,9 @@ class ApplicationEnvironmentInfo(BaseModel): class RuntimeInfo(BaseModel): - java_version: str = Field(None, alias="javaVersion") - java_home: str = Field(None, alias="javaHome") - scala_version: str = Field(None, alias="scalaVersion") + java_version: Optional[str] = Field(None, alias="javaVersion") + java_home: Optional[str] = Field(None, alias="javaHome") + scala_version: Optional[str] = Field(None, alias="scalaVersion") model_config = ConfigDict(populate_by_name=True) @@ -759,20 +857,22 @@ def mkstring(self, start: str, sep: str, end: str) -> str: class ThreadStackTrace(BaseModel): - thread_id: int = Field(None, alias="threadId") - thread_name: str = Field(None, alias="threadName") - thread_state: str = Field(None, alias="threadState") # ThreadState as string - stack_trace: StackTrace = Field(None, alias="stackTrace") - blocked_by_thread_id: int = Field(None, alias="blockedByThreadId") - blocked_by_lock: str = Field(None, alias="blockedByLock") + thread_id: Optional[int] = Field(None, alias="threadId") + thread_name: Optional[str] = Field(None, alias="threadName") + thread_state: Optional[str] = Field( + None, alias="threadState" + ) # ThreadState as string + stack_trace: Optional[StackTrace] = Field(None, alias="stackTrace") + blocked_by_thread_id: Optional[int] = Field(None, alias="blockedByThreadId") + blocked_by_lock: Optional[str] = Field(None, alias="blockedByLock") holding_locks: Sequence[str] = Field([], alias="holdingLocks") # deprecated synchronizers: Sequence[str] monitors: Sequence[str] - lock_name: str = Field(None, alias="lockName") - lock_owner_name: str = Field(None, alias="lockOwnerName") + lock_name: Optional[str] = Field(None, alias="lockName") + lock_owner_name: Optional[str] = Field(None, alias="lockOwnerName") suspended: bool - in_native: bool = Field(None, alias="inNative") - is_daemon: bool = Field(None, alias="isDaemon") + in_native: Optional[bool] = Field(None, alias="inNative") + is_daemon: Optional[bool] = Field(None, alias="isDaemon") priority: int model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True) @@ -780,12 +880,12 @@ class ThreadStackTrace(BaseModel): class ProcessSummary(BaseModel): id: str - host_port: str = Field(None, alias="hostPort") - is_active: bool = Field(None, alias="isActive") - total_cores: int = Field(None, alias="totalCores") - add_time: datetime = Field(None, alias="addTime") - remove_time: datetime = Field(None, alias="removeTime") - process_logs: Dict[str, str] = Field(None, alias="processLogs") + host_port: Optional[str] = Field(None, alias="hostPort") + is_active: Optional[bool] = Field(None, alias="isActive") + total_cores: Optional[int] = Field(None, alias="totalCores") + add_time: Optional[datetime] = Field(None, alias="addTime") + remove_time: Optional[datetime] = Field(None, alias="removeTime") + process_logs: Optional[Dict[str, str]] = Field(None, alias="processLogs") model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True) @@ -849,10 +949,10 @@ class ExecutionData(BaseModel): id: int status: str # SQLExecutionStatus as string - description: str = Field(None, alias="planDescription") + description: Optional[str] = Field(None, alias="planDescription") plan_description: str = Field(..., alias="planDescription") submission_time: datetime = Field(..., alias="submissionTime") - duration: int = Field(None, alias="durationMilliSeconds") + duration: Optional[int] = Field(None, alias="durationMilliSeconds") running_job_ids: Sequence[int] = Field([], alias="runningJobIds") success_job_ids: Sequence[int] = Field([], alias="successJobIds") failed_job_ids: Sequence[int] = Field([], alias="failedJobIds") diff --git a/start_local_spark_history.sh b/start_local_spark_history.sh index 911ac72..c59a898 100755 --- a/start_local_spark_history.sh +++ b/start_local_spark_history.sh @@ -12,6 +12,7 @@ USAGE: OPTIONS: -h, --help Show this help message --dry-run Validate prerequisites without starting the server + --interactive Run Docker container in interactive mode DESCRIPTION: This script starts a local Spark History Server using Docker for testing @@ -37,6 +38,7 @@ EOF # Parse command line arguments DRY_RUN=false +INTERACTIVE=false for arg in "$@"; do case $arg in -h|--help) @@ -47,6 +49,10 @@ for arg in "$@"; do DRY_RUN=true shift ;; + --interactive) + INTERACTIVE=true + shift + ;; *) echo "Unknown option: $arg" echo "Use --help for usage information." @@ -141,17 +147,35 @@ fi # Start Spark History Server with proper container name and error handling echo "๐Ÿณ Starting Docker container..." -docker run -it \ - --name spark-history-server \ - --rm \ - -v "$(pwd)/examples/basic:/mnt/data" \ - -p 18080:18080 \ - docker.io/apache/spark:3.5.5 \ - /opt/java/openjdk/bin/java \ - -cp '/opt/spark/conf:/opt/spark/jars/*' \ - -Xmx1g \ - org.apache.spark.deploy.history.HistoryServer \ - --properties-file /mnt/data/history-server.conf +if [ "$INTERACTIVE" = true ]; then + docker run -it \ + --name spark-history-server \ + --rm \ + -v "$(pwd)/examples/basic:/mnt/data" \ + -p 18080:18080 \ + docker.io/apache/spark:3.5.5 \ + /opt/java/openjdk/bin/java \ + -cp '/opt/spark/conf:/opt/spark/jars/*' \ + -Xmx1g \ + org.apache.spark.deploy.history.HistoryServer \ + --properties-file /mnt/data/history-server.conf +else + docker run \ + --name spark-history-server \ + --rm \ + -v "$(pwd)/examples/basic:/mnt/data" \ + -p 18080:18080 \ + docker.io/apache/spark:3.5.5 \ + /opt/java/openjdk/bin/java \ + -cp '/opt/spark/conf:/opt/spark/jars/*' \ + -Xmx1g \ + org.apache.spark.deploy.history.HistoryServer \ + --properties-file /mnt/data/history-server.conf + + echo "Spark History Server started in detached mode" + echo "To view logs: docker logs -f spark-history-server" + echo "To stop: docker stop spark-history-server" +fi echo "" echo "๐Ÿ›‘ Spark History Server stopped." diff --git a/tests/e2e.py b/tests/e2e.py new file mode 100644 index 0000000..53e3529 --- /dev/null +++ b/tests/e2e.py @@ -0,0 +1,122 @@ +import json +from contextlib import AsyncExitStack, asynccontextmanager +from types import TracebackType + +import pytest +from mcp import ClientSession +from mcp.client.streamable_http import streamablehttp_client +from mcp.types import TextContent + +from spark_types import ApplicationInfo, JobData + +mcp_endpoint = "http://localhost:18888/mcp/" +test_spark_id = "spark-cc4d115f011443d787f03a71a476a745" + + +class McpClient: + def __init__(self): + self._client_session = None + self._exit_stack = None + + @asynccontextmanager + async def initialize(self): + self._exit_stack = AsyncExitStack() + async with AsyncExitStack() as stack: + read, write, _ = await stack.enter_async_context( + streamablehttp_client(mcp_endpoint) + ) + mcp_client = await stack.enter_async_context(ClientSession(read, write)) + await mcp_client.initialize() + self._client_session = mcp_client + yield mcp_client + + @classmethod + @asynccontextmanager + async def get_mcp_client(cls): + client = cls() + async with client.initialize() as session: + yield session + + async def call_tool(self, name, arguments): + return await self._client_session.call_tool(name=name, arguments=arguments) + + async def list_tools(self): + return await self._client_session.list_tools() + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + await self._exit_stack.aclose() + + async def __aenter__(self): + self._exit_stack = AsyncExitStack() + read, write, _ = await self._exit_stack.enter_async_context( + streamablehttp_client(mcp_endpoint) + ) + self._client_session = await self._exit_stack.enter_async_context( + ClientSession(read, write) + ) + await self._client_session.initialize() + return self + + +@pytest.mark.asyncio +async def test_tools_not_empty(): + async with McpClient() as client: + tool_result = await client.list_tools() + assert tool_result, "Tools list should not be empty" + assert len(tool_result.tools) > 0, "Tools list should contain at least one tool" + + +@pytest.mark.asyncio +async def test_get_application(): + async with McpClient() as client: + app_result = await client.call_tool( + "get_application", {"spark_id": test_spark_id} + ) + assert not app_result.isError + assert isinstance(app_result.content[0], TextContent), ( + "get_application should return a TextContent object" + ) + + app_data = json.loads(app_result.content[0].text) + app_info = ApplicationInfo.model_validate(app_data) + + # Validate specific fields + assert app_info.id == test_spark_id + assert app_info.name == "NewYorkTaxiData_2025_06_27_03_56_52" + + +@pytest.mark.asyncio +async def test_get_jobs_no_filter(): + async with McpClient() as client: + # Test with status filter + jobs_result = await client.call_tool("get_jobs", {"spark_id": test_spark_id}) + assert not jobs_result.isError + assert len(jobs_result.content) == 6 + for content in jobs_result.content: + assert isinstance(content, TextContent), ( + "get_jobs should return a TextContent object" + ) + stage = JobData.model_validate_json(content.text) + assert stage.status == "SUCCEEDED", "All jobs should have SUCCEEDED status" + + +@pytest.mark.asyncio +async def test_get_jobs_with_status_filter(): + async with McpClient() as client: + # Test with status filter + jobs_result = await client.call_tool( + "get_jobs", {"spark_id": test_spark_id, "status": ["SUCCEEDED"]} + ) + assert not jobs_result.isError + assert len(jobs_result.content) > 0 + for content in jobs_result.content: + assert isinstance(content, TextContent), ( + "get_jobs should return a TextContent object" + ) + stage = JobData.model_validate_json(content.text) + assert stage.status == "SUCCEEDED", "All jobs should have SUCCEEDED status" diff --git a/uv.lock b/uv.lock index 4f404d3..c17634d 100644 --- a/uv.lock +++ b/uv.lock @@ -495,6 +495,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/29/16/c8a903f4c4dffe7a12843191437d7cd8e32751d5de349d45d3fe69544e87/pytest-8.4.1-py3-none-any.whl", hash = "sha256:539c70ba6fcead8e78eebbf1115e8b589e7565830d7d006a8723f19ac8a0afb7", size = 365474, upload-time = "2025-06-18T05:48:03.955Z" }, ] +[[package]] +name = "pytest-asyncio" +version = "1.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/d0/d4/14f53324cb1a6381bef29d698987625d80052bb33932d8e7cbf9b337b17c/pytest_asyncio-1.0.0.tar.gz", hash = "sha256:d15463d13f4456e1ead2594520216b225a16f781e144f8fdf6c5bb4667c48b3f", size = 46960, upload-time = "2025-05-26T04:54:40.484Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/30/05/ce271016e351fddc8399e546f6e23761967ee09c8c568bbfbecb0c150171/pytest_asyncio-1.0.0-py3-none-any.whl", hash = "sha256:4f024da9f1ef945e680dc68610b52550e36590a67fd31bb3b4943979a1f90ef3", size = 15976, upload-time = "2025-05-26T04:54:39.035Z" }, +] + [[package]] name = "pytest-cov" version = "6.2.1" @@ -641,6 +653,7 @@ dev = [ { name = "mypy" }, { name = "pre-commit" }, { name = "pytest" }, + { name = "pytest-asyncio" }, { name = "pytest-cov" }, { name = "ruff" }, ] @@ -659,6 +672,7 @@ dev = [ { name = "mypy", specifier = ">=1.7.0" }, { name = "pre-commit", specifier = ">=3.0.0" }, { name = "pytest", specifier = ">=8.4.1" }, + { name = "pytest-asyncio", specifier = ">=1.0.0" }, { name = "pytest-cov", specifier = ">=4.0.0" }, { name = "ruff", specifier = ">=0.1.0" }, ]