
# Creating Spark Session and Spark Context Object

In [19]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
val spark:SparkSession = SparkSession.builder().master("local").enableHiveSupport().getOrCreate()

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@13c601e5


# Creating a Dummy DF

In [20]:
import spark.implicits._
val df = Seq(("Boston", "USA", 0.67),("Dubai", "UAE", 3.1),("Cordoba", "Argentina", 1.39)).
toDF("city","country","population")

df.show()

+-------+---------+----------+
|   city|  country|population|
+-------+---------+----------+
| Boston|      USA|      0.67|
|  Dubai|      UAE|       3.1|
|Cordoba|Argentina|      1.39|
+-------+---------+----------+



import spark.implicits._
df: org.apache.spark.sql.DataFrame = [city: string, country: string ... 1 more field]


# Adding a new column to an existing DF

In [15]:
df.withColumn("is_population_in_billion",$"population" > 1).show()

+-------+---------+----------+------------------------+
|   city|  country|population|is_population_in_billion|
+-------+---------+----------+------------------------+
| Boston|      USA|      0.67|                   false|
|  Dubai|      UAE|       3.1|                    true|
|Cordoba|Argentina|      1.39|                    true|
+-------+---------+----------+------------------------+



# Filtering Rows

In [16]:
df.filter($"population" < 1).show()

+------+-------+----------+
|  city|country|population|
+------+-------+----------+
|Boston|    USA|      0.67|
+------+-------+----------+



# Reading a local CSV File 

In [None]:
val catsFilePath = "/Users/shivam/Downloads/server/files/cats.csv"
val catsDF = spark.read.option("header",true).csv(catsFilePath)
catsDF.show()

# Column Functions

In [2]:
val df = Seq(("thor", "new york"),("aquaman", "atlantis"),("wolverine", "new york")).toDF("superhero", "city")
df.withColumn("cityStratingWithNew",$"city".startsWith("new")).show(false)

+---------+--------+-------------------+
|superhero|city    |cityStratingWithNew|
+---------+--------+-------------------+
|thor     |new york|true               |
|aquaman  |atlantis|false              |
|wolverine|new york|true               |
+---------+--------+-------------------+



df: org.apache.spark.sql.DataFrame = [superhero: string, city: string]


In [5]:
val df = Seq((10, "cat"),(4, "dog"),(7, null)).toDF("num", "word")
df.withColumn("new_num",$"num" + lit(5)).show()

+---+----+-------+
|num|word|new_num|
+---+----+-------+
| 10| cat|     15|
|  4| dog|      9|
|  7|null|     12|
+---+----+-------+



df: org.apache.spark.sql.DataFrame = [num: int, word: string]


# Null and is Not Null

In [10]:
val newDF = df.withColumn("isWordNull",$"word".isNull)

newDF: org.apache.spark.sql.DataFrame = [num: int, word: string ... 1 more field]


# Fill Null Values

In [19]:
newDF.na.fill("NA").show()

+---+----+----------+
|num|word|isWordNull|
+---+----+----------+
| 10| cat|     false|
|  4| dog|     false|
|  7|  NA|      true|
+---+----+----------+



# When and Otherwise

In [22]:
val df = Seq(("bat", "bat"),("snake", "rat"),("cup", "phone"),("key", null)).toDF("word1", "word2")

df.withColumn("status",when($"word1" === $"word2","same_words").when
             (length($"word1") > length($"word2"),"word1 is bigger").otherwise("Let's Default")).show()

+-----+-----+---------------+
|word1|word2|         status|
+-----+-----+---------------+
|  bat|  bat|     same_words|
|snake|  rat|word1 is bigger|
|  cup|phone|  Let's Default|
|  key| null|  Let's Default|
+-----+-----+---------------+



df: org.apache.spark.sql.DataFrame = [word1: string, word2: string]


# Spark SQL Build in Functions

In [26]:
import org.apache.spark.sql.functions._
val numbers = Seq((1),(2),(3),(4),(5)).toDF("numbers")
numbers.withColumn("factorial",factorial($"numbers")).show() 

+-------+---------+
|numbers|factorial|
+-------+---------+
|      1|        1|
|      2|        2|
|      3|        6|
|      4|       24|
|      5|      120|
+-------+---------+



import org.apache.spark.sql.functions._
numbers: org.apache.spark.sql.DataFrame = [numbers: int]


# When and otherwise Revisited

In [39]:
val df = Seq(10, 15, 25).toDF("age")
df.withColumn("status",when(($"age" < 15),"child").when(($"age" < 18 && $"age" > 14),"semiTeenager")
             .otherwise("adult")).show()

+---+------------+
|age|      status|
+---+------------+
| 10|       child|
| 15|semiTeenager|
| 25|       adult|
+---+------------+



df: org.apache.spark.sql.DataFrame = [age: int]


# Create a UDF for above age

In [40]:
import org.apache.spark.sql.Column
def ageStatus(age:Column):Column ={
    when(($"age" < 15),"child").when(($"age" < 18 && $"age" > 14),"semiTeenager").otherwise("adult") 
}

import org.apache.spark.sql.Column
ageStatus: (age: org.apache.spark.sql.Column)org.apache.spark.sql.Column


In [41]:
df.withColumn("status",ageStatus($"age")).show()

+---+------------+
|age|      status|
+---+------------+
| 10|       child|
| 15|semiTeenager|
| 25|       adult|
+---+------------+



# UDF that removes all whitespaces and lowercase all characters

In [2]:
def lowerRemoveAllSpaces(s:String) : String = {
    s.toLowerCase().replaceAll("\\s","")
}

val lowerRemoveAllSpacesUDF = udf[String,String](lowerRemoveAllSpaces)

val anotherDF = Seq(("    thIS Is tOO. "),(" hElLO"),(null)).toDF("word")
anotherDF.select(lowerRemoveAllSpacesUDF($"word").as("cleanWord")).show()

org.apache.spark.SparkException:  Failed to execute user defined function($read$$Lambda$2059/45495881: (string) => string)

The above code will fail because of Null Pointer Exception. So we have to write more robust code

In [10]:
def betterLowerRemoveAllSpaces(s:String) : Option[String] = {
    val str = Option(s).getOrElse(return None)
    Some(str.toLowerCase().replaceAll("\\s",""))
}

val betterLowerRemoveAllSpacesUDF = udf[Option[String],String](betterLowerRemoveAllSpaces)

val anotherDF = Seq(("    thIS Is tOO "),(" hElLO"),(null)).toDF("word")
anotherDF.select(betterLowerRemoveAllSpacesUDF($"word").as("cleanWord")).show()

+---------+
|cleanWord|
+---------+
|thisistoo|
|    hello|
|     null|
+---------+



betterLowerRemoveAllSpaces: (s: String)Option[String]
betterLowerRemoveAllSpacesUDF: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$2623/1139397239@6e16b21,StringType,List(Some(class[value[0]: string])),None,true,true)
anotherDF: org.apache.spark.sql.DataFrame = [word: string]


This is better code. But now we will write the same function in native spark library and help write a better optimized code.

In [12]:
import org.apache.spark.sql.Column

def bestLowerRemoveAllSpaces()(col : Column ): Column = {
    lower(regexp_replace(col,"\\s",""))
}

anotherDF.select(bestLowerRemoveAllSpaces()($"word").as("cleanWord")).show()

+---------+
|cleanWord|
+---------+
|thisistoo|
|    hello|
|     null|
+---------+



import org.apache.spark.sql.Column
bestLowerRemoveAllSpaces: ()(col: org.apache.spark.sql.Column)org.apache.spark.sql.Column
