-
-
Notifications
You must be signed in to change notification settings - Fork 171
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
345 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
import json | ||
import os | ||
|
||
from constance import config | ||
from django.core.management.base import BaseCommand | ||
|
||
from allauth.socialaccount.models import SocialApp | ||
from kobo.apps.accounts.models import SocialAppCustomData | ||
|
||
|
||
class Command(BaseCommand): | ||
help = ( | ||
'Provision server settings including social apps and constance configs' | ||
) | ||
|
||
def add_arguments(self, parser): | ||
subparsers = parser.add_subparsers(dest='command') | ||
|
||
# Subcommand for managing social apps | ||
socialapp_parser = subparsers.add_parser( | ||
'socialapp', help='Insert a social app and related custom data' | ||
) | ||
|
||
socialapp_parser.add_argument( | ||
'provider', type=str, help='Either openid_connect or microsoft' | ||
) | ||
socialapp_parser.add_argument( | ||
'provider_id', type=str, help='Provider ID used in the login url' | ||
) | ||
socialapp_parser.add_argument( | ||
'name', | ||
type=str, | ||
help='Name of the organization displayed on the login page', | ||
) | ||
socialapp_parser.add_argument( | ||
'client_id', type=str, help='App ID, or consumer key' | ||
) | ||
socialapp_parser.add_argument( | ||
'secret', | ||
nargs='?', | ||
type=str, | ||
default='', | ||
help='API secret, client secret, or consumer secret', | ||
) | ||
socialapp_parser.add_argument( | ||
'key', | ||
nargs='?', | ||
type=str, | ||
default='', | ||
help='Key provided by client', | ||
) | ||
socialapp_parser.add_argument( | ||
'settings', | ||
type=str, | ||
help='Settings in json format enclosed with single quotes', | ||
) | ||
|
||
# Subcommand for managing constance configurations | ||
config_parser = subparsers.add_parser( | ||
'config', help='Update application configuration' | ||
) | ||
config_parser.add_argument( | ||
'config_kv_pairs', | ||
nargs='+', | ||
type=str, | ||
help='Constance configuration values as key=value pairs', | ||
) | ||
|
||
def handle(self, *args, **kwargs): | ||
command = kwargs.get('command') | ||
|
||
if command == 'socialapp': | ||
self.handle_socialapp(kwargs) | ||
elif command == 'config': | ||
self.handle_config(kwargs) | ||
else: | ||
self.stdout.write(self.style.ERROR('No valid subcommand provided')) | ||
|
||
def handle_socialapp(self, kwargs): | ||
provider = kwargs['provider'] | ||
provider_id = kwargs['provider_id'] | ||
name = kwargs['name'] | ||
client_id = kwargs['client_id'] | ||
secret = os.getenv('SOCIAL_APP_SECRET') or kwargs['secret'] | ||
key = kwargs['key'] | ||
settings_json = kwargs['settings'] | ||
|
||
try: | ||
settings = json.loads(settings_json) | ||
except TypeError: | ||
raise json.JSONDecodeError | ||
|
||
social_app_data = { | ||
'provider': provider, | ||
'provider_id': provider_id, | ||
'name': name, | ||
'client_id': client_id, | ||
'secret': secret, | ||
'key': key, | ||
'settings': settings, | ||
} | ||
|
||
social_app, created = SocialApp.objects.get_or_create( | ||
defaults=social_app_data | ||
) | ||
|
||
if not created: | ||
self.stdout.write( | ||
f'Social app for {social_app.name} already exists' | ||
) | ||
else: | ||
self.stdout.write( | ||
f'Successfully created social app for {social_app.name}' | ||
) | ||
|
||
social_app_custom_data_exists = SocialAppCustomData.objects.filter( | ||
social_app=social_app | ||
).exists() | ||
|
||
if not social_app_custom_data_exists: | ||
SocialAppCustomData.objects.create( | ||
social_app=social_app, | ||
is_public=False, | ||
) | ||
self.stdout.write( | ||
f'Successfully created custom data for {social_app.name}' | ||
) | ||
else: | ||
self.stdout.write( | ||
f'Custom data for {social_app.name} already exists' | ||
) | ||
|
||
def handle_config(self, kwargs): | ||
config_kv_pairs = kwargs['config_kv_pairs'] | ||
for pair in config_kv_pairs: | ||
key, value = pair.split('=') | ||
if hasattr(config, key): | ||
try: | ||
if value.lower() == 'true': | ||
value = True | ||
elif value.lower() == 'false': | ||
value = False | ||
else: | ||
# Specific handling for fields taking JSON arrays | ||
if key in [ | ||
'PROJECT_METADATA_FIELDS', | ||
'USER_METADATA_FIELDS', | ||
]: | ||
if value.startswith('[') and value.endswith(']'): | ||
try: | ||
value = json.loads(value) | ||
value = json.dumps(value) | ||
except json.JSONDecodeError as e: | ||
self.stdout.write( | ||
f'Invalid JSON array value for key {key}. {e}' | ||
) | ||
continue | ||
else: | ||
self.stdout.write( | ||
f'Invalid JSON array format for key {key}. Should start and end with "[" and "]".' | ||
) | ||
continue | ||
# Handling for other keys that should be JSON objects | ||
else: | ||
if value.startswith('{') and value.endswith('}'): | ||
try: | ||
value = json.loads(value) | ||
value = json.dumps(value) | ||
except json.JSONDecodeError as e: | ||
self.stdout.write( | ||
f'Invalid JSON object value for key {key}. {e}' | ||
) | ||
continue | ||
|
||
setattr(config, key, value) | ||
self.stdout.write( | ||
f'Successfully updated configuration for {key}' | ||
) | ||
except Exception as e: | ||
self.stdout.write( | ||
f'Error setting configuration for {key}: {e}' | ||
) | ||
else: | ||
self.stdout.write(f'Configuration key {key} does not exist') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
import json | ||
from unittest.mock import patch | ||
|
||
from django.core.management import call_command | ||
from django.test import TestCase | ||
|
||
from allauth.socialaccount.models import SocialApp | ||
from kobo.apps.accounts.models import SocialAppCustomData | ||
|
||
|
||
class ProvisionServerCommandTest(TestCase): | ||
|
||
@patch('os.getenv') | ||
def test_handle_successful_creation(self, mock_getenv): | ||
mock_getenv.return_value = None | ||
|
||
self.assertEqual(SocialApp.objects.count(), 0) | ||
self.assertEqual(SocialAppCustomData.objects.count(), 0) | ||
|
||
call_command( | ||
'provision_server', | ||
'socialapp', | ||
'openid_connect', | ||
'test_provider_id', | ||
'Test Organization', | ||
'test_client_id', | ||
'test_secret', | ||
'', | ||
'{"key": "value"}', | ||
) | ||
|
||
self.assertEqual(SocialApp.objects.count(), 1) | ||
self.assertEqual(SocialAppCustomData.objects.count(), 1) | ||
|
||
social_app = SocialApp.objects.first() | ||
self.assertEqual(social_app.provider, 'openid_connect') | ||
self.assertEqual(social_app.provider_id, 'test_provider_id') | ||
self.assertEqual(social_app.name, 'Test Organization') | ||
self.assertEqual(social_app.client_id, 'test_client_id') | ||
self.assertEqual(social_app.secret, 'test_secret') | ||
self.assertEqual(social_app.settings, {'key': 'value'}) | ||
|
||
@patch('os.getenv') | ||
def test_handle_existing_social_app(self, mock_getenv): | ||
mock_getenv.return_value = None | ||
|
||
SocialApp.objects.create( | ||
provider='openid_connect', | ||
provider_id='test_provider_id', | ||
name='Test Organization', | ||
client_id='test_client_id', | ||
secret='test_secret', | ||
key='', | ||
settings={'key': 'value'}, | ||
) | ||
|
||
self.assertEqual(SocialApp.objects.count(), 1) | ||
|
||
call_command( | ||
'provision_server', | ||
'socialapp', | ||
'openid_connect', | ||
'test_provider_id', | ||
'Test Organization', | ||
'test_client_id', | ||
'test_secret', | ||
'', | ||
'{"key": "value"}', | ||
) | ||
|
||
self.assertEqual(SocialApp.objects.count(), 1) | ||
self.assertEqual(SocialAppCustomData.objects.count(), 1) | ||
|
||
@patch('os.getenv') | ||
def test_handle_invalid_json(self, mock_getenv): | ||
mock_getenv.return_value = None | ||
|
||
with self.assertRaises(json.JSONDecodeError): | ||
call_command( | ||
'provision_server', | ||
'socialapp', | ||
'openid_connect', | ||
'test_provider_id', | ||
'Test Organization', | ||
'test_client_id', | ||
'test_secret', | ||
'', | ||
'{"invalid_json"}', | ||
) | ||
|
||
@patch('os.getenv') | ||
def test_handle_with_env_secret(self, mock_getenv): | ||
mock_getenv.return_value = 'env_secret' | ||
|
||
self.assertEqual(SocialApp.objects.count(), 0) | ||
|
||
call_command( | ||
'provision_server', | ||
'socialapp', | ||
'openid_connect', | ||
'test_provider_id', | ||
'Test Organization', | ||
'test_client_id', | ||
'test_secret', | ||
'', | ||
'{"key": "value"}', | ||
) | ||
|
||
self.assertEqual(SocialApp.objects.count(), 1) | ||
social_app = SocialApp.objects.first() | ||
self.assertEqual(social_app.secret, 'env_secret') | ||
|
||
@patch('kpi.management.commands.provision_server.config') | ||
def test_update_existing_config_key(self, mock_config): | ||
setattr(mock_config, 'TEST_CONFIG_KEY', 'old_value') | ||
call_command( | ||
'provision_server', | ||
'config', | ||
'TEST_CONFIG_KEY=new_value', | ||
) | ||
self.assertEqual(getattr(mock_config, 'TEST_CONFIG_KEY'), 'new_value') | ||
|
||
@patch('kpi.management.commands.provision_server.config') | ||
def test_update_non_existing_config_key(self, mock_config): | ||
delattr(mock_config, 'NON_EXISTENT_KEY') | ||
call_command( | ||
'provision_server', | ||
'config', | ||
'NON_EXISTENT_KEY=new_value', | ||
) | ||
|
||
self.assertFalse(hasattr(mock_config, 'NON_EXISTENT_KEY')) | ||
|
||
@patch('kpi.management.commands.provision_server.config') | ||
def test_update_config_key_with_different_data_types(self, mock_config): | ||
|
||
call_command( | ||
'provision_server', | ||
'config', | ||
'TEST_BOOL_KEY=True', | ||
) | ||
self.assertEqual(getattr(mock_config, 'TEST_BOOL_KEY'), True) | ||
|
||
call_command( | ||
'provision_server', | ||
'config', | ||
'TEST_JSON_KEY={"key": "value"}', | ||
) | ||
self.assertEqual( | ||
getattr(mock_config, 'TEST_JSON_KEY'), '{"key": "value"}' | ||
) | ||
|
||
call_command( | ||
'provision_server', | ||
'config', | ||
'PROJECT_METADATA_FIELDS=[{"key": "value"}]', | ||
) | ||
self.assertEqual( | ||
getattr(mock_config, 'PROJECT_METADATA_FIELDS'), | ||
'[{"key": "value"}]', | ||
) |