* Unified computing engine and set of library for parallel data processing on computer clusters.
     - Unified: Spark is designed for simple data loading, SQL queries to ML and streaming computation Over same computing engine and consistent API.
     - Computing engine: Spark handles loading data from data source, perform computation on it and store it to Cloud storage or Hadoop, cassandra or message bus like Kafka. Spark does not store data for long time. Hadoop has storage system HDFS and computing system MapReduce Which are closely coupled together.
     - Libraries: Provide unified API for data analysis task. Standard library are shipped with engine. Spark SQL (SQL and structured data), MLlib for machine learning. spark streaming, graphX (graph analytics)

* Spark manage and coordinate execution of task 

* Spark can be use data stored in variety of formats like cassandra, AWS S3, HDFS etc.
* Mapreduce requires files to be stored in HDFS, spark does not. MapReduce write data to disk after each map and reduce operation, Spark keeps most of data in memory in each transformation, if not enough RAM it will write to disk.

* Spark manage and coordinate execution of task on cluster of computers.
* Spark has standalone cluster manager, YARN, Mesos. We submit application to this cluster manager, which will grant resources to our application.

* Spark application has driver process and set of executor process. 
* Driver run main() function, sits on node in a cluster
    - maintains information about spark application
    - Responding to user's program or input
    - Analyzing, distributing, scheduling work across executors.
* Executor
    - Carry out work that driver assign them
    - Report state of computation back to driver.
![](images/architect.PNG)
* If spark running on local mode, driver and executor are just process, they live on same machine.
* Cluster manager keeps track of resources available.
* Driver process responsible for executing driver program's commands across the executors to complete given task. Executor will always running spark code, Driver can be driven from number of different languages using Spark language API.
* Code from specific language translated in Spark code and run on cluster of machines.
![](images/session.PNG)

* When we start Spark in interactve mode, we implicitly create SparkSession that manage Spark application. When we start it via stand alone application, we have to create SparkSession by ourself in application. It controls Spark application. 1 to 1 correspondence between SparkSession and Spark Application.

In [111]:
import findspark

In [112]:
findspark.init('/home/purvil/spark-2.4.3-bin-hadoop2.7')

In [113]:
from pyspark.sql import SparkSession

----


In [114]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [115]:
myRange = spark.range(1000).toDF("number")
myRange

DataFrame[number: bigint]

* Above numbers are distributed collection. Each part of the range is saved on different executors.

* Spark has Datasets, DataFrames, SQL Tables, Resilient Distributed Datasets(RDDs)
    - All represent distributed collections of data

### Partitions
* To allow executor to work in parallel, Spark breaks up data in partition. It is collection of rows that sit on one physical machine.

### Transformation
* The core data structure are immutable, meaning they can  not be changed after they are created. We instruct Spark how we want to modify it It is called transformation

In [116]:
divisBy2 = myRange.where("number %2 = 0")

In [117]:
divisBy2

DataFrame[number: bigint]

* We express business logic using transformation.
* Types of transformation
    - Narrow dependencies : Each input partition contribute to only 1 output partition.
    - Implemented as pipelining, multiple filter will be performed in memory.
    ![](images/narrow.PNG)
    - Wide dependencies (shuffle): Input partition contribute to many output partition. Spark exchange partition across clusters
    - In shuffle spark writes result in disk.
    ![](images/wide.PNG)

### Lazy evaluation
* Spark will wait to execute graph of computation instruction.
* Instead of modifying data immediately, we make plan of transformation that we want to apply on data. By waiting until the end Spark can compile plan from raw datadrame transformation to physical plan that will efficiently.
    - Ex. Predicate pushdown on dataframe

### Action
* Transformation allows us to build logical transformation plan. To trigger computation we run an action.
* `count()` is one kind of action
* There are action to view data in console. Action to collect data in native objects in respective language, action to write to output data source.

In [118]:
divisBy2.count()

500

* Example: flight data

In [131]:
! head spark_data/flight-data/csv/2015-summary.csv

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40


In [132]:
flighData2015 = spark.read.csv('spark_data/flight-data/csv/2015-summary.csv', header=True, inferSchema=True)

In [121]:
flighData2015

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: int]

* Here number of rows is not known, as reading data is transformation. Lazily evaluated. Spark only look at couple of rows to infer schema.
![](images/reading_csv.png)

In [122]:
flighData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [123]:
flighData2015.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



* `sort` create new dataframe by transforming given.

* To check plan how spark will execute transformation

In [124]:
flighData2015.sort("count").explain()

== Physical Plan ==
*(2) Sort [count#915 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#915 ASC NULLS FIRST, 5)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#913,ORIGIN_COUNTRY_NAME#914,count#915] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/purvil/spark_data/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>


* Bottom is source of data, top is end result.

* Sort is wide transformation as they need to compare with each other.

In [125]:
flighData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

* Having logical plan, we Spark can recompute any partition by performing all of the operation it had before on the same input data

* To set partitions

In [126]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

![](images/sort.png)

* To change dataframe as table or view

In [127]:
flighData2015.createOrReplaceTempView("flight")

* Now we can query flight table/view using SQL.

In [128]:
sqlWay =  spark.sql("SELECT DEST_COUNTRY_NAME, count(1) FROM flight GROUP BY DEST_COUNTRY_NAME")

In [98]:
dfWay = flighData2015.groupBy("DEST_COUNTRY_NAME").count()

In [99]:
sqlWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#761], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#761, 5)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#761], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#761] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/purvil/spark_data/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


In [100]:
dfWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#761], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#761, 5)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#761], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#761] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/purvil/spark_data/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


* In both way same execution plan, we will get same performance.

* `max`

In [101]:
spark.sql("SELECT max(count) FROM flight").take(1)

[Row(max(count)=370002)]

In [108]:
from pyspark.sql.functions import max

In [104]:
flighData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

* Top 5 destination

In [106]:
spark.sql("""
    SELECT DEST_COUNTRY_NAME, sum(count) AS destination_total
    FROM flight
    GROUP BY DEST_COUNTRY_NAME
    ORDER BY sum(count) DESC
    LIMIT 5
""").show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [109]:
from pyspark.sql.functions import desc

In [110]:
flighData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total")).limit(5).show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



* Above is Directed acyclic graph(DAG) of transformation, resulting in immutable dataframe.
![](images/dag.png)

* Output of groupBy transformation is `RelationalGroupedDataset`

### Spark toolset
![](images/toolset.png)

### spark-submit : Running production application
* Built in command line tool to submit application code to cluster and launch it to execute there.

```
./bin/spark-submit --master local ./example/src/main/python/pi.py
```
* We define local as a mode of running.

### Type safe structured API (Datasets)
* For writing statically typed code in Java, Scala.
* It allows to assign Java class to records within DataFrame and manipulate it as collection of typed objects
* `Dataset<Person>` will contain object of class Person

In [5]:
df = spark.read.json('/home/purvil/Downloads/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json')

In [6]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [7]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [8]:
df.columns

['age', 'name']

In [10]:
df.describe()

DataFrame[summary: string, age: string, name: string]

In [11]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [16]:
from pyspark.sql.types import StructField,StringType,IntegerType, StructType

In [17]:
data_schema = [StructField('age', IntegerType(), True), StructField('name', StringType(), True)] # True means can be null

In [18]:
final_struct = StructType(fields = data_schema)

In [19]:
df = spark.read.json('/home/purvil/Downloads/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json', schema=final_struct)

In [20]:
df['age']

Column<b'age'>

In [21]:
type(df['age'])

pyspark.sql.column.Column

In [22]:
df.select('age')

DataFrame[age: int]

In [24]:
type(df.select('age'))

pyspark.sql.dataframe.DataFrame

In [23]:
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [25]:
df.head(2)

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [26]:
df.select(['age', 'name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [29]:
df.withColumn('newage', df['age']) # Create new col with name newage and data as in df['age']

DataFrame[age: int, name: string, newage: int]

In [33]:
df.withColumn('double_age', df['age'] * 2).show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



In [35]:
df.withColumnRenamed('age', 'new_age').show() # Rename column name from age to new_age

+-------+-------+
|new_age|   name|
+-------+-------+
|   null|Michael|
|     30|   Andy|
|     19| Justin|
+-------+-------+



In [36]:
df.createOrReplaceTempView('people')

In [37]:
results = spark.sql("SELECT * FROM people")

In [38]:
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [39]:
spark.sql('SELECT * FROM people WHERE age = 30').show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



------------

In [40]:
spark = SparkSession.builder.appName('ops').getOrCreate()

In [42]:
df = spark.read.csv('/home/purvil/Downloads/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/appl_stock.csv', 
                    inferSchema=True, header=True)

In [43]:
df.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [45]:
df.filter("Close < 500").show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [47]:
df.filter("Close < 500").select(['Open', 'Close']).show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



In [51]:
df.filter(df['Close'] < 500).select('Volume').show()

+---------+
|   Volume|
+---------+
|123432400|
|150476200|
|138040000|
|119282800|
|111902700|
|115557400|
|148614900|
|151473000|
|108223500|
|148516900|
|182501900|
|153038200|
|152038600|
|220441900|
|266424900|
|466777500|
|430642100|
|293375600|
|311488100|
|187469100|
+---------+
only showing top 20 rows



In [54]:
df.filter((df['Close'] < 200) & (df['Open'] > 200)).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



* To use the result for different task use `collect()`

In [56]:
result = df.filter((df['Close'] < 200)&(df['Open'] > 200)).collect()

In [57]:
result

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401),
 Row(Date=datetime.datetime(2010, 1, 28, 0, 0), Open=204.930004, High=205.500004, Low=198.699995, Close=199.289995, Volume=293375600, Adj Close=25.819922000000002),
 Row(Date=datetime.datetime(2010, 1, 29, 0, 0), Open=201.079996, High=202.199995, Low=190.250002, Close=192.060003, Volume=311488100, Adj Close=24.883208)]

In [58]:
row = result[0]

In [59]:
row

Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)

In [60]:
row.asDict()

{'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}