### Loading and saving data from and to RDD , Dataframe

In [1]:
import org.apache.spark.sql.types._

In [19]:
//Spark 2 offer Sparksession called spark, with options to load many formatted files into a Dataframe
val loadedDF = spark.read.format("csv").option("header", "true").load("rows.csv?accessType=DOWNLOAD")

In [13]:
loadedDF.columns
//loadedDF.printSchema

Array(JURISDICTION NAME, COUNT PARTICIPANTS, COUNT FEMALE, PERCENT FEMALE, COUNT MALE, PERCENT MALE, COUNT GENDER UNKNOWN, PERCENT GENDER UNKNOWN, COUNT GENDER TOTAL, PERCENT GENDER TOTAL, COUNT PACIFIC ISLANDER, PERCENT PACIFIC ISLANDER, COUNT HISPANIC LATINO, PERCENT HISPANIC LATINO, COUNT AMERICAN INDIAN, PERCENT AMERICAN INDIAN, COUNT ASIAN NON HISPANIC, PERCENT ASIAN NON HISPANIC, COUNT WHITE NON HISPANIC, PERCENT WHITE NON HISPANIC, COUNT BLACK NON HISPANIC, PERCENT BLACK NON HISPANIC, COUNT OTHER ETHNICITY, PERCENT OTHER ETHNICITY, COUNT ETHNICITY UNKNOWN, PERCENT ETHNICITY UNKNOWN, COUNT ETHNICITY TOTAL, PERCENT ETHNICITY TOTAL, COUNT PERMANENT RESIDENT ALIEN, PERCENT PERMANENT RESIDENT ALIEN, COUNT US CITIZEN, PERCENT US CITIZEN, COUNT OTHER CITIZEN STATU...

In [20]:
//Create old fashion Rdd manually
val rdd = sc.parallelize(List(1,2,3))
rdd.collect().foreach(println)

1
2
3


In [17]:
//Or create rdd by loading in a file
val rdd = sc.textFile("file:////home/jovyan/rows.csv?accessType=DOWNLOAD",6)
rdd.take(5).foreach(println)
rdd.repartition(1) //make it one



JURISDICTION NAME,COUNT PARTICIPANTS,COUNT FEMALE,PERCENT FEMALE,COUNT MALE,PERCENT MALE,COUNT GENDER UNKNOWN,PERCENT GENDER UNKNOWN,COUNT GENDER TOTAL,PERCENT GENDER TOTAL,COUNT PACIFIC ISLANDER,PERCENT PACIFIC ISLANDER,COUNT HISPANIC LATINO,PERCENT HISPANIC LATINO,COUNT AMERICAN INDIAN,PERCENT AMERICAN INDIAN,COUNT ASIAN NON HISPANIC,PERCENT ASIAN NON HISPANIC,COUNT WHITE NON HISPANIC,PERCENT WHITE NON HISPANIC,COUNT BLACK NON HISPANIC,PERCENT BLACK NON HISPANIC,COUNT OTHER ETHNICITY,PERCENT OTHER ETHNICITY,COUNT ETHNICITY UNKNOWN,PERCENT ETHNICITY UNKNOWN,COUNT ETHNICITY TOTAL,PERCENT ETHNICITY TOTAL,COUNT PERMANENT RESIDENT ALIEN,PERCENT PERMANENT RESIDENT ALIEN,COUNT US CITIZEN,PERCENT US CITIZEN,COUNT OTHER CITIZEN STATUS,PERCENT OTHER CITIZEN STATUS,COUNT CITIZEN STATUS UNKNOWN,PERCENT CITIZEN STATUS UNKNOWN,COUNT CITIZEN STATUS TOTAL,PERCENT CITIZEN STATUS TOTAL,COUNT RECEIVES PUBLIC ASSISTANCE,PERCENT RECEIVES PUBLIC ASSISTANCE,COUNT NRECEIVES PUBLIC ASSISTANCE,PERCENT NRECEIV

MapPartitionsRDD[43] at repartition at <console>:22

In [20]:
//Next write as parquet, json etc, can only be done using Dataframe not RDD
loadedDF.write.json("file:////home/jovyan/file.json")

### Filtering
In case you load json with multiple depth level, use this to filter the level
Use DF.select("somelevel.*")

In [27]:
//select some columns
val rows = loadedDF.select("JURISDICTION NAME", "COUNT PARTICIPANTS", "COUNT FEMALE", "COUNT MALE")
rows.show(5)

+-----------------+------------------+------------+----------+
|JURISDICTION NAME|COUNT PARTICIPANTS|COUNT FEMALE|COUNT MALE|
+-----------------+------------------+------------+----------+
|            10001|                44|          22|        22|
|            10002|                35|          19|        16|
|            10003|                 1|           1|         0|
|            10004|                 0|           0|         0|
|            10005|                 2|           2|         0|
+-----------------+------------------+------------+----------+
only showing top 5 rows



In [30]:
//Filter row values
rows.filter(rows("COUNT PARTICIPANTS") > 10).show(5)

+-----------------+------------------+------------+----------+
|JURISDICTION NAME|COUNT PARTICIPANTS|COUNT FEMALE|COUNT MALE|
+-----------------+------------------+------------+----------+
|            10001|                44|          22|        22|
|            10002|                35|          19|        16|
|            10016|                17|          12|         5|
|            10025|                27|          17|        10|
|            10029|                20|          13|         7|
+-----------------+------------------+------------+----------+
only showing top 5 rows



### Grouping

In [34]:
rows.groupBy("COUNT MALE").count.show(5)

+----------+-----+
|COUNT MALE|count|
+----------+-----+
|         7|    6|
|        54|    1|
|        15|    1|
|        11|    1|
|         3|    9|
+----------+-----+
only showing top 5 rows



### SQL

In [46]:
loadedDF.createOrReplaceTempView("mytable")
spark.sql("select `COUNT MALE` from mytable where `COUNT MALE` > 5").show(5)

+----------+
|COUNT MALE|
+----------+
|        22|
|        16|
|         7|
|        10|
|         7|
+----------+
only showing top 5 rows



### To create schema on read:
```
val schema = StructType(
     StructField("_id",StringType,true)::
     StructField("_rev",StringType,true)::
     StructField("count",LongType,true)::
     StructField("flowrate",LongType,true)::
     StructField("fluidlevel",StringType,true)::
     StructField("frequency",LongType,true)::
     StructField("hardness",LongType,true)::
     StructField("speed",LongType,true)::
     StructField("temperature",LongType,true)::
     StructField("ts",LongType,true)::
     StructField("voltage",LongType,true)::
 Nil)
 
 spark.createDataFrame(rdd,schema)
 ```