Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

fixing schema changes for history tables

  • Loading branch information...
commit 3a341172178a39b1b92518bc9d81c96eac07cfeb 1 parent 0aaec25
@ssandler ssandler authored
View
9 TeslaSQL/Agents/Slave.cs
@@ -609,20 +609,19 @@ public Slave(IDataUtils sourceDataUtils, IDataUtils destDataUtils, Logger logger
switch (schemaChange.EventType) {
case SchemaChangeType.Rename:
logger.Log("Renaming column " + schemaChange.ColumnName + " to " + schemaChange.NewColumnName, LogLevel.Info);
- destDataUtils.RenameColumn(table, destDB, schemaChange.SchemaName, schemaChange.TableName,
- schemaChange.ColumnName, schemaChange.NewColumnName);
+ destDataUtils.RenameColumn(table, destDB, schemaChange.ColumnName, schemaChange.NewColumnName, Config.SlaveCTDB);
break;
case SchemaChangeType.Modify:
logger.Log("Changing data type on column " + schemaChange.ColumnName, LogLevel.Info);
- destDataUtils.ModifyColumn(table, destDB, schemaChange.SchemaName, schemaChange.TableName, schemaChange.ColumnName, schemaChange.DataType.ToString());
+ destDataUtils.ModifyColumn(table, destDB, schemaChange.ColumnName, schemaChange.DataType.ToString(), Config.SlaveCTDB);
break;
case SchemaChangeType.Add:
logger.Log("Adding column " + schemaChange.ColumnName, LogLevel.Info);
- destDataUtils.AddColumn(table, destDB, schemaChange.SchemaName, schemaChange.TableName, schemaChange.ColumnName, schemaChange.DataType.ToString());
+ destDataUtils.AddColumn(table, destDB, schemaChange.ColumnName, schemaChange.DataType.ToString(), Config.SlaveCTDB);
break;
case SchemaChangeType.Drop:
logger.Log("Dropping column " + schemaChange.ColumnName, LogLevel.Info);
- destDataUtils.DropColumn(table, destDB, schemaChange.SchemaName, schemaChange.TableName, schemaChange.ColumnName);
+ destDataUtils.DropColumn(table, destDB, schemaChange.ColumnName, Config.SlaveCTDB);
break;
}
}
View
31 TeslaSQL/DataUtils/IDataUtils.cs
@@ -225,45 +225,22 @@ public interface IDataUtils {
/// <summary>
/// Renames a column in a table, and the associated history table if recording history is configured
/// <summary>
- /// <param name="t">TableConf object for the table</param>
- /// <param name="dbName">Database name the table lives in</param>
- /// <param name="schema">Schema the table is part of</param>
- /// <param name="table">Table name</param>
- /// <param name="columnName">Old column name</param>
- /// <param name="newColumnName">New column name</param>
- void RenameColumn(TableConf t, string dbName, string schema, string table, string columnName, string newColumnName);
+ void RenameColumn(TableConf t, string dbName, string columnName, string newColumnName, string historyDB);
/// <summary>
/// Changes a column's data type
/// </summary>
- /// <param name="t">TableConf object for the table</param>
- /// <param name="dbName">Database name the table lives in</param>
- /// <param name="schema">Schema the table is part of</param>
- /// <param name="table">Table name</param>
- /// <param name="columnName">Column name to modify</param>
- /// <param name="dataType">String representation of the column's new data type</param>
- void ModifyColumn(TableConf t, string dbName, string schema, string table, string columnName, string dataType);
+ void ModifyColumn(TableConf t, string dbName, string columnName, string dataType, string historyDB);
/// <summary>
/// Adds a column to a table
/// </summary>
- /// <param name="t">TableConf object for the table</param>
- /// <param name="dbName">Database name the table lives in</param>
- /// <param name="schema">Schema the table is part of</param>
- /// <param name="table">Table name</param>
- /// <param name="columnName">Column name to add</param>
- /// <param name="dataType">String representation of the column's data type</param>
- void AddColumn(TableConf t, string dbName, string schema, string table, string columnName, string dataType);
+ void AddColumn(TableConf t, string dbName, string columnName, string dataType, string historyDB);
/// <summary>
/// Drops a column from a table
/// </summary>
- /// <param name="t">TableConf object for the table</param>
- /// <param name="dbName">Database name the table lives in</param>
- /// <param name="schema">Schema the table is part of</param>
- /// <param name="table">Table name</param>
- /// <param name="columnName">Column name to drop</param>
- void DropColumn(TableConf t, string dbName, string schema, string table, string columnName);
+ void DropColumn(TableConf t, string dbName, string columnName, string historyDB);
/// <summary>
/// Creates and does not populate TableInfo table for this CT batch in the appropriate DB
View
61 TeslaSQL/DataUtils/MSSQLDataUtils.cs
@@ -689,72 +689,71 @@ IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '{0}' AND
return result.Rows.Count > 0 && result.Rows[0].Field<int>("ColumnExists") == 1;
}
- public void RenameColumn(TableConf t, string dbName, string schema, string table,
- string columnName, string newColumnName) {
+ public void RenameColumn(TableConf t, string dbName, string columnName, string newColumnName, string historyDB) {
SqlCommand cmd;
//rename the column if it exists
- if (CheckColumnExists(dbName, schema, table, columnName)) {
+ if (CheckColumnExists(dbName, t.SchemaName, t.Name, columnName)) {
cmd = new SqlCommand("EXEC sp_rename @objname, @newname, 'COLUMN'");
- cmd.Parameters.Add("@objname", SqlDbType.VarChar, 500).Value = schema + "." + table + "." + columnName;
+ cmd.Parameters.Add("@objname", SqlDbType.VarChar, 500).Value = t.FullName + "." + columnName;
cmd.Parameters.Add("@newname", SqlDbType.VarChar, 500).Value = newColumnName;
logger.Log("Altering table with command: " + cmd.CommandText, LogLevel.Debug);
SqlNonQuery(dbName, cmd);
}
//check for history table, if it is configured and contains the column we need to modify that too
- if (t.RecordHistoryTable && CheckTableExists(dbName, schema, table + "_History") && CheckColumnExists(dbName, schema, table + "_History", columnName)) {
+ if (t.RecordHistoryTable && CheckTableExists(historyDB, t.HistoryName, t.SchemaName) && CheckColumnExists(historyDB, t.SchemaName, t.HistoryName, columnName)) {
cmd = new SqlCommand("EXEC sp_rename @objname, @newname, 'COLUMN'");
- cmd.Parameters.Add("@objname", SqlDbType.VarChar, 500).Value = schema + "." + table + "_History." + columnName;
+ cmd.Parameters.Add("@objname", SqlDbType.VarChar, 500).Value = t.SchemaName + "." + t.HistoryName + "." + columnName;
cmd.Parameters.Add("@newname", SqlDbType.VarChar, 500).Value = newColumnName;
logger.Log("Altering history table column with command: " + cmd.CommandText, LogLevel.Debug);
- SqlNonQuery(dbName, cmd);
+ SqlNonQuery(historyDB, cmd);
}
}
- public void ModifyColumn(TableConf t, string dbName, string schema, string table, string columnName, string dataType) {
+ public void ModifyColumn(TableConf t, string dbName, string columnName, string dataType, string historyDB) {
SqlCommand cmd;
//Modify the column if it exists
- if (CheckColumnExists(dbName, schema, table, columnName)) {
- cmd = new SqlCommand("ALTER TABLE " + schema + "." + table + " ALTER COLUMN " + columnName + " " + dataType);
+ if (CheckColumnExists(dbName, t.SchemaName, t.Name, columnName)) {
+ cmd = new SqlCommand("ALTER TABLE " + t.FullName + " ALTER COLUMN " + columnName + " " + dataType);
logger.Log("Altering table column with command: " + cmd.CommandText, LogLevel.Debug);
SqlNonQuery(dbName, cmd);
}
//modify on history table if that exists too
- if (t.RecordHistoryTable && CheckTableExists(dbName, schema, table + "_History") && CheckColumnExists(dbName, schema, table + "_History", columnName)) {
- cmd = new SqlCommand("ALTER TABLE " + schema + "." + table + "_History ALTER COLUMN " + columnName + " " + dataType);
+ if (t.RecordHistoryTable && CheckTableExists(historyDB, t.HistoryName, t.SchemaName) && CheckColumnExists(historyDB, t.SchemaName, t.HistoryName, columnName)) {
+ cmd = new SqlCommand("ALTER TABLE " + t.SchemaName + "." + t.HistoryName + " ALTER COLUMN " + columnName + " " + dataType);
logger.Log("Altering history table column with command: " + cmd.CommandText, LogLevel.Debug);
- SqlNonQuery(dbName, cmd);
+ SqlNonQuery(historyDB, cmd);
}
}
- public void AddColumn(TableConf t, string dbName, string schema, string table, string columnName, string dataType) {
+ public void AddColumn(TableConf t, string dbName, string columnName, string dataType, string historyDB) {
SqlCommand cmd;
//add column if it doesn't exist
- if (!CheckColumnExists(dbName, schema, table, columnName)) {
- cmd = new SqlCommand("ALTER TABLE " + schema + "." + table + " ADD " + columnName + " " + dataType);
+ if (!CheckColumnExists(dbName, t.SchemaName, t.Name, columnName)) {
+ cmd = new SqlCommand("ALTER TABLE " + t.FullName + " ADD " + columnName + " " + dataType);
logger.Log("Altering table with command: " + cmd.CommandText, LogLevel.Debug);
SqlNonQuery(dbName, cmd);
}
//add column to history table if the table exists and the column doesn't
- if (t.RecordHistoryTable && CheckTableExists(dbName, schema, table + "_History") && !CheckColumnExists(dbName, schema, table + "_History", columnName)) {
- cmd = new SqlCommand("ALTER TABLE " + schema + "." + table + "_History ADD " + columnName + " " + dataType);
+ if (t.RecordHistoryTable && CheckTableExists(historyDB, t.HistoryName, t.SchemaName) && !CheckColumnExists(historyDB, t.SchemaName, t.HistoryName, columnName)) {
+ cmd = new SqlCommand("ALTER TABLE " + t.SchemaName + "." + t.HistoryName + " ADD " + columnName + " " + dataType);
logger.Log("Altering history table column with command: " + cmd.CommandText, LogLevel.Debug);
- SqlNonQuery(dbName, cmd);
+ SqlNonQuery(historyDB, cmd);
}
}
- public void DropColumn(TableConf t, string dbName, string schema, string table, string columnName) {
+ public void DropColumn(TableConf t, string dbName, string columnName, string historyDB) {
SqlCommand cmd;
//drop column if it exists
- if (CheckColumnExists(dbName, schema, table, columnName)) {
- cmd = new SqlCommand("ALTER TABLE " + schema + "." + table + " DROP COLUMN " + columnName);
+ if (CheckColumnExists(dbName, t.SchemaName, t.Name, columnName)) {
+ cmd = new SqlCommand("ALTER TABLE " + t.FullName + " DROP COLUMN " + columnName);
logger.Log("Altering table with command: " + cmd.CommandText, LogLevel.Debug);
SqlNonQuery(dbName, cmd);
}
//if history table exists and column exists, drop it there too
- if (t.RecordHistoryTable && CheckTableExists(dbName, schema, table + "_History") && CheckColumnExists(dbName, schema, table + "_History", columnName)) {
- cmd = new SqlCommand("ALTER TABLE " + schema + "." + table + "_History DROP COLUMN " + columnName);
+ if (t.RecordHistoryTable && CheckTableExists(historyDB, t.HistoryName, t.SchemaName) && CheckColumnExists(historyDB, t.SchemaName, t.HistoryName, columnName)) {
+ cmd = new SqlCommand("ALTER TABLE " + t.SchemaName + "." + t.HistoryName + " DROP COLUMN " + columnName);
logger.Log("Altering history table column with command: " + cmd.CommandText, LogLevel.Debug);
- SqlNonQuery(dbName, cmd);
+ SqlNonQuery(historyDB, cmd);
}
}
@@ -982,19 +981,25 @@ THEN DELETE
public void CopyIntoHistoryTable(ChangeTable t, string slaveCTDB, bool isConsolidated) {
string sql;
string sourceTable;
-
+ List<TColumn> fields;
if (isConsolidated && Config.Slave == Config.RelayServer && slaveCTDB == Config.RelayDB) {
sourceTable = "[" + t.schemaName + "].[" + t.consolidatedName + "]";
+ fields = GetFieldList(slaveCTDB, t.consolidatedName, t.schemaName);
} else {
sourceTable = t.ctName;
+ fields = GetFieldList(slaveCTDB, t.ctName, t.schemaName);
}
+
+ string insertColumns = "CTHistID, " + string.Join(",", fields.Select(col => col.name));
+ string selectColumns = "CAST(" + t.CTID + " AS BIGINT) AS CTHistID, " + string.Join(",", fields.Select(col => col.name));
+
if (CheckTableExists(slaveCTDB, t.historyName, t.schemaName)) {
logger.Log("table " + t.historyName + " already exists; selecting into it", LogLevel.Trace);
- sql = string.Format("INSERT INTO {0} SELECT {1} AS CTHistID, * FROM {2}", t.historyName, t.CTID, sourceTable);
+ sql = string.Format("INSERT INTO {0} ({1}) SELECT {2} FROM {3}", t.historyName, insertColumns, selectColumns, sourceTable);
logger.Log(sql, LogLevel.Debug);
} else {
logger.Log("table " + t.historyName + " does not exist, inserting into it", LogLevel.Trace);
- sql = string.Format("SELECT {0} AS CTHistID, * INTO {1} FROM {2}", t.CTID, t.historyName, sourceTable);
+ sql = string.Format("SELECT {0} INTO {1} FROM {2}", selectColumns, t.historyName, sourceTable);
logger.Log(sql, LogLevel.Debug);
}
var cmd = new SqlCommand(sql);
View
55 TeslaSQL/DataUtils/NetezzaDataUtils.cs
@@ -133,6 +133,7 @@ public class NetezzaDataUtils : IDataUtils {
var res = SqlQuery(dbName, cmd);
return res.Rows.Count > 0;
}
+
public RowCounts ApplyTableChanges(TableConf table, TableConf archiveTable, string dbName, long CTID, string CTDBName, bool isConsolidated) {
var cmds = new List<InsertDelete>();
cmds.Add(BuildApplyCommand(table, dbName, CTDBName, CTID));
@@ -187,62 +188,70 @@ class InsertDelete {
public void CopyIntoHistoryTable(ChangeTable t, string dbName, bool isConsolidated) {
string sql;
+ var fields = GetFieldList(dbName, t.ctName, t.schemaName);
+ string insertColumns = "CTHistID, " + string.Join(",", fields.Select(col => col.name));
+ string selectColumns = "CAST(" + t.CTID + " AS BIGINT) AS CTHistID, " + string.Join(",", fields.Select(col => col.name));
+
if (CheckTableExists(dbName, t.historyName, t.schemaName)) {
logger.Log("table " + t.historyName + " already exists; selecting into it", LogLevel.Trace);
- sql = string.Format("INSERT INTO {0} SELECT {1} AS CTHistID, * FROM {2}", t.historyName, t.CTID, t.ctName);
+ sql = string.Format("INSERT INTO {0} ({1}) SELECT {2} FROM {3}", t.historyName, insertColumns, selectColumns, t.ctName);
logger.Log(sql, LogLevel.Debug);
} else {
logger.Log("table " + t.historyName + " does not exist, inserting into it", LogLevel.Trace);
- sql = string.Format("CREATE TABLE {0} AS SELECT {1} AS CTHistID, * FROM {2}", t.historyName, t.CTID, t.ctName);
+ sql = string.Format("CREATE TABLE {0} AS SELECT {1} FROM {2}", t.historyName, selectColumns, t.ctName);
logger.Log(sql, LogLevel.Debug);
}
var cmd = new OleDbCommand(sql);
SqlNonQuery(dbName, cmd);
}
- public void RenameColumn(TableConf t, string dbName, string schema, string table, string columnName, string newColumnName) {
- logger.Log("Please check pending schema changes to be applied on Netezza for " + dbName + "." + schema + "." + table + " on " + Config.Slave, LogLevel.Error);
+
+ public void RenameColumn(TableConf t, string dbName, string columnName, string newColumnName, string historyDB) {
+ logger.Log("Unable to apply rename of column " + columnName + " to " + newColumnName + " on "
+ + dbName + "." + t.FullName + " for slave " + Config.Slave, LogLevel.Error);
}
- public void ModifyColumn(TableConf t, string dbName, string schema, string table, string columnName, string dataType) {
- logger.Log("Please check pending schema changes to be applied on Netezza for " + dbName + "." + schema + "." + table + " on " + Config.Slave, LogLevel.Error);
+
+ public void ModifyColumn(TableConf t, string dbName, string columnName, string dataType, string historyDB) {
+ logger.Log("Unable to apply modify of column " + columnName + " to type " + dataType + " on "
+ + dbName + "." + t.FullName + " for slave " + Config.Slave, LogLevel.Error);
}
- public void AddColumn(TableConf t, string dbName, string schema, string table, string columnName, string dataType) {
+ public void AddColumn(TableConf t, string dbName, string columnName, string dataType, string historyDB) {
columnName = MapReservedWord(columnName);
- if (!CheckColumnExists(dbName, schema, table, columnName)) {
+ if (!CheckColumnExists(dbName, t.SchemaName, t.Name, columnName)) {
//The "max" string doesn't exist on netezza, we can just replace it with the NetezzaStringLength after mapping it.
//In practice this only impacts varchar and nvarchar, since other data types would be mapped to something else by the MapDataType
//function (i.e. varbinary). This is the only place we do this special string-based handling because we wanted to keep Netezza specific logic
//out of the DataType class. Outside of this case, the "max" is handled appropriately in the netezza data copy class.
dataType = DataType.MapDataType(Config.RelayType, SqlFlavor.Netezza, dataType).Replace("max", Config.NetezzaStringLength.ToString());
- string sql = string.Format("ALTER TABLE {0} ADD {1} {2}; GROOM TABLE {0} VERSIONS;", table, columnName, dataType);
+ string sql = string.Format("ALTER TABLE {0} ADD {1} {2}; GROOM TABLE {0} VERSIONS;", t.Name, columnName, dataType);
var cmd = new OleDbCommand(sql);
SqlNonQuery(dbName, cmd);
- RefreshViews(dbName, table);
+ RefreshViews(dbName, t.Name);
}
- if (t.RecordHistoryTable && CheckTableExists(dbName, schema, table + "_History") && !CheckColumnExists(dbName, schema, table + "_History", columnName)) {
- string sql = string.Format("ALTER TABLE {0} ADD {1} {2}; GROOM TABLE {0} VERSIONS;", table + "_History", columnName, dataType);
+ if (t.RecordHistoryTable && CheckTableExists(historyDB, t.HistoryName, t.SchemaName) && !CheckColumnExists(historyDB, t.SchemaName, t.HistoryName, columnName)) {
+ string sql = string.Format("ALTER TABLE {0} ADD {1} {2}; GROOM TABLE {0} VERSIONS;", t.HistoryName, columnName, dataType);
var cmd = new OleDbCommand(sql);
logger.Log("Altering history table column with command: " + cmd.CommandText, LogLevel.Debug);
- SqlNonQuery(dbName, cmd);
- RefreshViews(dbName, table + "_History");
+ SqlNonQuery(historyDB, cmd);
+ RefreshViews(historyDB, t.HistoryName);
}
}
- public void DropColumn(TableConf t, string dbName, string schema, string table, string columnName) {
+ public void DropColumn(TableConf t, string dbName, string columnName, string historyDB) {
columnName = MapReservedWord(columnName);
- if (CheckColumnExists(dbName, schema, table, columnName)) {
- string sql = string.Format("ALTER TABLE {0} DROP COLUMN {1} RESTRICT; GROOM TABLE {0} VERSIONS;", table, columnName);
+ if (CheckColumnExists(dbName, t.SchemaName, t.Name, columnName)) {
+ string sql = string.Format("ALTER TABLE {0} DROP COLUMN {1} RESTRICT; GROOM TABLE {0} VERSIONS;", t.Name, columnName);
var cmd = new OleDbCommand(sql);
- SqlNonQuery(dbName, cmd);
- RefreshViews(dbName, table);
+ SqlNonQuery(historyDB, cmd);
+ RefreshViews(historyDB, t.Name);
}
- if (t.RecordHistoryTable && CheckTableExists(dbName, schema, table + "_History") && CheckColumnExists(dbName, schema, table + "_History", columnName)) {
- string sql = string.Format("ALTER TABLE {0} DROP COLUMN {1} RESTRICT; GROOM TABLE {0} VERSIONS;", columnName);
+ if (t.RecordHistoryTable && CheckTableExists(dbName, t.HistoryName, t.SchemaName) && CheckColumnExists(dbName, t.SchemaName, t.HistoryName, columnName)) {
+ string sql = string.Format("ALTER TABLE {0} DROP COLUMN {1} RESTRICT; GROOM TABLE {0} VERSIONS;", t.HistoryName, columnName);
var cmd = new OleDbCommand(sql);
logger.Log("Altering history table column with command: " + cmd.CommandText, LogLevel.Debug);
- SqlNonQuery(dbName, cmd);
- RefreshViews(dbName, table + "_History");
+ SqlNonQuery(historyDB, cmd);
+ RefreshViews(historyDB, t.HistoryName);
}
}
View
19 TeslaSQL/DataUtils/TestDataUtils.cs
@@ -394,21 +394,20 @@ public class TestDataUtils : IDataUtils {
return;
}
- public void RenameColumn(TableConf t, string dbName, string schema, string table,
- string columnName, string newColumnName) {
- DataTable dt = testData.Tables[schema + "." + table, GetTableSpace(dbName)];
+ public void RenameColumn(TableConf t, string dbName, string columnName, string newColumnName, string historyDB) {
+ DataTable dt = testData.Tables[t.FullName, GetTableSpace(dbName)];
dt.Columns[columnName].ColumnName = newColumnName;
}
- public void ModifyColumn(TableConf t, string dbName, string schema, string table, string columnName, string dataType) {
+ public void ModifyColumn(TableConf t, string dbName, string columnName, string dataType, string historyDB) {
//can't change the datatype of a column in a datatable but since this is just for unit testing, we can just drop and recreate it
//instead since there is no data to worry about losing
- DropColumn(t, dbName, schema, table, columnName);
- AddColumn(t, dbName, schema, table, columnName, dataType);
+ DropColumn(t, dbName, columnName, historyDB);
+ AddColumn(t, dbName, columnName, dataType, historyDB);
}
- public void AddColumn(TableConf t, string dbName, string schema, string table, string columnName, string dataType) {
- DataTable dt = testData.Tables[schema + "." + table, GetTableSpace(dbName)];
+ public void AddColumn(TableConf t, string dbName, string columnName, string dataType, string historyDB) {
+ DataTable dt = testData.Tables[t.FullName, GetTableSpace(dbName)];
Type type;
//since this is just for unit testing we only need to support a subset of data types
switch (dataType) {
@@ -427,8 +426,8 @@ public class TestDataUtils : IDataUtils {
dt.Columns.Add(columnName, type);
}
- public void DropColumn(TableConf t, string dbName, string schema, string table, string columnName) {
- DataTable dt = testData.Tables[schema + "." + table, GetTableSpace(dbName)];
+ public void DropColumn(TableConf t, string dbName, string columnName, string historyDB) {
+ DataTable dt = testData.Tables[t.FullName, GetTableSpace(dbName)];
dt.Columns.Remove(columnName);
}
View
7 TeslaSQL/TableConf.cs
@@ -142,5 +142,12 @@ public class TableConf {
public override string ToString() {
return FullName;
}
+
+ [XmlIgnore]
+ public string HistoryName {
+ get {
+ return "tblCT" + Name + "_History";
+ }
+ }
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.