-
Notifications
You must be signed in to change notification settings - Fork 413
/
on_button_click.py
155 lines (144 loc) · 5.54 KB
/
on_button_click.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import hashlib
import time
from github.Repository import Repository
from loguru import logger
from sweepai.config.client import (
RESET_FILE,
REVERT_CHANGED_FILES_TITLE,
RULES_LABEL,
RULES_TITLE,
get_blocked_dirs,
)
from sweepai.config.server import MONGODB_URI
from sweepai.core.post_merge import PostMerge
from sweepai.handlers.on_merge import comparison_to_diff
from sweepai.handlers.stack_pr import stack_pr
from sweepai.utils.buttons import ButtonList, check_button_title_match
from sweepai.utils.chat_logger import ChatLogger
from sweepai.utils.event_logger import posthog
from sweepai.utils.github_utils import get_github_client
from sweepai.utils.str_utils import BOT_SUFFIX
from sweepai.web.events import IssueCommentRequest
def handle_button_click(request_dict):
request = IssueCommentRequest(**request_dict)
user_token, gh_client = get_github_client(request_dict["installation"]["id"])
button_list = ButtonList.deserialize(request_dict["comment"]["body"])
selected_buttons = [button.label for button in button_list.get_clicked_buttons()]
repo = gh_client.get_repo(
request_dict["repository"]["full_name"]
) # do this after checking ref
comment_id = request.comment.id
pr = repo.get_pull(request_dict["issue"]["number"])
comment = pr.get_issue_comment(comment_id)
if check_button_title_match(
REVERT_CHANGED_FILES_TITLE, request.comment.body, request.changes
):
revert_files = []
for button_text in selected_buttons:
revert_files.append(button_text.split(f"{RESET_FILE} ")[-1].strip())
handle_revert(
file_paths=revert_files,
pr_number=request_dict["issue"]["number"],
repo=repo,
)
comment.edit(
ButtonList(
buttons=[
button
for button in button_list.buttons
if button.label not in selected_buttons
],
title=REVERT_CHANGED_FILES_TITLE,
).serialize()
)
if check_button_title_match(RULES_TITLE, request.comment.body, request.changes):
rules = []
for button_text in selected_buttons:
rules.append(button_text.split(f"{RULES_LABEL} ")[-1].strip())
handle_rules(
request_dict=request_dict,
rules=rules,
user_token=user_token,
repo=repo,
gh_client=gh_client,
)
comment.edit(
ButtonList(
buttons=[
button
for button in button_list.buttons
if button.label not in selected_buttons
],
title=RULES_TITLE,
).serialize()
+ BOT_SUFFIX
)
if not rules:
try:
comment.delete()
except Exception as e:
logger.error(f"Error deleting comment: {e}")
def handle_revert(file_paths, pr_number, repo: Repository):
pr = repo.get_pull(pr_number)
branch_name = pr.head.ref if pr_number else pr.pr_head
def get_contents_with_fallback(
repo: Repository, file_path: str, branch: str = None
):
try:
if branch:
return repo.get_contents(file_path, ref=branch)
return repo.get_contents(file_path)
except Exception:
return None
old_file_contents = [
get_contents_with_fallback(repo, file_path) for file_path in file_paths
]
for file_path, old_file_content in zip(file_paths, old_file_contents):
try:
current_content = repo.get_contents(file_path, ref=branch_name)
if old_file_content:
repo.update_file(
file_path,
f"Revert {file_path}",
old_file_content.decoded_content,
sha=current_content.sha,
branch=branch_name,
)
else:
repo.delete_file(
file_path,
f"Delete {file_path}",
sha=current_content.sha,
branch=branch_name,
)
except Exception:
pass # file may not exist and this is expected
def handle_rules(request_dict, rules, user_token, repo: Repository, gh_client):
pr = repo.get_pull(request_dict["issue"]["number"])
chat_logger = (
ChatLogger(
{"username": request_dict["sender"]["login"]},
)
if MONGODB_URI
else None
)
blocked_dirs = get_blocked_dirs(repo)
comparison = repo.compare(pr.base.sha, pr.head.sha) # head is the most recent
commits_diff = comparison_to_diff(comparison, blocked_dirs)
for rule in rules:
changes_required, issue_title, issue_description = PostMerge(
chat_logger=chat_logger
).check_for_issues(rule=rule, diff=commits_diff)
tracking_id = hashlib.sha256(str(time.time()).encode()).hexdigest()[:10]
if changes_required:
stack_pr(
request=issue_description
+ "\n\nThis issue was created to address the following rule:\n"
+ rule,
pr_number=request_dict["issue"]["number"],
username=request_dict["sender"]["login"],
repo_full_name=request_dict["repository"]["full_name"],
installation_id=request_dict["installation"]["id"],
tracking_id=tracking_id,
)
posthog.capture(request_dict["sender"]["login"], "rule_pr_created")