Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
14 changed files
with
617 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
using System.Data; | ||
using System.IO; | ||
using Yuniql.Extensibility; | ||
using Oracle.ManagedDataAccess.Client; | ||
using Yuniql.Extensibility.BulkCsvParser; | ||
using System; | ||
using System.Diagnostics; | ||
|
||
//https://github.com/22222/CsvTextFieldParser | ||
namespace Yuniql.Oracle | ||
{ | ||
///<inheritdoc/> | ||
public class OracleBulkImportService : IBulkImportService | ||
{ | ||
private string _connectionString; | ||
private readonly ITraceService _traceService; | ||
|
||
///<inheritdoc/> | ||
public OracleBulkImportService(ITraceService traceService) | ||
{ | ||
this._traceService = traceService; | ||
} | ||
|
||
///<inheritdoc/> | ||
public void Initialize(string connectionString) | ||
{ | ||
this._connectionString = connectionString; | ||
} | ||
|
||
///<inheritdoc/> | ||
public void Run( | ||
IDbConnection connection, | ||
IDbTransaction transaction, | ||
string fileFullPath, | ||
string bulkSeparator = null, | ||
int? bulkBatchSize = null, | ||
int? commandTimeout = null) | ||
{ | ||
var connectionStringBuilder = new OracleConnectionStringBuilder(_connectionString); | ||
|
||
//get file name segments from potentially sequenceno.schemaname.tablename filename pattern | ||
var fileName = Path.GetFileNameWithoutExtension(fileFullPath); | ||
var fileNameSegments = fileName.SplitBulkFileName(defaultSchema: connectionStringBuilder.DataSource); | ||
var schemaName = fileNameSegments.Item2; | ||
var tableName = fileNameSegments.Item3; | ||
|
||
if(!string.Equals(connectionStringBuilder.DataSource, schemaName, System.StringComparison.InvariantCultureIgnoreCase)) | ||
{ | ||
throw new ApplicationException("Oracle does not support custom schema. Your bulk file name must resemble these patterns: 1.mytable.csv, 01.mytable.csv or mytable.csv"); | ||
} | ||
|
||
var stopwatch = new Stopwatch(); | ||
stopwatch.Start(); | ||
_traceService.Info($"OracleBulkImportService: Started copying data into destination table {schemaName}.{tableName}"); | ||
|
||
//read csv file and load into data table | ||
var dataTable = ParseCsvFile(connection, fileFullPath, tableName, bulkSeparator); | ||
|
||
//save the csv data into staging sql table | ||
BulkCopyWithDataTable(connection, transaction, bulkBatchSize, tableName, dataTable); | ||
|
||
stopwatch.Stop(); | ||
_traceService.Info($"OracleBulkImportService: Finished copying data into destination table {schemaName}.{tableName} in {stopwatch.ElapsedMilliseconds} ms"); | ||
} | ||
|
||
private DataTable ParseCsvFile( | ||
IDbConnection connection, | ||
string fileFullPath, | ||
string tableName, | ||
string bulkSeparator) | ||
{ | ||
if (string.IsNullOrEmpty(bulkSeparator)) | ||
bulkSeparator = ","; | ||
|
||
var csvDatatable = new DataTable(); | ||
string query = $"SELECT * FROM {tableName} LIMIT 0;"; | ||
using (var adapter = new OracleDataAdapter(query, connection as OracleConnection)) | ||
{ | ||
adapter.Fill(csvDatatable); | ||
}; | ||
|
||
using (var csvReader = new CsvTextFieldParser(fileFullPath)) | ||
{ | ||
csvReader.Separators = (new string[] { bulkSeparator }); | ||
csvReader.HasFieldsEnclosedInQuotes = true; | ||
|
||
//skipped the first row | ||
csvReader.ReadFields(); | ||
|
||
//process data rows | ||
while (!csvReader.EndOfData) | ||
{ | ||
string[] fieldData = csvReader.ReadFields(); | ||
for (int i = 0; i < fieldData.Length; i++) | ||
{ | ||
if (fieldData[i] == "" || fieldData[i] == "NULL") | ||
{ | ||
fieldData[i] = null; | ||
} | ||
} | ||
csvDatatable.Rows.Add(fieldData); | ||
} | ||
} | ||
return csvDatatable; | ||
} | ||
|
||
//https://dev.Oracle.com/doc/connector-net/en/connector-net-programming-bulk-loader.html | ||
//https://stackoverflow.com/questions/48018614/insert-datatable-into-a-Oracle-table-using-c-sharp | ||
|
||
//NOTE: This is not the most typesafe and performant way to do this and this is just to demonstrate | ||
//possibility to bulk import data in custom means during migration execution | ||
private void BulkCopyWithDataTable( | ||
IDbConnection connection, | ||
IDbTransaction transaction, | ||
int? bulkBatchSize, | ||
string tableName, | ||
DataTable dataTable) | ||
{ | ||
using (var cmd = new OracleCommand()) | ||
{ | ||
cmd.Connection = connection as OracleConnection; | ||
cmd.Transaction = transaction as OracleTransaction; | ||
cmd.CommandText = $"SELECT * FROM {tableName} WHERE 1 <> 1;"; | ||
|
||
using (var adapter = new OracleDataAdapter(cmd)) | ||
{ | ||
adapter.UpdateBatchSize = bulkBatchSize.HasValue ? bulkBatchSize.Value : DEFAULT_CONSTANTS.BULK_BATCH_SIZE; ; | ||
using (var cb = new OracleCommandBuilder(adapter)) | ||
{ | ||
cb.SetAllValues = true; | ||
adapter.Update(dataTable); | ||
} | ||
}; | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
using System.Collections.Generic; | ||
using System.Data; | ||
using Yuniql.Extensibility; | ||
using Oracle.ManagedDataAccess.Client; | ||
using System; | ||
using System.IO; | ||
using Yuniql.Extensibility.SqlBatchParser; | ||
using System.Linq; | ||
|
||
namespace Yuniql.Oracle | ||
{ | ||
///<inheritdoc/> | ||
public class OracleDataService : IDataService, IMixableTransaction | ||
{ | ||
private string _connectionString; | ||
private readonly ITraceService _traceService; | ||
|
||
///<inheritdoc/> | ||
public OracleDataService(ITraceService traceService) | ||
{ | ||
this._traceService = traceService; | ||
} | ||
|
||
///<inheritdoc/> | ||
public void Initialize(string connectionString) | ||
{ | ||
this._connectionString = connectionString; | ||
} | ||
|
||
///<inheritdoc/> | ||
public bool IsTransactionalDdlSupported => false; | ||
|
||
///<inheritdoc/> | ||
public bool IsSchemaSupported { get; } = true; | ||
|
||
///<inheritdoc/> | ||
public bool IsBatchSqlSupported { get; } = true; | ||
|
||
///<inheritdoc/> | ||
public bool IsUpsertSupported => false; | ||
|
||
///<inheritdoc/> | ||
public string TableName { get; set; } = "__yuniql_schema_version"; | ||
|
||
///<inheritdoc/> | ||
public string SchemaName { get; set; } | ||
|
||
///<inheritdoc/> | ||
public IDbConnection CreateConnection() | ||
{ | ||
return new OracleConnection(_connectionString); | ||
} | ||
|
||
///<inheritdoc/> | ||
public IDbConnection CreateMasterConnection() | ||
{ | ||
var masterConnectionStringBuilder = new OracleConnectionStringBuilder(_connectionString); | ||
masterConnectionStringBuilder.DataSource = "INFORMATION_SCHEMA"; | ||
|
||
return new OracleConnection(masterConnectionStringBuilder.ConnectionString); | ||
} | ||
|
||
///<inheritdoc/> | ||
public List<string> BreakStatements(string sqlStatementRaw) | ||
{ | ||
var sqlBatchParser = new SqlBatchParser(_traceService, new GoSqlBatchLineAnalyzer(), new CommentAnalyzer()); | ||
return sqlBatchParser.Parse(sqlStatementRaw).Select(s => s.BatchText).ToList(); | ||
} | ||
|
||
///<inheritdoc/> | ||
public ConnectionInfo GetConnectionInfo() | ||
{ | ||
var connectionStringBuilder = new OracleConnectionStringBuilder(_connectionString); | ||
return new ConnectionInfo { DataSource = connectionStringBuilder.DataSource, Database = connectionStringBuilder.DataSource }; | ||
} | ||
|
||
///<inheritdoc/> | ||
public string GetSqlForCheckIfDatabaseExists() | ||
=> @" | ||
SELECT 1 FROM SYS.ALL_TABLES WHERE OWNER = '${YUNIQL_DB_NAME}'; | ||
"; | ||
|
||
///<inheritdoc/> | ||
public string GetSqlForCreateDatabase() | ||
=> throw new NotSupportedException("Create database is not supported in Oracle."); | ||
|
||
///<inheritdoc/> | ||
public string GetSqlForCreateSchema() | ||
=> throw new NotSupportedException("Custom schema is not supported in Oracle."); | ||
|
||
///<inheritdoc/> | ||
public string GetSqlForCheckIfDatabaseConfigured() | ||
=> @" | ||
SELECT 1 FROM SYS.ALL_TABLES WHERE OWNER = '${YUNIQL_DB_NAME}' AND TABLE_NAME = '${YUNIQL_TABLE_NAME}' AND ROWNUM = 1; | ||
"; | ||
|
||
///<inheritdoc/> | ||
public string GetSqlForCheckIfDatabaseConfiguredv10() | ||
=> @" | ||
SELECT 1 FROM SYS.ALL_TABLES WHERE OWNER = '${YUNIQL_DB_NAME}' AND TABLE_NAME = '__yuniqldbversion' AND ROWNUM = 1; | ||
"; | ||
|
||
///<inheritdoc/> | ||
public string GetSqlForConfigureDatabase() | ||
=> @" | ||
CREATE TABLE ${YUNIQL_TABLE_NAME} ( | ||
sequence_id NUMBER NOT NULL, | ||
version VARCHAR2(190) NOT NULL, | ||
applied_on_utc TIMESTAMP NOT NULL, | ||
applied_by_user VARCHAR2(32) NOT NULL, | ||
applied_by_tool VARCHAR2(32) NOT NULL, | ||
applied_by_tool_version VARCHAR2(16) NOT NULL, | ||
status VARCHAR2(32) NOT NULL, | ||
duration_ms NUMBER NOT NULL, | ||
checksum VARCHAR2(64) NOT NULL, | ||
failed_script_path VARCHAR2(4000) NULL, | ||
failed_script_error VARCHAR2(4000) NULL, | ||
additional_artifacts VARCHAR2(4000) NULL, | ||
CONSTRAINT pk_${YUNIQL_TABLE_NAME} PRIMARY KEY (sequence_id), | ||
CONSTRAINT ix_${YUNIQL_TABLE_NAME} UNIQUE (version) | ||
); | ||
"; | ||
|
||
///<inheritdoc/> | ||
public string GetSqlForGetCurrentVersion() | ||
=> @" | ||
SELECT version FROM ${YUNIQL_TABLE_NAME} WHERE status = 'Successful' AND ROWNUM = 1 ORDER BY sequence_id DESC; | ||
"; | ||
|
||
///<inheritdoc/> | ||
public string GetSqlForGetAllVersions() | ||
=> @" | ||
SELECT sequence_id, version, applied_on_utc, applied_by_user, applied_by_tool, applied_by_tool_version, status, duration_ms, checksum, failed_script_path, failed_script_error, additional_artifacts | ||
FROM ${YUNIQL_TABLE_NAME} ORDER BY version ASC; | ||
"; | ||
|
||
///<inheritdoc/> | ||
public string GetSqlForInsertVersion() | ||
=> @" | ||
INSERT INTO ${YUNIQL_TABLE_NAME} (version, applied_on_utc, applied_by_user, applied_by_tool, applied_by_tool_version, status, duration_ms, checksum, failed_script_path, failed_script_error, additional_artifacts) | ||
VALUES ('${YUNIQL_VERSION}', SYSDATE, USER, '${YUNIQL_APPLIED_BY_TOOL}', '${YUNIQL_APPLIED_BY_TOOL_VERSION}','${YUNIQL_STATUS}', '${YUNIQL_DURATION_MS}', '${YUNIQL_CHECKSUM}', '${YUNIQL_FAILED_SCRIPT_PATH}', '${YUNIQL_FAILED_SCRIPT_ERROR}', '${YUNIQL_ADDITIONAL_ARTIFACTS}'); | ||
"; | ||
|
||
///<inheritdoc/> | ||
public string GetSqlForUpdateVersion() | ||
=> @" | ||
UPDATE ${YUNIQL_TABLE_NAME} | ||
SET | ||
applied_on_utc = SYSDATE, | ||
applied_by_user = USER, | ||
applied_by_tool = '${YUNIQL_APPLIED_BY_TOOL}', | ||
applied_by_tool_version = '${YUNIQL_APPLIED_BY_TOOL_VERSION}', | ||
status = '${YUNIQL_STATUS}', | ||
duration_ms = '${YUNIQL_DURATION_MS}', | ||
failed_script_path = '${YUNIQL_FAILED_SCRIPT_PATH}', | ||
failed_script_error = '${YUNIQL_FAILED_SCRIPT_ERROR}', | ||
additional_artifacts = '${YUNIQL_ADDITIONAL_ARTIFACTS}' | ||
WHERE | ||
version = '${YUNIQL_VERSION}'; | ||
"; | ||
|
||
///<inheritdoc/> | ||
public string GetSqlForUpsertVersion() | ||
=> throw new NotSupportedException("Not supported for the target platform"); | ||
|
||
///<inheritdoc/> | ||
public string GetSqlForCheckRequireMetaSchemaUpgrade(string currentSchemaVersion) | ||
//when table __yuniqldbversion exists, we need to upgrade from yuniql v1.0 to v1.1 version | ||
=> @" | ||
SELECT 'v1.1' FROM SYS.ALL_TABLES WHERE OWNER = '${YUNIQL_DB_NAME}' AND TABLE_NAME = '__yuniqldbversion' AND ROWNUM = 1; | ||
"; | ||
|
||
///<inheritdoc/> | ||
public string GetSqlForUpgradeMetaSchema(string requiredSchemaVersion) | ||
{ | ||
var assembly = typeof(OracleDataService).Assembly; | ||
var resource = assembly.GetManifestResourceStream($"{assembly.GetName().Name}.SchemaUpgrade_{requiredSchemaVersion.Replace(".", "_")}.sql"); | ||
using var reader = new StreamReader(resource); | ||
return reader.ReadToEnd(); | ||
} | ||
|
||
///<inheritdoc/> | ||
public bool TryParseErrorFromException(Exception exception, out string result) | ||
{ | ||
result = null; | ||
if (exception is OracleException sqlException) | ||
{ | ||
result = $"(0x{sqlException.ErrorCode:X}) Error {sqlException.Number}: {sqlException.Message}"; | ||
return true; | ||
} | ||
return false; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
CREATE TABLE ${YUNIQL_DB_NAME}.${YUNIQL_TABLE_NAME}_v1_0 AS SELECT * FROM ${YUNIQL_DB_NAME}.__yuniqldbversion; | ||
|
||
DROP TABLE ${YUNIQL_DB_NAME}.__yuniqldbversion; | ||
|
||
CREATE TABLE ${YUNIQL_DB_NAME}.${YUNIQL_TABLE_NAME} ( | ||
sequence_id NUMBER NOT NULL, | ||
version VARCHAR2(190) NOT NULL, | ||
applied_on_utc TIMESTAMP NOT NULL, | ||
applied_by_user VARCHAR2(32) NOT NULL, | ||
applied_by_tool VARCHAR2(32) NOT NULL, | ||
applied_by_tool_version VARCHAR2(16) NOT NULL, | ||
status VARCHAR2(32) NOT NULL, | ||
duration_ms NUMBER NOT NULL, | ||
checksum VARCHAR2(64) NOT NULL, | ||
failed_script_path VARCHAR2(4000) NULL, | ||
failed_script_error VARCHAR2(4000) NULL, | ||
additional_artifacts VARCHAR2(4000) NULL, | ||
CONSTRAINT pk_${YUNIQL_TABLE_NAME} PRIMARY KEY (sequence_id), | ||
CONSTRAINT ix_${YUNIQL_TABLE_NAME} UNIQUE (version) | ||
); | ||
|
||
INSERT INTO ${YUNIQL_DB_NAME}.${YUNIQL_TABLE_NAME} (version, applied_on_utc, applied_by_user, applied_by_tool, applied_by_tool_version, status, duration_ms, checksum, failed_script_path, failed_script_error, additional_artifacts) | ||
SELECT version, applied_on_utc, applied_by_user, applied_by_tool, applied_by_tool_version, 'Successful', '0', '', NULL, NULL, NULL | ||
FROM ${YUNIQL_DB_NAME}.${YUNIQL_TABLE_NAME}_v1_0 | ||
ORDER BY version ASC; |
Oops, something went wrong.