Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 517 lines (461 sloc) 18.3 KB
#!/usr/bin/python -tt
#-*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
This program performs a number of check to have an educated guess as to
whether someone can be consider as 'active' or not within the fedora
import datetime
import fedora_cert
import getpass
import json
import logging
import re
import time
import urllib
# Initial simple logging stuff
log = logging.getLogger("active-user")
_table_keys = {
'user_perms': ['user_id', 'perm_id'],
'user_groups': ['user_id', 'group_id'],
'tag_inheritance': ['tag_id', 'parent_id'],
'tag_config': ['tag_id'],
'build_target_config': ['build_target_id'],
'external_repo_config': ['external_repo_id'],
'tag_external_repos': ['tag_id', 'external_repo_id'],
'tag_listing': ['build_id', 'tag_id'],
'tag_packages': ['package_id', 'tag_id'],
'group_config': ['group_id', 'tag_id'],
'group_req_listing': ['group_id', 'tag_id', 'req_id'],
'group_package_listing': ['group_id', 'tag_id', 'package'],
_mailing_lists = [
def _get_bodhi_history(username):
""" Print the last action performed on bodhi by the given FAS user.
:arg username, the fas username whose action is searched.
from fedora.client.bodhi import BodhiClient
bodhiclient = BodhiClient("")
log.debug('Querying Bodhi for user: {0}'.format(username))
json_obj = bodhiclient.send_request(
"updates/?user=%s" % username, verb='GET')
def dategetter(field):
def getter(item):
return datetime.datetime.strptime(item[field],
"%Y-%m-%d %H:%M:%S")
return getter
print('Last package update on bodhi:')
if json_obj['updates']:
latest = sorted(json_obj['updates'], key=dategetter("date_submitted")
print(' {0} on package {1}'.format(
latest["date_submitted"], latest["title"]))
print(' No activity found on bodhi')
def _get_bugzilla_history(email, all_comments=False):
""" Query the bugzilla for all bugs to which the provided email
is either assigned or cc'ed. Then for each bug found, print the
latest comment from this user (if any).
:arg email, the email address used in the bugzilla and for which we
are searching for activities.
:arg all_comments, boolean to display all the comments made by this
person on the bugzilla.
from bugzilla.rhbugzilla import RHBugzilla
bzclient = RHBugzilla(url='')
log.debug('Querying bugzilla for email: {0}'.format(email))
print("Bugzilla activity")
bugbz = bzclient.query({
'emailtype1': 'substring',
'emailcc1': True,
'emailassigned_to1': True,
'query_format': 'advanced',
'order': 'Last Change',
'bug_status': ['ASSIGNED', 'NEW', 'NEEDINFO'],
'email1': email
print(' {0} bugs assigned, cc or on which {1} commented'.format(
len(bugbz), email))
# Retrieve the information about this user
user = bzclient.getuser(email)
print('Last comment on the most recent ticket on bugzilla:')
ids = [bug.bug_id for bug in bugbz]
for bug in bzclient.getbugs(ids):
user_coms = filter(lambda com: com["creator_id"] == user.userid,
if user_coms:
last_com = user_coms[-1]
converted = datetime.datetime.strptime(last_com['time'].value,
print(u' #{0} {1} {2}'.format(bug.bug_id,
if not all_comments:
def _get_koji_history(username):
Print the last operation made by this user in koji.
This is partly stolen from the koji client written by:
Dennis Gregorovic <>
Mike McLean <>
Cristian Balint <>
:arg username, the fas username whose history is investigated.
import koji
kojiclient = koji.ClientSession('',
log.debug('Search last history element in koji for {0}'.format(username))
histdata = kojiclient.queryHistory(user=username)
timeline = []
def distinguish_match(x, name):
"""determine if create or revoke event matched"""
name = '_' + name
ret = True
for key in x:
if key.startswith(name):
ret = ret and x[key]
return ret
for table in histdata:
hist = histdata[table]
for x in hist:
if x['revoke_event'] is not None:
if distinguish_match(x, 'revoked'):
timeline.append((x['revoke_event'], table, 0, x.copy()))
if distinguish_match(x, 'created'):
timeline.append((x['create_event'], table, 1, x))
#group edits together
new_timeline = []
last_event = None
edit_index = {}
for entry in timeline:
event_id, table, create, x = entry
if event_id != last_event:
edit_index = {}
last_event = event_id
key = tuple([x[k] for k in _table_keys[table]])
prev = edit_index.get((table, event_id), {}).get(key)
if prev:
prev[-1].setdefault('.related', []).append(entry)
edit_index.setdefault((table, event_id), {})[key] = entry
print('Last action on koji:')
for entry in new_timeline[-1:]:
def _get_last_email_list(email):
""" Using gname, let's find the last email sent by this email.
:arg email, the email address to search on the mailing lists.
log.debug('Searching mailing lists for email {0}'.format(email))
print('Last email on mailing list:')
for mailinglist in _mailing_lists:
log.debug('Searching list {0}'.format(mailinglist))
url = '' \
% (mailinglist, urllib.quote(email))
stream = urllib.urlopen(url)
page =
regex = re.compile(r'.*(\d\d\d\d-\d\d-\d\d).*')
for line in page.split('\n'):
if 'GMT' in line:
g = regex.match(line)
print(' %s %s' % (g.groups()[0].split(' ')[0], mailinglist))
print(" No activity found on %s" % mailinglist)
def _get_fedmsg_history(username):
""" Using datagrepper, returns the last 10 actions of the user
according to his/her username over the last year.
:arg username, the fas username whose action is searched.
log.debug('Searching datagrepper for the action of {0}'.format(
print('Last actions performed according to fedmsg:')
url = ''\
'rows_per_page=10' % (username)
stream = urllib.urlopen(url)
page =
jsonobj = json.loads(page)
for entry in jsonobj['raw_messages']:
print(' - %s on %s' % (
).strftime('%Y-%m-%d %H:%M:%S'))),
if 'meetbot' in entry['topic']:
if username in entry['msg']['chairs']:
print("(%s was a chair)" % username),
elif username in entry['msg']['attendees']:
print("(%s participated)" % username),
# datagrepper returned this message for our user, but the user
# doesn't appear in the message. How?
raise ValueError("This shouldn't happen.")
def _get_last_website_login(username):
""" Retrieve from FAS the last time this user has been seen.
:arg username, the fas username from who we would like to see the
last connection in FAS.
from fedora.client import AccountSystem
fasclient = AccountSystem()
log.debug('Querying FAS for user: {0}'.format(username))
fasusername = fedora_cert.read_user_cert()
except Exception:
log.debug('Could not read Fedora cert, using login name')
fasusername = raw_input('FAS username: ')
password = getpass.getpass('FAS password for %s: ' % fasusername)
fasclient.username = fasusername
fasclient.password = password
person = fasclient.person_by_username(username)
print('Last login in FAS:')
print(' %s %s' % (username, person['last_seen'].split(' ')[0]))
def _print_histline(entry, **kwargs):
This is mainly stolen from the koji client written by:
Dennis Gregorovic <>
Mike McLean <>
Cristian Balint <>
event_id, table, create, x = entry
who = None
edit = x.get('.related')
if edit:
del x['.related']
bad_edit = None
if len(edit) != 1:
bad_edit = '{0} elements'.format(len(edit) + 1)
other = edit[0]
#check edit for sanity
if create or not other[2]:
bad_edit = "out of order"
if event_id != other[0]:
bad_edit = "non-matching"
if bad_edit:
print('Warning: unusual edit at event {0} in table {1} ({2})'
''.format(event_id, table, bad_edit))
#we'll simply treat them as separate events
_print_histline(entry, **kwargs)
for data in edit:
_print_histline(entry, **kwargs)
if create:
ts = x['create_ts']
if 'creator_name' in x:
who = "by %(creator_name)s"
ts = x['revoke_ts']
if 'revoker_name' in x:
who = "by %(revoker_name)s"
if table == 'tag_listing':
if edit:
fmt = "%(name)s-%(version)s-%(release)s re-tagged into "\
elif create:
fmt = "%(name)s-%(version)s-%(release)s tagged into %("
fmt = "%(name)s-%(version)s-%(release)s untagged from %("
elif table == 'user_perms':
if edit:
fmt = "permission %( re-granted to %("
elif create:
fmt = "permission %( granted to %("
fmt = "permission %( revoked for %("
elif table == 'user_groups':
if edit:
fmt = "user %( re-added to group %("
elif create:
fmt = "user %( added to group %("
fmt = "user %( removed from group %("
elif table == 'tag_packages':
if edit:
fmt = "package list entry for %( in %( "\
elif create:
fmt = "package list entry created: %( in "\
fmt = "package list entry revoked: %( in "\
elif table == 'tag_inheritance':
if edit:
fmt = "inheritance line %(>%( updated"
elif create:
fmt = "inheritance line %(>%( added"
fmt = "inheritance line %(>%( removed"
elif table == 'tag_config':
if edit:
fmt = "tag configuration for %( altered"
elif create:
fmt = "new tag: %("
fmt = "tag deleted: %("
elif table == 'build_target_config':
if edit:
fmt = "build target configuration for %( "\
elif create:
fmt = "new build target: %("
fmt = "build target deleted: %("
elif table == 'external_repo_config':
if edit:
fmt = "external repo configuration for %( "\
elif create:
fmt = "new external repo: %("
fmt = "external repo deleted: %("
elif table == 'tag_external_repos':
if edit:
fmt = "external repo entry for %( in tag "\
"%( updated"
elif create:
fmt = "external repo entry for %( added to "\
"tag %("
fmt = "external repo entry for %( removed "\
"from tag %("
elif table == 'group_config':
if edit:
fmt = "group %( configuration for tag %( "\
elif create:
fmt = "group %( added to tag %("
fmt = "group %( removed from tag %("
elif table == 'group_req_listing':
if edit:
fmt = "group dependency %(>%( updated in "\
"tag %("
elif create:
fmt = "group dependency %(>%( added in "\
"tag %("
fmt = "group dependency %(>%( dropped from"\
" tag %("
elif table == 'group_package_listing':
if edit:
fmt = "package entry %(package)s in group %(, tag "\
"%( updated"
elif create:
fmt = "package %(package)s added to group %( in tag "\
fmt = "package %(package)s removed from group %( in "\
"tag %("
if edit:
fmt = "%s entry updated" % table
elif create:
fmt = "%s entry created" % table
fmt = "%s entry revoked" % table
time_str = time.strftime("%a, %d %b %Y", time.localtime(ts))
parts = [time_str, fmt % x]
if who:
parts.append(who % x)
if create and x['active']:
parts.append("[still active]")
print(' ' + ' '.join(parts))
def main():
""" The main function."""
parser = setup_parser()
args = parser.parse_args()
global log
if args.debug:
#pkgdbclient.debug = True
elif args.verbose:
if args.username and not args.nofas:
if args.username and not args.nokoji:
if args.username and not args.nobodhi:
if args.username and not args.nofedmsg:
if and not args.nolists:
if and not args.nobz:
except Exception as err:
if args.debug:
return 1
return 0
def setup_parser():
Set the command line arguments.
import argparse
parser = argparse.ArgumentParser(
# General connection options
parser.add_argument('--user', dest="username",
help="FAS username")
parser.add_argument('--email', dest="email",
help="FAS or Bugzilla email looked for")
parser.add_argument('--nofas', action='store_true',
help="Do not check FAS")
parser.add_argument('--nokoji', action='store_true',
help="Do not check koji")
parser.add_argument('--nolists', action='store_true',
help="Do not check mailing lists")
parser.add_argument('--nobodhi', action='store_true',
help="Do not check bodhi")
parser.add_argument('--nobz', action='store_true',
help="Do not check bugzilla")
parser.add_argument('--nofedmsg', action='store_true',
help="Do not check fedmsg")
parser.add_argument('--all-comments', action='store_true',
help="Prints the date of all the comments made by this"
" person on bugzilla")
parser.add_argument('--verbose', action='store_true',
help="Gives more info about what's going on")
parser.add_argument('--debug', action='store_true',
help="Outputs bunches of debugging info")
return parser
if __name__ == '__main__':
import sys