# Basic Structured Operations

At first let's create a spark session

In [1]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
    .appName("essdg")
    .getOrCreate()

Intitializing Scala interpreter ...

Spark Web UI available at http://555b3d8130b5:4041
SparkContext available as 'sc' (version = 3.0.0, master = local[*], app id = local-1598468016299)
SparkSession available as 'spark'


import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@9def068


Then we've to retrieve the path of the dataset's csv file... 

In [4]:
!ls ../../../src/201508_station_data.csv

../../../src/201508_station_data.csv



in order to load/read it:

In [6]:
val df = spark.read
    .format("csv")
    .option("inferSchema", "True")
    .option("header", "True")
    .load("../../../src/201508_station_data.csv")

df.show(5)

+----------+--------------------+---------+-----------+---------+--------+------------+
|station_id|                name|      lat|       long|dockcount|landmark|installation|
+----------+--------------------+---------+-----------+---------+--------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|San Jose|    8/6/2013|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|
|         6|    San Pedro Square|37.336721|-121.894074|       15|San Jose|    8/7/2013|
+----------+--------------------+---------+-----------+---------+--------+------------+
only showing top 5 rows



df: org.apache.spark.sql.DataFrame = [station_id: int, name: string ... 5 more fields]


List all the colums of our dataset (in a Scala array)

In [5]:
df.columns

res1: Array[String] = Array(station_id, name, lat, long, dockcount, landmark, installation)


We can also get the number of cols

In [6]:
df.columns.size

res2: Int = 7


with the number of rows

In [7]:
df.count()

res3: Long = 70


We can retrieve the first element of the columns this way:

In [8]:
df.columns(0)

res4: String = station_id


Now, let's add 5 to all station_id rows

In [9]:
df.selectExpr("station_id + 5").show(5)

+----------------+
|(station_id + 5)|
+----------------+
|               7|
|               8|
|               9|
|              10|
|              11|
+----------------+
only showing top 5 rows



The same thing can be done differently :

In [10]:
df.select(expr("station_id + 5")).show(5)

+----------------+
|(station_id + 5)|
+----------------+
|               7|
|               8|
|               9|
|              10|
|              11|
+----------------+
only showing top 5 rows



Creation of a row / record:

In [11]:
import org.apache.spark.sql.Row

val myRow = Row("Hello", null, 1, false)
myRow(0)

import org.apache.spark.sql.Row
myRow: org.apache.spark.sql.Row = [Hello,null,1,false]
res7: Any = Hello


then display various elements of this row

In [12]:
println(myRow(1))
println(myRow(2))
println(myRow(3))

null
1
false


In [13]:
myRow.getString(0)

res9: String = Hello


In [14]:
myRow.getInt(2)

res10: Int = 1


In [15]:
myRow.getBoolean(3)

res11: Boolean = false


Create DF from rows with a manual schema

In [23]:
import org.apache.spark.sql.types._

val myManualSchema = new StructType(Array(
    new StructField("someString", StringType, true),
    new StructField("names", StringType, true),
    new StructField("idNb", LongType, false)
))

val myRows = Seq(Row("Hello", null, 1L), 
                 Row("What?", "test", 10L))

val myRDD = spark.sparkContext.parallelize(myRows)
val myDF = spark.createDataFrame(myRDD, myManualSchema)

myDF.show()

+----------+-----+----+
|someString|names|idNb|
+----------+-----+----+
|     Hello| null|   1|
|     What?| test|  10|
+----------+-----+----+



import org.apache.spark.sql.types._
myManualSchema: org.apache.spark.sql.types.StructType = StructType(StructField(someString,StringType,true), StructField(names,StringType,true), StructField(idNb,LongType,false))
myRows: Seq[org.apache.spark.sql.Row] = List([Hello,null,1], [What?,test,10])
myRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[41] at parallelize at <console>:50
myDF: org.apache.spark.sql.DataFrame = [someString: string, names: string ... 1 more field]


Or you can simply convert a Seq (aka a list in Python) to a DF:

In [24]:
val myDF = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")
myDF.show()

+-----+----+----+
| col1|col2|col3|
+-----+----+----+
|Hello|   2|   1|
+-----+----+----+



myDF: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 1 more field]


Change a column name

In [25]:
df.withColumnRenamed("lat", "LATTITUDE!").show(2)

+----------+--------------------+----------+-----------+---------+--------+------------+
|station_id|                name|LATTITUDE!|       long|dockcount|landmark|installation|
+----------+--------------------+----------+-----------+---------+--------+------------+
|         2|San Jose Diridon ...| 37.329732|-121.901782|       27|San Jose|    8/6/2013|
|         3|San Jose Civic Ce...| 37.330698|-121.888979|       15|San Jose|    8/5/2013|
+----------+--------------------+----------+-----------+---------+--------+------------+
only showing top 2 rows



In [26]:
df.selectExpr("name as NAME").show(2)
df.select(col("name").as("NAME")).show(2)

+--------------------+
|                NAME|
+--------------------+
|San Jose Diridon ...|
|San Jose Civic Ce...|
+--------------------+
only showing top 2 rows



Same thing in an SQL way

In [27]:
import org.apache.spark.sql.functions._
df.createOrReplaceTempView("dfTable")

spark.sql("""
SELECT long as LONGITUDE
FROM dfTable
LIMIT 4
""").show()

+-----------+
|  LONGITUDE|
+-----------+
|-121.901782|
|-121.888979|
|-121.894902|
|  -121.8932|
+-----------+



import org.apache.spark.sql.functions._


In [28]:
//https://mungingdata.com/apache-spark/aggregations/

Duplicate landmark only if lockcount > 20

In [29]:
//val df_dup = df.withColumn("landmark", F.when("dockcount < 20"))
var df_dup = df.withColumn("landmark_bis", when((col("dockcount") > 20), col("landmark")))
df_dup.show(4)

+----------+--------------------+---------+-----------+---------+--------+------------+------------+
|station_id|                name|      lat|       long|dockcount|landmark|installation|landmark_bis|
+----------+--------------------+---------+-----------+---------+--------+------------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|    San Jose|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|        null|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|San Jose|    8/6/2013|        null|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|        null|
+----------+--------------------+---------+-----------+---------+--------+------------+------------+
only showing top 4 rows



df_dup: org.apache.spark.sql.DataFrame = [station_id: int, name: string ... 6 more fields]


add an other condition

In [30]:
var df_ter = df.withColumn("landmark_ter", 
    when((col("dockcount") > 20), col("landmark")).when((col("dockcount") < 15), "test").otherwise("bbb")
                          )
df_ter.show(4)

+----------+--------------------+---------+-----------+---------+--------+------------+------------+
|station_id|                name|      lat|       long|dockcount|landmark|installation|landmark_ter|
+----------+--------------------+---------+-----------+---------+--------+------------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|    San Jose|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|         bbb|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|San Jose|    8/6/2013|        test|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|         bbb|
+----------+--------------------+---------+-----------+---------+--------+------------+------------+
only showing top 4 rows



df_ter: org.apache.spark.sql.DataFrame = [station_id: int, name: string ... 6 more fields]


Create a new col depending on the fact that two cols are the same

In [31]:
df_dup.selectExpr("*", "landmark_bis == landmark as EL").show(4)

+----------+--------------------+---------+-----------+---------+--------+------------+------------+----+
|station_id|                name|      lat|       long|dockcount|landmark|installation|landmark_bis|  EL|
+----------+--------------------+---------+-----------+---------+--------+------------+------------+----+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|    San Jose|true|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|        null|null|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|San Jose|    8/6/2013|        null|null|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|        null|null|
+----------+--------------------+---------+-----------+---------+--------+------------+------------+----+
only showing top 4 rows



Same thing the SQL Way

In [32]:
df_dup.createOrReplaceTempView("dfTable")
spark.sql("""
SELECT *, landmark_bis == landmark AS EL
FROM dfTable
LIMIT 4
""").show()

+----------+--------------------+---------+-----------+---------+--------+------------+------------+----+
|station_id|                name|      lat|       long|dockcount|landmark|installation|landmark_bis|  EL|
+----------+--------------------+---------+-----------+---------+--------+------------+------------+----+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|    San Jose|true|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|        null|null|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|San Jose|    8/6/2013|        null|null|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|        null|null|
+----------+--------------------+---------+-----------+---------+--------+------------+------------+----+



In [33]:
//https://hackersandslackers.com/transforming-pyspark-dataframes/

Average of a col values

In [34]:
df.select(avg("station_id")).show()
df.selectExpr("avg(station_id)").show()
df.select(mean("station_id")).show()

+---------------+
|avg(station_id)|
+---------------+
|           43.0|
+---------------+



Discting rows nb

In [7]:
df.agg(countDistinct("*")).show()

+-----------------------+
|count(unresolvedstar())|
+-----------------------+
|                     70|
+-----------------------+



In [10]:
df.select("*").distinct().count()
df.distinct().count()

res6: Long = 70


Count distinct with 1 col

In [37]:
df_ter.select("landmark").distinct().count()

res30: Long = 5


In [38]:
df_ter.select(countDistinct("landmark")).show()
df.agg(countDistinct("landmark")).show()

+------------------------+
|count(DISTINCT landmark)|
+------------------------+
|                       5|
+------------------------+



Count distinct with 2 cols

In [39]:
df_ter.select(countDistinct("landmark", "landmark_ter")).show()

+--------------------------------------+
|count(DISTINCT landmark, landmark_ter)|
+--------------------------------------+
|                                    13|
+--------------------------------------+



In [40]:
df_ter.select("landmark", "landmark_ter").distinct().count()

res33: Long = 13


converting to spark literal types

In [41]:
df.select(col("landmark"), lit(1).as("one")).show(3)

+--------+---+
|landmark|one|
+--------+---+
|San Jose|  1|
|San Jose|  1|
|San Jose|  1|
+--------+---+
only showing top 3 rows



Adding col with literal

In [42]:
df.withColumn("one", lit(1)).show(3)

+----------+--------------------+---------+-----------+---------+--------+------------+---+
|station_id|                name|      lat|       long|dockcount|landmark|installation|one|
+----------+--------------------+---------+-----------+---------+--------+------------+---+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|  1|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|  1|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|San Jose|    8/6/2013|  1|
+----------+--------------------+---------+-----------+---------+--------+------------+---+
only showing top 3 rows



Adding col with a condition

In [43]:
df.withColumn("greater", expr("dockcount > 18")).show(5)

+----------+--------------------+---------+-----------+---------+--------+------------+-------+
|station_id|                name|      lat|       long|dockcount|landmark|installation|greater|
+----------+--------------------+---------+-----------+---------+--------+------------+-------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|   true|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|  false|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|San Jose|    8/6/2013|  false|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|   true|
|         6|    San Pedro Square|37.336721|-121.894074|       15|San Jose|    8/7/2013|  false|
+----------+--------------------+---------+-----------+---------+--------+------------+-------+
only showing top 5 rows



In [44]:
df_ter.withColumn("equal", expr("landmark == landmark_ter")).show(5)

+----------+--------------------+---------+-----------+---------+--------+------------+------------+-----+
|station_id|                name|      lat|       long|dockcount|landmark|installation|landmark_ter|equal|
+----------+--------------------+---------+-----------+---------+--------+------------+------------+-----+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|    San Jose| true|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|         bbb|false|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|San Jose|    8/6/2013|        test|false|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|         bbb|false|
|         6|    San Pedro Square|37.336721|-121.894074|       15|San Jose|    8/7/2013|         bbb|false|
+----------+--------------------+---------+-----------+---------+--------+------------+------------+-----+
only showing top 5 rows



Duplicate a col

In [45]:
df.withColumn("newName", col("landmark")).show(2) // or expr

+----------+--------------------+---------+-----------+---------+--------+------------+--------+
|station_id|                name|      lat|       long|dockcount|landmark|installation| newName|
+----------+--------------------+---------+-----------+---------+--------+------------+--------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|San Jose|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|San Jose|
+----------+--------------------+---------+-----------+---------+--------+------------+--------+
only showing top 2 rows



Drop columns

In [46]:
df.drop("name", "lat").show(2)

+----------+-----------+---------+--------+------------+
|station_id|       long|dockcount|landmark|installation|
+----------+-----------+---------+--------+------------+
|         2|-121.901782|       27|San Jose|    8/6/2013|
|         3|-121.888979|       15|San Jose|    8/5/2013|
+----------+-----------+---------+--------+------------+
only showing top 2 rows



Changing column type

In [47]:
df.withColumn("newDockCount", col("dockcount").cast("long")).printSchema

root
 |-- station_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dockcount: integer (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: string (nullable = true)
 |-- newDockCount: long (nullable = true)



Filtering rows (numerical condition)

In [48]:
df.filter("dockcount > 18").show(5)

+----------+--------------------+---------+-----------+---------+--------+------------+
|station_id|                name|      lat|       long|dockcount|landmark|installation|
+----------+--------------------+---------+-----------+---------+--------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|
|        11|         MLK Library|37.335885| -121.88566|       19|San Jose|    8/6/2013|
|        12|SJSU 4th at San C...|37.332808|-121.883891|       19|San Jose|    8/7/2013|
|        14|Arena Green / SAP...|37.332692|-121.900084|       19|San Jose|    8/5/2013|
+----------+--------------------+---------+-----------+---------+--------+------------+
only showing top 5 rows



In [49]:
df.where("dockcount = 19").show(3)

+----------+--------------------+---------+-----------+---------+--------+------------+
|station_id|                name|      lat|       long|dockcount|landmark|installation|
+----------+--------------------+---------+-----------+---------+--------+------------+
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|
|        11|         MLK Library|37.335885| -121.88566|       19|San Jose|    8/6/2013|
|        12|SJSU 4th at San C...|37.332808|-121.883891|       19|San Jose|    8/7/2013|
+----------+--------------------+---------+-----------+---------+--------+------------+
only showing top 3 rows



Same thing the SQL Way

In [50]:
spark.sql("""
SELECT *
FROM dfTable
WHERE dockcount = 19
""").show(3)

+----------+--------------------+---------+-----------+---------+--------+------------+------------+
|station_id|                name|      lat|       long|dockcount|landmark|installation|landmark_bis|
+----------+--------------------+---------+-----------+---------+--------+------------+------------+
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|        null|
|        11|         MLK Library|37.335885| -121.88566|       19|San Jose|    8/6/2013|        null|
|        12|SJSU 4th at San C...|37.332808|-121.883891|       19|San Jose|    8/7/2013|        null|
+----------+--------------------+---------+-----------+---------+--------+------------+------------+
only showing top 3 rows



Filtering rows : 2 conditions on strings

In [51]:
df.where("dockcount <> 15").where("landmark <> 'Redwood City'").show()

+----------+--------------------+---------+-----------+---------+-------------+------------+
|station_id|                name|      lat|       long|dockcount|     landmark|installation|
+----------+--------------------+---------+-----------+---------+-------------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|     San Jose|    8/6/2013|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|     San Jose|    8/6/2013|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|     San Jose|    8/5/2013|
|        11|         MLK Library|37.335885| -121.88566|       19|     San Jose|    8/6/2013|
|        12|SJSU 4th at San C...|37.332808|-121.883891|       19|     San Jose|    8/7/2013|
|        14|Arena Green / SAP...|37.332692|-121.900084|       19|     San Jose|    8/5/2013|
|        28|Mountain View Cal...|37.394358|-122.076713|       23|Mountain View|   8/15/2013|
|        29|San Antonio Caltr...| 37.40694|-122.106758|       23|Mount

Same thing the SQL Way

In [52]:
spark.sql("""
SELECT *
FROM dfTable
WHERE dockcount <> 15 AND landmark <> 'Redwood City'
""").show()

+----------+--------------------+---------+-----------+---------+-------------+------------+-------------+
|station_id|                name|      lat|       long|dockcount|     landmark|installation| landmark_bis|
+----------+--------------------+---------+-----------+---------+-------------+------------+-------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|     San Jose|    8/6/2013|     San Jose|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|     San Jose|    8/6/2013|         null|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|     San Jose|    8/5/2013|         null|
|        11|         MLK Library|37.335885| -121.88566|       19|     San Jose|    8/6/2013|         null|
|        12|SJSU 4th at San C...|37.332808|-121.883891|       19|     San Jose|    8/7/2013|         null|
|        14|Arena Green / SAP...|37.332692|-121.900084|       19|     San Jose|    8/5/2013|         null|
|        28|Mountain View Cal...|37.3

AND

In [53]:
df.where(col("dockcount") =!= 15 && col("landmark") =!= "Redwood City").show()

+----------+--------------------+---------+-----------+---------+-------------+------------+
|station_id|                name|      lat|       long|dockcount|     landmark|installation|
+----------+--------------------+---------+-----------+---------+-------------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|     San Jose|    8/6/2013|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|     San Jose|    8/6/2013|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|     San Jose|    8/5/2013|
|        11|         MLK Library|37.335885| -121.88566|       19|     San Jose|    8/6/2013|
|        12|SJSU 4th at San C...|37.332808|-121.883891|       19|     San Jose|    8/7/2013|
|        14|Arena Green / SAP...|37.332692|-121.900084|       19|     San Jose|    8/5/2013|
|        28|Mountain View Cal...|37.394358|-122.076713|       23|Mountain View|   8/15/2013|
|        29|San Antonio Caltr...| 37.40694|-122.106758|       23|Mount

In [54]:
df.where(col("dockcount") =!= 15 || col("landmark") =!= "Redwood City").show()

+----------+--------------------+---------+-----------+---------+-------------+------------+
|station_id|                name|      lat|       long|dockcount|     landmark|installation|
+----------+--------------------+---------+-----------+---------+-------------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|     San Jose|    8/6/2013|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|     San Jose|    8/5/2013|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|     San Jose|    8/6/2013|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|     San Jose|    8/5/2013|
|         6|    San Pedro Square|37.336721|-121.894074|       15|     San Jose|    8/7/2013|
|         7|Paseo de San Antonio|37.333798|-121.886943|       15|     San Jose|    8/7/2013|
|         8| San Salvador at 1st|37.330165|-121.885831|       15|     San Jose|    8/5/2013|
|         9|           Japantown|37.348742|-121.894715|       15|     

Sample

In [55]:
var seed = 5
val withReplacement = false
val fraction = 0.5

val df_sample = df.sample(withReplacement, fraction, seed)

seed: Int = 5
withReplacement: Boolean = false
fraction: Double = 0.5
df_sample: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [station_id: int, name: string ... 5 more fields]


In [56]:
df_sample.show()

+----------+--------------------+---------+-----------+---------+-------------+------------+
|station_id|                name|      lat|       long|dockcount|     landmark|installation|
+----------+--------------------+---------+-----------+---------+-------------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|     San Jose|    8/6/2013|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|     San Jose|    8/5/2013|
|         6|    San Pedro Square|37.336721|-121.894074|       15|     San Jose|    8/7/2013|
|         7|Paseo de San Antonio|37.333798|-121.886943|       15|     San Jose|    8/7/2013|
|         8| San Salvador at 1st|37.330165|-121.885831|       15|     San Jose|    8/5/2013|
|        10|  San Jose City Hall|37.337391|-121.886995|       15|     San Jose|    8/6/2013|
|        11|         MLK Library|37.335885| -121.88566|       19|     San Jose|    8/6/2013|
|        13|       St James Park|37.339301|-121.889937|       15|     

Random splits

In [57]:
seed = 42
val testAndTrain = df.randomSplit(Array(1, 4), seed)
println(testAndTrain(0).count())

13


seed: Int = 42
testAndTrain: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = Array([station_id: int, name: string ... 5 more fields], [station_id: int, name: string ... 5 more fields])


In [58]:
println(testAndTrain(1).count())

57


In [59]:
df.count() * 0.25

res51: Double = 17.5


In [60]:
// https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-scala.html

Sorting rows 

In [61]:
df.sort(desc("dockcount")).show(2)

+----------+-----------------+---------+-----------+---------+-------------+------------+
|station_id|             name|      lat|       long|dockcount|     landmark|installation|
+----------+-----------------+---------+-----------+---------+-------------+------------+
|        61|  2nd at Townsend|37.780526|-122.390288|       27|San Francisco|   8/22/2013|
|        77|Market at Sansome|37.789625|-122.400811|       27|San Francisco|   8/25/2013|
+----------+-----------------+---------+-----------+---------+-------------+------------+
only showing top 2 rows



In [62]:
df.orderBy(desc("dockcount")).show(3)

+----------+--------------------+---------+-----------+---------+-------------+------------+
|station_id|                name|      lat|       long|dockcount|     landmark|installation|
+----------+--------------------+---------+-----------+---------+-------------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|     San Jose|    8/6/2013|
|        61|     2nd at Townsend|37.780526|-122.390288|       27|San Francisco|   8/22/2013|
|        67|      Market at 10th|37.776619|-122.417385|       27|San Francisco|   8/23/2013|
+----------+--------------------+---------+-----------+---------+-------------+------------+
only showing top 3 rows



In [63]:
df.orderBy(desc("dockcount"), desc("station_id")).show(3)

+----------+-----------------+---------+-----------+---------+-------------+------------+
|station_id|             name|      lat|       long|dockcount|     landmark|installation|
+----------+-----------------+---------+-----------+---------+-------------+------------+
|        77|Market at Sansome|37.789625|-122.400811|       27|San Francisco|   8/25/2013|
|        67|   Market at 10th|37.776619|-122.417385|       27|San Francisco|   8/23/2013|
|        61|  2nd at Townsend|37.780526|-122.390288|       27|San Francisco|   8/22/2013|
+----------+-----------------+---------+-----------+---------+-------------+------------+
only showing top 3 rows



In [64]:
df.orderBy(expr("dockcount desc")).show(3)

+----------+--------------------+---------+-----------+---------+-------------+------------+
|station_id|                name|      lat|       long|dockcount|     landmark|installation|
+----------+--------------------+---------+-----------+---------+-------------+------------+
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|     San Jose|    8/6/2013|
|        32|Castro Street and...|37.385956|-122.083678|       11|Mountain View|  12/31/2013|
|        35|University and Em...|37.444521|-122.163093|       11|    Palo Alto|   8/15/2013|
+----------+--------------------+---------+-----------+---------+-------------+------------+
only showing top 3 rows



In [65]:
df.rdd.getNumPartitions

res57: Int = 1


In [66]:
val rdd_repart = df.rdd.repartition(4)

rdd_repart: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[244] at repartition at <console>:40


In [67]:
rdd_repart.getNumPartitions

res58: Int = 4


In [68]:
rdd_repart

res59: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[244] at repartition at <console>:40


In [69]:
val rdd_repart.toDF()

<console>:  error: incomplete input