feat(service): Google Sheets workflow — 3 parallel loops, generic read activity#253
Merged
feat(service): Google Sheets workflow — 3 parallel loops, generic read activity#253
Conversation
The backfill logic was consolidated into pipelineWorkflow's state machine in #251. The standalone backfill workflow is no longer referenced or needed. Committed-By-Agent: claude
…read activity - Split readLoop into liveEventLoop + reconcileLoop + writeLoop - Single GoogleSheetWorkflowState object replaces scattered booleans - derivePipelineStatus() + setState() pattern matches pipeline-workflow.ts - EOF-based completion detection replaces deepEqual - Rename readGoogleSheetsIntoQueue → readIntoQueue (now generic) - Move row key/number augmentation from read to write activity - Treat SourceState as atomic in workflow; activities own merge logic - Rename RowIndex → RowIndexMap, state → sourceState for clarity Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
…signatures - Postgres test: use pipelineId arg (not full config), add pipelineStore, fix signal names (source_input, desired_status) - Google Sheets test: use googleSheetPipelineWorkflow, nested destination envelope format, pre-populate in-memory pipeline store - CLI: default --data-dir to ~/.stripe-sync for all commands Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
…sage utility - Rename RowIndex → RowIndexMap, rowAssignments → rowIndexMap for clarity - Extract immutable mergeStateMessage() utility in activities/_shared.ts - Remove redundant state variable aliases in write activity - Use assignment pattern (state = mergeStateMessage(state, msg)) consistently Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Follow snake_case API convention for the google sheets destination type identifier (registry key, type discriminator, nested config key). Package/directory names unchanged. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Committed-By-Agent: claude
When DATA_DIR is set, all commands (serve/worker/webhook) use it as the default data directory without requiring an explicit --data-dir flag. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Committed-By-Agent: claude
3 tasks
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Committed-By-Agent: claude
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Committed-By-Agent: claude
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Committed-By-Agent: claude
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
googleSheetPipelineWorkflowto use 3 parallel loops (liveEventLoop,reconcileLoop,writeLoop) matching thepipelineWorkflowpatternGoogleSheetWorkflowStateobject replaces scattered booleans (setupDone,readComplete,pendingWrites)derivePipelineStatus()+setState()pattern for consistent status reportingdeepEqualreadGoogleSheetsIntoQueue→readIntoQueue(now generic — reads from source into Kafka)mergeStateMessage()utility for consistent state merge across activitiesRowIndex→RowIndexMap,rowAssignments→rowIndexMapfor claritygoogle-sheets→google_sheets(snake_case on the wire)Test plan
pnpm buildpassespnpm test— all 23 service tests pass🤖 Generated with Claude Code