-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathcommit_update.py
147 lines (130 loc) · 5.36 KB
/
commit_update.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import datetime as dt
import logging
from shared.celery_config import commit_update_task_name
from shared.torngit.exceptions import TorngitClientError, TorngitRepoNotFoundError
from app import celery_app
from database.models import Branch, Commit, Pull
from helpers.exceptions import RepositoryWithoutValidBotError
from helpers.github_installation import get_installation_name_for_owner_for_task
from services.repository import (
get_repo_provider_service,
possibly_update_commit_from_provider_info,
)
from tasks.base import BaseCodecovTask
log = logging.getLogger(__name__)
class CommitUpdateTask(BaseCodecovTask, name=commit_update_task_name):
def run_impl(
self,
db_session,
repoid: int,
commitid: str,
**kwargs,
):
commit = None
commits = db_session.query(Commit).filter(
Commit.repoid == repoid, Commit.commitid == commitid
)
commit = commits.first()
assert commit, "Commit not found in database."
repository = commit.repository
repository_service = None
was_updated = False
try:
installation_name_to_use = get_installation_name_for_owner_for_task(
self.name, repository.owner
)
repository_service = get_repo_provider_service(
repository, installation_name_to_use=installation_name_to_use
)
was_updated = possibly_update_commit_from_provider_info(
commit, repository_service
)
if isinstance(commit.timestamp, str):
commit.timestamp = dt.datetime.fromisoformat(commit.timestamp).replace(
tzinfo=None
)
if commit.pullid is not None:
# upsert pull
pull = (
db_session.query(Pull)
.filter(Pull.repoid == repoid, Pull.pullid == commit.pullid)
.first()
)
if pull is None:
pull = Pull(
repoid=repoid,
pullid=commit.pullid,
author_id=commit.author_id,
head=commit.commitid,
)
db_session.add(pull)
else:
previous_pull_head = (
db_session.query(Commit)
.filter(Commit.repoid == repoid, Commit.commitid == pull.head)
.first()
)
if (
previous_pull_head is None
or previous_pull_head.deleted == True
or previous_pull_head.timestamp < commit.timestamp
):
pull.head = commit.commitid
db_session.flush()
if commit.branch is not None:
# upsert branch
branch = (
db_session.query(Branch)
.filter(Branch.repoid == repoid, Branch.branch == commit.branch)
.first()
)
if branch is None:
branch = Branch(
repoid=repoid,
branch=commit.branch,
head=commit.commitid,
authors=[commit.author_id],
)
db_session.add(branch)
else:
if commit.author_id is not None:
if branch.authors is None:
branch.authors = [commit.author_id]
elif commit.author_id not in branch.authors:
branch.authors.append(commit.author_id)
previous_branch_head = (
db_session.query(Commit)
.filter(Commit.repoid == repoid, Commit.commitid == branch.head)
.first()
)
if (
previous_branch_head is None
or previous_branch_head.deleted == True
or previous_branch_head.timestamp < commit.timestamp
):
branch.head = commit.commitid
db_session.flush()
except RepositoryWithoutValidBotError:
log.warning(
"Unable to reach git provider because repo doesn't have a valid bot",
extra=dict(repoid=repoid, commit=commitid),
)
except TorngitRepoNotFoundError:
log.warning(
"Unable to reach git provider because this specific bot/integration can't see that repository",
extra=dict(repoid=repoid, commit=commitid),
)
except TorngitClientError:
log.warning(
"Unable to reach git provider because there was a 4xx error",
extra=dict(repoid=repoid, commit=commitid),
exc_info=True,
)
if was_updated:
log.info(
"Commit updated successfully",
extra=dict(commitid=commitid, repoid=repoid),
)
return {"was_updated": was_updated}
RegisteredCommitUpdateTask = celery_app.register_task(CommitUpdateTask())
commit_update_task = celery_app.tasks[RegisteredCommitUpdateTask.name]