Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 245 lines (186 sloc) 8.9 KB
#!/bin/env python
# Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at http://oss.oracle.com/licenses/upl.
#
# Since: April, 2019
# Author: Stephen Balousek <stephen.balousek@oracle.com>
# Description: Examine or change a configuration value for an Oracle GoldenGate service.
#
# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
#
import sys
import json
import argparse
import urllib3
from getpass import getpass
app_description = 'Examine or change a configuration value for an Oracle GoldenGate service'
app_example_usage = '''Examples:
NOTE: String values for '<json-value>' must be quoted using JSON
notation. See example 3 for details.
1. oggServiceConfig http://localhost:11000 Local distsrvr \\
--user oggadmin --password oggadmin-A1
Display the configuration data for Distribution Server in the
deployment called 'Local'. The Service Manager administrative
user name is 'oggadmin' and the password is 'password' for the
Service Manager listening on port 11000.
2. oggServiceConfig http://localhost:11000 Local adminsrvr \\
--user oggadmin --password oggadmin-A1 \\
--path /authorizationDetails/common/allow \\
--value '["Digest","x-Cert"]'
Set the authentication modes used by Administration Server in the
deployment called 'Local' to Basic and x-Cert and then restart
the Administration Server.
NOTE: Digest Authentication is available starting
with Oracle GoldenGate version 19.1.
3. oggServiceConfig http://localhost:11000 Local adminsrvr \\
--user oggadmin --password oggadmin-A1 \\
--path /securityDetails/network/inbound/protocolVersion \\
--value '"1_2"'
Set the TLS version used by Administration Server in the
deployment called 'Local' to TLS 1.2 and then restart the
Administration Server.
4. oggServiceConfig http://localhost:11000 Local adminsrvr \\
--user oggadmin --password oggadmin-A1 \\
--path /securityDetails/network/inbound/cipherSuites \\
--value '[ "TLS_RSA_WITH_AES_128_GCM_SHA256",
"TLS_RSA_WITH_AES_128_CBC_SHA256",
"TLS_RSA_WITH_AES_256_GCM_SHA384",
"TLS_RSA_WITH_AES_256_CBC_SHA256",
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256" ]'
Set the TLS ciphers used by Administration Server in the
deployment called 'Local' to secure values and then restart the
Administration Server.
'''
rest_call_headers = {
'ContentType': 'application/json',
'Accept': 'application/json'
}
def value_as_json(value):
"""Returns a string representation of a value formatted as JSON"""
if type(value) != dict:
result = json.dumps(value)
if len(result) < 60:
return result
return '\n' + json.dumps(value, indent=4, sort_keys=True)
def error_exit(message, value=None):
"""Displays an error message, an optional JSON value, and then exits the application"""
print('')
print('Error: ' + message)
if value is not None:
print(value_as_json(value))
sys.exit(1)
def get_requests_library():
"""Loads the 'requests' module, which is used to make REST API calls"""
try:
import requests
return requests
except ImportError:
error_exit(
'''The 'requests' module is not installed. You can install the 'requests' module with this command:
$ pip install requests --user
''')
def rest_failure(response):
"""Called when a REST API call fails, reports an error message and exits the application"""
error_exit('Unexpected Rest API response code %d' % response.status_code, response.json())
def get_service_auth(args):
"""Query the Service Manager to determine the authentication method to use"""
requests = get_requests_library()
auth = None
url = args.serviceManager + '/services/v2/config/summary'
response = requests.get(url, headers=rest_call_headers, auth=auth, verify=False)
getuser = sys.version_info[0] >= 3 and input or raw_input
if response.status_code == 401:
if args.user is None:
args.user = getuser('User for \'' + args.serviceManager + '\': ')
if args.password is None:
args.password = getpass('Password for \'' + args.user + '\': ')
auth = requests.auth.HTTPDigestAuth(args.user, args.password)
response = requests.get(url, headers=rest_call_headers, auth=auth, verify=False)
if response.status_code == 401:
auth = requests.auth.HTTPBasicAuth(args.user, args.password)
return auth
def get_service_config(args):
"""Retrieve the configuration of a service from Service Manager"""
requests = get_requests_library()
url = args.serviceManager + '/services/v2/deployments/' + args.deployment + '/services/' + args.service
auth = get_service_auth(args)
response = requests.get(url, headers=rest_call_headers, auth=auth, verify=False)
if response.status_code != 200:
rest_failure(response)
service_json = response.json()
if 'response' in service_json and \
'config' in service_json['response']:
return service_json['response']['config']
error_exit('No "config" property in service', service_json)
def set_service_config(args, config):
"""Sets the configuration of a service in Service Manager"""
requests = get_requests_library()
url = args.serviceManager + '/services/v2/deployments/' + args.deployment + '/services/' + args.service
auth = get_service_auth(args)
body = {'config': config,
'status': 'restart'}
response = requests.patch(url, json=body, auth=auth, headers=rest_call_headers, verify=False)
if response.status_code != 200:
rest_failure(response)
def get_config_value(config, path):
"""Uses a slash-separated path to retrieve a property in a JSON value"""
if type(path) == str:
return get_config_value(config, list(filter(None, path.split('/'))))
if not path:
return config
return path[0] in config and get_config_value(config[path[0]], path[1:]) or None
def set_config_value(config, path, value):
"""Uses a slash-separated path to set a property in a JSON value"""
if type(path) == str:
path = list(filter(None, path.split('/')))
if not path:
error_exit('Cannot assign value to top-level item')
return value
return set_config_value(config, path, value)
if path[0] not in config:
config[path[0]] = {}
if len(path) == 1:
if value is None:
del config[path[0]]
else:
config[path[0]] = value
else:
config[path[0]] = set_config_value(config[path[0]], path[1:], value)
return config
def parse_command_line():
"""Command line parsing"""
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description=app_description, epilog=app_example_usage)
parser.add_argument('serviceManager', help='The URL for the Service Manager')
parser.add_argument('deployment', help='The name of the deployment for the service')
parser.add_argument('service', help='The name of the service')
parser.add_argument('--user', '-u', help='The name of the user when connecting to Service Manager')
parser.add_argument('--password', help='The password for the user account')
parser.add_argument('--path', '-p', help='The path of the configuration item (default \'/\')', default='/')
parser.add_argument('--value', '-v', help='The new JSON value for the service configuration item')
return parser.parse_args()
def main():
"""Application entry point"""
urllib3.disable_warnings()
args = parse_command_line()
service_config = get_service_config(args)
current_value = get_config_value(service_config, args.path)
print('Current value of "' + args.path + '"'
+ ' for "' + args.deployment + '/' + args.service + '"'
+ ' is ' + (current_value is not None and value_as_json(current_value) or '<not defined>' + '.'))
if args.value:
args.value = json.loads(args.value)
if args.value == current_value:
print('No changes required.')
else:
print('Setting new value and restarting service.')
set_config_value(service_config, args.path, args.value)
set_service_config(args, service_config)
print('New value of "' + args.path + '"'
+ ' for "' + args.deployment + '/' + args.service + '"'
+ ' is ' + value_as_json(args.value) + '.')
return 0
try:
sys.exit(main())
except KeyboardInterrupt as exception:
error_exit('*** Aborted ***')
You can’t perform that action at this time.