Skip to content

Commit cd104bf

Browse files
committed
[Spark 1.4.1] - Refactor DataFrame schema and create data type classes, add CreateDataFrame of SqlContexxt
1 parent fac6f8f commit cd104bf

File tree

19 files changed

+745
-497
lines changed

19 files changed

+745
-497
lines changed

csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@
109109
<Compile Include="Sql\Row.cs" />
110110
<Compile Include="Sql\Functions.cs" />
111111
<Compile Include="Sql\SqlContext.cs" />
112-
<Compile Include="Sql\Struct.cs" />
112+
<Compile Include="Sql\Types.cs" />
113113
<Compile Include="Sql\UserDefinedFunction.cs" />
114114
<Compile Include="Streaming\DStream.cs" />
115115
<Compile Include="Streaming\PairDStreamFunctions.cs" />

csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,13 @@
22
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
33

44
using System;
5+
using System.Collections.Generic;
6+
using System.Linq;
57
using System.Text;
68
using System.IO;
9+
using Newtonsoft.Json;
10+
using Newtonsoft.Json.Linq;
11+
using Newtonsoft.Json.Serialization;
712

813
namespace Microsoft.Spark.CSharp.Interop.Ipc
914
{
@@ -232,4 +237,66 @@ public static void Write(Stream s, string value)
232237
Write(s, buffer);
233238
}
234239
}
240+
241+
/// <summary>
242+
/// Json.NET Serialization/Deserialization helper class.
243+
/// </summary>
244+
public static class JsonSerDe
245+
{
246+
/*
247+
* Note: Scala side uses JSortedObject when parse Json, so the properties in JObject need to be sorted.
248+
*/
249+
250+
/// <summary>
251+
/// Extend method to sort items in a JSON object by keys.
252+
/// </summary>
253+
/// <param name="jObject"></param>
254+
/// <returns></returns>
255+
public static JObject SortProperties(this JObject jObject)
256+
{
257+
JObject sortedJObject = new JObject();
258+
foreach (var property in jObject.Properties().OrderBy(p => p.Name))
259+
{
260+
if (property.Value is JObject)
261+
{
262+
var propJObject = property.Value as JObject;
263+
sortedJObject.Add(property.Name, propJObject.SortProperties());
264+
}
265+
else if (property.Value is JArray)
266+
{
267+
var propJArray = property.Value as JArray;
268+
sortedJObject.Add(property.Name, propJArray.SortProperties());
269+
}
270+
else
271+
{
272+
sortedJObject.Add(property.Name, property.Value);
273+
}
274+
}
275+
return sortedJObject;
276+
}
277+
278+
/// <summary>
279+
/// Extend method to sort items in a JSON array by keys.
280+
/// </summary>
281+
public static JArray SortProperties(this JArray jArray)
282+
{
283+
JArray sortedJArray = new JArray();
284+
if(jArray.Count == 0) return jArray;
285+
286+
foreach (var item in jArray)
287+
{
288+
if (item is JObject)
289+
{
290+
var sortedItem = ((JObject)item).SortProperties();
291+
sortedJArray.Add(sortedItem);
292+
}
293+
else if (item is JArray)
294+
{
295+
var sortedItem = ((JArray)item).SortProperties();
296+
sortedJArray.Add(sortedItem);
297+
}
298+
}
299+
return sortedJArray;
300+
}
301+
}
235302
}

csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkCLRProxy.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ interface ISparkCLRProxy
1616
ISparkContextProxy SparkContextProxy { get; }
1717
ISparkConfProxy CreateSparkConf(bool loadDefaults = true);
1818
ISparkContextProxy CreateSparkContext(ISparkConfProxy conf);
19-
IStructFieldProxy CreateStructField(string name, string dataType, bool isNullable);
20-
IStructTypeProxy CreateStructType(List<StructField> fields);
2119
IDStreamProxy CreateCSharpDStream(IDStreamProxy jdstream, byte[] func, string deserializer);
2220
IDStreamProxy CreateCSharpTransformed2DStream(IDStreamProxy jdstream, IDStreamProxy jother, byte[] func, string deserializer, string deserializerOther);
2321
IDStreamProxy CreateCSharpReducedWindowedDStream(IDStreamProxy jdstream, byte[] func, byte[] invFunc, int windowSeconds, int slideSeconds, string deserializer);

csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISqlContextProxy.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ namespace Microsoft.Spark.CSharp.Proxy
1313
{
1414
internal interface ISqlContextProxy
1515
{
16+
IDataFrameProxy CreateDataFrame(IRDDProxy rddProxy, IStructTypeProxy structTypeProxy);
1617
IDataFrameProxy ReadDataFrame(string path, StructType schema, Dictionary<string, string> options);
1718
IDataFrameProxy JsonFile(string path);
1819
IDataFrameProxy TextFile(string path, StructType schema, string delimiter);

csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkCLRIpcProxy.cs

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -61,33 +61,6 @@ public ISparkContextProxy CreateSparkContext(ISparkConfProxy conf)
6161
return sparkContextProxy;
6262
}
6363

64-
public IStructFieldProxy CreateStructField(string name, string dataType, bool isNullable)
65-
{
66-
return new StructFieldIpcProxy(
67-
new JvmObjectReference(
68-
JvmBridge.CallStaticJavaMethod(
69-
"org.apache.spark.sql.api.csharp.SQLUtils", "createStructField",
70-
new object[] { name, dataType, isNullable }).ToString()
71-
)
72-
);
73-
}
74-
75-
public IStructTypeProxy CreateStructType(List<StructField> fields)
76-
{
77-
var fieldsReference = fields.Select(s => (s.StructFieldProxy as StructFieldIpcProxy).JvmStructFieldReference).ToList().Cast<JvmObjectReference>();
78-
79-
var seq =
80-
new JvmObjectReference(
81-
JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils",
82-
"toSeq", new object[] { fieldsReference }).ToString());
83-
84-
return new StructTypeIpcProxy(
85-
new JvmObjectReference(
86-
JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "createStructType", new object[] { seq }).ToString()
87-
)
88-
);
89-
}
90-
9164
public IDStreamProxy CreateCSharpDStream(IDStreamProxy jdstream, byte[] func, string deserializer)
9265
{
9366
var jvmDStreamReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.api.csharp.CSharpDStream",

csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkContextIpcProxy.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ public IRDDProxy EmptyRDD()
121121
return new RDDIpcProxy(jvmRddReference);
122122
}
123123

124-
//TODO - this implementation is slow. Replace with call to createRDDFromArray() in CSharpRDD
125124
public IRDDProxy Parallelize(IEnumerable<byte[]> values, int numSlices)
126125
{
127126
var jvmRddReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.csharp.CSharpRDD", "createRDDFromArray", new object[] { jvmSparkContextReference, values, numSlices }));

csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SqlContextIpcProxy.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,17 @@ public SqlContextIpcProxy(JvmObjectReference jvmSqlContextReference)
2121
this.jvmSqlContextReference = jvmSqlContextReference;
2222
}
2323

24+
public IDataFrameProxy CreateDataFrame(IRDDProxy rddProxy, IStructTypeProxy structTypeProxy)
25+
{
26+
var rdd = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "byteArrayRDDToAnyArrayRDD",
27+
new object[] { (rddProxy as RDDIpcProxy).JvmRddReference }).ToString());
28+
29+
return new DataFrameIpcProxy(
30+
new JvmObjectReference(
31+
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSqlContextReference, "applySchemaToPythonRDD",
32+
new object[] { rdd, (structTypeProxy as StructTypeIpcProxy).JvmStructTypeReference }).ToString()), this);
33+
}
34+
2435
public IDataFrameProxy ReadDataFrame(string path, StructType schema, Dictionary<string, string> options)
2536
{
2637
//TODO parameter Dictionary<string, string> options is not used right now - it is meant to be passed on to data sources
@@ -44,7 +55,7 @@ public IDataFrameProxy TextFile(string path, StructType schema, string delimiter
4455
new JvmObjectReference(
4556
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod(
4657
"org.apache.spark.sql.api.csharp.SQLUtils", "loadTextFile",
47-
new object[] {jvmSqlContextReference, path, delimiter, (schema.StructTypeProxy as StructTypeIpcProxy).JvmStructTypeReference}).ToString()
58+
new object[] { jvmSqlContextReference, path, delimiter, schema.Json}).ToString()
4859
), this
4960
);
5061
}

csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@ public class DataFrame
2424
private readonly IDataFrameProxy dataFrameProxy;
2525
[NonSerialized]
2626
private readonly SparkContext sparkContext;
27-
[NonSerialized]
27+
2828
private StructType schema;
29-
private RowSchema rowSchema;
3029
[NonSerialized]
3130
private RDD<Row> rdd;
3231

@@ -39,14 +38,14 @@ public RDD<Row> Rdd
3938
{
4039
get
4140
{
41+
if (schema == null)
42+
{
43+
schema = new StructType(dataFrameProxy.GetSchema());
44+
}
4245
if (rdd == null)
4346
{
44-
if (rowSchema == null)
45-
{
46-
rowSchema = RowSchema.ParseRowSchemaFromJson(Schema.ToJson());
47-
}
4847
IRDDProxy rddProxy = dataFrameProxy.JavaToCSharp();
49-
rdd = new RDD<Object[]>(rddProxy, sparkContext, SerializedMode.Row).Map(item => (Row)new RowImpl(item, rowSchema));
48+
rdd = new RDD<Object[]>(rddProxy, sparkContext, SerializedMode.Row).Map(item => (Row)new RowImpl(item, schema));
5049
}
5150
return rdd;
5251
}
@@ -130,7 +129,7 @@ public void Show(int numberOfRows = 20, bool truncate = true)
130129
/// </summary>
131130
public void ShowSchema()
132131
{
133-
List<string> nameTypeList = Schema.Fields.Select(structField => string.Format("{0}:{1}", structField.Name, structField.DataType.SimpleString())).ToList();
132+
var nameTypeList = Schema.Fields.Select(structField => structField.SimpleString);
134133
Console.WriteLine(string.Join(", ", nameTypeList));
135134
}
136135

@@ -139,18 +138,13 @@ public void ShowSchema()
139138
/// </summary>
140139
public IEnumerable<Row> Collect()
141140
{
142-
if (rowSchema == null)
143-
{
144-
rowSchema = RowSchema.ParseRowSchemaFromJson(Schema.ToJson());
145-
}
146-
147141
IRDDProxy rddProxy = dataFrameProxy.JavaToCSharp();
148142
RDD<Row> rdd = new RDD<Row>(rddProxy, sparkContext, SerializedMode.Row);
149143

150144
int port = rddProxy.CollectAndServe();
151145
foreach (var item in rdd.Collect(port))
152146
{
153-
yield return new RowImpl(item, rowSchema);
147+
yield return new RowImpl(item, Schema);
154148
}
155149
}
156150

0 commit comments

Comments
 (0)