Skip to content

Commit

Permalink
code
Browse files Browse the repository at this point in the history
  • Loading branch information
Roberto Congiu committed Apr 4, 2015
1 parent d419479 commit 0ea07ae
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 0 deletions.
12 changes: 12 additions & 0 deletions build.sbt
@@ -0,0 +1,12 @@
name := "nested"

version := "1.0"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.2.0"
,"org.apache.spark" %% "spark-sql" % "1.2.0"
,"org.apache.spark" %% "spark-hive" % "1.2.0"
,"com.databricks" %% "spark-csv" % "0.1.1"
)


3 changes: 3 additions & 0 deletions data/data.csv
@@ -0,0 +1,3 @@
propertyId,propertyName,roomname1,roomsize1,rommname2,roomsize2
"bhaa123","My house","kitchen",134,"bedroom",345
"pasa372","Other house","living room",433,"bedroom",332
49 changes: 49 additions & 0 deletions src/main/scala/main.scala
@@ -0,0 +1,49 @@
import com.databricks.spark.csv._
import org.apache.spark.sql.Row
import org.apache.spark.{SparkConf,SparkContext}

object nesting extends Serializable {
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf())

val hc = new org.apache.spark.sql.hive.HiveContext(sc)

val data = hc.csvFile("file:///data/data.csv")
hc.registerRDDAsTable(data,"data")

hc.sql("""CREATE TABLE IF NOT EXISTS nested (
propertyId string,
propertyName string,
rooms array<struct<roomname:string,roomsize:int>>
) STORED AS PARQUET
""")

val nestedRDD = data.map(buildRecord(_))

// after the map, Spark does not know the schema
// of the result RDD. We can just copy it from the
// hive table using applySchema
val nested = hc.sql("select * from nested limit 0")
val schemaNestedRDD = hc.applySchema(nestedRDD, nested.schema)

hc.registerRDDAsTable(schemaNestedRDD,"schemanestedrdd")

hc.sql("insert overwrite table nested select * from schemanestedrdd")

}

def buildRecord(r:Row):Row = {
println(r)
var res = Seq[Any]()
res = res ++ r.slice(0,2) // takes the first two elements
// now res = [ 'some id','some name']
var ary = Seq[Any]() // this will contain all the array elements
for (i <- 0 to 1 ) { // we assume there are 2 groups of columns
// 0-based indexes, takes (2,3) (4,5) .. and converts to appropriate type
ary = ary :+ Row( r.getString( 2 + 2 * i), r.getString(2 + 1 + 2*i).toInt )
}
res = res :+ ary // adds array as an element and returns it
Row.fromSeq(res)
}
}

4 changes: 4 additions & 0 deletions submit.sh
@@ -0,0 +1,4 @@
spark-submit --class nesting \
--master local[2] \
--jars /Users/rcongiu/work/scala-hive/spark-csv-assembly-0.1.1.jar \
target/scala-2.10/nested_2.10-1.0.jar

0 comments on commit 0ea07ae

Please sign in to comment.