Skip to content

Commit

Permalink
Merge pull request #108 from SunZhaonan/master
Browse files Browse the repository at this point in the history
Innodb engine DDL. Add config for timeout and load sample.
  • Loading branch information
SunZhaonan committed Apr 6, 2016
2 parents 821e404 + b202832 commit a7187a4
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 4 deletions.
4 changes: 2 additions & 2 deletions data-model/DDL/ETL_DDL/dataset_metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ CREATE TABLE `dict_dataset` (
UNIQUE KEY `uq_dataset_urn` (`urn`),
FULLTEXT KEY `fti_datasets_all` (`name`, `schema`, `properties`, `urn`)
)
ENGINE = MyISAM
ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = latin1;

Expand Down Expand Up @@ -196,7 +196,7 @@ CREATE TABLE `dict_field_detail` (
KEY `idx_dict_field__datasetid_fieldname` (`dataset_id`, `field_name`) USING BTREE,
KEY `idx_dict_field__fieldslayoutid` (`fields_layout_id`) USING BTREE
)
ENGINE = MyISAM
ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8
COMMENT = 'Fields/Columns';
Expand Down
5 changes: 5 additions & 0 deletions metadata-etl/src/main/resources/jython/HdfsLoad.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,11 @@ def load_sample(self):
l.db_id = args[Constant.DB_ID_KEY]
l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY]
l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)

if Constant.INNODB_LOCK_WAIT_TIMEOUT in args:
lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT]
l.conn_mysql.cursor().execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time)

try:
l.load_metadata()
l.load_field()
Expand Down
5 changes: 5 additions & 0 deletions metadata-etl/src/main/resources/jython/HiveLoad.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ def load_field(self):
l.db_id = args[Constant.DB_ID_KEY]
l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY]
l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)

if Constant.INNODB_LOCK_WAIT_TIMEOUT in args:
lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT]
l.conn_mysql.cursor().execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time)

try:
l.load_metadata()
l.load_field()
Expand Down
5 changes: 4 additions & 1 deletion metadata-etl/src/main/resources/jython/TeradataExtract.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,9 @@ def run(self, database_name, table_name, schema_output_file, sample_output_file,

e = TeradataExtract()
e.conn_td = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)
do_sample = True
if Constant.TD_LOAD_SAMPLE in args:
do_sample = bool(args[Constant.TD_LOAD_SAMPLE])
try:
e.conn_td.cursor().execute(
"SET QUERY_BAND = 'script=%s; pid=%d; ' FOR SESSION;" % ('TeradataExtract.py', os.getpid()))
Expand All @@ -557,7 +560,7 @@ def run(self, database_name, table_name, schema_output_file, sample_output_file,
index_type = {'P': 'Primary Index', 'K': 'Primary Key', 'S': 'Secondary Index', 'Q': 'Partitioned Primary Index',
'J': 'Join Index', 'U': 'Unique Index'}

e.run(None, None, args[Constant.TD_SCHEMA_OUTPUT_KEY], args[Constant.TD_SAMPLE_OUTPUT_KEY], sample=False)
e.run(None, None, args[Constant.TD_SCHEMA_OUTPUT_KEY], args[Constant.TD_SAMPLE_OUTPUT_KEY], sample=do_sample)
finally:
e.conn_td.close()

12 changes: 11 additions & 1 deletion metadata-etl/src/main/resources/jython/TeradataLoad.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,22 @@ def load_sample(self):
l.input_file = args[Constant.TD_METADATA_KEY]
l.input_field_file = args[Constant.TD_FIELD_METADATA_KEY]
l.input_sampledata_file = args[Constant.TD_SAMPLE_OUTPUT_KEY]

do_sample = True # default load sample
if Constant.TD_LOAD_SAMPLE in args:
do_sample = bool(args[Constant.TD_LOAD_SAMPLE])
l.db_id = args[Constant.DB_ID_KEY]
l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY]
l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)

if Constant.INNODB_LOCK_WAIT_TIMEOUT in args:
lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT]
l.conn_mysql.cursor().execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time)

try:
l.load_metadata()
l.load_field()
l.load_sample()
if do_sample:
l.load_sample()
finally:
l.conn_mysql.close()
5 changes: 5 additions & 0 deletions wherehows-common/src/main/java/wherehows/common/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public class Constant {
/** The property_name field in wh_etl_job_property table. Oozie execution info ETL lookback time */
public static final String OZ_EXEC_ETL_LOOKBACK_MINS_KEY = "oz.exec_etl.lookback_period.in.minutes";

/** Optional. The property_name field in wh_etl_job_property table. Set innodb_lock_wait_timeout for mysql */
public static final String INNODB_LOCK_WAIT_TIMEOUT = "innodb_lock_wait_timeout";

// Teradata
/** The property_name field in wh_etl_job_property table. Teradata connection info */
public static final String TD_DB_URL_KEY = "teradata.db.jdbc.url";
Expand All @@ -85,6 +88,8 @@ public class Constant {
public static final String TD_TARGET_DATABASES_KEY = "teradata.databases";
/** The property_name field in wh_etl_job_property table. Used for connecting */
public static final String TD_DEFAULT_DATABASE_KEY = "teradata.default_database";
/** Optional. The property_name field in wh_etl_job_property table. Decide whether load sample data or not */
public static final String TD_LOAD_SAMPLE = "teradata.load_sample";

// Hdfs
/** The property_name field in wh_etl_job_property table. The hfds remote user that run the hadoop job on gateway */
Expand Down

0 comments on commit a7187a4

Please sign in to comment.