Skip to content

Commit

Permalink
Merge pull request #2 from xiongrenyi/dev
Browse files Browse the repository at this point in the history
merge DataFrame.Collect with RDD.Collect both of which map to the sam…
  • Loading branch information
xiongrenyi committed Nov 19, 2015
2 parents 0dc8f75 + a20f3b4 commit c38e064
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 135 deletions.
2 changes: 0 additions & 2 deletions csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@
<Compile Include="Core\StatusTracker.cs" />
<Compile Include="Core\StorageLevel.cs" />
<Compile Include="Interop\SparkCLREnvironment.cs" />
<Compile Include="Interop\Ipc\SparkCLRSocket.cs" />
<Compile Include="Interop\Ipc\ISparkCLRSocket.cs" />
<Compile Include="Interop\Ipc\IJvmBridge.cs" />
<Compile Include="Interop\Ipc\JvmBridge.cs" />
<Compile Include="Interop\Ipc\JvmObjectReference.cs" />
Expand Down
29 changes: 17 additions & 12 deletions csharp/Adapter/Microsoft.Spark.CSharp/Core/RDD.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using System.Net.Sockets;
using Microsoft.Spark.CSharp.Proxy;
using Microsoft.Spark.CSharp.Interop.Ipc;
using Razorvine.Pickle;

namespace Microsoft.Spark.CSharp.Core
{
Expand Down Expand Up @@ -544,15 +545,14 @@ public void ForeachPartition(Action<IEnumerable<T>> f)
/// Return a list that contains all of the elements in this RDD.
/// </summary>
/// <returns></returns>
public T[] Collect() //TODO - hardcoded for now - need to fix
public T[] Collect()
{
int port = RddProxy.CollectAndServe();
return Collect(port);
return Collect(port).Cast<T>().ToArray();
}

internal T[] Collect(int port)
internal IEnumerable<dynamic> Collect(int port)
{
List<object> items = new List<object>();
IFormatter formatter = new BinaryFormatter();
Socket sock = new Socket(SocketType.Stream, ProtocolType.Tcp);
sock.Connect("127.0.0.1", port);
Expand All @@ -565,24 +565,30 @@ internal T[] Collect(int port)
if (serializedMode == SerializedMode.Byte)
{
MemoryStream ms = new MemoryStream(buffer);
items.Add(formatter.Deserialize(ms));
yield return formatter.Deserialize(ms);
}
else if (serializedMode == SerializedMode.String)
{
items.Add(Encoding.UTF8.GetString(buffer));
yield return Encoding.UTF8.GetString(buffer);
}
else if (serializedMode == SerializedMode.Pair)
{
MemoryStream ms = new MemoryStream(buffer);
MemoryStream ms2 = new MemoryStream(SerDe.ReadBytes(s));

ConstructorInfo ci = typeof(T).GetConstructors()[0];
items.Add(ci.Invoke(new object[] { formatter.Deserialize(ms), formatter.Deserialize(ms2) }));
yield return ci.Invoke(new object[] { formatter.Deserialize(ms), formatter.Deserialize(ms2) });
}
else if (serializedMode == SerializedMode.Row)
{
Unpickler unpickler = new Unpickler();
foreach (var item in (unpickler.loads(buffer) as object[]))
{
yield return item;
}
}
}
}

return items.Cast<T>().ToArray();
}

/// <summary>
Expand Down Expand Up @@ -812,7 +818,7 @@ public T[] Take(int num)
IEnumerable<int> partitions = Enumerable.Range(partsScanned, Math.Min(numPartsToTry, totalParts - partsScanned));
var mappedRDD = MapPartitions<T>(new TakeHelper<T>(left).Execute);
int port = sparkContext.SparkContextProxy.RunJob(mappedRDD.RddProxy, partitions, true);
T[] res = Collect(port);
IEnumerable<T> res = Collect(port).Cast<T>();

items.AddRange(res);
partsScanned += numPartsToTry;
Expand Down Expand Up @@ -1046,8 +1052,7 @@ public IEnumerable<T> ToLocalIterator()
{
var mappedRDD = MapPartitions<T>(iter => iter);
int port = sparkContext.SparkContextProxy.RunJob(mappedRDD.RddProxy, Enumerable.Range(partition, 1), true);
T[] rows = Collect(port);
foreach (T row in rows)
foreach (T row in Collect(port))
yield return row;
}
}
Expand Down
30 changes: 20 additions & 10 deletions csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

namespace Microsoft.Spark.CSharp.Interop.Ipc
{
/// <summary>
/// see PythonRDD.scala with which Work.cs communicates
/// </summary>
public enum SpecialLengths : int
{
END_OF_DATA_SECTION = -1,
Expand Down Expand Up @@ -125,19 +128,26 @@ public static string ReadString(Stream s)

public static byte[] ReadBytes(Stream s, int length)
{
byte[] buffer = new byte[length];
int bytesRead;
int totalBytesRead = 0;
do
if (length < 0)
{
bytesRead = s.Read(buffer, totalBytesRead, length - totalBytesRead);
totalBytesRead += bytesRead;
throw new ArgumentOutOfRangeException("length", length, "length can't be negative.");
}
while (totalBytesRead < length && bytesRead > 0);

if (totalBytesRead < length && totalBytesRead > 0)
throw new ArgumentException(string.Format("Incomplete bytes read: {0}, expected: {1}", totalBytesRead, length));

byte[] buffer = new byte[length];
if (length > 0)
{
int bytesRead;
int totalBytesRead = 0;
do
{
bytesRead = s.Read(buffer, totalBytesRead, length - totalBytesRead);
totalBytesRead += bytesRead;
}
while (totalBytesRead < length && bytesRead > 0);

if (totalBytesRead < length && totalBytesRead > 0)
throw new ArgumentException(string.Format("Incomplete bytes read: {0}, expected: {1}", totalBytesRead, length));
}
return buffer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal interface IDataFrameProxy
{
void RegisterTempTable(string tableName);
long Count();
int CollectAndServe();
IRDDProxy JavaToCSharp();
string GetQueryExecution();
string GetExecutedPlan();
string GetShowString(int numberOfRows, bool truncate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,11 @@ public long Count()
jvmDataFrameReference, "count").ToString());
}

/// <summary>
/// Call CollectAndServe() in Java side, it will collect an RDD as an iterator, then serve it via socket
/// </summary>
/// <returns>the port number of a local socket which serves the data collected</returns>
public int CollectAndServe()
public IRDDProxy JavaToCSharp()
{
var javaRDDReference = new JvmObjectReference(
(string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDataFrameReference, "javaToPython"));
var rddReference = new JvmObjectReference(
(string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(javaRDDReference, "rdd"));
return int.Parse(SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod(
"org.apache.spark.api.python.PythonRDD",
"collectAndServe",
new object[] { rddReference }).ToString());
return new RDDIpcProxy(javaRDDReference);
}

public string GetQueryExecution()
Expand Down
39 changes: 7 additions & 32 deletions csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,45 +104,20 @@ public void ShowSchema()
/// Returns all of Rows in this DataFrame
/// </summary>
public IEnumerable<Row> Collect()
{
int port = dataFrameProxy.CollectAndServe();
RowSchema rs = GetRowSchema();
List<Row> items = Collect(port, rs);
return items.AsEnumerable<Row>();
}

private RowSchema GetRowSchema()
{
if (rowSchema == null)
{
string json = Schema.ToJson();
rowSchema = RowSchema.ParseRowSchemaFromJson(json);
rowSchema = RowSchema.ParseRowSchemaFromJson(Schema.ToJson());
}
return rowSchema;
}

private List<Row> Collect(int port, RowSchema dataType)
{
List<Row> items = new List<Row>();
IFormatter formatter = new BinaryFormatter();
Unpickler unpickler = new Unpickler();
Socket sock = new Socket(SocketType.Stream, ProtocolType.Tcp);
sock.Connect("127.0.0.1", port);

using (NetworkStream s = new NetworkStream(sock))
IRDDProxy rddProxy = dataFrameProxy.JavaToCSharp();
RDD<Row> rdd = new RDD<Row>(rddProxy, sparkContext, SerializedMode.Row);

int port = rddProxy.CollectAndServe();
foreach (var item in rdd.Collect(port))
{
byte[] buffer;
while ((buffer = SerDe.ReadBytes(s)) != null && buffer.Length > 0)
{
foreach (var item in (unpickler.loads(buffer) as object[]))
{
RowImpl row = new RowImpl(item, dataType);
items.Add(row);
}
}
yield return new RowImpl(item, rowSchema);
}

return items;
}

/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions csharp/AdapterTest/Mocks/MockDStreamProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public IDStreamProxy AsJavaDStream()
public void CallForeachRDD(byte[] func, string deserializer)
{
Action<double, RDD<dynamic>> f = (Action<double, RDD<dynamic>>)formatter.Deserialize(new MemoryStream(func));
f(DateTime.UtcNow.Ticks, new RDD<dynamic>(new MockRddProxy(null), new SparkContext("", "")));
f(DateTime.UtcNow.Ticks, new RDD<dynamic>(rddProxy, new SparkContext("", "")));
}

public void Print(int num = 10)
Expand All @@ -63,7 +63,7 @@ public void Checkpoint(long intervalMs)

public IRDDProxy[] Slice(long fromUnixTime, long toUnixTime)
{
return new IRDDProxy[] {(new MockDStreamProxy()) as IRDDProxy};
return new IRDDProxy[] { rddProxy };
}
}
}
49 changes: 2 additions & 47 deletions csharp/AdapterTest/Mocks/MockDataFrameProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ internal class MockDataFrameProxy : IDataFrameProxy
private List<object> mockRows;
private int mockPort;
private IStructTypeProxy mockSchema;
private Task mockSocketServerTask;

public ISqlContextProxy SqlContextProxy
{
Expand Down Expand Up @@ -55,51 +54,6 @@ public long Count()
throw new NotImplementedException();
}


public int CollectAndServe()
{
// start a new task to mock the Socket server side
mockSocketServerTask = Task.Run(() =>
{
// listen to localPort, and create socket
TcpListener listener = new TcpListener(IPAddress.Any, mockPort);
listener.Start();
Socket socket = listener.AcceptSocket();
Stream ns = new NetworkStream(socket);
Pickler picker = new Pickler();
BinaryWriter bw = new BinaryWriter(ns);
// write picked data via socket
foreach (var row in mockRows)
{
byte[] pickledRowData = picker.dumps(new object[] { row });
int pickledRowLength = pickledRowData.Count();
// write the length in BigEndian format
byte[] lengthBuffer = new byte[4];
lengthBuffer[0] = (byte)(pickledRowLength >> 24);
lengthBuffer[1] = (byte)(pickledRowLength >> 16);
lengthBuffer[2] = (byte)(pickledRowLength >> 8);
lengthBuffer[3] = (byte)(pickledRowLength);
bw.Write(lengthBuffer);
bw.Write(pickledRowData);
}
// close the stream and socket
ns.Close();
socket.Close();
}
);

// sleep some time to let the server side start first
Thread.Sleep(100);

return mockPort;
}

public string GetQueryExecution()
{
throw new NotImplementedException();
Expand Down Expand Up @@ -202,7 +156,8 @@ public void Unpersist(bool blocking)

public IRDDProxy JavaToCSharp()
{
throw new NotImplementedException();
Pickler pickler = new Pickler();
return new MockRddProxy(mockRows.Select(r => pickler.dumps(new object[] { r })), true);
}

public IDataFrameProxy Limit(int num)
Expand Down
6 changes: 4 additions & 2 deletions csharp/AdapterTest/Mocks/MockRddProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace AdapterTest.Mocks
internal class MockRddProxy : IRDDProxy
{
internal IEnumerable<dynamic> result;
internal bool pickle;

internal object[] mockRddReference;

Expand All @@ -25,9 +26,10 @@ public MockRddProxy(object[] parameterCollection)
mockRddReference = parameterCollection;
}

public MockRddProxy(IEnumerable<dynamic> result)
public MockRddProxy(IEnumerable<dynamic> result, bool pickle = false)
{
this.result = result;
this.pickle = pickle;
}

public IRDDProxy Distinct<T>()
Expand Down Expand Up @@ -80,7 +82,7 @@ public IRDDProxy Union(IRDDProxy javaRddReferenceOther)

public int CollectAndServe()
{
return MockSparkContextProxy.RunJob();
return MockSparkContextProxy.RunJob(this);
}

public int PartitionLength()
Expand Down

0 comments on commit c38e064

Please sign in to comment.