Skip to content

Commit

Permalink
调整线程池处理
Browse files Browse the repository at this point in the history
  • Loading branch information
maikebing committed Aug 11, 2022
1 parent fc491d7 commit 7961a24
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 35 deletions.
34 changes: 25 additions & 9 deletions src/Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,27 @@ static void Main(string[] args)
connection.CreateCommand("CREATE TABLE IF NOT EXISTS telemetrydata (ts timestamp,value_type tinyint, value_boolean bool, value_string binary(10240), value_long bigint,value_datetime timestamp,value_double double) TAGS (deviceid binary(32),keyname binary(64));").ExecuteNonQuery();
var devid1 = $"{Guid.NewGuid():N}";
var devid2 = $"{Guid.NewGuid():N}";
var devid3 = $"{Guid.NewGuid():N}";
var devid4 = $"{Guid.NewGuid():N}";
DateTime dt = DateTime.Now;
UploadTelemetryData(connection, devid1, "1#air-compressor-two-level-discharge-temperature", 4000);
UploadTelemetryData(connection, devid2, "1#air-compressor-load-rate", 4000);

UploadTelemetryData(connection, devid1, "1#air-compressor-two-level-discharge-temperature", 5000);
UploadTelemetryData(connection, devid2, "1#air-compressor-load-rate", 5000);
Console.WriteLine("");
Console.WriteLine("");
Console.WriteLine("");
DateTime dt2 = DateTime.Now;
UploadTelemetryDataPool(connection, devid1, "1#air-compressor-two-level-discharge-temperature1", 4000);
UploadTelemetryDataPool(connection, devid2, "1#air-compressor-load-rate1", 4000);
Console.WriteLine($"UploadTelemetryData 耗时:{DateTime.Now.Subtract(dt).TotalSeconds}");
Console.WriteLine($"UploadTelemetryDataPool 耗时:{DateTime.Now.Subtract(dt2).TotalSeconds}");
UploadTelemetryDataPool(connection, devid3, "1#air-compressor-two-level-discharge-temperature1", 5000);
UploadTelemetryDataPool(connection, devid4, "1#air-compressor-load-rate1", 5000);
var t1 = DateTime.Now.Subtract(dt).TotalSeconds;
var t2 = DateTime.Now.Subtract(dt2).TotalSeconds;
Console.WriteLine("Done");
Thread.Sleep(TimeSpan.FromSeconds(1));
Console.WriteLine($"UploadTelemetryData 耗时:{t1}");
Console.WriteLine($"UploadTelemetryDataPool 耗时:{t2}");
Thread.Sleep(TimeSpan.FromSeconds(2));
var reader2 = connection.CreateCommand("select last_row(*) from telemetrydata group by deviceid,keyname ;").ExecuteReader();
ConsoleTableBuilder.From(reader2.ToDataTable()).WithFormat(ConsoleTableBuilderFormat.Default).ExportAndWriteLine();
var reader3 = connection.CreateCommand("select * from telemetrydata").ExecuteReader();

List<string> list = new List<string>();
while (reader3.Read())
{
Expand Down Expand Up @@ -258,7 +266,15 @@ static void UploadTelemetryDataPool(TaosConnection connection, string devid, str
{
Parallel.For(0, count,new ParallelOptions() { MaxDegreeOfParallelism=connection.PoolSize }, i =>
{
connection.CreateCommand($"INSERT INTO device_{devid} USING telemetrydata TAGS(\"{devid}\",\"{keyname}\") values (now,2,true,'{i}',{i},now,{i});").ExecuteNonQuery();
try
{
connection.CreateCommand($"INSERT INTO device_{devid} USING telemetrydata TAGS(\"{devid}\",\"{keyname}\") values (now,2,true,'{i}',{i},now,{i});").ExecuteNonQuery();
Console.WriteLine($"线程:{Thread.CurrentThread.ManagedThreadId}{i}条数据, OK");
}
catch (Exception ex)
{
Console.WriteLine($"线程:{Thread.CurrentThread.ManagedThreadId}{i}条数据, {ex.Message}");
}
});
}
}
Expand Down
24 changes: 19 additions & 5 deletions src/IoTSharp.Data.Taos/ConcurrentTaosQueue.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
Expand All @@ -26,10 +27,10 @@ public void Return(IntPtr client)
{
Monitor.Enter(TaosQueue);
TaosQueue.Enqueue(client);
System.Diagnostics.Debug.WriteLine($"TaosQueue Return:{client}");
Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 归还 {client}");
Monitor.Pulse(TaosQueue);
Monitor.Exit(TaosQueue);

Thread.Sleep(0);
}
int _ref = 0;
public void AddRef()
Expand All @@ -50,16 +51,29 @@ public void RemoveRef()
_ref--;
}
}
public int Timeout { get; set; }
public IntPtr Take()
{
IntPtr client = IntPtr.Zero;
Monitor.Enter(TaosQueue);
if (TaosQueue.IsEmpty)
{
Monitor.Wait(TaosQueue);
Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 连接池已空,请等待 超时时长:{Timeout}");
Monitor.Wait(TaosQueue, TimeSpan.FromSeconds(Timeout));
}
if (!TaosQueue.TryDequeue(out client))
{
Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 从连接池获取连接失败,等待并重试");
}
else
{
Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 拿走 {client}");
}
TaosQueue.TryDequeue(out var client);
System.Diagnostics.Debug.WriteLine($"TaosQueue Take:{client}");
Monitor.Exit(TaosQueue);
if (client == IntPtr.Zero)
{
throw new TimeoutException($"Connection pool is empty and wait time out({Timeout}s)!");
}
return client;
}
}
Expand Down
25 changes: 20 additions & 5 deletions src/IoTSharp.Data.Taos/TaosCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ internal long GetDateTimeFrom(DateTime dt, IntPtr _taos)
/// <exception cref="TaosException">A Taos error occurs during execution.</exception>
public new virtual TaosDataReader ExecuteReader(CommandBehavior behavior)
{
var _taos = _connection.TakeClient();
var dr= ExecuteReader(behavior, _taos);
dr.OnDispose += (object sender, EventArgs e)=>_connection.ReturnClient(_taos);
return dr;
}

private TaosDataReader ExecuteReader(CommandBehavior behavior,IntPtr _taos)
{
if ((behavior & ~(CommandBehavior.Default | CommandBehavior.SequentialAccess | CommandBehavior.SingleResult
| CommandBehavior.SingleRow | CommandBehavior.CloseConnection)) != 0)
{
Expand Down Expand Up @@ -322,7 +330,7 @@ public new virtual TaosDataReader ExecuteReader(CommandBehavior behavior)
var closeConnection = (behavior & CommandBehavior.CloseConnection) != 0;
try
{
var _taos = _connection.TakeClient();

#if DEBUG
Debug.WriteLine($"_commandText:{_commandText}");
#endif
Expand Down Expand Up @@ -605,10 +613,14 @@ public override int ExecuteNonQuery()
{
throw new InvalidOperationException($"CallRequiresSetCommandText{nameof(ExecuteNonQuery)}");
}
using (var reader = ExecuteReader())
var _taos= _connection.TakeClient();
int result = -1;
using (var reader = ExecuteReader( CommandBehavior.Default,_taos))
{
return reader.RecordsAffected;
result= reader.RecordsAffected;
}
_connection.ReturnClient(_taos);
return result;
}

/// <summary>
Expand All @@ -626,13 +638,16 @@ public override object ExecuteScalar()
{
throw new InvalidOperationException($"CallRequiresSetCommandText{nameof(ExecuteScalar)}");
}

var _taos = _connection.TakeClient();
object result =null;
using (var reader = ExecuteReader())
{
return reader.Read()
result= reader.Read()
? reader.GetValue(0)
: null;
}
_connection.ReturnClient(_taos);
return result;
}

/// <summary>
Expand Down
8 changes: 5 additions & 3 deletions src/IoTSharp.Data.Taos/TaosConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public override string ConnectionString

internal TaosConnectionStringBuilder ConnectionStringBuilder { get; set; }

public override int ConnectionTimeout => ConnectionStringBuilder.ConnectionTimeout;


/// <summary>
/// Gets the path to the database file. Will be absolute for open connections.
Expand Down Expand Up @@ -207,18 +209,18 @@ public override void Open()
}
if (!g_pool.ContainsKey(_connectionString))
{
g_pool.Add(_connectionString, new ConcurrentTaosQueue());
g_pool.Add(_connectionString, new ConcurrentTaosQueue() { Timeout= ConnectionTimeout});
}
_queue = g_pool[_connectionString];
_queue.AddRef();
if (ConnectionString == null)
{
throw new InvalidOperationException("Open Requires Set ConnectionString");
}
for (int i = 0; i < ConnectionStringBuilder.PoolSize; i++)
for (int i = 0; i < ConnectionStringBuilder.PoolSize+1; i++)
{
var c = TDengine.Connect(this.DataSource, ConnectionStringBuilder.Username, ConnectionStringBuilder.Password, "", (short)ConnectionStringBuilder.Port);
if (c != null && c!=IntPtr.Zero)
if (c!=IntPtr.Zero)
{
_queue.Return(c);
}
Expand Down
37 changes: 29 additions & 8 deletions src/IoTSharp.Data.Taos/TaosConnectionStringBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class TaosConnectionStringBuilder : DbConnectionStringBuilder
private const string DataBaseKeyword = "DataBase";
private const string PortKeyword = "Port";
private const string PoolSizeKeyword = "PoolSize";

private const string TimeOutKeyword = "TimeOut";
private enum Keywords
{
DataSource,
Expand All @@ -35,7 +35,9 @@ private enum Keywords
Password,
Port,
Charset,
PoolSize
PoolSize,
TimeOut


}

Expand All @@ -48,21 +50,23 @@ private enum Keywords
private string _charset = System.Text.Encoding.UTF8.EncodingName;
private string _password = string.Empty;
private int _port =6030;
private int _PoolSize=8;
private int _PoolSize=Environment.ProcessorCount;
private int _timeout = 30;

static TaosConnectionStringBuilder()
{
var validKeywords = new string[7];
var validKeywords = new string[8];
validKeywords[(int)Keywords.DataSource] = DataSourceKeyword;
validKeywords[(int)Keywords.DataBase] = DataBaseKeyword;
validKeywords[(int)Keywords.Username] = UserNameKeyword;
validKeywords[(int)Keywords.Password] = PasswordKeyword;
validKeywords[(int)Keywords.Charset] =CharsetKeyword;
validKeywords[(int)Keywords.Port] = PortKeyword;
validKeywords[(int)Keywords.PoolSize] = PoolSizeKeyword;
validKeywords[(int)Keywords.TimeOut] = TimeOutKeyword;
_validKeywords = validKeywords;

_keywords = new Dictionary<string, Keywords>(7, StringComparer.OrdinalIgnoreCase)
_keywords = new Dictionary<string, Keywords>(8, StringComparer.OrdinalIgnoreCase)
{
[DataSourceKeyword] = Keywords.DataSource,
[UserNameKeyword] = Keywords.Username,
Expand All @@ -71,7 +75,8 @@ static TaosConnectionStringBuilder()
[CharsetKeyword] = Keywords.Charset,
[DataSourceNoSpaceKeyword] = Keywords.DataSource,
[PortKeyword] = Keywords.Port,
[PoolSizeKeyword] = Keywords.PoolSize
[PoolSizeKeyword] = Keywords.PoolSize,
[TimeOutKeyword] = Keywords.TimeOut
};
}

Expand Down Expand Up @@ -100,6 +105,8 @@ public virtual string DataSource
get => _dataSource;
set => base[DataSourceKeyword] = _dataSource = value;
}


public virtual string Username
{
get => _userName;
Expand Down Expand Up @@ -164,7 +171,13 @@ public virtual string DataBase
}
internal bool ForceDatabaseName { get; set; } = false;



public int ConnectionTimeout
{
get => _timeout;
set => base[TimeOutKeyword] = _timeout = value;
}

/// <summary>
/// Gets or sets the value associated with the specified key.
Expand Down Expand Up @@ -206,6 +219,9 @@ public virtual string DataBase
case Keywords.PoolSize:
PoolSize = Convert.ToInt32(value, CultureInfo.InvariantCulture);
return;
case Keywords.TimeOut:
ConnectionTimeout = Convert.ToInt32(value, CultureInfo.InvariantCulture);
return;
default:
Debug.Assert(false, "Unexpected keyword: " + keyword);
return;
Expand Down Expand Up @@ -330,6 +346,8 @@ private object GetAt(Keywords index)
return Charset;
case Keywords.PoolSize:
return PoolSize;
case Keywords.TimeOut:
return ConnectionTimeout;
default:
Debug.Assert(false, "Unexpected keyword: " + index);
return null;
Expand Down Expand Up @@ -358,13 +376,16 @@ private void Reset(Keywords index)
_dataBase = string.Empty;
return;
case Keywords.Port:
_port=6060;
_port=6030;
return;
case Keywords.Charset:
_charset = System.Text.Encoding.UTF8.EncodingName;
return;
case Keywords.PoolSize:
_PoolSize = 8;
_PoolSize = Environment.ProcessorCount;
return;
case Keywords.TimeOut :
_timeout = 30;
return;
default:
Debug.Assert(false, "Unexpected keyword: " + index);
Expand Down
11 changes: 6 additions & 5 deletions src/IoTSharp.Data.Taos/TaosDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public class TaosDataReader : DbDataReader
private readonly int _recordsAffected;
private bool _closed;
private bool _closeConnection;
private readonly IntPtr _taos;
private IntPtr _taosResult;
private int _fieldCount;
IntPtr rowdata=IntPtr.Zero;
Expand All @@ -33,11 +32,12 @@ public class TaosDataReader : DbDataReader
private DateTime _dt1970;
private TAOS_BIND[] _binds;



internal TaosDataReader(TaosCommand taosCommand, taosField[] metas, bool closeConnection, IntPtr taos, IntPtr res, int recordsAffected, int fieldcount, TAOS_BIND[] binds)
{
_command = taosCommand;
_closeConnection = closeConnection;
_taos = taos;
_fieldCount = fieldcount;
_hasRows = recordsAffected > 0;
_recordsAffected = recordsAffected;
Expand Down Expand Up @@ -145,6 +145,8 @@ public override bool NextResult()
public override void Close()
=> Dispose(true);


internal event EventHandler OnDispose;
/// <summary>
/// Releases any resources used by the data reader and closes it.
/// </summary>
Expand All @@ -160,8 +162,7 @@ protected override void Dispose(bool disposing)
_command.DataReader = null;

_closed = true;



TDengine.FreeResult(_taosResult);
if (_binds != null)
{
Expand All @@ -172,7 +173,7 @@ protected override void Dispose(bool disposing)
{
TDengine.FreeResult(rowdata);
}
_command.Connection.ReturnClient(_taos);
OnDispose?.Invoke(this, EventArgs.Empty);
}

/// <summary>
Expand Down

0 comments on commit 7961a24

Please sign in to comment.