Skip to content

Commit

Permalink
modified instrumentation interface: TracingSession is now only availa…
Browse files Browse the repository at this point in the history
…ble through dedicated ICluster::QueryTracingInfo

This solves performance problems and this let client to query when they require performance info
  • Loading branch information
pchalamet committed May 14, 2013
1 parent b4a3913 commit f3eb49d
Show file tree
Hide file tree
Showing 23 changed files with 522 additions and 323 deletions.
9 changes: 4 additions & 5 deletions CHANGES.txt
@@ -1,17 +1,16 @@
version 3.3.0
version 3.3.0
NOT RELEASED
- removed obsolete operations (fluent interface is the way to go)
- log connection building error, avoid GC TcpClient if building goes bad
- simplified command builder extensions - decoupled builder & mapper
- simplified command builder extensions - decoupled builder and mapper
- fixed 100% cpu when linux hosted server disconnect
- modified instrumentation interface: TracingSession is now no more available - use ICluster::QueryTracingSession
when required

version 3.2.1
NOT RELEASED
- added releaseNotes to nuget packages

version 3.2.1
- fixed issue 49 (https://github.com/pchalamet/cassandra-sharp/issues/49)

version 3.2.0
- use culture invariant for identifier conversion
- ICqlCommand.Execute & IPreparedQuery.Execute with all parameters are depreacated. Use fluent interface instead
Expand Down
Expand Up @@ -26,6 +26,6 @@ public interface IInstrumentation : IDisposable

void ClientTrace(InstrumentationToken token, EventType eventType);

void ServerTrace(InstrumentationToken token, TracingSession tracingSession);
void ServerTrace(InstrumentationToken token, Guid tracingId);
}
}
40 changes: 38 additions & 2 deletions CassandraSharp.Interfaces/Extensibility/InstrumentationToken.cs
Expand Up @@ -17,14 +17,14 @@ namespace CassandraSharp.Extensibility
{
using System;

public class InstrumentationToken
public class InstrumentationToken : IEquatable<InstrumentationToken>
{
private InstrumentationToken(RequestType type, ExecutionFlags executionFlags, string cql)
{
Id = Guid.NewGuid();
Type = type;
ExecutionFlags = executionFlags;
Cql = cql;
Cql = cql ?? string.Empty;
}

public string Cql { get; private set; }
Expand All @@ -35,6 +35,42 @@ private InstrumentationToken(RequestType type, ExecutionFlags executionFlags, st

public ExecutionFlags ExecutionFlags { get; private set; }

public bool Equals(InstrumentationToken other)
{
if (null == other)
{
return false;
}

bool bRes = Id == other.Id
&& Type == other.Type
&& ExecutionFlags == other.ExecutionFlags
&& Cql == other.Cql;
return bRes;
}

public override int GetHashCode()
{
const int prime = 31;
int hash = 0;
hash = prime * (hash + Id.GetHashCode());
hash += prime * (hash + Type.GetHashCode());
hash += prime * (hash + ExecutionFlags.GetHashCode());
hash += prime * (hash + Cql.GetHashCode());
return hash;
}

public override bool Equals(object obj)
{
InstrumentationToken other = obj as InstrumentationToken;
if (null != other)
{
return Equals(other);
}

return false;
}

internal static InstrumentationToken Create(RequestType requestType, ExecutionFlags executionFlags, string cql = null)
{
return new InstrumentationToken(requestType, executionFlags, cql);
Expand Down
2 changes: 2 additions & 0 deletions CassandraSharp.Interfaces/Extensibility/TracingEvent.cs
Expand Up @@ -28,6 +28,8 @@ public sealed class TracingEvent

public int SourceElapsed { get; internal set; }

public string Stage { get; internal set; }

public string Thread { get; internal set; }
}
}
75 changes: 75 additions & 0 deletions CassandraSharp.Interfaces/TracingExtensions.cs
@@ -0,0 +1,75 @@
// cassandra-sharp - high performance .NET driver for Apache Cassandra
// Copyright (c) 2011-2013 Pierre Chalamet
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace CassandraSharp
{
using System;
using System.Linq;
using System.Threading.Tasks;
using CassandraSharp.CQLPoco;
using CassandraSharp.Extensibility;

public static class TracingExtensions
{
private static int CompareTracingEvent(TracingEvent x, TracingEvent y)
{
if (x.SourceElapsed < y.SourceElapsed)
{
return -1;
}

if (x.SourceElapsed > y.SourceElapsed)
{
return 1;
}

return 0;
}

public static TracingSession QueryTracingInfo(this ICluster @this, Guid tracingId)
{
var cmd = @this.CreatePocoCommand();

// query events and session
string queryEvents = "select * from system_traces.events where session_id = " + tracingId;
var obsEvents = cmd.Execute<TracingEvent>(queryEvents)
.WithConsistencyLevel(ConsistencyLevel.ONE)
.AsFuture();

string querySession = "select * from system_traces.sessions where session_id = " + tracingId;
var obsSession = cmd.Execute<TracingSession>(querySession)
.WithConsistencyLevel(ConsistencyLevel.ONE)
.AsFuture();

Task.WaitAll(obsEvents, obsSession);

// format the events
var tracingEvents = obsEvents.Result.ToList();
tracingEvents.Sort(CompareTracingEvent);
TracingEvent[] events = tracingEvents.ToArray();
foreach (var evt in events)
{
string[] tmp = evt.Thread.Split(':');
evt.Stage = tmp[0];
evt.Thread = tmp[1];
}

// build the result
TracingSession tracingSession = obsSession.Result.Single();
tracingSession.TracingEvents = events;
return tracingSession;
}
}
}
9 changes: 8 additions & 1 deletion CassandraSharp/CQLBinaryProtocol/Queries/CqlQuery.cs
Expand Up @@ -99,9 +99,16 @@ private static IEnumerable<object> ReadRows(IFrameReader frameReader, IColumnSpe
for (int rowIdx = 0; rowIdx < rowCount; ++rowIdx)
{
IInstanceBuilder instanceBuilder = mapperFactory.CreateBuilder();

byte[][] rawDatas = new byte[columnSpecs.Length][];
foreach (ColumnSpec colSpec in columnSpecs)
{
rawDatas[colSpec.Index] = stream.ReadByteArray();
}

foreach (ColumnSpec colSpec in columnSpecs)
{
byte[] rawData = stream.ReadByteArray();
byte[] rawData = rawDatas[colSpec.Index];

object data = null;
if (null != rawData)
Expand Down
3 changes: 2 additions & 1 deletion CassandraSharp/Instrumentation/NullInstrumentation.cs
Expand Up @@ -15,6 +15,7 @@

namespace CassandraSharp.Instrumentation
{
using System;
using System.Net;
using CassandraSharp.Extensibility;

Expand All @@ -32,7 +33,7 @@ public void ClientTrace(InstrumentationToken token, EventType eventType)
{
}

public void ServerTrace(InstrumentationToken token, TracingSession tracingSession)
public void ServerTrace(InstrumentationToken token, Guid tracingId)
{
}

Expand Down
87 changes: 0 additions & 87 deletions CassandraSharp/Instrumentation/TracingHelpers.cs

This file was deleted.

26 changes: 12 additions & 14 deletions CassandraSharp/Transport/LongRunningConnection.cs
Expand Up @@ -27,7 +27,6 @@ namespace CassandraSharp.Transport
using CassandraSharp.CQLBinaryProtocol.Queries;
using CassandraSharp.Config;
using CassandraSharp.Extensibility;
using CassandraSharp.Instrumentation;
using CassandraSharp.Utils;

internal sealed class LongRunningConnection : IConnection,
Expand Down Expand Up @@ -127,7 +126,7 @@ public LongRunningConnection(IPAddress address, TransportConfig config, ILogger

public void Dispose()
{
Close(false);
Close(false, null);
}

public static void SetTcpKeepAlive(Socket socket, int keepaliveTime, int keepaliveInterval)
Expand All @@ -146,7 +145,7 @@ public static void SetTcpKeepAlive(Socket socket, int keepaliveTime, int keepali
socket.IOControl(IOControlCode.KeepAliveValues, inOptionValues, null);
}

private void Close(bool notifyFailure)
private void Close(bool notifyFailure, Exception ex)
{
// already in close state ?
lock (_lock)
Expand All @@ -171,6 +170,11 @@ private void Close(bool notifyFailure)

if (notifyFailure && null != OnFailure)
{
if (null != ex)
{
_logger.Fatal("Failed with error : {0}", ex);
}

FailureEventArgs failureEventArgs = new FailureEventArgs(null);
OnFailure(this, failureEventArgs);
}
Expand All @@ -186,8 +190,7 @@ private void SendQueryWorker()
}
catch (Exception ex)
{
_logger.Fatal("Error while trying to send query : {0}", ex);
HandleError();
HandleError(ex);
}
}

Expand Down Expand Up @@ -265,8 +268,7 @@ private void ReadResponseWorker()
}
catch (Exception ex)
{
_logger.Fatal("Error while trying to receive response: {0}", ex);
HandleError();
HandleError(ex);
}
}

Expand Down Expand Up @@ -316,16 +318,15 @@ private void ReadResponse()
InstrumentationToken token = queryInfo.Token;
if (0 != (token.ExecutionFlags & ExecutionFlags.ServerTracing))
{
_logger.Debug("Requesting tracing info for query {0}", frameReader.TraceId);
TracingHelpers.AsyncQueryAndPushTracingSession(this, frameReader.TraceId, token, _instrumentation, _logger);
_instrumentation.ServerTrace(token, frameReader.TraceId);
}
}
}
}

private void HandleError()
private void HandleError(Exception ex)
{
Close(true);
Close(true, ex);
}

private void GetOptions()
Expand All @@ -337,8 +338,6 @@ private void GetOptions()
private void ReadifyConnection()
{
var obsReady = new ReadyQuery(this, _config.CqlVersion).AsFuture();
obsReady.Wait();

bool authenticate = obsReady.Result.Single();
if (authenticate)
{
Expand All @@ -354,7 +353,6 @@ private void Authenticate()
}

var obsAuth = new AuthenticateQuery(this, _config.User, _config.Password).AsFuture();
obsAuth.Wait();
if (! obsAuth.Result.Single())
{
throw new InvalidCredentialException();
Expand Down

0 comments on commit f3eb49d

Please sign in to comment.