/
S?Oquerywithoutpartitions.scala
126 lines (89 loc) · 4.78 KB
/
S?Oquerywithoutpartitions.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import java.nio.file.{Paths, Files}
import java.nio.charset.StandardCharsets
import scala.io.Source.fromFile
import scala.io.Source.fromInputStream
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.HashPartitioner
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import scala.util.Try
import java.nio.file.{ Files, Paths }
import org.apache.spark.sql.hive.HiveContext;
import java.io.File
case class Auction(Subject: Option[String], Predicate: Option[String],Object:Option[String])
object FullMaterialization {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("SimpleApp");
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import sqlContext.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};
//Mapping from files to DataFrame
val auction1 = sc.textFile("s3://dataversionsarchiving/data/1.nt").map(_.split(" ")).map(p =>
Auction(Try(p(0).toString()).toOption,Try(p(1).toString()).toOption,Try(p(2).toString()).toOption)).toDF()
//Adding a version tag
val df1=auction1.withColumn("version",lit("v1"))
val auction2 = sc.textFile("s3://dataversionsarchiving/data/5.nt").map(_.split(" ")).map(p =>
Auction(Try(p(0).toString()).toOption,Try(p(1).toString()).toOption,Try(p(2).toString()).toOption)).toDF()
val df2=auction2.withColumn("version",lit("v5"))
val auction3 = sc.textFile("s3://dataversionsarchiving/data/10.nt").map(_.split(" ")).map(p =>
Auction(Try(p(0).toString()).toOption,Try(p(1).toString()).toOption,Try(p(2).toString()).toOption)).toDF()
val df3=auction3.withColumn("version",lit("v10"))
val auction4 = sc.textFile("s3://dataversionsarchiving/data/15.nt").map(_.split(" ")).map(p =>
Auction(Try(p(0).toString()).toOption,Try(p(1).toString()).toOption,Try(p(2).toString()).toOption)).toDF()
val df4=auction4.withColumn("version",lit("v15"))
val auction5 = sc.textFile("s3://dataversionsarchiving/data/20.nt").map(_.split(" ")).map(p =>
Auction(Try(p(0).toString()).toOption,Try(p(1).toString()).toOption,Try(p(2).toString()).toOption)).toDF()
val df5=auction5.withColumn("version",lit("v20"))
val auction6 = sc.textFile("s3://dataversionsarchiving/data/25.nt").map(_.split(" ")).map(p =>
Auction(Try(p(0).toString()).toOption,Try(p(1).toString()).toOption,Try(p(2).toString()).toOption)).toDF()
val df6=auction6.withColumn("version",lit("v25"))
val auction7= sc.textFile("s3://dataversionsarchiving/data/30.nt").map(_.split(" ")).map(p =>
Auction(Try(p(0).toString()).toOption,Try(p(1).toString()).toOption,Try(p(2).toString()).toOption)).toDF()
val df7=auction7.withColumn("version",lit("v30"))
val auction8 = sc.textFile("s3://dataversionsarchiving/data/35.nt").map(_.split(" ")).map(p =>
Auction(Try(p(0).toString()).toOption,Try(p(1).toString()).toOption,Try(p(2).toString()).toOption)).toDF()
val df8 =auction8.withColumn("version",lit("v35"))
val auction9 = sc.textFile("s3://dataversionsarchiving/data/40.nt").map(_.split(" ")).map(p =>
Auction(Try(p(0).toString()).toOption,Try(p(1).toString()).toOption,Try(p(2).toString()).toOption)).toDF()
val df9=auction9.withColumn("version",lit("v40"))
val res=df1.union(df2)
val res1 =res.union(df3)
val res2 =res1.union(df4)
val res3 =res2.union(df5)
val res4 =res3.union(df6)
val res5 =res4.union(df7)
val res6 =res5.union(df8)
val res7 =res6.union(df9)
res7.write.mode(SaveMode.Append).saveAsTable("TableWithoutPartitions")
val now = System.nanoTime
//S?O query
val df10=sqlContext.sql("SELECT Subject,Predicate,Object from TableWithoutPartitions where version ='v5' and Subject='<acct:nicoletta@beta.teamlife.it#acct>' and Object='<acct:nicoletta@beta.teamlife.it>'").write.saveAsTable("TableS?O")
val micros = (System.nanoTime - now) / 1000
println("%d microseconds".format(micros))
}}