diff --git a/TeslaSQL/Agents/Slave.cs b/TeslaSQL/Agents/Slave.cs index fae09fd..d2ca3fc 100644 --- a/TeslaSQL/Agents/Slave.cs +++ b/TeslaSQL/Agents/Slave.cs @@ -139,9 +139,12 @@ public Slave(IDataUtils sourceDataUtils, IDataUtils destDataUtils, Logger logger IList batches = new List(); DataRow lastBatch = sourceDataUtils.GetLastCTBatch(Config.RelayDB, AgentType.Slave, Config.Slave); + //apparently we shouldn't hit the code block below except for unit tests? + //in the db init we're supposed to write the first row, so it (in theory) shouldn't return null if (lastBatch == null) { ctb = new ChangeTrackingBatch(1, 0, 0, 0); batches.Add(ctb); + logger.Log("Couldn't find any records for the last change tracking batch! Returning the default new CTB.", LogLevel.Warn); return batches; } @@ -179,7 +182,7 @@ public Slave(IDataUtils sourceDataUtils, IDataUtils destDataUtils, Logger logger /// /// Runs a single change tracking batch /// - /// Change tracking batch object to work on + /// Change tracking batch object to work on private void RunSingleBatch(ChangeTrackingBatch ctb) { Stopwatch sw; logger.Log("Applying schema changes ", LogLevel.Info); diff --git a/TeslaSQL/Config.cs b/TeslaSQL/Config.cs index 0e14306..22724f7 100644 --- a/TeslaSQL/Config.cs +++ b/TeslaSQL/Config.cs @@ -555,7 +555,7 @@ public class RefreshView { public class TColumn : IEquatable { public readonly string name; public bool isPk; - public readonly DataType dataType; + public DataType dataType {get;set;} public readonly bool isNullable; public TColumn(string name, bool isPk, DataType dataType, bool isNullable) { @@ -571,8 +571,16 @@ public class TColumn : IEquatable { /// /// Returns a string representation of the column for use in CREATE TABLE statements /// - public string ToExpression() { - return string.Format("[{0}] {1} {2}", name, dataType.ToString(), isNullable ? "NULL" : "NOT NULL"); + public string ToExpression(SqlFlavor flavor = SqlFlavor.MSSQL) { + switch (flavor) + { + case SqlFlavor.MSSQL: + return string.Format("[{0}] {1} {2}", name, dataType.ToString(), isNullable ? "NULL" : "NOT NULL"); + case SqlFlavor.MySQL: + return string.Format("{0} {1} {2}", name, dataType.ToString(), isNullable ? "NULL" : "NOT NULL"); + default: + throw new NotImplementedException("No defined ToExpression for sql flavor: " + flavor.ToString()); + } } public bool Equals(TColumn other) { diff --git a/TeslaSQL/DataCopy/DataCopyFactory.cs b/TeslaSQL/DataCopy/DataCopyFactory.cs index a18e90b..2e81fd2 100644 --- a/TeslaSQL/DataCopy/DataCopyFactory.cs +++ b/TeslaSQL/DataCopy/DataCopyFactory.cs @@ -14,11 +14,24 @@ public static class DataCopyFactory { } switch (sourceSqlFlavor) { case SqlFlavor.MSSQL: - if (destSqlFlavor == SqlFlavor.MSSQL) { + if (destSqlFlavor == SqlFlavor.MSSQL) + { return new MSSQLToMSSQLDataCopy((MSSQLDataUtils)sourceDataUtils, (MSSQLDataUtils)destDataUtils, logger); - } else if (destSqlFlavor == SqlFlavor.Netezza) { + } + else if (destSqlFlavor == SqlFlavor.Netezza) + { return new MSSQLToNetezzaDataCopy((MSSQLDataUtils)sourceDataUtils, (NetezzaDataUtils)destDataUtils, logger, Config.Slave, Config.NetezzaUser, Config.NetezzaPrivateKeyPath); } + else if (destSqlFlavor == SqlFlavor.MySQL) + { + return new MSSQLToMySQLDataCopy((MSSQLDataUtils)sourceDataUtils, (MySQLDataUtils)destDataUtils, logger); + } + break; + case SqlFlavor.MySQL: + if (destSqlFlavor == SqlFlavor.MSSQL) + { + return new MySQLToMSSQLDataCopy((MySQLDataUtils)sourceDataUtils, (MSSQLDataUtils)destDataUtils, logger); + } break; case SqlFlavor.MySQL: if (destSqlFlavor == SqlFlavor.MSSQL) diff --git a/TeslaSQL/DataCopy/MSSQLToMySQLDataCopy.cs b/TeslaSQL/DataCopy/MSSQLToMySQLDataCopy.cs new file mode 100644 index 0000000..5a336cd --- /dev/null +++ b/TeslaSQL/DataCopy/MSSQLToMySQLDataCopy.cs @@ -0,0 +1,207 @@ +using System; +using System.Collections.Generic; +using System.Collections.Specialized; +using System.Linq; +using System.Text; +using System.Data.SqlClient; +using TeslaSQL.DataUtils; +using MySql.Data.MySqlClient; +using System.Diagnostics; +using System.Text.RegularExpressions; +using System.IO; + +namespace TeslaSQL.DataCopy { + public class MSSQLToMySQLDataCopy : IDataCopy { + + private MSSQLDataUtils sourceDataUtils; + private MySQLDataUtils destDataUtils; + private Logger logger; + + public MSSQLToMySQLDataCopy(MSSQLDataUtils sourceDataUtils, MySQLDataUtils destDataUtils, Logger logger) { + this.sourceDataUtils = sourceDataUtils; + this.destDataUtils = destDataUtils; + this.logger = logger; + } + + private static void CreateDirectoryIfNotExists(string directory) + { + DirectoryInfo dir = new DirectoryInfo(directory); + if (!dir.Exists) + { + dir.Create(); + } + } + + public void CopyTable(string sourceDB, string sourceTableName, string schema, string destDB, int timeout, string destTableName = null, string originalTableName = null) { + //by default the dest table will have the same name as the source table + destTableName = (destTableName == null) ? sourceTableName : destTableName; + originalTableName = originalTableName ?? sourceTableName; + + //drop table at destination and create from source schema + CopyTableDefinition(sourceDB, sourceTableName, schema, destDB, destTableName, originalTableName); + + var cols = GetColumns(sourceDB, sourceTableName, schema, originalTableName); + var bcpSelect = string.Format("SELECT {0} FROM {1}..{2};", + string.Join(",", cols.Select(col => col.ColExpression())), + sourceDB, sourceTableName); + if (bcpSelect.Length > 3800) + { + //BCP commands fail if their text length is over 4000 characters, and we need some padding + //drop view CTVWtablename if exists + //create view CTVWtablename AS $bcpSelect + string viewName = "CTVW" + sourceTableName; + sourceDataUtils.RecreateView(sourceDB, viewName, bcpSelect); + bcpSelect = string.Format("SELECT * FROM {0}..{1}", sourceDB, viewName); + } + string directory = Config.BcpPath.TrimEnd('\\') + @"\" + sourceDB.ToLower(); + CreateDirectoryIfNotExists(directory); + string password = new cTripleDes().Decrypt(Config.RelayPassword); + var bcpArgs = string.Format(@"""{0}"" queryout {1}\{2}.txt -c -S{3} -U {4} -P {5} -t""|"" -r""&_+-!/=/=""", + bcpSelect, + directory, + destTableName, + Config.RelayServer, + Config.RelayUser, + password + ); + logger.Log("BCP command: bcp " + bcpArgs.Replace(password, "********"), LogLevel.Trace); + var outputBuilder = new StringBuilder(); + var errorBuilder = new StringBuilder(); + var bcp = new Process(); + bcp.StartInfo.FileName = "bcp"; + bcp.StartInfo.Arguments = bcpArgs; + bcp.StartInfo.UseShellExecute = false; + bcp.StartInfo.RedirectStandardError = true; + bcp.StartInfo.RedirectStandardOutput = true; + bcp.OutputDataReceived += delegate(object sender, DataReceivedEventArgs e) + { + lock (outputBuilder) + { + outputBuilder.AppendLine(e.Data); + } + }; + bcp.ErrorDataReceived += delegate(object sender, DataReceivedEventArgs e) + { + lock (errorBuilder) + { + errorBuilder.AppendLine(e.Data); + } + }; + bcp.Start(); + bcp.BeginOutputReadLine(); + bcp.BeginErrorReadLine(); + bool status = bcp.WaitForExit(Config.DataCopyTimeout * 1000); + if (!status) + { + bcp.Kill(); + throw new Exception("BCP timed out for table " + sourceTableName); + } + if (bcp.ExitCode != 0) + { + string err = outputBuilder + "\r\n" + errorBuilder; + logger.Log(err, LogLevel.Critical); + throw new Exception("BCP error: " + err); + } + logger.Log("BCP successful for " + sourceTableName, LogLevel.Trace); + string filename = sourceDB.ToLower() + "/" + destTableName + ".txt"; + destDataUtils.BulkCopy(filename, destDB, destTableName, 60 * 10, cols); + + } + + + /// + /// Copies the schema of a table from one server to another, dropping it first if it exists at the destination. + /// + /// Source database name + /// Table name + /// Table's schema + /// Destination database name + public void CopyTableDefinition(string sourceDB, string sourceTableName, string schema, string destDB, string destTableName, string originalTableName = null) { + //script out the table at the source + string createScript = sourceDataUtils.ScriptTable(sourceDB, sourceTableName, schema, originalTableName, SqlFlavor.MySQL); + createScript = createScript.Replace(sourceTableName, destTableName); + MySqlCommand cmd = new MySqlCommand(createScript); + + //drop it if it exists at the destination + destDataUtils.DropTableIfExists(destDB, destTableName, schema); + + //create it at the destination + destDataUtils.MySqlNonQuery(destDB, cmd); + } + + + public struct Col + { + public string name; + public string typeName; + public DataType dataType; + + public Col(string name, string typeName, DataType dataType) + { + this.name = name; + this.typeName = typeName; + this.dataType = dataType; + } + /// + /// Return an expression for use in BCP command + /// + public string ColExpression() + { + return name; + } + public override string ToString() + { + return name + " " + typeName; + } + } + + private List GetColumns(string sourceDB, string sourceTableName, string schema, string originalTableName) + { + //get actual field list on the source table + var includeColumns = new List() { "SYS_CHANGE_VERSION", "SYS_CHANGE_OPERATION" }; + var columns = sourceDataUtils.GetFieldList(sourceDB, sourceTableName, schema, originalTableName, includeColumns); + //get the table config object + var table = Config.TableByName(originalTableName); + + var cols = new List(); + foreach (TColumn col in columns) + { + string typeName = col.dataType.BaseType; + + ColumnModifier mod = null; + //see if there are any column modifiers which override our length defaults + ColumnModifier[] modifiers = table.ColumnModifiers; + if (modifiers != null) + { + IEnumerable mods = modifiers.Where(c => ((c.columnName == col.name) && (c.type == "ShortenField"))); + mod = mods.FirstOrDefault(); + } + + string modDataType = DataType.MapDataType(SqlFlavor.MSSQL, SqlFlavor.MySQL, typeName); + if (typeName != modDataType) + { + if (mod != null && Regex.IsMatch(modDataType, @".*\(\d+\)$")) + { + modDataType = Regex.Replace(modDataType, @"\d+", mod.length.ToString()); + } + cols.Add(new Col(col.name, modDataType, col.dataType)); + continue; + } + + if (col.dataType.UsesMaxLength()) + { + if (mod != null) + { + typeName += "(" + mod.length + ")"; + } + } + else if (col.dataType.UsesPrecisionScale()) + { + typeName += "(" + col.dataType.NumericPrecision + "," + col.dataType.NumericScale + ")"; + } + cols.Add(new Col(col.name, typeName, col.dataType)); + } + return cols; + } + } +} diff --git a/TeslaSQL/DataCopy/MySQLToMSSQLDataCopy.cs b/TeslaSQL/DataCopy/MySQLToMSSQLDataCopy.cs index d54755d..1ac2a74 100644 --- a/TeslaSQL/DataCopy/MySQLToMSSQLDataCopy.cs +++ b/TeslaSQL/DataCopy/MySQLToMSSQLDataCopy.cs @@ -97,6 +97,11 @@ public void CopyTableDefinition(string sourceDB, string sourceTableName, string case "nvarchar": case "char": case "nchar": +<<<<<<< HEAD +======= + case "text": + case "ntext": +>>>>>>> working script.Append('('); script.Append((column.dataType.CharacterMaximumLength != null && column.dataType.CharacterMaximumLength < 8000) ? column.dataType.CharacterMaximumLength.ToString() : "MAX"); script.Append(')'); diff --git a/TeslaSQL/DataType.cs b/TeslaSQL/DataType.cs index 8363e85..0972d73 100644 --- a/TeslaSQL/DataType.cs +++ b/TeslaSQL/DataType.cs @@ -16,6 +16,7 @@ public class DataType { public long? CharacterMaximumLength { get; private set; } public int? NumericPrecision { get; private set; } public int? NumericScale { get; private set; } + public bool? Signed { get; private set; } public static void LoadDataMappingsFromFile(string filePath) { string s; @@ -58,11 +59,12 @@ public class DataType { } - public DataType(string baseType, long? characterMaximumLength = null, int? numericPrecision = null, int? numericScale = null) { + public DataType(string baseType, long? characterMaximumLength = null, int? numericPrecision = null, int? numericScale = null, bool? signed = null) { this.BaseType = baseType; this.CharacterMaximumLength = characterMaximumLength; this.NumericPrecision = numericPrecision; this.NumericScale = numericScale; + this.Signed = signed; } /// @@ -94,7 +96,7 @@ public class DataType { /// String expression representing the data type public override string ToString() { var typesUsingMaxLen = new string[6] { "varchar", "nvarchar", "char", "nchar", "varbinary", "binary" }; - var typesUsingScale = new string[2] { "numeric", "decimal" }; + var typesUsingScale = new string[4] { "numeric", "decimal", "real", "float" }; string suffix = ""; if (typesUsingMaxLen.Contains(BaseType) && CharacterMaximumLength != null) { @@ -129,7 +131,10 @@ public class DataType { /// public bool UsesPrecisionScale() { return BaseType.ToLower().Contains("decimal") - || BaseType.ToLower().Contains("numeric"); + || BaseType.ToLower().Contains("numeric") + || BaseType.ToLower().Contains("money") + || BaseType.ToLower().Contains("real") + || BaseType.ToLower().Contains("float"); } /// @@ -139,6 +144,19 @@ public class DataType { /// A TeslaSQL.DataType object public static DataType ParseDataType(DataRow row) { string dataType = row.Field("DATA_TYPE"); + bool? signed; + if (dataType.ToUpper().Contains("INT")) + { + signed = !dataType.ToUpper().Contains("UNSIGNED"); + } + else if (dataType.ToUpper().Contains("BIT")) + { + signed = false; + } + else + { + signed = null; + } Nullable characterMaximumLength; if (row["CHARACTER_MAXIMUM_LENGTH"].GetType() == typeof(System.DBNull)) { @@ -156,7 +174,7 @@ public class DataType { var numericPrecision = precision; Nullable numericScale = row["NUMERIC_SCALE"].GetType() == typeof(UInt64) ? Convert.ToByte(row.Field("NUMERIC_SCALE")) : row.Field("NUMERIC_SCALE"); return new DataType( - dataType, characterMaximumLength, numericPrecision, numericScale + dataType, characterMaximumLength, numericPrecision, numericScale, signed ); } } diff --git a/TeslaSQL/DataUtils/MSSQLDataUtils.cs b/TeslaSQL/DataUtils/MSSQLDataUtils.cs index e64f205..e93ddc4 100644 --- a/TeslaSQL/DataUtils/MSSQLDataUtils.cs +++ b/TeslaSQL/DataUtils/MSSQLDataUtils.cs @@ -924,16 +924,36 @@ THEN DELETE /// Table name /// Table's schema\ /// Original table to pull TableConf for + /// SQL type of the table the script will be run on /// The CREATE TABLE script as a string - public string ScriptTable(string dbName, string table, string schema, string originalTableName) { + public string ScriptTable(string dbName, string table, string schema, string originalTableName, SqlFlavor flavor = SqlFlavor.MSSQL) { //get actual field list on the source table var includeColumns = new List() { "SYS_CHANGE_VERSION", "SYS_CHANGE_OPERATION" }; List columns = GetFieldList(dbName, table, schema, originalTableName, includeColumns); - - return string.Format( - @"CREATE TABLE [{0}].[{1}] ( - {2} - );", schema, table, string.Join(",", columns.Select(c => c.ToExpression()))); + switch (flavor) + { + case SqlFlavor.MSSQL: + return string.Format( + @"CREATE TABLE [{0}].[{1}] ( + {2} + );", schema, table, string.Join(",", columns.Select(c => c.ToExpression()))); + case SqlFlavor.MySQL: + foreach (TColumn column in columns) + { + column.dataType = new DataType(DataType.MapDataType(SqlFlavor.MSSQL, SqlFlavor.MySQL, column.dataType.BaseType), column.dataType.CharacterMaximumLength, column.dataType.NumericPrecision, column.dataType.NumericScale); + if (column.dataType.UsesMaxLength() && column.dataType.CharacterMaximumLength == -1) + { + column.dataType = new DataType("longtext"); + } + } + var ctid = table.Split('_').Last(); + TableConf fakeTableConf = new TableConf(); + fakeTableConf.Name = originalTableName; + string pks = String.Join(",",GetPrimaryKeysFromInfoTable(fakeTableConf, Convert.ToInt64(ctid), dbName)); + return string.Format("CREATE TABLE {0}.{1}({2}, PRIMARY KEY ({3}));", dbName, table, string.Join(",", columns.Select(c => c.ToExpression(SqlFlavor.MySQL))), pks); + default: + throw new NotImplementedException("No scripting rules defined for " + flavor.ToString()); + } } public void Consolidate(string ctTableName, string consolidatedTableName, string dbName, string schemaName) { diff --git a/TeslaSQL/DataUtils/MySQLDataUtils.cs b/TeslaSQL/DataUtils/MySQLDataUtils.cs index 76b5f50..3b369cb 100644 --- a/TeslaSQL/DataUtils/MySQLDataUtils.cs +++ b/TeslaSQL/DataUtils/MySQLDataUtils.cs @@ -27,12 +27,20 @@ public MySQLDataUtils(Logger logger, TServer server) if (agentType.Equals(AgentType.Slave)) { cmd = new MySqlCommand("SELECT CTID, syncStartVersion, syncStopVersion, syncBitWise, syncStartTime" + +<<<<<<< HEAD " FROM tblCTSlaveVersion WITH(NOLOCK) WHERE slaveIdentifier = @slave ORDER BY cttimestamp DESC LIMIT 0,1"); +======= + " FROM tblCTSlaveVersion WHERE slaveIdentifier = @slave ORDER BY cttimestamp DESC LIMIT 0,1;"); +>>>>>>> working cmd.Parameters.Add("@slave", MySqlDbType.VarChar, 100).Value = slaveIdentifier; } else { +<<<<<<< HEAD cmd = new MySqlCommand("SELECT CTID, syncStartVersion, syncStopVersion, syncBitWise, syncStartTime FROM tblCTVersion ORDER BY CTID DESC LIMIT 0,1"); +======= + cmd = new MySqlCommand("SELECT CTID, syncStartVersion, syncStopVersion, syncBitWise, syncStartTime FROM tblCTVersion ORDER BY CTID DESC LIMIT 0,1;"); +>>>>>>> working } DataTable result = MySqlQuery(dbName, cmd); @@ -42,8 +50,13 @@ public MySQLDataUtils(Logger logger, TServer server) public DataTable GetPendingCTVersions(string dbName, Int64 CTID, int syncBitWise) { string query = ("SELECT CTID, syncStartVersion, syncStopVersion, syncStartTime, syncBitWise" + +<<<<<<< HEAD " FROM tblCTVersion WITH(NOLOCK) WHERE CTID > @ctid AND syncBitWise & @syncbitwise > 0" + " ORDER BY CTID ASC"); +======= + " FROM tblCTVersion WHERE CTID > @ctid AND syncBitWise & @syncbitwise > 0" + + " ORDER BY CTID ASC;"); +>>>>>>> working MySqlCommand cmd = new MySqlCommand(query); cmd.Parameters.Add("@ctid", MySqlDbType.Timestamp).Value = new DateTime(CTID).ToUniversalTime(); cmd.Parameters.Add("@syncbitwise", MySqlDbType.Int32).Value = syncBitWise; @@ -54,8 +67,21 @@ public DataTable GetPendingCTVersions(string dbName, Int64 CTID, int syncBitWise public DataTable GetPendingCTSlaveVersions(string dbName, string slaveIdentifier, int bitwise) { +<<<<<<< HEAD //not yet (slave) throw new NotImplementedException(); +======= + string query = @"SELECT * FROM tblCTSlaveVersion + WHERE slaveIdentifier = @slaveidentifier AND CTID > + ( + SELECT MAX(ctid) FROM tblCTSlaveVersion WHERE slaveIdentifier = @slaveidentifier AND syncBitWise = @bitwise + ) ORDER BY CTID;"; + MySqlCommand cmd = new MySqlCommand(query); + cmd.Parameters.Add("@bitwise", MySqlDbType.Int32).Value = bitwise; + cmd.Parameters.Add("@slaveidentifier", MySqlDbType.VarChar, 500).Value = slaveIdentifier; + logger.Log("Running query: " + cmd.CommandText + "... slaveidentifiers is " + slaveIdentifier, LogLevel.Debug); + return MySqlQuery(dbName, cmd); +>>>>>>> working } public DateTime GetLastStartTime(string dbName, Int64 CTID, int syncBitWise, AgentType type, string slaveIdentifier = null) @@ -292,8 +318,24 @@ public int SelectIntoCTTable(string sourceCTDB, TableConf table, string sourceDB public void CreateSlaveCTVersion(string dbName, ChangeTrackingBatch ctb, string slaveIdentifier) { +<<<<<<< HEAD //not yet (slave) throw new NotImplementedException(); +======= + string query = "INSERT INTO tblCTSlaveVersion (CTID, slaveIdentifier, syncStartVersion, syncStopVersion, syncStartTime, syncBitWise)"; + query += " VALUES (@ctid, @slaveidentifier, @startversion, @stopversion, @starttime, @syncbitwise);"; + + MySqlCommand cmd = new MySqlCommand(query); + + cmd.Parameters.Add("@ctid", MySqlDbType.Int64).Value = ctb.CTID; + cmd.Parameters.Add("@slaveidentifier", MySqlDbType.VarChar, 100).Value = slaveIdentifier; + cmd.Parameters.Add("@startversion", MySqlDbType.Int64).Value = ctb.SyncStartVersion; + cmd.Parameters.Add("@stopversion", MySqlDbType.Int64).Value = ctb.SyncStopVersion; + cmd.Parameters.Add("@starttime", MySqlDbType.DateTime).Value = ctb.SyncStartTime; + cmd.Parameters.Add("@syncbitwise", MySqlDbType.Int32).Value = ctb.SyncBitWise; + + MySqlNonQuery(dbName, cmd, 30); +>>>>>>> working } public void CreateSchemaChangeTable(string dbName, Int64 CTID) @@ -419,6 +461,7 @@ public DataTable GetDDLEvents(string dbName, DateTime afterDate) String currentSchemaTableNameShort = table.Name + "_schema_" + (this.CTID).ToString(); currentSchemaTableName = CTdbName + "." + table.Name + "_schema_" + this.CTID.ToString(); compareSchemaTableName = CTdbName + "." + table.Name + "_schema_" + (this.CTID - 1).ToString(); +<<<<<<< HEAD //I'm not sure under what circumstances the table would already exist, but sometimes it does and //so we're checking before we do it. if (!CheckTableExists(CTdbName, currentSchemaTableNameShort)) @@ -432,6 +475,16 @@ public DataTable GetDDLEvents(string dbName, DateTime afterDate) query.AppendLine(";"); MySqlNonQuery(dbName, new MySqlCommand(query.ToString())); } +======= + //make a snapshot of the current schema to work off of + query.Clear(); + query.Append("CREATE TABLE "); + query.Append(currentSchemaTableName); + query.Append(" LIKE "); + query.Append(table.Name); + query.AppendLine(";"); + MySqlNonQuery(dbName, new MySqlCommand(query.ToString())); +>>>>>>> working //if this is the first time running, just return an empty event set if (!CheckTableExists(CTdbName, compareSchemaTableNameShort)) { @@ -719,22 +772,35 @@ public void WriteBitWise(string dbName, Int64 CTID, int value, AgentType agentTy public int ReadBitWise(string dbName, Int64 CTID, AgentType agentType) { +<<<<<<< HEAD //does this get called? throw new NotImplementedException(); +======= +>>>>>>> working string query; MySqlCommand cmd; if (agentType.Equals(AgentType.Slave)) { +<<<<<<< HEAD query = "SELECT syncBitWise from tblCTSlaveVersion WITH(NOLOCK)"; query += " WHERE slaveIdentifier = @slaveidentifier AND CTID = @ctid"; +======= + query = "SELECT syncBitWise from tblCTSlaveVersion"; + query += " WHERE slaveIdentifier = @slaveidentifier AND CTID = @ctid;"; +>>>>>>> working cmd = new MySqlCommand(query); cmd.Parameters.Add("@slaveidentifier", MySqlDbType.VarChar, 100).Value = Config.Slave; cmd.Parameters.Add("@ctid", MySqlDbType.Timestamp).Value = new DateTime(CTID).ToUniversalTime(); } else { +<<<<<<< HEAD query = "SELECT syncBitWise from tblCTVersion WITH(NOLOCK)"; query += " WHERE CTID = @ctid"; +======= + query = "SELECT syncBitWise from tblCTVersion"; + query += " WHERE CTID = @ctid;"; +>>>>>>> working cmd = new MySqlCommand(query); cmd.Parameters.Add("@ctid", MySqlDbType.Timestamp).Value = new DateTime(CTID).ToUniversalTime(); } @@ -831,7 +897,11 @@ public void AddColumn(TableConf t, string dbName, string columnName, string data //add column if it doesn't exist if (!CheckColumnExists(dbName, t.SchemaName, t.Name, columnName)) { +<<<<<<< HEAD cmd = new MySqlCommand("ALTER TABLE " + t.FullName + " ADD " + columnName + " " + dataType); +======= + cmd = new MySqlCommand("ALTER TABLE " + Config.SlaveDB + "." + t.Name + " ADD " + columnName + " " + dataType); +>>>>>>> working logger.Log("Altering table with command: " + cmd.CommandText, LogLevel.Debug); MySqlNonQuery(dbName, cmd); } @@ -877,8 +947,47 @@ public void PublishTableInfo(string dbName, TableConf table, long CTID, long exp public RowCounts ApplyTableChanges(TableConf table, TableConf archiveTable, string dbName, Int64 CTID, string CTDBName, bool isConsolidated) { +<<<<<<< HEAD //We're not writing to MySQL slaves at this time throw new NotImplementedException(); +======= + var rowcounts = new DataTable(); + List commands = new List(); + commands.Add(BuildCopyCommand(table, dbName, CTDBName, CTID)); + if (archiveTable != null) + { + commands.Add(BuildCopyCommand(archiveTable, dbName, CTDBName, CTID)); + } + + foreach (MySqlCommand command in commands) + { + rowcounts = MySqlQuery(dbName, command, null, "allow user variables=true;"); + logger.Log(Convert.ToInt32(rowcounts.Rows[0]["a"]) + " rows deleted, " + Convert.ToInt32(rowcounts.Rows[0]["a"]) + " rows inserted into table " + table.Name, LogLevel.Info); + } + + return new RowCounts(Convert.ToInt32(rowcounts.Rows[0]["a"]), Convert.ToInt32(rowcounts.Rows[0]["b"])); + } + + private MySqlCommand BuildCopyCommand(TableConf table, string dbName, string CTDBName, Int64 CTID) + { + var sql = string.Format(@"BEGIN; + DELETE P FROM {0} AS P + WHERE EXISTS ( + SELECT 1 FROM {1}.{2} AS CT WHERE {3} AND CT.sys_change_operation = 'D'); + SELECT row_count() into @a; + REPLACE {0} + SELECT {4} FROM {1}.{2} AS CT WHERE CT.sys_change_operation IN ('I', 'U'); + SELECT row_count() into @b; + COMMIT; + SELECT @a as a, @b as b;", + table.Name, + CTDBName, + table.ToCTName(CTID), + table.PkList, + table.SimpleColumnList); + + return new MySqlCommand(sql.ToString()); +>>>>>>> working } public void Consolidate(string ctTableName, string consolidatedTableName, string dbName, string schemaName) @@ -894,8 +1003,45 @@ public void Consolidate(string ctTableName, string consolidatedTableName, string public void CopyIntoHistoryTable(ChangeTable t, string slaveCTDB, bool isConsolidated) { +<<<<<<< HEAD //not yet throw new NotImplementedException(); +======= + string sql; + string sourceTable; + List fields; + if (false && isConsolidated && Config.Slave == Config.RelayServer && slaveCTDB == Config.RelayDB) //false added until mysql relay for compiler optimization + { + //we don't have a mysql relay so I'm not touching this + sourceTable = "[" + t.schemaName + "].[" + t.consolidatedName + "]"; + fields = GetFieldList(slaveCTDB, t.consolidatedName, t.schemaName); + } + else + { + sourceTable = t.ctName; + fields = GetFieldList(slaveCTDB, t.ctName, t.schemaName); + } + + string insertColumns = "CTHistID, " + string.Join(",", fields.Select(col => col.name)); + string selectColumns = "CAST(" + t.CTID + " AS BIGINT) AS CTHistID, " + string.Join(",", fields.Select(col => col.name)); + + if (!CheckTableExists(slaveCTDB, t.historyName, t.schemaName)) + { + logger.Log("table " + t.historyName + " does not exist, creating it", LogLevel.Trace); + sql = string.Format("CREATE TABLE {0} LIKE {1};", t.historyName, sourceTable); + logger.Log(sql, LogLevel.Debug); + MySqlNonQuery(slaveCTDB, new MySqlCommand(sql)); + sql = string.Format("ALTER TABLE {0} ADD CTHistID BIGINT FIRST;", t.historyName, sourceTable); + logger.Log(sql, LogLevel.Debug); + MySqlNonQuery(slaveCTDB, new MySqlCommand(sql)); + } + + logger.Log("selecting into " + t.historyName, LogLevel.Trace); + sql = string.Format("INSERT INTO {0} ({1}) SELECT {2} FROM {3};", t.historyName, insertColumns, selectColumns, sourceTable); + logger.Log(sql, LogLevel.Debug); + var cmd = new MySqlCommand(sql); + MySqlNonQuery(slaveCTDB, cmd); +>>>>>>> working } public ChangeTrackingBatch GetCTBatch(string dbName, Int64 ctid) @@ -931,13 +1077,21 @@ public void CreateShardCTVersion(string dbName, Int64 CTID, Int64 startVersion) public IEnumerable GetPrimaryKeysFromInfoTable(TableConf table, long CTID, string database) { +<<<<<<< HEAD //not yet +======= + //not yet +>>>>>>> working throw new NotImplementedException(); } public int GetExpectedRowCounts(string ctDbName, long ctid) { +<<<<<<< HEAD //not yet +======= + //not yet (relay) +>>>>>>> working throw new NotImplementedException(); } @@ -972,7 +1126,11 @@ public IEnumerable GetOldCTIDsMaster(string dbName, DateTime chopDate) public IEnumerable GetOldCTIDsRelay(string dbName, DateTime chopDate) { +<<<<<<< HEAD //not yet +======= + //not yet (relay) +>>>>>>> working throw new NotImplementedException(); } @@ -1006,7 +1164,11 @@ public bool IsBeingInitialized(string sourceCTDB, TableConf table) public Int64? GetInitializeStartVersion(string sourceCTDB, TableConf table) { +<<<<<<< HEAD string sql = @"SELECT nextSynchVersion FROM tblCTInitialize WHERE tableName = @tableName"; +======= + string sql = @"SELECT nextSynchVersion FROM tblCTInitialize WHERE tableName = @tableName;"; +>>>>>>> working var cmd = new MySqlCommand(sql); cmd.Parameters.Add("@tableName", MySqlDbType.VarChar, 500).Value = table.Name; DataTable res = MySqlQuery(sourceCTDB, cmd); @@ -1034,7 +1196,11 @@ public void CleanUpInitializeTable(string dbName, DateTime syncStartTime) cmd.Parameters.Add("@syncStartTime", MySqlDbType.Timestamp).Value = syncStartTime.ToUniversalTime(); MySqlNonQuery(dbName, cmd); +<<<<<<< HEAD sql = @"UPDATE " + CTIDtoTimestampTable + " SET status=1;"; +======= + sql = "UPDATE " + CTIDtoTimestampTable + " SET status=1;"; +>>>>>>> working MySqlNonQuery(dbName.Substring(3), new MySqlCommand(sql)); } @@ -1160,7 +1326,11 @@ public MySqlDataReader ExecuteReader(string dbName, MySqlCommand cmd, int timeou return reader; } +<<<<<<< HEAD private DataTable MySqlQuery(string dbName, MySqlCommand cmd, int? timeout = null) +======= + private DataTable MySqlQuery(string dbName, MySqlCommand cmd, int? timeout = null, string addToConnString = "") +>>>>>>> working { int commandTimeout = timeout ?? Config.QueryTimeout; foreach (IDataParameter p in cmd.Parameters) @@ -1169,7 +1339,11 @@ private DataTable MySqlQuery(string dbName, MySqlCommand cmd, int? timeout = nul p.Value = DBNull.Value; } //build connection string based on server/db info passed in +<<<<<<< HEAD string connStr = buildConnString(dbName); +======= + string connStr = buildConnString(dbName, addToConnString); +>>>>>>> working //using block to avoid resource leaks using (MySqlConnection conn = new MySqlConnection(connStr)) @@ -1206,7 +1380,11 @@ private string ParseCommand(MySqlCommand cmd) return query; } +<<<<<<< HEAD private string buildConnString(string database) +======= + private string buildConnString(string database, string addToConnectionString = "") +>>>>>>> working { string sqlhost = ""; string sqluser = ""; @@ -1231,7 +1409,11 @@ private string buildConnString(string database) break; } +<<<<<<< HEAD return "server=" + sqlhost + "; database=" + database + ";user=" + sqluser + ";password=" + sqlpass + ";Convert Zero Datetime=True"; +======= + return "server=" + sqlhost + "; database=" + database + ";user=" + sqluser + ";password=" + sqlpass + ";Convert Zero Datetime=True;" + addToConnectionString; +>>>>>>> working } private T MySqlQueryToScalar(string dbName, MySqlCommand cmd, int? timeout = null) @@ -1360,5 +1542,61 @@ public bool CleanupTriggerTable(string dbName, string tableName, DateTime chopDa } +<<<<<<< HEAD +======= + /// + /// Writes data from the given stream reader to a destination database + /// + /// DataReader object to stream input from + /// Database name + /// Schema of the table to write to + /// Table name to write to + /// Timeout + public void BulkCopy(string file, string dbName, string table, int timeout, List columns) + { + //TODO: Manually build the string + using (MySqlConnection conn = new MySqlConnection(buildConnString(dbName, "allow user variables=true;"))) + { + var query = new StringBuilder(); + query.Append("LOAD DATA INFILE '"); + query.Append("/wayfair/mnt/change_tracking" + file); + query.Append("' INTO TABLE "); + query.Append(dbName); + query.Append("."); + query.Append(table); + query.Append(" FIELDS TERMINATED BY '|'"); + query.Append(" LINES TERMINATED BY '&_+-!/=/='"); + query.Append(" (" + String.Join(",", columns.Select(x => "@" + x.ColExpression())) + ") SET "); + var setStatements = new List(); + foreach (DataCopy.MSSQLToMySQLDataCopy.Col col in columns) + { + string colName = col.ColExpression(); + if (col.dataType.UsesPrecisionScale()) + { + setStatements.Add(colName + " = CAST(IF(LENGTH(@" + colName + ")=0, NULL, IF(LENGTH(@" + colName + ")=10 AND @" + colName + "=' ', '',@" + colName + ")) AS DECIMAL)"); + } + else if (col.dataType.IsStringType()) + { + setStatements.Add(colName + " = IF(LENGTH(@" + colName + ")=0, NULL, IF(LENGTH(@" + colName + ")=10 AND @" + colName + "=' ', '',CONVERT(CONVERT(@" + colName + " USING LATIN1) USING UTF8)))"); + } + else if (col.dataType.BaseType.ToUpper().Contains("INT") || col.dataType.BaseType.ToUpper().Contains("BIT")) + { + string signed = (bool)col.dataType.Signed ? "SIGNED" : "UNSIGNED"; + setStatements.Add(colName + " = CAST(IF(LENGTH(@" + colName + ")=0, NULL, IF(LENGTH(@" + colName + ")=10 AND @" + colName + "=' ', '',@" + colName + ")) AS "+ signed + ")"); + } + else + { + setStatements.Add(colName + " = IF(LENGTH(@" + colName + ")=0, NULL, IF(LENGTH(@" + colName + ")=10 AND @" + colName + "=' ', '',@" + colName + "))"); + } + } + query.Append(string.Join(",", setStatements.Select(x => x))); + query.Append(";"); + conn.Open(); + MySqlCommand comm = new MySqlCommand(query.ToString(), conn); + logger.Log("Executing bulk copy query: " + query.ToString(), LogLevel.Info); + comm.ExecuteNonQuery(); + } + } +>>>>>>> working } } diff --git a/TeslaSQL/TeslaSQL.csproj b/TeslaSQL/TeslaSQL.csproj index 285679a..e2a8c25 100644 --- a/TeslaSQL/TeslaSQL.csproj +++ b/TeslaSQL/TeslaSQL.csproj @@ -94,6 +94,7 @@ +