Skip to content

Commit

Permalink
Merge pull request apache#115 from airbnb/csv_improvements
Browse files Browse the repository at this point in the history
Cherrypick csv upload changes (apache#5268)
  • Loading branch information
Grace Guo committed Sep 19, 2018
2 parents 901ea8a + 866e644 commit fc07ddb
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
4 changes: 4 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ class CeleryConfig(object):
# contain all the external tables
CSV_TO_HIVE_UPLOAD_DIRECTORY = 'EXTERNAL_HIVE_TABLES/'

# The namespace within hive where the tables created from
# uploading CSVs will be stored.
UPLOADED_CSV_HIVE_NAMESPACE = None

# A dictionary of items that gets merged into the Jinja context for
# SQL Lab. The existing context gets updated with this dictionary,
# meaning values for existing keys get overwritten by the content of this
Expand Down
23 changes: 20 additions & 3 deletions superset/db_engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,9 +910,26 @@ def get_column_names(filepath):
return next(unicodecsv.reader(f, encoding='utf-8-sig'))

table_name = form.name.data
filename = form.csv_file.data.filename
schema_name = form.schema.data

if config.get('UPLOADED_CSV_HIVE_NAMESPACE'):
if '.' in table_name or schema_name:
raise Exception(
"You can't specify a namespace. "
'All tables will be uploaded to the `{}` namespace'.format(
config.get('HIVE_NAMESPACE')))
table_name = '{}.{}'.format(
config.get('UPLOADED_CSV_HIVE_NAMESPACE'), table_name)
else:
if '.' in table_name and schema_name:
raise Exception(
"You can't specify a namespace both in the name of the table "
'and in the schema field. Please remove one')
if schema_name:
table_name = '{}.{}'.format(schema_name, table_name)

bucket_path = app.config['CSV_TO_HIVE_UPLOAD_S3_BUCKET']
filename = form.csv_file.data.filename
bucket_path = config['CSV_TO_HIVE_UPLOAD_S3_BUCKET']

if not bucket_path:
logging.info('No upload bucket specified')
Expand All @@ -933,7 +950,7 @@ def get_column_names(filepath):
s3.upload_file(
upload_path, bucket_path,
os.path.join(upload_prefix, table_name, filename))
sql = """CREATE EXTERNAL TABLE {table_name} ( {schema_definition} )
sql = """CREATE TABLE {table_name} ( {schema_definition} )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS
TEXTFILE LOCATION '{location}'
tblproperties ('skip.header.line.count'='1')""".format(**locals())
Expand Down

0 comments on commit fc07ddb

Please sign in to comment.