-
Notifications
You must be signed in to change notification settings - Fork 211
/
Copy pathProgram.cs
106 lines (89 loc) · 4.28 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Sql;
namespace Microsoft.Spark.CSharp.Examples
{
/// <summary>
/// This example shows how to use Cassandra in .NET Apache Spark DataFrame API
/// It covers load rows from Cassandra, filter it and save results to another table
/// </summary>
public class CassandraDataFrameExample
{
static void Main(string[] args)
{
var cassandraHostName = "localhost";
var cassandraKeySpace = "ks";
var cassandraTableToRead = "users";
var cassandraTableToInsert = "filteredusers";
if (args.Length == 4)
{
cassandraHostName = args[0];
cassandraKeySpace = args[1];
cassandraTableToRead = args[2];
cassandraTableToInsert = args[3];
}
/*
** CQL used to create table in Cassandra for this example **
CREATE TABLE users (
username VARCHAR,
firstname VARCHAR,
lastname VARCHAR,
PRIMARY KEY (username)
);
INSERT INTO ks.users (username, firstname, lastname) VALUES ('JD123', 'John', 'Doe');
INSERT INTO ks.users (username, firstname, lastname) VALUES ('BillJ', 'Bill', 'Jones');
INSERT INTO ks.users (username, firstname, lastname) VALUES ('SL', 'Steve', 'Little');
CREATE TABLE filteredusers (
username VARCHAR,
firstname VARCHAR,
lastname VARCHAR,
PRIMARY KEY (username)
);
*/
var sparkConf = new SparkConf().Set("spark.cassandra.connection.host", cassandraHostName);
var sparkContext = new SparkContext(sparkConf);
var sqlContext = new SqlContext(sparkContext);
//read from cassandra table
var usersDataFrame =
sqlContext.Read()
.Format("org.apache.spark.sql.cassandra")
.Options(new Dictionary<string, string> { {"keyspace", cassandraKeySpace }, { "table", cassandraTableToRead } })
.Load();
//display rows in the console
usersDataFrame.Show();
var createTempTableStatement =
string.Format(
"CREATE TEMPORARY TABLE userstemp USING org.apache.spark.sql.cassandra OPTIONS(table \"{0}\", keyspace \"{1}\")",
cassandraTableToRead,
cassandraKeySpace);
//create a temp table
sqlContext.Sql(createTempTableStatement);
//read from temp table, filter it and display schema and rows
var filteredUsersDataFrame = sqlContext.Sql("SELECT * FROM userstemp").Filter("username = 'SL'");
filteredUsersDataFrame.ShowSchema();
filteredUsersDataFrame.Show();
//write filtered rows to another table
filteredUsersDataFrame.Write()
.Format("org.apache.spark.sql.cassandra")
.Options(new Dictionary<string, string> { { "keyspace", cassandraKeySpace }, { "table", cassandraTableToInsert } })
.Save();
//convert to RDD, execute map & filter and collect result
var rddCollectedItems = usersDataFrame.ToRDD()
.Map(
r =>
string.Format("{0},{1},{2}", r.GetAs<string>("username"),
r.GetAs<string>("firstname"),
r.GetAs<string>("lastname")))
.Filter(s => s.Contains("SL"))
.Collect();
foreach (var rddCollectedItem in rddCollectedItems)
{
Console.WriteLine(rddCollectedItem);
}
Console.WriteLine("Completed running example");
}
}
}