In [6]:
import org.apache.spark.SparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.streaming.Trigger
import scala.collection.JavaConverters._
// val spark: SparkContext = new SparkContext()

// VALUES are immutable constants. You can't change them once defined.
val hello: String = "Hola!"                     //> hello  : String = Hola!
println(hello)                                  //> Hola!


Hola!


hello = Hola!


Hola!

In [9]:
import org.apache.spark.sql.SparkSession

val spark: SparkSession = SparkSession.builder()
  .master("local[1]")
  .appName("SparkByExamples.com")
  .getOrCreate()

val data = Seq(("James ","","Smith","36636","M",3000),
             ("Michael ","Rose","","40288","M",4000),
             ("Robert ","","Williams","42114","M",4000),
             ("Maria ","Anne","Jones","39192","F",4000),
             ("Jen","Mary","Brown","","F",-1))

val columns = Seq("firstname","middlename","lastname","dob","gender","salary")

import spark.sqlContext.implicits._
val df = data.toDF(columns:_*)
df.show()
df.printSchema()
df.write
  .parquet("/tmp/output/people.parquet")


+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|   James |          |   Smith|36636|     M|  3000|
| Michael |      Rose|        |40288|     M|  4000|
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = false)



org.apache.spark.sql.AnalysisException: path file:/tmp/output/people.parquet already exists.;

In [10]:
val parqDF = spark.read.parquet("/tmp/output/people.parquet")

parqDF.createOrReplaceTempView("ParquetTable")
spark.sql("select * from ParquetTable where salary >= 4000").explain()


== Physical Plan ==
*(1) Project [firstname#214, middlename#215, lastname#216, dob#217, gender#218, salary#219]
+- *(1) Filter (isnotnull(salary#219) && (salary#219 >= 4000))
   +- *(1) FileScan parquet [firstname#214,middlename#215,lastname#216,dob#217,gender#218,salary#219] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/output/people.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(salary), GreaterThanOrEqual(salary,4000)], ReadSchema: struct<firstname:string,middlename:string,lastname:string,dob:string,gender:string,salary:int>


parqDF = [firstname: string, middlename: string ... 4 more fields]


lastException: Throwable = null


[firstname: string, middlename: string ... 4 more fields]

In [11]:
val parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")
parkSQL.show()
parkSQL.printSchema()
df.write
  .partitionBy("gender","salary")
  .parquet("/tmp/output/people2.parquet")


+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|   Maria |      Anne|   Jones|39192|     F|  4000|
|  Robert |          |Williams|42114|     M|  4000|
| Michael |      Rose|        |40288|     M|  4000|
+---------+----------+--------+-----+------+------+

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



org.apache.spark.sql.AnalysisException: path file:/tmp/output/people2.parquet already exists.;

In [12]:
val parqDF2 = spark.read.parquet("/tmp/output/people2.parquet")
parqDF2.createOrReplaceTempView("ParquetTable2")
val df3 = spark.sql("select * from ParquetTable2  where gender='M' and salary >= 4000")
df3.explain()
df3.printSchema()
df3.show()


== Physical Plan ==
*(1) FileScan parquet [firstname#264,middlename#265,lastname#266,dob#267,gender#268,salary#269] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/output/people2.parquet], PartitionCount: 1, PartitionFilters: [isnotnull(gender#268), isnotnull(salary#269), (gender#268 = M), (salary#269 >= 4000)], PushedFilters: [], ReadSchema: struct<firstname:string,middlename:string,lastname:string,dob:string>
root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|  Robert |          |Williams|42114|     M|  4000|
| Michael |      Rose|        |40288|     M|  4000|
+---------+----------+--------+-----+------+------+



parqDF2 = [firstname: string, middlename: string ... 4 more fields]
df3 = [firstname: string, middlename: string ... 4 more fields]


lastException: Throwable = null


[firstname: string, middlename: string ... 4 more fields]

In [14]:
val parqDF3 = spark.read
  .parquet("/tmp/output/people2.parquet/gender=M")
parqDF3.show()

+---------+----------+--------+-----+------+
|firstname|middlename|lastname|  dob|salary|
+---------+----------+--------+-----+------+
|  Robert |          |Williams|42114|  4000|
| Michael |      Rose|        |40288|  4000|
|   James |          |   Smith|36636|  3000|
+---------+----------+--------+-----+------+



parqDF3 = [firstname: string, middlename: string ... 3 more fields]


[firstname: string, middlename: string ... 3 more fields]