AJDA-594 (input-mapping) job-based workspace loading for async input mapping#414
AJDA-594 (input-mapping) job-based workspace loading for async input mapping#414
Conversation
…pace Table Strategies
b677284 to
e8643cb
Compare
| if (!$plan->preserve) { | ||
| $this->logger->info('Cleaning workspace before loading tables.'); | ||
| $cleanJobId = $workspaces->queueWorkspaceCloneInto($workspaceId, [ | ||
| 'input' => [], // workspace will be only cleaned | ||
| 'preserve' => 0, | ||
| ]); | ||
|
|
||
| // Wait for clean job to complete before proceeding | ||
| $this->clientWrapper->getBranchClient()->handleAsyncTasks([$cleanJobId]); | ||
|
|
||
| // Don't add CLEAN job to queue since it's already completed | ||
| } |
There was a problem hiding this comment.
Tohle je odlisnost od puvodni implementace.
Tam to bylo synchronne za sebou. Clone a pak Copy. Pokud nemel byt preserve, tak prvni job i promazal workspace.
Tady ale chceme aby to bylo async. A zafrontovaly se oba joby.
Takze cleanup se musi vyresit predtim. Proto tenhle job. Zaroven se ceka na nej, nez se dokonci
There was a problem hiding this comment.
No to je prekerní tohle, vůbec se mě to neliíbí, protože to je zase hroznej overhead s jobama, ale jako lepší řešní nemám (pochopitelně bez toho, aby to podporovalo connection a týhle věci jsme se úplně zbavili).
Napadá mě jedině ten job založit a podívat se jestli se switchnul na processing a pak teprv založit ten druhej, čimž by se zařídilo, že jdou po sobě, jenže by to současně znamenalo, že když se zafrontují, tak se to celý zasekne a vytimoutuje, což je naprd.
Jenže stejně tak se to celý zafrontuje a vytimoutuje i když se bude čekat na ten cleanup job.
There was a problem hiding this comment.
Pro betu mi to přijde legitimní a směroplatný takhle
…bstractWorkspaceStrategy
…tWorkspaceStrategy
…WorkspaceStrategy
|
Pokracovani by mohlo byt nejak takhle #417 |
| $this->logger->info(sprintf('Table "%s" will be cloned.', $table->getSource())); | ||
| if ($loadType === WorkspaceLoadType::CLONE) { | ||
| return [ | ||
| 'table' => $table, |
There was a problem hiding this comment.
jako moc nechápu, proč se tady neposílají ty options taky, oni jsou samozřejmě všechny zbytečný (kromě overwrite - to se validuje v canClone, kterej se tam asi moc nepoužívá), ale ani tak by jako nemělo ničemu vadit to tam poslat
There was a problem hiding this comment.
a ano, vidím, že to tak bylo už předtím :)
There was a problem hiding this comment.
Do tohodle bych nehrabal. Ted ten endpoint v KBC ty ostatni parametry ignoruje. Ale az se to mergne do jednoho nedpointu, tak by to zaclo asi hazet chyby.
There was a problem hiding this comment.
vtip je v tom, že ty ostatní parametry se tam nepošlou, ale klidně to tam nech, bylo to tam, může to tam zůstat
| 'overwrite' => $table->getOverwrite(), | ||
| 'dropTimestampColumn' => !$table->keepInternalTimestampColumn(), | ||
| ]; | ||
| // ?????? sourceBranchId is alreeady set above and from same method, why check again? |
There was a problem hiding this comment.
No jo, to jsem psal já a vůbec nevím proč to tam je, podle mě je to úplná pitomost. Na svou obhajobu můžu říct jen to, že to tam bylo kvůli SOXu a řekl bych, že to vzniklo tak, že tam byl nějakej bug na connection což zmiňuje ten komentář a pak to někdo fixnul a dal jsem to nahoru a zapomněl už oddělat tu podmínku
dal bych to pryč
| $loadOptions = $table->getStorageApiLoadOptions($this->tablesState); | ||
| $loadType = $this->decideTableLoadMethod($table, $loadOptions); | ||
|
|
||
| $instructionLoadOptions = $loadType === WorkspaceLoadType::CLONE ? null : $loadOptions; |
There was a problem hiding this comment.
tady mi přijde, že by mit taky nemuselo být to větvení
|
|
||
| public function hasCloneInstructions(): bool | ||
| { | ||
| return !empty($this->getCloneInstructions()); |
There was a problem hiding this comment.
ještě, že je to pole malý a můžem si dovolit takovýhle plejtvání
|
|
||
| namespace Keboola\InputMapping\Table\Strategy; | ||
|
|
||
| enum WorkspaceJobType: string |
There was a problem hiding this comment.
nemůžem použít WorkspaceLoadType ?
There was a problem hiding this comment.
To by vlastně šlo 👍
| $copyTableOptions, | ||
| ['overwrite' => true], | ||
| ), | ||
| new WorkspaceTableLoadInstruction( |
There was a problem hiding this comment.
jako tohle je celkem unreal kombinace
There was a problem hiding this comment.
Jako muzu to rozdelit do vice testu, ale ty jsi prvni kdo je spis pro jejic mergovani do jednoho. Tak jsem ti chtel udelat radost :)
There was a problem hiding this comment.
nejde o oddělení testů, ale o to, že ty takovýhle instrukce se v reálu neobjeví
| self::assertInstanceOf(WorkspaceLoadQueue::class, $result); | ||
| self::assertCount(1, $result->jobs); | ||
| self::assertTrue($this->testHandler->hasInfoThatContains('Cleaning workspace before loading tables.')); | ||
| self::assertTrue($this->testHandler->hasInfoThatContains('Cloning 1 tables to workspace.')); |
There was a problem hiding this comment.
jako procházet tyhle vygenerovaný testy je fakt na sebevraždu, půlka z nich se překryvá s něčím jiným
| ); | ||
|
|
||
| $reflection = new ReflectionClass($local); | ||
| $method = $reflection->getMethod('getDataFilePath'); |
There was a problem hiding this comment.
toto taky duplikuje getManifestPathDataProvider, jako kdyby se to vytáhlo, tak aby to šlo otestovat, tak by k tomu nemuselo bejt 1500 řádků testů
|
|
||
| $plan = new WorkspaceLoadPlan([$copyInstruction, $viewInstruction], false); | ||
|
|
||
| self::assertFalse($plan->hasCloneInstructions()); |
There was a problem hiding this comment.
jako zase, tohle se dá přidat k předchozímu testu a dál je to zbytečný, stačí jich tady polovina
|
|
||
| $workspaceStrategy = $this->createMock(AbstractWorkspaceStrategy::class); | ||
| $workspaceStrategy | ||
| ->expects(self::once()) |
odinuv
left a comment
There was a problem hiding this comment.
jako může se to mergnout - ve bude to jistý zlepšení, i když to pořád neřeší ten asynchronní load - buede potřeba řešit tohle https://keboolaglobal.slack.com/archives/C0553EM2RGX/p1757331962719179
a bylo by super probrat ty testy a nechat tam jen ty užitečný, jako víc řádků testů neznamená, že je to líp otestovaný, sice jsou to mocky většinou a moc to nezpomalí to, ale udržovat se budou muset
…rnCallback in AbstractWorkspaceStrategyTest
There was a problem hiding this comment.
Pull Request Overview
The PR introduces a job-based table loading architecture for workspace operations, enabling asynchronous input mapping in the future. The changes lay the foundation by introducing structured job management with unique IDs, load queues for batch processing, and orchestration plans with clone/copy strategies.
- New job-based architecture with WorkspaceLoadJob, WorkspaceLoadQueue, and WorkspaceLoadPlan classes
- Enhanced AbstractWorkspaceStrategy with separate preparation and execution phases
- New public API methods: prepareAndExecuteTableLoads(), prepareTableLoadsToWorkspace(), and executeTableLoadsToWorkspace()
Reviewed Changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| WorkspaceLoadPlanTest.php | Tests for workspace load plan functionality including instruction filtering |
| WorkspaceLoadJobTest.php | Tests for workspace load job validation and type constraints |
| TestWorkspaceStrategy.php | Test helper class extending AbstractWorkspaceStrategy |
| SnowflakeTest.php | Added test for getWorkspaceType() method |
| LocalTest.php | Tests for data file path building logic |
| BigQueryTest.php | Added test for getWorkspaceType() method |
| AbstractWorkspaceStrategyTest.php | Comprehensive tests for workspace strategy preparation and execution |
| AbstractStrategyTest.php | Tests for manifest path building logic |
| AbstractFileStrategyTest.php | Tests for file strategy validation and getters |
| ReaderTest.php | Tests for new prepareAndExecuteTableLoads() method |
| WorkspaceTableLoadInstruction.php | Value object for table load instructions |
| WorkspaceLoadType.php | Enum defining clone, copy, and view load types |
| WorkspaceLoadQueue.php | Container for workspace load jobs |
| WorkspaceLoadPlan.php | Orchestration plan with instruction filtering |
| WorkspaceLoadJob.php | Job representation with validation |
| Snowflake.php | Made getWorkspaceType() public |
| S3.php | Refactored to use getManifestPath() helper |
| Local.php | Refactored to use getDataFilePath() and getManifestPath() helpers |
| BigQuery.php | Made getWorkspaceType() public |
| AbstractWorkspaceStrategy.php | Major refactoring with new job-based methods and extracted logic |
| AbstractStrategy.php | Added getManifestPath() helper and abstract methods |
| AbstractFileStrategy.php | Added getter methods for metadata storage and destination |
| ABS.php | Refactored to use getManifestPath() helper |
| Reader.php | Added prepareAndExecuteTableLoads() method and refactored table processing |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
…ion() methods from Table Strategy classes
Release NotesThe changes maintain full backward compatibility:
New feature:
Change TypeMinor Impact Analysis
Customer Communication PlansNot needed Justification
Deployment PlanRelease lib as minor version. Primary bump its version in AI service. Rollback PlanFix version of input mapping in AI service to previous version. Post-Release Support PlanNo special post-release support needed |
Fixes https://keboola.atlassian.net/browse/AJDA-594
Job-based table loading architecture for workspace operations, laying the foundation for asynchronous input mapping.
Public API Enhancements
prepareAndExecuteTableLoads()- New method enabling separation of preparation and execution phasesprepareTableLoadsToWorkspace()- Validates and plans table loads to workspace before executionexecuteTableLoadsToWorkspace()- Submits prepared workspace load jobs to Storage API for asynchronous executionCode Organization
The changes maintain full backward compatibility