Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

加入线程池 #224

Merged
merged 2 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ services:
- taos-network

taos:
image: tdengine/tdengine:2.4.0.12
image: tdengine/tdengine:2.6.0.12
restart: always
container_name: taos
hostname: taos
Expand Down
2 changes: 1 addition & 1 deletion src/Example/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ RUN echo "deb https://mirrors.tuna.tsinghua.edu.cn/debian/ bullseye main contrib
echo "deb https://mirrors.tuna.tsinghua.edu.cn/debian/ bullseye-backports main contrib non-free" >> /etc/apt/sources.list && \
apt-get -y -q update && apt-get install -y -q apt-utils libgdiplus libc6-dev lsof net-tools wget curl iputils-ping inetutils-tools && \
apt-get autoremove -y && apt-get clean && apt-get autoclean && rm /var/cache/apt/* -rf && ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
RUN curl -o TDengine-client.tar.gz "https://www.taosdata.com/assets-download/TDengine-client-2.4.0.12-Linux-x64.tar.gz" && \
RUN curl -o TDengine-client.tar.gz "https://www.taosdata.com/assets-download/TDengine-client-2.6.0.12-Linux-x64.tar.gz" && \
tar -xvf TDengine-client.tar.gz && rm TDengine-client.tar.gz -f && cd $(ls TDengine-client* -d) && \
./install_client.sh && \
rm $(pwd) -rf
Expand Down
46 changes: 39 additions & 7 deletions src/Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Data.Common;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace TaosADODemo
{
Expand All @@ -21,7 +22,7 @@ static void Main(string[] args)
string database = "db_" + DateTime.Now.ToString("yyyyMMddHHmmss");
var builder = new TaosConnectionStringBuilder()
{
DataSource = "airleaderserver",
DataSource = "taos",
DataBase = database,
Username = "root",
Password = "taosdata",
Expand Down Expand Up @@ -61,7 +62,7 @@ static void Main(string[] args)
Console.WriteLine("");
ConsoleTableBuilder.From(reader.ToDataTable()).WithFormat(ConsoleTableBuilderFormat.MarkDown).ExportAndWriteLine();


connection.ChangeDatabase(database);
Console.WriteLine("");
connection.CreateCommand($"CREATE TABLE datas ('reportTime' timestamp, type int, 'bufferedEnd' bool, address nchar(64), parameter nchar(64), value nchar(64)) TAGS ('boxCode' nchar(64), 'machineId' int);").ExecuteNonQuery();
connection.CreateCommand($"INSERT INTO data_history_67 USING datas TAGS (mongo, 67) values ( 1608173534840 2 false 'Channel1.窑.烟囱温度' '烟囱温度' '122.00' );").ExecuteNonQuery();
Expand All @@ -84,12 +85,27 @@ static void Main(string[] args)
connection.CreateCommand("CREATE TABLE IF NOT EXISTS telemetrydata (ts timestamp,value_type tinyint, value_boolean bool, value_string binary(10240), value_long bigint,value_datetime timestamp,value_double double) TAGS (deviceid binary(32),keyname binary(64));").ExecuteNonQuery();
var devid1 = $"{Guid.NewGuid():N}";
var devid2 = $"{Guid.NewGuid():N}";
UploadTelemetryData(connection, devid1, "1#air-compressor-two-level-discharge-temperature", 2000);
UploadTelemetryData(connection, devid2, "1#air-compressor-load-rate", 2000);
var devid3 = $"{Guid.NewGuid():N}";
var devid4 = $"{Guid.NewGuid():N}";
DateTime dt = DateTime.Now;
UploadTelemetryData(connection, devid1, "1#air-compressor-two-level-discharge-temperature", 5000);
UploadTelemetryData(connection, devid2, "1#air-compressor-load-rate", 5000);
Console.WriteLine("");
Console.WriteLine("");
Console.WriteLine("");
DateTime dt2 = DateTime.Now;
UploadTelemetryDataPool(connection, devid3, "1#air-compressor-two-level-discharge-temperature1", 5000);
UploadTelemetryDataPool(connection, devid4, "1#air-compressor-load-rate1", 5000);
var t1 = DateTime.Now.Subtract(dt).TotalSeconds;
var t2 = DateTime.Now.Subtract(dt2).TotalSeconds;
Console.WriteLine("Done");
Thread.Sleep(TimeSpan.FromSeconds(1));
Console.WriteLine($"UploadTelemetryData 耗时:{t1}");
Console.WriteLine($"UploadTelemetryDataPool 耗时:{t2}");
Thread.Sleep(TimeSpan.FromSeconds(2));
var reader2 = connection.CreateCommand("select last_row(*) from telemetrydata group by deviceid,keyname ;").ExecuteReader();
ConsoleTableBuilder.From(reader2.ToDataTable()).WithFormat(ConsoleTableBuilderFormat.Default).ExportAndWriteLine();
var reader3 = connection.CreateCommand("select * from telemetrydata").ExecuteReader();

List<string> list = new List<string>();
while (reader3.Read())
{
Expand Down Expand Up @@ -120,7 +136,7 @@ static void Main(string[] args)
string[] jsonStr = {
"{"
+"\"metric\": \"stb0_0\","
+"\"timestamp\": 1626006833,"
+$"\"timestamp\": {DateTimeOffset.Now.ToUnixTimeSeconds()},"
+"\"value\": 10,"
+"\"tags\": {"
+" \"t1\": true,"
Expand Down Expand Up @@ -169,7 +185,7 @@ static JObject AddTag(JObject tags, string name, object value, string type)
payload.Add("metric", "stb3_0");

var timestamp = new JObject();
timestamp.Add("value", 1626006833);
timestamp.Add("value", DateTimeOffset.Now.ToUnixTimeSeconds() );
timestamp.Add("type", "s");
payload.Add("timestamp", timestamp);

Expand Down Expand Up @@ -245,5 +261,21 @@ static void UploadTelemetryData( TaosConnection connection, string devid, strin
connection.CreateCommand($"INSERT INTO device_{devid} USING telemetrydata TAGS(\"{devid}\",\"{keyname}\") values (now,2,true,'{i}',{i},now,{i});").ExecuteNonQuery();
}
}

static void UploadTelemetryDataPool(TaosConnection connection, string devid, string keyname, int count)
{
Parallel.For(0, count,new ParallelOptions() { MaxDegreeOfParallelism=connection.PoolSize }, i =>
{
try
{
connection.CreateCommand($"INSERT INTO device_{devid} USING telemetrydata TAGS(\"{devid}\",\"{keyname}\") values (now,2,true,'{i}',{i},now,{i});").ExecuteNonQuery();
Console.WriteLine($"线程:{Thread.CurrentThread.ManagedThreadId} 第{i}条数据, OK");
}
catch (Exception ex)
{
Console.WriteLine($"线程:{Thread.CurrentThread.ManagedThreadId} 第{i}条数据, {ex.Message}");
}
});
}
}
}
80 changes: 80 additions & 0 deletions src/IoTSharp.Data.Taos/ConcurrentTaosQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace IoTSharp.Data.Taos
{
public class ConcurrentTaosQueue
{
public ConcurrentQueue<IntPtr> TaosQueue { get; }

public ConcurrentTaosQueue(List<IntPtr> clients)
{
TaosQueue = new ConcurrentQueue<IntPtr>(clients);
}

public ConcurrentTaosQueue()
{
TaosQueue = new ConcurrentQueue<IntPtr>();
}

public void Return(IntPtr client)
{
Monitor.Enter(TaosQueue);
TaosQueue.Enqueue(client);
Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 归还 {client}");
Monitor.Pulse(TaosQueue);
Monitor.Exit(TaosQueue);
Thread.Sleep(0);
}
int _ref = 0;
public void AddRef()
{
lock (this)
{
_ref++;
}
}
public int GetRef()
{
return _ref;
}
public void RemoveRef()
{
lock (this)
{
_ref--;
}
}
public int Timeout { get; set; }
public IntPtr Take()
{
IntPtr client = IntPtr.Zero;
Monitor.Enter(TaosQueue);
if (TaosQueue.IsEmpty)
{
Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 连接池已空,请等待 超时时长:{Timeout}");
Monitor.Wait(TaosQueue, TimeSpan.FromSeconds(Timeout));
}
if (!TaosQueue.TryDequeue(out client))
{
Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 从连接池获取连接失败,等待并重试");
}
else
{
Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 拿走 {client}");
}
Monitor.Exit(TaosQueue);
if (client == IntPtr.Zero)
{
throw new TimeoutException($"Connection pool is empty and wait time out({Timeout}s)!");
}
return client;
}
}
}
2 changes: 1 addition & 1 deletion src/IoTSharp.Data.Taos/Driver/TDengineDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ static public string Error(IntPtr res)
static extern public int ErrorNo(IntPtr res);

[DllImport("taos", EntryPoint = "taos_query", CallingConvention = CallingConvention.Cdecl)]
// static extern public IntPtr Query(IntPtr conn, string sqlstr);
static extern private IntPtr Query(IntPtr conn, IntPtr byteArr);

[DllImport("taos", EntryPoint = "taos_stop_query", CallingConvention = CallingConvention.Cdecl)]
public static extern void StopQuery(IntPtr taos);

Expand Down
77 changes: 47 additions & 30 deletions src/IoTSharp.Data.Taos/TaosCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class TaosCommand : DbCommand
private readonly DateTime _dt1970;
private TaosConnection _connection;
private string _commandText;
private IntPtr _taos => _connection._taos;

/// <summary>
/// Initializes a new instance of the <see cref="TaosCommand" /> class.
/// </summary>
Expand Down Expand Up @@ -116,10 +116,9 @@ public new virtual TaosConnection Connection
{
throw new InvalidOperationException($"SetRequiresNoOpenReader{nameof(Connection)}");
}

if (value != _connection)
{

_connection?.RemoveCommand(this);
_connection = value;
value?.AddCommand(this);
Expand All @@ -137,6 +136,7 @@ protected override DbConnection DbConnection
set => Connection = (TaosConnection)value;
}


/// <summary>
/// Gets or sets the transaction within which the command executes.
/// </summary>
Expand Down Expand Up @@ -289,6 +289,14 @@ internal long GetDateTimeFrom(DateTime dt, IntPtr _taos)
/// <exception cref="TaosException">A Taos error occurs during execution.</exception>
public new virtual TaosDataReader ExecuteReader(CommandBehavior behavior)
{
var _taos = _connection.TakeClient();
var dr= ExecuteReader(behavior, _taos);
dr.OnDispose += (object sender, EventArgs e)=>_connection.ReturnClient(_taos);
return dr;
}

private TaosDataReader ExecuteReader(CommandBehavior behavior,IntPtr _taos)
{
if ((behavior & ~(CommandBehavior.Default | CommandBehavior.SequentialAccess | CommandBehavior.SingleResult
| CommandBehavior.SingleRow | CommandBehavior.CloseConnection)) != 0)
{
Expand Down Expand Up @@ -317,12 +325,12 @@ public new virtual TaosDataReader ExecuteReader(CommandBehavior behavior)
{
throw new InvalidOperationException($"CallRequiresSetCommandText{nameof(ExecuteReader)}");
}

var unprepared = false;
TaosDataReader dataReader = null;
var closeConnection = (behavior & CommandBehavior.CloseConnection) != 0;
try
{

#if DEBUG
Debug.WriteLine($"_commandText:{_commandText}");
#endif
Expand All @@ -336,7 +344,7 @@ public new virtual TaosDataReader ExecuteReader(CommandBehavior behavior)
if (stmt != IntPtr.Zero)
{
var pms = _parameters.Value;
binds = BindParamters(pms);
binds = BindParamters(pms,_taos);
int res = TDengine.StmtPrepare(stmt, _commandText);
if (res == 0)
{
Expand Down Expand Up @@ -415,31 +423,33 @@ public new virtual TaosDataReader ExecuteReader(CommandBehavior behavior)
}
}
#endif
dataReader = new TaosDataReader(this, metas, closeConnection, code.Result, _affectRows, metas?.Length ?? 0, binds);
}
else if (isok && TDengine.ErrorNo(code.Result) != 0)
{
TaosException.ThrowExceptionForRC(_commandText, new TaosErrorResult() { Code = TDengine.ErrorNo(code.Result), Error = TDengine.Error(code.Result) });
}
else if (isok && code.Result == IntPtr.Zero)
{
TaosException.ThrowExceptionForRC(_commandText, new TaosErrorResult() { Code = TDengine.ErrorNo(_taos), Error = TDengine.Error(_taos) });
}
else if (code.Status == TaskStatus.Running || !isok)
{
TaosException.ThrowExceptionForRC(-10006, "Execute sql command timeout", null);
}
else if (code.IsCanceled)
{
TaosException.ThrowExceptionForRC(-10003, "Command is Canceled", null);
dataReader = new TaosDataReader(this, metas, closeConnection, _taos, code.Result, _affectRows, metas?.Length ?? 0, binds);
}
else if (code.IsFaulted)
else if (isok && TDengine.ErrorNo(code.Result) != 0)
{
TaosException.ThrowExceptionForRC(-10004, code.Exception.Message, code.Exception?.InnerException);
var ter = new TaosErrorResult() { Code = TDengine.ErrorNo(code.Result), Error = TDengine.Error(code.Result) };
_connection.ReturnClient(_taos);
TaosException.ThrowExceptionForRC(_commandText, ter);
}
else
{
TaosException.ThrowExceptionForRC(_commandText, new TaosErrorResult() { Code = -10007, Error = $"Unknow Exception" });
_connection.ReturnClient(_taos);
if (code.Status == TaskStatus.Running || !isok)
{
TaosException.ThrowExceptionForRC(-10006, "Execute sql command timeout", null);
}
else if (code.IsCanceled)
{
TaosException.ThrowExceptionForRC(-10003, "Command is Canceled", null);
}
else if (code.IsFaulted)
{
TaosException.ThrowExceptionForRC(-10004, code.Exception.Message, code.Exception?.InnerException);
}
else
{
TaosException.ThrowExceptionForRC(_commandText, new TaosErrorResult() { Code = -10007, Error = $"Unknow Exception" });
}
}
}
catch when (unprepared)
Expand All @@ -449,7 +459,7 @@ public new virtual TaosDataReader ExecuteReader(CommandBehavior behavior)
return dataReader;
}

private TAOS_BIND[] BindParamters(TaosParameterCollection pms)
private TAOS_BIND[] BindParamters(TaosParameterCollection pms,IntPtr _taos)
{
TAOS_BIND[] binds = new TAOS_BIND[pms.Count];
for (int i = 0; i < pms.Count; i++)
Expand Down Expand Up @@ -603,10 +613,14 @@ public override int ExecuteNonQuery()
{
throw new InvalidOperationException($"CallRequiresSetCommandText{nameof(ExecuteNonQuery)}");
}
using (var reader = ExecuteReader())
var _taos= _connection.TakeClient();
int result = -1;
using (var reader = ExecuteReader( CommandBehavior.Default,_taos))
{
return reader.RecordsAffected;
result= reader.RecordsAffected;
}
_connection.ReturnClient(_taos);
return result;
}

/// <summary>
Expand All @@ -624,13 +638,16 @@ public override object ExecuteScalar()
{
throw new InvalidOperationException($"CallRequiresSetCommandText{nameof(ExecuteScalar)}");
}

var _taos = _connection.TakeClient();
object result =null;
using (var reader = ExecuteReader())
{
return reader.Read()
result= reader.Read()
? reader.GetValue(0)
: null;
}
_connection.ReturnClient(_taos);
return result;
}

/// <summary>
Expand Down
Loading