-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add commands for managing SSH keys (#86)
* Add key model, add command, and ls command * Add show, get, and rm commands
- Loading branch information
Showing
7 changed files
with
226 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -106,3 +106,6 @@ services.yml | |
results.yml | ||
project.yml | ||
.vagrant/ | ||
|
||
*.key | ||
*.pub |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
import click | ||
from terminaltables import AsciiTable | ||
|
||
from flag_slurper import utils | ||
from flag_slurper.autolib.models import Key, Team | ||
from flag_slurper.conf import Project | ||
|
||
|
||
@click.group() | ||
@click.pass_context | ||
def keys(ctx): | ||
""" | ||
View and manage ssh private keys to be used with autopwn. | ||
""" | ||
p = Project.get_instance() | ||
if not p.enabled: | ||
utils.report_error('Key commands require an active project') | ||
exit(4) | ||
p.connect_database() | ||
ctx.obj = p | ||
|
||
|
||
@keys.command() | ||
@click.option('-t', '--team', type=click.INT, help='Filter by team number', default=None) | ||
@click.option('-u', '--username', type=click.STRING, help='Filter by username', default=None) | ||
def ls(team, username): | ||
""" | ||
List all keys used by autopwn. | ||
""" | ||
keys = Key.select() | ||
|
||
if team: | ||
keys = keys.join(Team).where(Key.team.number == team) | ||
|
||
if username: | ||
keys = keys.where(Key.username == username) | ||
|
||
if keys.count() == 0: | ||
utils.report_warning('No keys found') | ||
exit(1) | ||
|
||
data = [[k.id, k.username, k.team.number if k.team else ''] for k in keys] | ||
data.insert(0, ['ID', 'Username', 'Team']) | ||
table = AsciiTable(data) | ||
utils.conditional_page(table.table, len(data)) | ||
|
||
|
||
@keys.command() | ||
@click.argument('file', type=click.File('r')) | ||
@click.option('-t', '--team', type=click.INT) | ||
@click.option('-u', '--username', type=click.STRING, required=True) | ||
def add(file, team, username): | ||
""" | ||
Add an ssh key to use for autopwn. | ||
""" | ||
if team: | ||
team = Team.get_by_id(team) | ||
|
||
Key.create(team=team, username=username, contents=file.read()) | ||
utils.report_success(f'Key for user {username} created successfully') | ||
|
||
|
||
@keys.command() | ||
@click.argument('id', metavar='ID', type=click.INT) | ||
def show(id): | ||
""" | ||
Show the requested SSH key in view. | ||
""" | ||
key = Key.get_by_id(id) | ||
|
||
data = [ | ||
['ID', id], | ||
['Username', key.username], | ||
['Team Number', key.team.number if key.team else ''], | ||
['Team Name', key.team.name if key.team else ''], | ||
] | ||
table = AsciiTable(data) | ||
click.echo(table.table) | ||
click.pause() | ||
click.edit(key.contents, editor='view') | ||
|
||
|
||
@keys.command() | ||
@click.argument('id', metavar='ID', type=click.INT) | ||
@click.argument('file', metavar='FILE', type=click.File('wb')) | ||
def get(id, file): | ||
""" | ||
Retrieve and locally save a private key. | ||
FILE may be a local file path to save the key or - to | ||
write the keys' contents to stdout. | ||
""" | ||
key = Key.get_by_id(id) | ||
file.write(key.contents.encode('utf-8')) | ||
|
||
|
||
@keys.command() | ||
@click.argument('id') | ||
def rm(id): | ||
""" | ||
Remove an SSH key. | ||
""" | ||
Key.delete().where(Key.id == id).execute() | ||
click.secho('Key removed.', fg='red') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,3 @@ | ||
import os | ||
|
||
from click.testing import CliRunner | ||
|
||
from flag_slurper.cli import cli | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
import pytest | ||
|
||
from flag_slurper.autolib.models import Key | ||
from flag_slurper.conf import Project | ||
|
||
|
||
@pytest.fixture | ||
def shared_key(db): | ||
yield Key.create(id=1, username='cdc', contents='--- BEGIN OPENSSH PRIVATE KEY ---') | ||
|
||
|
||
@pytest.fixture | ||
def keys_project(create_project): | ||
tmpdir = create_project(""" | ||
_version: "1.0" | ||
project: Flag Slurper Test | ||
base: {dir}/flag-test | ||
flags: | ||
- service: WWW SSH | ||
type: blue | ||
location: /root | ||
name: team{{{{ num }}}}_www_root.flag | ||
search: yes | ||
""") | ||
project_path = str(tmpdir.join('project.yml')) | ||
p = Project.get_instance() | ||
p.load(project_path) | ||
return project_path | ||
|
||
|
||
def test_keys_no_project(np_runner): | ||
p = Project.get_instance() | ||
p.project_data = None | ||
result = np_runner('keys', 'ls') | ||
assert result.exit_code == 4 | ||
assert result.output == '[!] Key commands require an active project\n' | ||
|
||
|
||
def test_list_keys(runner, keys_project, shared_key): | ||
result = runner(keys_project, 'keys', 'ls') | ||
assert result.exit_code == 0 | ||
assert shared_key.username in result.output | ||
|
||
|
||
def test_list_filter_by_team(runner, keys_project, shared_key): | ||
result = runner(keys_project, 'keys', 'ls', '-t', '5') | ||
assert result.exit_code == 1 | ||
assert result.output == '[*] No keys found\n' | ||
|
||
|
||
def test_list_filter_by_username_not_found(runner, keys_project, shared_key): | ||
result = runner(keys_project, 'keys', 'ls', '-u', 'non-existant') | ||
assert result.exit_code == 1 | ||
assert result.output == '[*] No keys found\n' | ||
|
||
|
||
def test_list_filter_by_username(runner, keys_project, shared_key): | ||
result = runner(keys_project, 'keys', 'ls', '-u', shared_key.username) | ||
assert result.exit_code == 0 | ||
assert shared_key.username in result.output | ||
|
||
|
||
def test_show_key(runner, shared_key, keys_project, mocker): | ||
edit = mocker.patch('flag_slurper.keys.click.edit') | ||
result = runner(keys_project, 'keys', 'show', str(shared_key.id)) | ||
assert result.exit_code == 0 | ||
assert edit.called | ||
assert shared_key.username in result.output | ||
|
||
|
||
def test_get_key(runner, shared_key, keys_project, tmpdir): | ||
file = tmpdir.join('test.priv') | ||
result = runner(keys_project, 'keys', 'get', str(shared_key.id), str(file)) | ||
assert result.exit_code == 0 | ||
assert file.read() == shared_key.contents | ||
|
||
|
||
def test_add_key(runner, keys_project, db, tmpdir): | ||
keyfile = tmpdir.join('keyfile') | ||
keyfile.write('TEST KEY') | ||
result = runner(keys_project, 'keys', 'add', str(keyfile), '-u', 'cdc') | ||
assert result.exit_code == 0 | ||
assert '[+] Added key for cdc' | ||
|
||
|
||
def test_add_key_for_team(runner, team, keys_project, tmpdir): | ||
keyfile = tmpdir.join('keyfile') | ||
keyfile.write('TEST KEY') | ||
result = runner(keys_project, 'keys', 'add', str(keyfile), '-u', 'cdc', '-t', str(team.id)) | ||
assert result.exit_code == 0 | ||
assert '[+] Added key for cdc' | ||
assert Key.select().order_by(Key.id.desc()).get().team.number == 1 | ||
|
||
|
||
def test_rm_key(runner, keys_project, shared_key): | ||
result = runner(keys_project, 'keys', 'rm', str(shared_key.id)) | ||
assert result.exit_code == 0 | ||
assert result.output == 'Key removed.\n' | ||
assert Key.select().where(Key.id == shared_key.id).count() == 0 |