Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 618 lines (469 sloc) 18.5 KB
#!/usr/bin/python
import sys
import ConfigParser
import re
import urllib2
import tempfile
import os
import sys
import json
import functools
import collections
from datetime import datetime
from subprocess import call, check_output, STDOUT
EDITOR = os.environ.get('EDITOR', 'vim')
def urlopen(url, data=None, auth=None, headers=None):
"""Requests-like wrapper to GET/POST an URL."""
headers = headers or {}
headers.setdefault('User-Agent', _user_agent)
opener = urllib2.build_opener()
if auth:
user, password = auth
realm = _auth_realm
handler = urllib2.HTTPBasicAuthHandler()
handler.add_password(realm=realm, uri=url, user=user, passwd=password)
opener.add_handler(handler)
request = urllib2.Request(url)
for key, value in headers.items():
request.add_header(key, value)
return opener.open(request, data=data)
get = post = urlopen
def error(line):
sys.stderr.write(line)
sys.stderr.write('\n')
sys.stderr.flush()
class CommandNotFound(Exception):
pass
class Commander(object):
def __init__(self):
self.function_map = {}
def command(self, *args):
key = args
def wrapper(func):
self.function_map[key] = func
return func
return wrapper
def help(self):
error('usage: cb [command]')
error('The available commands are:')
for key in sorted(self.function_map):
func = self.function_map[key]
error(' %s %s' % (' '.join(key).ljust(20), func.func_doc))
def run_command(self, command_args):
command_found = False
command_end = len(command_args)
command = tuple(command_args[1:])
if command == ('help',):
commander.help()
else:
arguments = tuple(command_args[command_end:])
while not command_found and command:
try:
func = commander.function_map[command]
except KeyError:
command_end = command_end - 1
command = tuple(command_args[1:command_end])
arguments = tuple(command_args[command_end:])
else:
command_found = True
func(*arguments)
if not command_found:
raise CommandNotFound
commander = Commander()
def api_get(url):
return json.loads(authed_get('%s%s.json' % (API_URL, url)).read())
def api_post(url, data):
return authed_post(
'%s%s' % (API_URL, url),
data=data,
headers={'content-type': 'application/xml'},
).read()
@commander.command('ticket number')
def ticket_number():
'''Shows the ticket cb is working against.'''
print get_ticket_number()
def get_ticket_number(mute_error=False):
branch_name = check_output('git rev-parse --abbrev-ref HEAD', stderr=STDOUT, shell=True)[:-1]
auto_ticket_number = config_ticket_number = None
try:
auto_ticket_number = re.match('^(?:ticket-)?([0-9]+)', branch_name).groups()[0]
except AttributeError:
pass
try:
config_ticket_number = config.get('ticket', 'number')
except ConfigParser.NoSectionError, ConfigParser.NoOptionError:
pass
if auto_ticket_number is None and config_ticket_number is None and not mute_error:
error('Could not retrieve ticket number from config or branch name "%s".' % branch_name)
if auto_ticket_number:
return auto_ticket_number
return config_ticket_number
def get_ticket_note_xml(content=None, status_id=None, assignee_id=None):
if not content:
content = ''
changes = []
if status_id:
changes.append('<status-id>%s</status-id>' % status_id)
if assignee_id is not None:
changes.append('<assignee-id>%s</assignee-id>' % assignee_id)
return '<ticket-note><content>%s</content><changes>%s</changes></ticket-note>' % (content, ''.join(changes))
def post_ticket_note(ticket_number, content=None, status_id=None, assignee_id=None):
xml = get_ticket_note_xml(content, status_id, assignee_id)
api_post('tickets/%s/notes' % ticket_number, xml)
@commander.command('status')
def status(ticket_number=None):
'''Gets the status of the current ticket.'''
if not ticket_number:
ticket_number = get_ticket_number()
if ticket_number:
content = api_get('tickets/%s' % ticket_number)
print content['ticket']['status']['name']
@commander.command('status', 'update')
def status_update(new_status=None):
'''Sets the status of the ticket.'''
ticket_no = get_ticket_number()
invalid = False
statuses = api_get('tickets/statuses')
ids = {i: status['ticketing_status']['id'] for i, status in enumerate(statuses, 1)}
if not new_status:
print 'Please select an option:'
for i, status in enumerate(statuses, 1):
print '%d: %s' % (i, status['ticketing_status']['name'])
try:
status = int(raw_input('Please select from the above: '))
except ValueError:
invalid = True
else:
try:
status = int(new_status)
except ValueError:
invalid = True
if status not in range(1, len(statuses)) or invalid:
print 'Unknown value'
else:
post_ticket_note(ticket_no, status_id=ids[status])
@commander.command('statuses')
def statuses():
'''Gets the available statuses for a ticket.'''
statuses = api_get('tickets/statuses')
for i, status in enumerate(statuses, 1):
print '%d: %s' % (i, status['ticketing_status']['name'])
def get_user_details():
users = api_get('assignments')
user_details = {}
for user in users:
user = user['user']
user_details[user['id']] = {
'first_name': user['first_name'],
'last_name': user['last_name'],
'username': user['username'],
}
return user_details
@commander.command('assign')
@commander.command('assigned')
def assigned():
'''Gets the currently assigned user.'''
ticket_no = get_ticket_number()
user_details = get_user_details()
content = api_get('tickets/%s' % ticket_no)
assignee_id = content['ticket']['assignee_id']
try:
assignee_details = user_details[assignee_id]
print '%s %s' % (assignee_details['first_name'], assignee_details['last_name'])
except KeyError:
print 'Not assigned'
@commander.command('assign', 'update')
@commander.command('assigned', 'update')
def assigned_update():
'''Sets the assigned user.'''
ticket_no = get_ticket_number()
user_details = get_user_details()
user_id_map = {}
print 'Please select an option:'
for i, (user_id, user_details) in enumerate(user_details.iteritems(), 1):
print '%d: %s %s' % (i, user_details['first_name'], user_details['last_name'])
user_id_map[i] = user_id
print '0: Unassign'
user = int(raw_input('Please select from the above: '))
if user == 0:
user_id = ''
else:
user_id = str(user_id_map[user])
post_ticket_note(ticket_no, assignee_id=user_id)
@commander.command('comments')
@commander.command('log')
@commander.command('notes')
def comments(ticket_number=None):
'''Gets a log of the comments.'''
if ticket_number:
ticket_no = ticket_number
else:
ticket_no = get_ticket_number()
content = api_get('tickets/%s/notes' % ticket_no)
user_details = get_user_details()
for comment in content:
comment = comment['ticket_note']
created_at = datetime.strptime(comment['created_at'], '%Y-%m-%dT%H:%M:%SZ')
content = comment['content']
user_id = comment['user_id']
updates = json.loads(comment['updates'])
author_details = user_details[user_id]
print 'Author: %s %s' % (author_details['first_name'], author_details['last_name'])
print 'Date: %s\n' % created_at.strftime('%c')
if content:
print content
print ''
if updates:
for k, v in updates.iteritems():
update_type = k.replace('_id', '').title()
print '%s changed from "%s" to "%s"' % (update_type, v[0], v[1])
print ''
@commander.command('comment', 'update')
@commander.command('comments', 'update')
@commander.command('notes', 'update')
def comments_update(message=None):
'''Creates a comment.'''
ticket_no = get_ticket_number()
if not message:
with tempfile.NamedTemporaryFile() as comment_file:
call([EDITOR, comment_file.name])
with open(comment_file.name):
comment_file_aux = open(comment_file.name)
message = comment_file_aux.read()
if message:
post_data = get_ticket_note_xml(content=message)
api_post('tickets/%s/notes' % ticket_no, post_data)
else:
error('Aborting due to empty update message')
@commander.command('description')
@commander.command('desc')
@commander.command('summary')
def summary(ticket_number=None):
'''Shows the description of the ticket.'''
if not ticket_number:
ticket_number = get_ticket_number()
content = api_get('tickets/%s' % ticket_number)
print content['ticket']['summary']
@commander.command('open')
def open_browser(ticket_number=None):
'''Opens the current ticket in a browser window.'''
url = 'http://%s/projects/%s/' % (
CODEBASE_URL,
PROJECT_NAME,
)
if not ticket_number:
ticket_number = get_ticket_number(mute_error=True)
if ticket_number:
url = '%stickets/%s' % (url, ticket_number)
os.system(
'open %s' % url
)
@commander.command('ticket', 'update')
def ticket_update(ticket_no=None):
'''Updates ticket data.'''
if not ticket_no:
ticket_no = get_ticket_number()
ticket_args = {}
update_note = False
message = ''
while True:
inpt = raw_input('Do you want to update the notes? [y/N] ')
inpt = inpt.lower()
if inpt in ('y', 'n', ''):
if inpt == 'y':
update_note = True
break
if update_note:
with tempfile.NamedTemporaryFile() as comment_file:
call([EDITOR, comment_file.name])
with open(comment_file.name):
comment_file_aux = open(comment_file.name)
message = comment_file_aux.read()
ticket_args['content'] = message
update_status = False
while True:
inpt = raw_input('Do you want to update the status? [y/N] ')
inpt = inpt.lower()
if inpt in ('y', 'n', ''):
if inpt == 'y':
update_status = True
break
if update_status:
statuses = api_get('tickets/statuses')
status_ids = {i: status['ticketing_status']['id'] for i, status in enumerate(statuses, 1)}
print 'Please select an option:'
for i, status in enumerate(statuses, 1):
print '%d: %s' % (i, status['ticketing_status']['name'])
status = int(raw_input('Please select from the above: '))
ticket_args['status_id'] = status_ids[status]
update_assigned = False
while True:
inpt = raw_input('Do you want to update the assigned user? [y/N] ')
inpt = inpt.lower()
if inpt in ('y', 'n', ''):
if inpt == 'y':
update_assigned = True
break
if update_assigned:
user_details = get_user_details()
user_id_map = {}
print 'Please select an option:'
for i, (user_id, user_details) in enumerate(user_details.iteritems(), 1):
print '%d: %s %s' % (i, user_details['first_name'], user_details['last_name'])
user_id_map[i] = user_id
print '0: Unassign'
user = int(raw_input('Please select from the above: '))
if user == 0:
user_id = ''
else:
user_id = str(user_id_map[user])
ticket_args['assignee_id'] = user_id
post_ticket_note(ticket_no, **ticket_args)
def mark_as(ticket_number, status):
# Get the current branches
branches = check_output('git branch -a', stderr=STDOUT, shell=True)
branches = branches.split('\n')
branches = set(clean_branch_name(branch) for branch in branches if match_branch_name(ticket_number, branch))
if len(branches) > 1:
# Need to get the user to pick the branch
print '%d branches match that ticket number' % len(branches)
print 'Which branch did you mean?'
branch_map = {}
for i, branch in enumerate(branches, 1):
branch_map[i] = branch
print '%d. %s' % (i, branch)
branch = ''
while not branch:
try:
branch_index = int(raw_input('Please choose: '))
except ValueError:
pass
else:
branch = branch_map[branch_index]
elif len(branches) == 1:
branch = list(branches)[0]
else:
error('There are no branches that match %s' % ticket_number)
return
# Checkout the branch
retcode = call(['git', 'checkout', branch])
user_details = get_user_details()
user_id_map = {}
for user_id, user_details in user_details.iteritems():
user_id_map[user_details['username']] = user_id
user_id = user_id_map[AUTH_USERNAME.split('/')[1]]
statuses = api_get('tickets/statuses')
status_ids = {status['ticketing_status']['name']: status['ticketing_status']['id'] for status in statuses}
status_id = status_ids[status]
post_ticket_note(ticket_number, status_id=status_id, assignee_id=user_id)
@commander.command('review')
def mark_reviewing(ticket_number):
'''Checks out and marks a ticket as in review for the current user.'''
mark_as(ticket_number, 'In Review')
@commander.command('workon')
def mark_in_progress(ticket_number):
'''Checks out and marks a ticket as in progress for the current user.'''
mark_as(ticket_number, 'In Progress')
@commander.command('ticket', 'set')
def set_ticket(ticket_number):
'''Sets a ticket for when auto-discovery is not possible'''
try:
config.add_section('ticket')
except ConfigParser.DuplicateSectionError:
pass
config.set('ticket', 'number', ticket_number)
with open('.cbconfig', 'wb') as config_file:
config.write(config_file)
def clean_branch_name(branch_name):
branch_name = branch_name.replace('* ', '').strip()
components = branch_name.split('/')
return components[-1]
def match_branch_name(ticket_number, branch_name):
return ticket_number in branch_name
@commander.command('workon')
def workon(ticket_number):
'''Checks out and marks a ticket as "In Progress" for the current user'''
# Get the current branches
branches = check_output('git branch -a', stderr=STDOUT, shell=True)
branches = branches.split('\n')
branches = set(clean_branch_name(branch) for branch in branches if match_branch_name(ticket_number, branch))
if len(branches) > 1:
# Need to get the user to pick the branch
print '%d branches match that ticket number' % len(branches)
print 'Which branch did you mean?'
branch_map = {}
for i, branch in enumerate(branches, 1):
branch_map[i] = branch
print '%d. %s' % (i, branch)
branch = ''
while not branch:
try:
branch_index = int(raw_input('Please choose: '))
except ValueError:
pass
else:
branch = branch_map[branch_index]
elif len(branches) == 1:
branch = list(branches)[0]
else:
error('There are no branches that match %s' % ticket_number)
inpt = ''
while not inpt in ('y', 'n',):
inpt = raw_input('Do you want to set this ticket in progress anyway? [y/N]: ')
if inpt == '' or inpt.lower() == 'n':
return
user_details = get_user_details()
user_id_map = {}
for user_id, user_details in user_details.iteritems():
user_id_map[user_details['username']] = user_id
user_id = user_id_map[AUTH_USERNAME.split('/')[1]]
statuses = api_get('tickets/statuses')
status_ids = {status['ticketing_status']['name']: status['ticketing_status']['id'] for status in statuses}
review_status_id = status_ids['In Progress']
post_ticket_note(ticket_number, status_id=review_status_id, assignee_id=user_id)
set_ticket(ticket_number)
return
# Checkout the branch
retcode = call(['git', 'checkout', branch])
user_details = get_user_details()
user_id_map = {}
for user_id, user_details in user_details.iteritems():
user_id_map[user_details['username']] = user_id
user_id = user_id_map[AUTH_USERNAME.split('/')[1]]
statuses = api_get('tickets/statuses')
status_ids = {status['ticketing_status']['name']: status['ticketing_status']['id'] for status in statuses}
review_status_id = status_ids['In Progress']
post_ticket_note(ticket_number, status_id=review_status_id, assignee_id=user_id)
if __name__ == "__main__":
try:
config = ConfigParser.SafeConfigParser()
if not os.path.exists('.cbconfig'):
print 'Please enter the following details:'
config.add_section('main')
url = raw_input('Your codebase URL e.g. foo.codebasehq.com: ')
config.set('main', 'url', url)
config.add_section('auth')
username = raw_input('Your codebase API username: ')
config.set('auth', 'username', username)
token = raw_input('Your codebase API key: ')
config.set('auth', 'token', token)
config.add_section('project')
project = raw_input('The codebase project for this directory: ')
config.set('project', 'name', project)
with open('.cbconfig', 'wb') as config_file:
config.write(config_file)
else:
config.read('.cbconfig')
_user_agent = 'codebase-cli/dev'
_auth_realm = 'Application'
AUTH_USERNAME = config.get('auth', 'username')
AUTH_TOKEN = config.get('auth', 'token')
AUTH_CREDENTIALS = (AUTH_USERNAME, AUTH_TOKEN,)
PROJECT_NAME = config.get('project', 'name')
API_URL = 'https://api3.codebasehq.com/%s/' % PROJECT_NAME
CODEBASE_URL = config.get('main', 'url')
authed_get = functools.partial(get, auth=AUTH_CREDENTIALS)
authed_post = functools.partial(post, auth=AUTH_CREDENTIALS)
commander.run_command(sys.argv)
except CommandNotFound:
error('Command not found')