<h1>Chapter 2</h1> 

1. Spark run on a number of separate machines __Cluster__
2. Spark application is comprised of a __driver__ process and a set of __executors__
3. The __driver__ process runs the ***main()*** function 
4. The driver is responsible for:
	- Maintaining information about the Spark application
	- Responding to user's program or input
	- Analyzing, distributing, and scheduling work across executors
5. Each executor is responsible ofr the following:
	- Execute code assigned by the __driver__
	- Reporting the state of the computation on the executor back to the __driver__ node

<p align="center">
	<img src="https://miro.medium.com/max/1058/1*jLVo8Bl4m9pXhADFxzbDVA.png"/>
</p>

In [1]:
import $ivy.`org.apache.spark::spark-sql:3.1.1`
import org.apache.spark.sql._
import org.apache.log4j.{Level, Logger}

[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}[39m

In [2]:
Logger.getLogger("org").setLevel(Level.WARN)

In [3]:
val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
	.config("sspark.executor..instances","2")
	.config("spark.executor.memory","2G")
    .getOrCreate()
}

Loading spark-stubs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/09/25 15:20:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@412f5119

![Spark Eco-System](https://miro.medium.com/max/1400/1*fQ5VWad1nu4yYt2MOuSBsQ.png)

In [4]:
val myRange = spark.range(100).toDF("number")
myRange.show(5)

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
+------+
only showing top 5 rows



[36mmyRange[39m: [32mDataFrame[39m = [number: bigint]

In [5]:
val divisBy2 = myRange.where("number % 2 = 0")
divisBy2.show(5)

+------+
|number|
+------+
|     0|
|     2|
|     4|
|     6|
|     8|
+------+
only showing top 5 rows



[36mdivisBy2[39m: [32mDataset[39m[[32mRow[39m] = [number: bigint]

In [6]:
divisBy2.count()

[36mres5[39m: [32mLong[39m = [32m50L[39m

In [7]:
val flightData2015 = spark.read
						  .option("inferSchema","true")
						  .option("header","true")
						  .csv("/Users/mehdi/Documents/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv")

[36mflightData2015[39m: [32mDataFrame[39m = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

In [8]:
flightData2015.take(3)

[36mres7[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(
  [United States,Romania,15],
  [United States,Croatia,1],
  [United States,Ireland,344]
)

In [10]:

flightData2015.createOrReplaceTempView("flight_data_2015")

In [11]:
val sqlWay = spark.sql("""
  SELECT DEST_COUNTRY_NAME, count(1)
  FROM flight_data_2015
  GROUP BY DEST_COUNTRY_NAME
  """)

[36msqlWay[39m: [32mDataFrame[39m = [DEST_COUNTRY_NAME: string, count(1): bigint]

In [12]:
val dataFrameWay = flightData2015
    .groupBy("DEST_COUNTRY_NAME")
    .count()

[36mdataFrameWay[39m: [32mDataFrame[39m = [DEST_COUNTRY_NAME: string, count: bigint]

In [13]:
sqlWay.explain

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#36], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#36, 200), ENSURE_REQUIREMENTS, [id=#99]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#36], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#36] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/mehdi/Documents/Spark-The-Definitive-Guide/data/flight-data/csv/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [14]:
dataFrameWay.explain

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#36], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#36, 200), ENSURE_REQUIREMENTS, [id=#118]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#36], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#36] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/mehdi/Documents/Spark-The-Definitive-Guide/data/flight-data/csv/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [15]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)

[36mres14[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m([370002])

In [16]:
import org.apache.spark.sql.functions.max
flightData2015.select(max("count")).take(1)

[32mimport [39m[36morg.apache.spark.sql.functions.max
[39m
[36mres15_1[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m([370002])

In [17]:
val maxSql = spark.sql("""
  SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
  FROM flight_data_2015
  GROUP BY DEST_COUNTRY_NAME
  ORDER BY sum(count) DESC
  LIMIT 5
  """)
  maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



[36mmaxSql[39m: [32mDataFrame[39m = [DEST_COUNTRY_NAME: string, destination_total: bigint]

In [18]:
AmmoniteSparkSession.sync()

[36mres17[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@412f5119

In [19]:
val maxSql = spark.sql("""
  SELECT DEST_COUNTRY_NAME, sum(count) 
  FROM flight_data_2015
  WHERE DEST_COUNTRY_NAME IN("FRANCE",
  				          -- "ITALY",
  							 "UNITED STATES"
							)
  GROUP BY DEST_COUNTRY_NAME
  LIMIT 5
  """)
 maxSql.show()

+-----------------+----------+
|DEST_COUNTRY_NAME|sum(count)|
+-----------------+----------+
+-----------------+----------+



[36mmaxSql[39m: [32mDataFrame[39m = [DEST_COUNTRY_NAME: string, sum(count): bigint]