In [1]:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
   .builder()
   .master("local[*]")
   .appName("SparkSessionExample")
   .getOrCreate()
val sc = spark.sparkContext

Intitializing Scala interpreter ...

Spark Web UI available at http://172.27.25.211:4041
SparkContext available as 'sc' (version = 3.2.1, master = local[*], app id = local-1647892187113)
SparkSession available as 'spark'


import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@20212f29


In [3]:
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
val squared = distData.map(x => x * x)
squared.collect()

data: Array[Int] = Array(1, 2, 3, 4, 5)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
squared: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27


In [5]:
// find the number of occurrences of words in a text
val data = sc.parallelize(Array("hello", "world", "hello", "15619", "is", "awesome")).map { word => (word, 1)}

res1: Array[Int] = Array(1, 4, 9, 16, 25)


In [10]:
// groupByKey works very much like MapReduce.
val count = data.groupByKey().map(x => (x._1, x._2.sum))
count.collect().foreach(println)
// or 
val count = data.reduceByKey(_ + _)
count.collect().foreach(println)
// or aggregateByKey
val count = data.aggregateByKey(0)((x, y) => x + 1, (a, b) => a + b)
count.collect().foreach(println)
// or combineByKey
val count = data.combineByKey((z: Int) => z, (x: Int, y: Int) => x + 1, (a: Int, b: Int) => a + b)
count.collect().foreach(println)

(is,1)
(15619,1)
(hello,2)
(world,1)
(awesome,1)


count: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:26


### Wordcount with Spark

In [None]:
// Install sample data
!wget https://www.gutenberg.org/files/1112/1112.txt

In [17]:
val file = sc.textFile("input/1112.txt") // change the path if working locally
val words = file.flatMap(line => line.split(" "))
val pairs = words.map(word => (word, 1))
val counts = pairs.reduceByKey(_+_)
counts.saveAsTextFile("output")

file: org.apache.spark.rdd.RDD[String] = 1112.txt MapPartitionsRDD[15] at textFile at <console>:25
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:26
pairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[17] at map at <console>:27
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:28


In [18]:
// Map and Reduce function
val data = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)
rdd.map(x => List(x, x + 1)).collect()
// Array(List(1, 2), List(2, 3), List(3, 4), List(4, 5), List(5, 6))
rdd.flatMap(x => List(x, x + 1)).collect()

data: Array[Int] = Array(1, 2, 3, 4, 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:27
res8: Array[Int] = Array(1, 2, 2, 3, 3, 4, 4, 5, 5, 6)


In [29]:
// loading Json
val df = spark.read.json("input/people.json")
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]


In [33]:
// loading csv
// It can also read a file directly as CSV, with and without providing a schema:
import org.apache.spark.sql.types.{StructType, IntegerType, StringType}
import org.apache.spark.sql.functions.col

val schema = new StructType()
  .add("name", StringType)
  .add("age", IntegerType)
val df = spark.read.schema(schema).csv("input/people.csv")
df.select("name").where(col("age") > 20).show()

+----+
|name|
+----+
|Andy|
+----+



import org.apache.spark.sql.types.{StructType, IntegerType, StringType}
import org.apache.spark.sql.functions.col
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
df: org.apache.spark.sql.DataFrame = [name: string, age: int]


In [34]:
// By providing a case class, a DataFrame can be converted to a Dataset
import spark.implicits._
case class Person(name: String, age: Long)
val ds = df.as[Person]

import spark.implicits._
defined class Person
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]


<b> Operating on Data

In [35]:
import org.apache.spark.sql.functions.col
df.select("name").where(col("age") > 20).show()

+----+
|name|
+----+
|Andy|
+----+



import org.apache.spark.sql.functions.col


In [None]:
// save as parquet file
val df = spark.read.json("people.json")
df.write.parquet("people")
val df = spark.read.parquet("people") # to read it

### Spark SQL: Find maximum age

In [42]:
// download Dataset
!mkdir input && cd input
!wget https://clouddeveloper.blob.core.windows.net/assets/iterative-processing/primer/spark-primer/datasets/grades.csv
!wget https://clouddeveloper.blob.core.windows.net/assets/iterative-processing/primer/spark-primer/datasets/students.csv

--2022-03-22 00:08:58--  https://clouddeveloper.blob.core.windows.net/assets/iterative-processing/primer/spark-primer/datasets/grades.csv


Resolving clouddeveloper.blob.core.windows.net (clouddeveloper.blob.core.windows.net)... 52.239.169.4


Connecting to clouddeveloper.blob.core.windows.net (clouddeveloper.blob.core.windows.net)|52.239.169.4|:443... connected.


HTTP request sent, awaiting response... 200 OK


Length: 209084545 (199M) [text/csv]


Saving to: ‘grades.csv’







grades.csv            0%[                    ]       0  --.-KB/s               

grades.csv            0%[                    ] 679.64K  3.28MB/s               

grades.csv            1%[                    ]   2.31M  5.75MB/s               

grades.csv            1%[                    ]   2.38M  3.26MB/s               

grades.csv            1%[                    ]   2.83M  3.03MB/s               

grades.csv            1%[                    ]   3.87M  3.40MB/s               

grades.csv            2%[   

In [43]:
// find maximum age
import org.apache.spark.sql.types.{StructType, IntegerType, StringType}
import org.apache.spark.sql.functions.max
val schema = new StructType()
  .add("id", IntegerType)
  .add("name", StringType)
  .add("age", IntegerType)
val students = spark.read.schema(schema).csv("input/students.csv")
val schemaGrades = new StructType()
  .add("id", IntegerType)
  .add("semester", StringType)
  .add("course", StringType)
  .add("grade", IntegerType)
val grades = spark.read.schema(schemaGrades).csv("input/grades.csv")
val maxDF = grades.join(students, Seq("id")).where("course == 15619").select(max("age"))
// Forces the execution - remember transformations are lazy
maxDF.show()


+--------+
|max(age)|
+--------+
|      27|
+--------+



import org.apache.spark.sql.types.{StructType, IntegerType, StringType}
import org.apache.spark.sql.functions.max
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(age,IntegerType,true))
students: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
schemaGrades: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(semester,StringType,true), StructField(course,StringType,true), StructField(grade,IntegerType,true))
grades: org.apache.spark.sql.DataFrame = [id: int, semester: string ... 2 more fields]
maxDF: org.apache.spark.sql.DataFrame = [max(age): int]


In [45]:
// On RDDs Spark
val studentsRDD = spark.sparkContext.textFile("input/students.csv").map{ line => 
    val Array(id, _, age) = line.split(",") 
    (id.toInt, age.toInt)
}
val gradesRDD = spark.sparkContext.textFile("input/grades.csv").map{ line => 
    val Array(id, _, course, _) = line.split(",") 
    (id.toInt, course.toInt)
}
gradesRDD.filter(row => row._2 == 15619)
         .join(studentsRDD)
         .map(row => row._2._2)
         .max()

studentsRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[80] at map at <console>:37
gradesRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[83] at map at <console>:41
res19: Int = 27


In [48]:
// explain 
maxDF.explain

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[max(age#79)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#318]
      +- HashAggregate(keys=[], functions=[partial_max(age#79)])
         +- Project [age#79]
            +- SortMergeJoin [id#83], [id#77], Inner
               :- Sort [id#83 ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(id#83, 200), ENSURE_REQUIREMENTS, [id=#310]
               :     +- Project [id#83]
               :        +- Filter ((isnotnull(course#85) AND (cast(course#85 as int) = 15619)) AND isnotnull(id#83))
               :           +- FileScan csv [id#83,course#85] Batched: false, DataFilters: [isnotnull(course#85), (cast(course#85 as int) = 15619), isnotnull(id#83)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/mnt/c/Users/Mo/CloudComputing/p4-Iterative Processing with Spark..., PartitionFilters: [], PushedFilters: [IsNotNull(course), IsNotNull(id)], ReadSchema: st