# Creating a Spark session

In [1]:
from pyspark.sql import *
from pyspark.sql import functions as f
from pyspark.sql.types import *

spark = SparkSession.builder.appName("SparkIntro").getOrCreate()

# Window functions
Last week we saw how to do group aggregations like for instance:

In [27]:
df = spark.read.option('header', True).option('inferSchema', True).csv('data/titanic.csv')
df.groupBy('Pclass').agg(f.sum('Fare'), f.avg('Fare'), f.max('Fare')).show()

+------+------------------+------------------+---------+
|Pclass|         sum(Fare)|         avg(Fare)|max(Fare)|
+------+------------------+------------------+---------+
|     1|18177.412499999984| 84.15468749999992| 512.3292|
|     3| 6714.695100000002|13.675550101832997|    69.55|
|     2|3801.8416999999995| 20.66218315217391|     73.5|
+------+------------------+------------------+---------+



Suppose we want to compute something on a group without reducing it. For example, let's say we want to find the oldest person in each passenger class.

We could first compute the max age per class and join the resulting dataframe with the original as follows.

In [28]:
max_fare_per_class = df.groupBy('Pclass')\
    .agg(f.max('Age').alias('MaxAge'))\
    .select('MaxAge', f.col('Pclass').alias('Pclass_'))

df.join(max_fare_per_class, (f.col('Age') == f.col('MaxAge')) & (f.col('Pclass') == f.col('Pclass_')), 'inner')\
    .drop('Pclass_')\
    .show()

+-----------+--------+------+--------------------+----+----+-----+-----+----------+-----+-----+--------+------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|    Ticket| Fare|Cabin|Embarked|MaxAge|
+-----------+--------+------+--------------------+----+----+-----+-----+----------+-----+-----+--------+------+
|        631|    true|     1|Barkworth, Mr. Al...|male|80.0|    0|    0|     27042| 30.0|  A23|       S|  80.0|
|        673|   false|     2|Mitchell, Mr. Hen...|male|70.0|    0|    0|C.A. 24580| 10.5| null|       S|  70.0|
|        852|   false|     3| Svensson, Mr. Johan|male|74.0|    0|    0|    347060|7.775| null|       S|  74.0|
+-----------+--------+------+--------------------+----+----+-----+-----+----------+-----+-----+--------+------+



What if we want the three oldest passengers in each class? The method above wont work, because we are using `max` in the first dataframe.

The solution is to use a Window function. With a window function you basically treat each group as a separate dataframe.

In the following example we sort each group by age and add a new column with the "rank" of each passenger within the group. Here, the "rank" becomes the sort order.

In [29]:
sorted_by_age_window = Window.partitionBy('Pclass').orderBy(f.desc('Age'))

ranked_df = df.withColumn('AgeClassRank', f.rank().over(sorted_by_age_window))
ranked_df.filter(f.col('AgeClassRank') <= 3).drop('AgeClassRank').orderBy('Pclass', f.desc('Age')).show()

+-----------+--------+------+--------------------+----+----+-----+-----+-----------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|     Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+-----------+-------+-----+--------+
|        631|    true|     1|Barkworth, Mr. Al...|male|80.0|    0|    0|      27042|   30.0|  A23|       S|
|         97|   false|     1|Goldschmidt, Mr. ...|male|71.0|    0|    0|   PC 17754|34.6542|   A5|       C|
|        494|   false|     1|Artagaveytia, Mr....|male|71.0|    0|    0|   PC 17609|49.5042| null|       C|
|        673|   false|     2|Mitchell, Mr. Hen...|male|70.0|    0|    0| C.A. 24580|   10.5| null|       S|
|         34|   false|     2|Wheadon, Mr. Edwa...|male|66.0|    0|    0| C.A. 24579|   10.5| null|       S|
|        571|    true|     2|  Harris, Mr. George|male|62.0|    0|    0|S.W./PP 752|   10.5| null|       S|
|        852|   false|     3

# Physical Plans
When using the DataFrame API, Spark will optimize your program by reordering the operations that you wish to perform.

Lets take a look at an example from last week.

In [40]:
web_traffic_df = spark.read.option('delimiter', '\t').csv('data/traffic_2_sample')\
    .select(f.col('_c0').alias('ip'), f.col('_c1').alias('domain'))

count_df = web_traffic_df.filter(f.col('domain') == 'google.com').groupBy('domain').agg(f.countDistinct('ip'))

You can inspect the physical plan using `.explain()`. The physical plan is the actual plan for the work that Spark is going to do. 

In [41]:
count_df.explain()

== Physical Plan ==
*(3) HashAggregate(keys=[domain#2922], functions=[count(distinct ip#2921)])
+- Exchange hashpartitioning(domain#2922, 200)
   +- *(2) HashAggregate(keys=[domain#2922], functions=[partial_count(distinct ip#2921)])
      +- *(2) HashAggregate(keys=[domain#2922, ip#2921], functions=[])
         +- Exchange hashpartitioning(domain#2922, ip#2921, 200)
            +- *(1) HashAggregate(keys=[domain#2922, ip#2921], functions=[])
               +- *(1) Project [_c0#2917 AS ip#2921, _c1#2918 AS domain#2922]
                  +- *(1) Filter (isnotnull(_c1#2918) && (_c1#2918 = google.com))
                     +- *(1) FileScan csv [_c0#2917,_c1#2918] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/data/traffic_2_sample], PartitionFilters: [], PushedFilters: [IsNotNull(_c1), EqualTo(_c1,google.com)], ReadSchema: struct<_c0:string,_c1:string>


In the Spark UI ([http://127.0.0.1:4040/](http://127.0.0.1:)), you can find a graph representation of the plan under the SQL tab.

The above plan has three stages (indicated by the numbers preceeding the operation name). The plan tells you that Spark will:

- Read the CSV (`(1) FileScan csv`)
- Filter rows where domain is google.com (`(1) Filter (isnotnull(_c1#2918) && (_c1#2918 = google.com)`)
- Rename columns (`(1) Project`)
- Shuffle the data (`Exchange hashpartitioning`)
- Count distinct IPs on partitions (`(2) HashAggregate(keys=[domain#2922], functions=[partial_count(distinct ip#2921)])`) 
- Shuffle the aggregated data (`Exchange hashpartitioning`)
- Sum the distinct counts (`HashAggregate(keys=[domain#2922], functions=[count(distinct ip#2921)])`)

# Sketches in Spark
Sketches are powerful data structures for computing estimates on count statistics in a stream. A property common to sketches is that they are composeable, i.e., two sketches obtained from two different streams can be composed to one sketch that has the same properties (error rate) as if all data was added to the same sketch.

If you only need an approximate results, sketches are also useful for distributed computing. Each split of the data is considered a stream, and at the end the sketches are composed. This avoids having to shuffle the data.

Consider the following example of counting unique IPs in the web traffic data.

In [2]:
df = spark.read.option('delimiter', '\t').csv('data/traffic_2_sample')\
    .select(f.col('_c0').alias('ip'), f.col('_c1').alias('domain'))

distinct_ips = df.select(f.countDistinct('ip'))
distinct_ips.show()

+------------------+
|count(DISTINCT ip)|
+------------------+
|           9576468|
+------------------+



The above program triggers a shuffle `Exchange hashpartitioning`, as can be seen from the physical plan.

In [3]:
distinct_ips.explain()

== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(distinct ip#14)])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[], functions=[partial_count(distinct ip#14)])
      +- *(2) HashAggregate(keys=[ip#14], functions=[])
         +- Exchange hashpartitioning(ip#14, 200)
            +- *(1) HashAggregate(keys=[ip#14], functions=[])
               +- *(1) Project [_c0#10 AS ip#14]
                  +- *(1) FileScan csv [_c0#10] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/data/traffic_2_sample], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c0:string>


In the following we use `approxCountDistinct()` to approximately count instead. The physical plan does not have a shuffle.

In [6]:
approx_distinct_ips = df.select(f.approxCountDistinct('ip'))
approx_distinct_ips.show()

+-------------------------+
|approx_count_distinct(ip)|
+-------------------------+
|                  9743877|
+-------------------------+



In [7]:
approx_distinct_ips.explain()

== Physical Plan ==
HashAggregate(keys=[], functions=[approx_count_distinct(ip#14, 0.05, 0, 0)])
+- Exchange SinglePartition
   +- HashAggregate(keys=[], functions=[partial_approx_count_distinct(ip#14, 0.05, 0, 0)])
      +- *(1) Project [_c0#10 AS ip#14]
         +- *(1) FileScan csv [_c0#10] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/data/traffic_2_sample], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c0:string>


**Question: which sketch do you think Spark is using for counting distinct elements?**

# Collecting and writing
So far we have seen how to compute dataframes and print the output in the notebook. But what if we want to use the results in non-Spark programs or services?

We will use the Web Traffic data to demonstrate "collecting" and writing to disk.

In [9]:
df = spark.read.option('delimiter', '\t').csv('data/traffic_2_sample')\
    .select(f.col('_c0').alias('ip'), f.col('_c1').alias('domain'))

Let's say we have compute a dataframe of aggregates.

In [18]:
distinct_ips = df.groupBy('domain').agg(f.countDistinct('ip').alias('count'))

Collect will convert the dataframe into a list in Python.

In [19]:
rows = distinct_ips.collect()
rows

[Row(domain='pandas.pydata.org', count=1266737),
 Row(domain='databricks.com', count=129125),
 Row(domain='datarobot.com', count=26387),
 Row(domain='github.com', count=129262),
 Row(domain='google.com', count=255543),
 Row(domain='scala-lang.org', count=1),
 Row(domain='wikipedia.org', count=5019691),
 Row(domain='python.org', count=2521367),
 Row(domain='dtu.dk', count=255711),
 Row(domain='spark.apache.org', count=51722)]

From a list of Row objects to a list of lists:

In [20]:
list(map(lambda r: [r['domain'], r['count']], rows))

[['pandas.pydata.org', 1266737],
 ['databricks.com', 129125],
 ['datarobot.com', 26387],
 ['github.com', 129262],
 ['google.com', 255543],
 ['scala-lang.org', 1],
 ['wikipedia.org', 5019691],
 ['python.org', 2521367],
 ['dtu.dk', 255711],
 ['spark.apache.org', 51722]]

Careful when collecting. All data is moved to one worker, so for large dataframes it will be very slow or crash Spark. Collect is intended for being used AFTER we've used Spark to compute an aggregate dataframe.

Writing is as easy as reading:

In [22]:
df.write.csv('data/demo')

Spark writes as many files as there are partitions. Use coalesce to write one file. But be careful, all data is sent to one worker.

In [24]:
df.coalesce(1).write.csv('data/demo2')

# Broadcast joins
When you want to join two dataframes, Spark will do the following (as of Spark 2.2):

- Sort the two dataframes according to the columns in the join condition
- Locally merge join data on partitions

Sorting is a very costly operation in distributed computing. It requires multiple passes over the data and a shuffle.

If the dataframe on the right hand side of the join is very small, you can tell Spark to do a broadcast join, in which it will do the following:

- Create a hash table mapping from the values in the columns of the join condition and send it to each worker
- Locally hash join data on partitions

The broadcast avoids the shuffle entirely.

Consider the plans created by the following two Spark programs.

In [26]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df = spark.read.option('header', True).option('inferSchema', True).csv('data/titanic.csv')

max_fare_per_class = df.groupBy('Pclass')\
    .agg(f.max('Age').alias('MaxAge'))\
    .select('MaxAge', f.col('Pclass').alias('Pclass_'))

df.join(max_fare_per_class, (f.col('Age') == f.col('MaxAge')) & (f.col('Pclass') == f.col('Pclass_')), 'inner')\
    .drop('Pclass_')\
    .explain()

== Physical Plan ==
*(6) Project [PassengerId#2412, Survived#2413, Pclass#2414, Name#2415, Sex#2416, Age#2417, SibSp#2418, Parch#2419, Ticket#2420, Fare#2421, Cabin#2422, Embarked#2423, MaxAge#2449]
+- *(6) SortMergeJoin [Age#2417, Pclass#2414], [MaxAge#2449, Pclass_#2452], Inner
   :- *(2) Sort [Age#2417 ASC NULLS FIRST, Pclass#2414 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(Age#2417, Pclass#2414, 200)
   :     +- *(1) Project [PassengerId#2412, Survived#2413, Pclass#2414, Name#2415, Sex#2416, Age#2417, SibSp#2418, Parch#2419, Ticket#2420, Fare#2421, Cabin#2422, Embarked#2423]
   :        +- *(1) Filter (isnotnull(Age#2417) && isnotnull(Pclass#2414))
   :           +- *(1) FileScan csv [PassengerId#2412,Survived#2413,Pclass#2414,Name#2415,Sex#2416,Age#2417,SibSp#2418,Parch#2419,Ticket#2420,Fare#2421,Cabin#2422,Embarked#2423] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/data/titanic.csv], PartitionFilters: [], PushedFilters: [IsNotN

In [27]:
df.join(f.broadcast(max_fare_per_class), (f.col('Age') == f.col('MaxAge')) & (f.col('Pclass') == f.col('Pclass_')), 'inner')\
    .drop('Pclass_')\
    .explain()

== Physical Plan ==
*(3) Project [PassengerId#2412, Survived#2413, Pclass#2414, Name#2415, Sex#2416, Age#2417, SibSp#2418, Parch#2419, Ticket#2420, Fare#2421, Cabin#2422, Embarked#2423, MaxAge#2449]
+- *(3) BroadcastHashJoin [Age#2417, Pclass#2414], [MaxAge#2449, Pclass_#2452], Inner, BuildRight
   :- *(3) Project [PassengerId#2412, Survived#2413, Pclass#2414, Name#2415, Sex#2416, Age#2417, SibSp#2418, Parch#2419, Ticket#2420, Fare#2421, Cabin#2422, Embarked#2423]
   :  +- *(3) Filter (isnotnull(Age#2417) && isnotnull(Pclass#2414))
   :     +- *(3) FileScan csv [PassengerId#2412,Survived#2413,Pclass#2414,Name#2415,Sex#2416,Age#2417,SibSp#2418,Parch#2419,Ticket#2420,Fare#2421,Cabin#2422,Embarked#2423] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/data/titanic.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Age), IsNotNull(Pclass)], ReadSchema: struct<PassengerId:int,Survived:boolean,Pclass:int,Name:string,Sex:string,Age:double,SibSp:int,Pa...
   +

Spark will automatically use the broadcast join if one of the tables is sufficiently small.