Skip to content

Commit

Permalink
Encapsulate data source/query runner configuration in an object.
Browse files Browse the repository at this point in the history
This is a step towards adding more complex logic in configuration
handling, like encryption of secrets.
  • Loading branch information
arikfr committed Feb 23, 2016
1 parent f1e90fd commit ed99b84
Show file tree
Hide file tree
Showing 13 changed files with 292 additions and 102 deletions.
20 changes: 19 additions & 1 deletion migrations/0003_update_data_source_config.py
@@ -1,13 +1,31 @@
import json
import jsonschema
from jsonschema import ValidationError

from redash import query_runner
from redash.models import DataSource


def validate_configuration(query_runner_type, configuration_json):
query_runner_class = query_runner.query_runners.get(query_runner_type, None)
if query_runner_class is None:
return False

try:
if isinstance(configuration_json, basestring):
configuration = json.loads(configuration_json)
else:
configuration = configuration_json
jsonschema.validate(configuration, query_runner_class.configuration_schema())
except (ValidationError, ValueError):
return False

return True

def update(data_source):
print "[%s] Old options: %s" % (data_source.name, data_source.options)

if query_runner.validate_configuration(data_source.type, data_source.options):
if validate_configuration(data_source.type, data_source.options):
print "[%s] configuration already valid. skipping." % data_source.name
return

Expand Down
21 changes: 10 additions & 11 deletions redash/cli/data_sources.py
Expand Up @@ -2,7 +2,8 @@
import click
from flask.ext.script import Manager
from redash import models
from redash.query_runner import query_runners, validate_configuration
from redash.query_runner import query_runners, get_configuration_schema_for_type
from redash.utils.configuration import ConfigurationContainer

manager = Manager(help="Data sources management commands.")

Expand All @@ -23,12 +24,6 @@ def validate_data_source_type(type):
exit()


def validate_data_source_options(type, options):
if not validate_configuration(type, options):
print "Error: invalid configuration."
exit()


@manager.command
def new(name=None, type=None, options=None):
"""Create new data source"""
Expand Down Expand Up @@ -76,9 +71,10 @@ def new(name=None, type=None, options=None):
if value != default_value:
options_obj[k] = value

options = json.dumps(options_obj)

validate_data_source_options(type, options)
options = ConfigurationContainer(options_obj, schema)
if not options.is_valid():
print "Error: invalid configuration."
exit()

print "Creating {} data source ({}) with options:\n{}".format(type, name, options)

Expand Down Expand Up @@ -120,7 +116,10 @@ def edit(name, new_name=None, options=None, type=None):
data_source = models.DataSource.get(models.DataSource.name==name)

if options is not None:
validate_data_source_options(data_source.type, options)
schema = get_configuration_schema_for_type(data_source.type)
options = json.loads(options)
data_source.options.set_schema(schema)
data_source.options.update(options)

update_attr(data_source, "name", new_name)
update_attr(data_source, "type", type)
Expand Down
27 changes: 18 additions & 9 deletions redash/handlers/data_sources.py
@@ -1,13 +1,12 @@
import json

from flask import make_response, request
from flask.ext.restful import abort
from funcy import project

from redash import models
from redash.wsgi import api
from redash.utils.configuration import ConfigurationContainer, ValidationError
from redash.permissions import require_admin
from redash.query_runner import query_runners, validate_configuration
from redash.query_runner import query_runners, get_configuration_schema_for_type
from redash.handlers.base import BaseResource, get_object_or_404


Expand All @@ -30,14 +29,18 @@ def post(self, data_source_id):
data_source = models.DataSource.get_by_id_and_org(data_source_id, self.current_org)
req = request.get_json(True)

data_source.replace_secret_placeholders(req['options'])
schema = get_configuration_schema_for_type(req['type'])
if schema is None:
abort(400)

if not validate_configuration(req['type'], req['options']):
try:
data_source.options.set_schema(schema)
data_source.options.update(req['options'])
except ValidationError:
abort(400)

data_source.type = req['type']
data_source.name = req['name']
data_source.options = json.dumps(req['options'])

data_source.save()

return data_source.to_dict(all=True)
Expand Down Expand Up @@ -76,12 +79,18 @@ def post(self):
if f not in req:
abort(400)

if not validate_configuration(req['type'], req['options']):
schema = get_configuration_schema_for_type(req['type'])
if schema is None:
abort(400)

config = ConfigurationContainer(req['options'], schema)
if not config.is_valid():
abort(400)

datasource = models.DataSource.create_with_group(org=self.current_org,
name=req['name'],
type=req['type'], options=json.dumps(req['options']))
type=req['type'],
options=config)

return datasource.to_dict(all=True)

Expand Down
37 changes: 14 additions & 23 deletions redash/models.py
Expand Up @@ -10,14 +10,16 @@

import peewee
from passlib.apps import custom_app_context as pwd_context
from playhouse.postgres_ext import ArrayField, DateTimeTZField, PostgresqlExtDatabase
from playhouse.postgres_ext import ArrayField, DateTimeTZField
from flask.ext.login import UserMixin, AnonymousUserMixin
from permissions import has_access, view_only

from redash import utils, settings, redis_connection
from redash.query_runner import get_query_runner
from redash.metrics.database import MeteredPostgresqlExtDatabase, MeteredModel
from utils import generate_token
from redash.utils import generate_token
from redash.utils.configuration import ConfigurationContainer



class Database(object):
Expand Down Expand Up @@ -314,14 +316,20 @@ def verify_password(self, password):
return self.password_hash and pwd_context.verify(password, self.password_hash)


class DataSource(BelongsToOrgMixin, BaseModel):
SECRET_PLACEHOLDER = '--------'
class ConfigurationField(peewee.TextField):
def db_value(self, value):
return value.to_json()

def python_value(self, value):
return ConfigurationContainer.from_json(value)


class DataSource(BelongsToOrgMixin, BaseModel):
id = peewee.PrimaryKeyField()
org = peewee.ForeignKeyField(Organization, related_name="data_sources")
name = peewee.CharField()
type = peewee.CharField()
options = peewee.TextField()
options = ConfigurationField()
queue_name = peewee.CharField(default="queries")
scheduled_queue_name = peewee.CharField(default="scheduled_queries")
created_at = DateTimeTZField(default=datetime.datetime.now)
Expand All @@ -342,7 +350,7 @@ def to_dict(self, all=False, with_permissions=False):
}

if all:
d['options'] = self.configuration
d['options'] = self.options.to_dict(mask_secrets=True)
d['queue_name'] = self.queue_name
d['scheduled_queue_name'] = self.scheduled_queue_name
d['groups'] = self.groups
Expand All @@ -361,23 +369,6 @@ def create_with_group(cls, *args, **kwargs):
DataSourceGroup.create(data_source=data_source, group=data_source.org.default_group)
return data_source

@property
def configuration(self):
configuration = json.loads(self.options)
schema = self.query_runner.configuration_schema()
for prop in schema.get('secret', []):
if prop in configuration and configuration[prop]:
configuration[prop] = self.SECRET_PLACEHOLDER

return configuration

def replace_secret_placeholders(self, configuration):
current_configuration = json.loads(self.options)
schema = self.query_runner.configuration_schema()
for prop in schema.get('secret', []):
if prop in configuration and configuration[prop] == self.SECRET_PLACEHOLDER:
configuration[prop] = current_configuration[prop]

def get_schema(self, refresh=False):
key = "data_source:schema:{}".format(self.id)

Expand Down
27 changes: 8 additions & 19 deletions redash/query_runner/__init__.py
@@ -1,14 +1,11 @@
import logging
import json

import jsonschema
from jsonschema import ValidationError
from redash import settings

logger = logging.getLogger(__name__)

__all__ = [
'ValidationError',
'BaseQueryRunner',
'InterruptException',
'BaseSQLQueryRunner',
Expand Down Expand Up @@ -41,12 +38,13 @@
TYPE_DATE
])


class InterruptException(Exception):
pass


class BaseQueryRunner(object):
def __init__(self, configuration):
jsonschema.validate(configuration, self.configuration_schema())
self.syntax = 'sql'
self.configuration = configuration

Expand Down Expand Up @@ -142,29 +140,20 @@ def register(query_runner_class):
logger.warning("%s query runner enabled but not supported, not registering. Either disable or install missing dependencies.", query_runner_class.name())


def get_query_runner(query_runner_type, configuration_json):
def get_query_runner(query_runner_type, configuration):
query_runner_class = query_runners.get(query_runner_type, None)
if query_runner_class is None:
return None

return query_runner_class(json.loads(configuration_json))
return query_runner_class(configuration)


def validate_configuration(query_runner_type, configuration_json):
def get_configuration_schema_for_type(query_runner_type):
query_runner_class = query_runners.get(query_runner_type, None)
if query_runner_class is None:
return False

try:
if isinstance(configuration_json, basestring):
configuration = json.loads(configuration_json)
else:
configuration = configuration_json
jsonschema.validate(configuration, query_runner_class.configuration_schema())
except (ValidationError, ValueError):
return False

return True
return None

return query_runner_class.configuration_schema()


def import_query_runners(query_runner_imports):
Expand Down
4 changes: 1 addition & 3 deletions redash/query_runner/python.py
Expand Up @@ -144,9 +144,7 @@ def execute_query(self, data_source_name_or_id, query):
except models.DataSource.DoesNotExist:
raise Exception("Wrong data source name/id: %s." % data_source_name_or_id)

query_runner = get_query_runner(data_source.type, data_source.options)

data, error = query_runner.run_query(query)
data, error = data_source.query_runner.run_query(query)
if error is not None:
raise Exception(error)

Expand Down
4 changes: 2 additions & 2 deletions redash/tasks.py
Expand Up @@ -14,7 +14,7 @@
from redash import redis_connection, models, statsd_client, settings, utils, mail
from redash.utils import gen_query_hash
from redash.worker import celery
from redash.query_runner import get_query_runner, InterruptException
from redash.query_runner import InterruptException
from version_check import run_version_check

logger = get_task_logger(__name__)
Expand Down Expand Up @@ -270,7 +270,7 @@ def execute_query(self, query, data_source_id, metadata):
logger.debug("Executing query:\n%s", query)

query_hash = gen_query_hash(query)
query_runner = get_query_runner(data_source.type, data_source.options)
query_runner = data_source.query_runner

logger.info("task=execute_query state=before query_hash=%s type=%s ds_id=%d task_id=%s queue=%s query_id=%s username=%s",
query_hash, data_source.type, data_source.id, self.request.id, self.request.delivery_info['routing_key'],
Expand Down
File renamed without changes.
77 changes: 77 additions & 0 deletions redash/utils/configuration.py
@@ -0,0 +1,77 @@
import json
import jsonschema
from jsonschema import ValidationError

SECRET_PLACEHOLDER = '--------'


class ConfigurationContainer(object):
def __init__(self, config, schema=None):
self._config = config
self.set_schema(schema)

def set_schema(self, schema):
self._schema = schema

@property
def schema(self):
if self._schema is None:
raise RuntimeError("Schema missing.")

return self._schema

def is_valid(self):
try:
self.validate()
except (ValidationError, ValueError):
return False

return True

def validate(self):
jsonschema.validate(self._config, self._schema)

def to_json(self):
return json.dumps(self._config)

def iteritems(self):
return self._config.iteritems()

def to_dict(self, mask_secrets=False):
if mask_secrets is False:
return self._config

config = self._config.copy()
for key in config:
if key in self.schema['secret']:
config[key] = SECRET_PLACEHOLDER

return config

def update(self, new_config):
jsonschema.validate(new_config, self.schema)

config = {}
for k, v in new_config.iteritems():
if k in self.schema['secret'] and v == SECRET_PLACEHOLDER:
config[k] = self[k]
else:
config[k] = v

self._config = config

def get(self, *args, **kwargs):
return self._config.get(*args, **kwargs)

def __getitem__(self, item):
if item in self._config:
return self._config[item]

raise KeyError(item)

def __contains__(self, item):
return item in self._config

@classmethod
def from_json(cls, config_in_json):
return cls(json.loads(config_in_json))

0 comments on commit ed99b84

Please sign in to comment.