Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 32 additions & 35 deletions events/code_review_events/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,53 +87,49 @@ def get_repositories(self, repositories, cache_root):
)
return repositories

async def run(self):
async def process_build(self, build):
"""
Code review workflow to load all necessary information from Phabricator builds
received from the webserver
"""
while True:
assert build is not None, "Invalid payload"
assert isinstance(build, PhabricatorBuild)

# Receive build from webserver
build = await self.bus.receive(QUEUE_WEB_BUILDS)
assert build is not None, "Invalid payload"
assert isinstance(build, PhabricatorBuild)
# Update its state
self.update_state(build)

# Update its state
self.update_state(build)
if build.state == PhabricatorBuildState.Public:

if build.state == PhabricatorBuildState.Public:
# Check if the author is not blacklisted
if self.is_blacklisted(build.revision):
return

# Check if the author is not blacklisted
if self.is_blacklisted(build.revision):
continue

# When the build is public, load needed details
try:
self.load_patches_stack(build)
logger.info("Loaded stack of patches", build=str(build))
# When the build is public, load needed details
try:
self.load_patches_stack(build)
logger.info("Loaded stack of patches", build=str(build))

self.load_reviewers(build)
logger.info("Loaded reviewers", build=str(build))
except Exception as e:
logger.warning(
"Failed to load build details", build=str(build), error=str(e)
)
continue
self.load_reviewers(build)
logger.info("Loaded reviewers", build=str(build))
except Exception as e:
logger.warning(
"Failed to load build details", build=str(build), error=str(e)
)
return

# Then send the build toward next stage
logger.info("Send build to Mercurial", build=str(build))
await self.bus.send(QUEUE_MERCURIAL, build)
# Then send the build toward next stage
logger.info("Send build to Mercurial", build=str(build))
await self.bus.send(QUEUE_MERCURIAL, build)

# Report public bug as 'working' (in progress)
await self.bus.send(QUEUE_PHABRICATOR_RESULTS, ("work", build, {}))
# Report public bug as 'working' (in progress)
await self.bus.send(QUEUE_PHABRICATOR_RESULTS, ("work", build, {}))

# Send to bugbug workflow
await self.bus.send(QUEUE_BUGBUG, build)
# Send to bugbug workflow
await self.bus.send(QUEUE_BUGBUG, build)

elif build.state == PhabricatorBuildState.Queued:
# Requeue when nothing changed for now
await self.bus.send(QUEUE_WEB_BUILDS, build)
elif build.state == PhabricatorBuildState.Queued:
# Requeue when nothing changed for now
await self.bus.send(QUEUE_WEB_BUILDS, build)

def is_blacklisted(self, revision: dict):
"""Check if the revision author is in blacklisted"""
Expand Down Expand Up @@ -396,7 +392,8 @@ def run(self):
# Code review main workflow
if self.workflow:
consumers += [
self.workflow.run(),
# Process Phabricator build received from webserver
self.bus.run(self.workflow.process_build, QUEUE_WEB_BUILDS),
# Publish results on Phabricator
self.bus.run(self.workflow.publish_results, QUEUE_PHABRICATOR_RESULTS),
# Parse and redirect pulse messages
Expand Down