---

<div style="float: right;margin-right: 1em; margin-bottom: 3em;">
<a href="https://pybcn.org">
   <img src="Images/pybcn.png" width=50>
</a>
    
<a href="https://engineering.affectv.com">
    <img src="Images/affectv.png" width=100 style="margin-top: 1em;">
</a>
</div>

# The Magic of **Py**Spark

### _An introduction to PySpark_

&nbsp;

---

You only need to do the following if you are running in Binder (or have not downloaded the soccer dataset). This is around 5MB

In [1]:
import urllib.request
url = "https://projects.fivethirtyeight.com/soccer-api/club/spi_matches.csv"
data = urllib.request.urlopen(url).read()

with open("spi_matches.csv", "wb") as f:
    f.write(data)

# Creating a `SparkSession` and loading some data

The first step is to create a `SparkSession` object we can interact with. It is customary to call it `spark`

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Since the dataset is in the local filesystem, **we need to load it**. Not only Spark comes with many different types of loaders, but also handles parallelisation: if our dataset was stored in a distributed store (like HDFS or S3), each worker would fetch only a piece of the data.

You can pass additional `option` to readers, in this case I know this CSV file comes with headers, hence I want the reader to handle the first row differently. We could also pass a different delimiter (like tabs, or pipes) or what to do with quotes. Check the documentation of each reader to know what options are available, there are many!

*NOTE*: when loading distributed CSVs, each CSV file should come with the header. Also note that CSV is a horrible format to use!

*TIP*: [other loaders](https://spark.apache.org/docs/latest/sql-data-sources.html) (or, datasources as they are known in Spark) have pretty straightforward names, like [`.parquet()`](https://spark.apache.org/docs/latest/sql-data-sources-parquet.html) and [`.json()`](https://spark.apache.org/docs/latest/sql-data-sources-json.html) (the latter expects one-json-per-line)

In [3]:
dataframe = spark.read.option("header", True).csv("spi_matches.csv")

We don't know much about this file aside from it being CSV. First we can do is to check what the columns are named. The advantage of reading with the read method is that all will come tagged with names, a disadvantage may be the types: everything will show as `string`

In [4]:
dataframe.printSchema()

root
 |-- date: string (nullable = true)
 |-- league_id: string (nullable = true)
 |-- league: string (nullable = true)
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- spi1: string (nullable = true)
 |-- spi2: string (nullable = true)
 |-- prob1: string (nullable = true)
 |-- prob2: string (nullable = true)
 |-- probtie: string (nullable = true)
 |-- proj_score1: string (nullable = true)
 |-- proj_score2: string (nullable = true)
 |-- importance1: string (nullable = true)
 |-- importance2: string (nullable = true)
 |-- score1: string (nullable = true)
 |-- score2: string (nullable = true)
 |-- xg1: string (nullable = true)
 |-- xg2: string (nullable = true)
 |-- nsxg1: string (nullable = true)
 |-- nsxg2: string (nullable = true)
 |-- adj_score1: string (nullable = true)
 |-- adj_score2: string (nullable = true)



Now, we may want to see how the data looks like. In general you would use the [`show`](https://spark.apache.org/docs/2.4.4/api/python/pyspark.sql.html#pyspark.sql.DataFrame.show) method on a dataframe, which is similar to Pandas'. We can also use [`describe`](https://spark.apache.org/docs/2.4.4/api/python/pyspark.sql.html#pyspark.sql.DataFrame.describe) to compute some statistics per column

In [5]:
dataframe.show(5)

+----------+---------+--------------------+--------------+--------------------+-----+-----+------+------+-------+-----------+-----------+-----------+-----------+------+------+----+----+-----+-----+----------+----------+
|      date|league_id|              league|         team1|               team2| spi1| spi2| prob1| prob2|probtie|proj_score1|proj_score2|importance1|importance2|score1|score2| xg1| xg2|nsxg1|nsxg2|adj_score1|adj_score2|
+----------+---------+--------------------+--------------+--------------------+-----+-----+------+------+-------+-----------+-----------+-----------+-----------+------+------+----+----+-----+-----+----------+----------+
|2016-08-12|     1843|      French Ligue 1|        Bastia| Paris Saint-Germain|51.16|85.68|0.0463| 0.838| 0.1157|       0.91|       2.36|       32.4|       67.7|     0|     1|0.97|0.63| 0.43| 0.45|       0.0|      1.05|
|2016-08-12|     1843|      French Ligue 1|     AS Monaco|            Guingamp|68.85|56.48|0.5714|0.1669| 0.2617|       

In [6]:
dataframe.describe().show(5)

+-------+----------+------------------+--------------------+--------------------+--------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|      date|         league_id|              league|               team1|               team2|              spi1|              spi2|              prob1|              prob2|            probtie|        proj_score1|       proj_score2|       importance1|       importance2|            score1|            score2|               xg1|               xg2|             nsxg1|             nsxg2|        adj_score1|        adj_score2|
+-------+----------+------------------+--------------------+--------------------+--------------------+------------------+------------------+

You may have noticed that the second operation (`describe().show()`) was slower. Why was that? Spark operations are split in _actions_ and _transformations_. 
- *Transformations*: "explain" or "define" an operation. When you do `desc = df.describe()`, you are just stating _desc should contain the describe operation on df_. That's just a description, and takes essentially zero time: you are not requesting to _see it_ or _store it_ or...
- *Actions*: Actions "require" something to be done. They show, store, or in some sense, operate (in functional programming you'd call them _effectful_)
---

And, yes, the output looks a bit horrible `¯\_(ツ)_/¯`. It is clear though that some of these columns are actually float/double, others are ints and others are indeed strings. And the first looks pretty much a datetime. First step would be to change the types to the correct ones. 

There are several ways to do this, the easiest one is to just select what we want and cast

## Converting column datatypes

In [7]:
dataframe_with_1_type = dataframe.select(dataframe["league_id"].cast("int"))
dataframe_with_1_other_type = dataframe.select(dataframe["xg1"].cast("float"))

In Spark, there are several ways of selecting a column:
- by name
like `select("columnName")` or `select(col("columnName"))`
- by "named reference"
like `select(dataframe.columnName)` or `select(dataframe["columnName"]`
- _if you were using Scala you'd have two more_

The first is more generic: it is referencing _a_ column with that name, but _not necessarily in a specific dataframe_. This distinction can be important in some machine learning algorithms, where additional metadata is added _to a column IN a dataframe_. 

In [8]:
dataframe_with_1_type.printSchema()
dataframe_with_1_type.show(5)

root
 |-- league_id: integer (nullable = true)

+---------+
|league_id|
+---------+
|     1843|
|     1843|
|     2411|
|     2411|
|     2411|
+---------+
only showing top 5 rows



In [9]:
dataframe_with_1_other_type.printSchema()
dataframe_with_1_other_type.show(5)

root
 |-- xg1: float (nullable = true)

+----+
| xg1|
+----+
|0.97|
|2.45|
|0.85|
|1.11|
|0.73|
+----+
only showing top 5 rows



What would happen if we messed up with the types…?

In [10]:
dataframe_with_wrong_type = dataframe.select(dataframe["xg1"].cast("int"))
dataframe_with_wrong_type.printSchema()
dataframe_with_wrong_type.show(5)

root
 |-- xg1: integer (nullable = true)

+---+
|xg1|
+---+
|  0|
|  2|
|  0|
|  1|
|  0|
+---+
only showing top 5 rows



In general, for types offering it, casting will either truncate or convert. So, going from _should be a float_ to `int` will just round. This won't be the case for types not offering it though, the conversion will be _really bad_.

In [11]:
dataframe_with_very_wrong_type = dataframe.select(dataframe["league"].cast("int"))
dataframe_with_very_wrong_type.printSchema()
dataframe_with_very_wrong_type.show(5)

root
 |-- league: integer (nullable = true)

+------+
|league|
+------+
|  null|
|  null|
|  null|
|  null|
|  null|
+------+
only showing top 5 rows



Note that using strings is error prone: what if you typed `cast("fioat")`? It is better to use the built in type objects

In [12]:
from pyspark.sql.types import FloatType, IntegerType, StringType, DateType

dataframe_with_1_type_2 = dataframe.select(dataframe["league_id"].cast(IntegerType()))
dataframe_with_1_type_2.printSchema()
dataframe_with_1_type_2.show(5)

root
 |-- league_id: integer (nullable = true)

+---------+
|league_id|
+---------+
|     1843|
|     1843|
|     2411|
|     2411|
|     2411|
+---------+
only showing top 5 rows



Now, we don't really need all columns from this dataframe, but only the following:

- date
- league
- team1
- team2 
- score1
- score2

We can select on a list, as easily as `select(col_1, col_2)`, and we'd also like to convert to the correct types on the go. Shouldn't be hard in Python

In [13]:
conv = {
    "date": DateType(),
    "league": StringType(),
    "team1": StringType(),
    "team2": StringType(),
    "score1": IntegerType(),
    "score2": IntegerType(),
}

selection = [dataframe[k].cast(v) for k, v in conv.items()]

df = dataframe.select(*selection)  # reminder: *list unpacks a list

In [14]:
df.printSchema()
df.show(5)
df.describe().show()

root
 |-- date: date (nullable = true)
 |-- league: string (nullable = true)
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- score1: integer (nullable = true)
 |-- score2: integer (nullable = true)

+----------+--------------------+--------------+--------------------+------+------+
|      date|              league|         team1|               team2|score1|score2|
+----------+--------------------+--------------+--------------------+------+------+
|2016-08-12|      French Ligue 1|        Bastia| Paris Saint-Germain|     0|     1|
|2016-08-12|      French Ligue 1|     AS Monaco|            Guingamp|     2|     2|
|2016-08-13|Barclays Premier ...|     Hull City|      Leicester City|     2|     1|
|2016-08-13|Barclays Premier ...|Crystal Palace|West Bromwich Albion|     0|     1|
|2016-08-13|Barclays Premier ...|       Everton|   Tottenham Hotspur|     1|     1|
+----------+--------------------+--------------+--------------------+------+------+
only showing t

*Note*: we could have used the `inferSchema` setting in the CSV loader, though. But:
- We wouldn't have an excuse to explain how to manually type columns
- It makes _two_ passes on the data, which is usually one more than needed
- It may still mess up if there is some string lurking

In [15]:
dataframe_inferred = spark.read.option("header", True).option("inferSchema", True).csv("spi_matches.csv")
dataframe_inferred.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- league_id: integer (nullable = true)
 |-- league: string (nullable = true)
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- spi1: double (nullable = true)
 |-- spi2: double (nullable = true)
 |-- prob1: double (nullable = true)
 |-- prob2: double (nullable = true)
 |-- probtie: double (nullable = true)
 |-- proj_score1: double (nullable = true)
 |-- proj_score2: double (nullable = true)
 |-- importance1: double (nullable = true)
 |-- importance2: double (nullable = true)
 |-- score1: integer (nullable = true)
 |-- score2: integer (nullable = true)
 |-- xg1: double (nullable = true)
 |-- xg2: double (nullable = true)
 |-- nsxg1: double (nullable = true)
 |-- nsxg2: double (nullable = true)
 |-- adj_score1: double (nullable = true)
 |-- adj_score2: double (nullable = true)



Now, we may want to filter a specific league, like the Premier League, and count how many matches are there

# Operating on data (filtering, transforming, ordering...), SparkSQL

You should eventually be familiar, a bit, with what is available in `pyspark.sql.functions`. Even when working in Python, [I check the Scala reference](https://spark.apache.org/docs/2.4.4/api/scala/index.html#org.apache.spark.sql.functions$) instead of the Python one (maybe because I usually work in Scala, though)

In [16]:
from pyspark.sql.functions import col

premier = df.where(col("league").contains("Barclays"))
premier.count()

1520

That's a lot of matches, I'd like to figure out how many years of data we have

In [17]:
from pyspark.sql.functions import date_trunc

premier.select(date_trunc("year", col("date")).alias("year")).distinct().orderBy(col("date")).show()

+-------------------+
|               year|
+-------------------+
|2016-01-01 00:00:00|
|2017-01-01 00:00:00|
|2018-01-01 00:00:00|
|2019-01-01 00:00:00|
|2020-01-01 00:00:00|
+-------------------+



I have introduced many things here! For one, a function to work on a column, using distinct and ordering. But they should be pretty clear, this looks mostly like SQL, but as functions. Huh, wait... this reminds me...

In [18]:
premier.createOrReplaceTempView("premier")

spark.sql("SELECT DISTINCT date_trunc('year', date) AS year FROM premier ORDER BY year").show()

+-------------------+
|               year|
+-------------------+
|2016-01-01 00:00:00|
|2017-01-01 00:00:00|
|2018-01-01 00:00:00|
|2019-01-01 00:00:00|
|2020-01-01 00:00:00|
+-------------------+



As you can see, Spark offers **SparkSQL**, allowing you to write your queries in pretty much standard SQL (there are some minor differences) instead of as methods on a dataframe. First you register your tables, then you operate on them

*NOTE*: Using methods on a dataframe is a better approach in general, since modularity, testing and specially _parsing_ are important. See next cell

In [19]:
# try to run this

spark.sql("SELCT DISTINCT date_trunc('year', date) AS year FROM premier ORDER BY year").show()

ParseException: "\nmismatched input 'SELCT' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0)\n\n== SQL ==\nSELCT DISTINCT date_trunc('year', date) AS year FROM premier ORDER BY year\n^^^\n"

It will fail very badly (with a stacktrace from Scala, showing that you referenced a non-existing SQL method). This error should be caught by pylint/mypy/pyflakes or any other Python syntax checker, if you use the methods on a dataframe and write `df.selct` instead of `df.select`

If we want to do anything with the data, we don't want matches in the future. Let's check if we can filter them out somehow without needing to know when we downloaded this.

In [21]:
premier.where("date_trunc('year', date) = '2020'").show(5)

+----------+--------------------+--------------------+-----------------+------+------+
|      date|              league|               team1|            team2|score1|score2|
+----------+--------------------+--------------------+-----------------+------+------+
|2020-01-01|Barclays Premier ...|Brighton and Hove...|          Chelsea|  null|  null|
|2020-01-01|Barclays Premier ...|             Burnley|      Aston Villa|  null|  null|
|2020-01-01|Barclays Premier ...|             Watford|    Wolverhampton|  null|  null|
|2020-01-01|Barclays Premier ...|           Newcastle|   Leicester City|  null|  null|
|2020-01-01|Barclays Premier ...|         Southampton|Tottenham Hotspur|  null|  null|
+----------+--------------------+--------------------+-----------------+------+------+
only showing top 5 rows



As you can see, you can use SQL-like expressions in `where` method calls, too. Like above, this should be discouraged, but when exploring data it is faster to type, so it's not that bad. But make sure not to use it in production code!

Also, _clearly_ we can remove future matches by searching for null scores

In [22]:
current_premier = premier.where("score1 is not null")

In [23]:
current_premier.select(date_trunc("year", col("date")).alias("year")).distinct().orderBy(col("date")).show()

+-------------------+
|               year|
+-------------------+
|2016-01-01 00:00:00|
|2017-01-01 00:00:00|
|2018-01-01 00:00:00|
|2019-01-01 00:00:00|
+-------------------+



_Clearly_ is, in general, not good enough. 

In [24]:
premier.where("score1 is null")\
       .select(date_trunc("year", col("date")).alias("year")).distinct().orderBy(col("date")).show()

+-------------------+
|               year|
+-------------------+
|2019-01-01 00:00:00|
|2020-01-01 00:00:00|
+-------------------+



Did we find some weird anomaly in the data? How does it look like? Here we use again SQL-where, which is faster to write when exploring data

In [25]:
premier.where("date_trunc('year', date) = '2019' and score1 is null").show(5)

+----------+--------------------+--------------------+-----------------+------+------+
|      date|              league|               team1|            team2|score1|score2|
+----------+--------------------+--------------------+-----------------+------+------+
|2019-11-23|Barclays Premier ...|     West Ham United|Tottenham Hotspur|  null|  null|
|2019-11-23|Barclays Premier ...|             Everton|     Norwich City|  null|  null|
|2019-11-23|Barclays Premier ...|             Arsenal|      Southampton|  null|  null|
|2019-11-23|Barclays Premier ...|             Watford|          Burnley|  null|  null|
|2019-11-23|Barclays Premier ...|Brighton and Hove...|   Leicester City|  null|  null|
+----------+--------------------+--------------------+-----------------+------+------+
only showing top 5 rows



Oh, of course: `date_trunc` just keeps the year and sets 1st of January. The rest of the current year should have no data, obviously. Let's confirm this:

In [26]:
premier.where("date_trunc('year', date) = '2019' and score1 is null").groupBy("date").count().orderBy("date").show()

+----------+-----+
|      date|count|
+----------+-----+
|2019-11-23|    8|
|2019-11-24|    1|
|2019-11-25|    1|
|2019-11-30|    6|
|2019-12-01|    4|
|2019-12-03|    2|
|2019-12-04|    6|
|2019-12-05|    2|
|2019-12-07|    5|
|2019-12-08|    4|
|2019-12-09|    1|
|2019-12-14|    6|
|2019-12-15|    3|
|2019-12-16|    1|
|2019-12-21|    8|
|2019-12-22|    2|
|2019-12-26|    9|
|2019-12-27|    1|
|2019-12-28|    7|
|2019-12-29|    3|
+----------+-----+



Makes sense, actually. If this was part of a data pipeline, this last check should be part of the code, with some business-level validation. For this to make sense, you need to know your data, your data's SLAs, etc: in this case the validation could have been "no nulls for dates before today".

To be fair, though, this has been a pain of copy pasting, and copy pasting is in general... bad. How would we do this in a function?

In [27]:
def years_in_data(dataframe):
    return dataframe.select(date_trunc("year", col("date")).alias("year")).distinct().orderBy(col("date"))

years_in_data(premier).show()

+-------------------+
|               year|
+-------------------+
|2016-01-01 00:00:00|
|2017-01-01 00:00:00|
|2018-01-01 00:00:00|
|2019-01-01 00:00:00|
|2020-01-01 00:00:00|
+-------------------+



Easy peasy, dataframes are in the end just Python objects we can pass around, manipulate, etc. 

# Groups, Aggregates, using `explain`

Let's do some basic analysis: which team has won more matches in this period?

In [28]:
from pyspark.sql.functions import when

either_winner = current_premier.select("team1", 
                                       "team2", 
                                       when(col("score1") > col("score2"), True).otherwise(False).alias("win1"),
                                       when(col("score2") > col("score1"), True).otherwise(False).alias("win2"),)

In [29]:
either_winner.printSchema()
either_winner.show(5)

root
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- win1: boolean (nullable = false)
 |-- win2: boolean (nullable = false)

+--------------+--------------------+-----+-----+
|         team1|               team2| win1| win2|
+--------------+--------------------+-----+-----+
|     Hull City|      Leicester City| true|false|
|Crystal Palace|West Bromwich Albion|false| true|
|       Everton|   Tottenham Hotspur|false|false|
|       Burnley|        Swansea City|false| true|
|   Southampton|             Watford|false|false|
+--------------+--------------------+-----+-----+
only showing top 5 rows



In [30]:
winners = either_winner.where(col("win1") | col("win2"))\
                       .withColumn("winner", 
                                   when(col("win1"), col("team1")).otherwise(col("team2")))\
                       .drop("team1", "team2", "win1", "win2")

In [31]:
winners.printSchema()
winners.show(5)

root
 |-- winner: string (nullable = true)

+--------------------+
|              winner|
+--------------------+
|           Hull City|
|West Bromwich Albion|
|        Swansea City|
|     Manchester City|
|   Manchester United|
+--------------------+
only showing top 5 rows



This was a lot to digest. The process has been:
- Create a boolean column for home win
- Create a boolean column for away win
- Filter out draws
- Pick the winner according to who was the home or away team

*Note*: This could have been done just with a `when` query, or using many other options, but I wanted to show a few more operations, also putting everything in a single `when` whould have been unreadable.

In [32]:
from pyspark.sql.functions import desc

best = winners.groupBy("winner").count().orderBy(desc("count"))

You may be thinking
> I have heard grouping is bad in Spark, because it is very slow.

That's not wrong, but is not 100% right either. `groupByKey` is the bad one (on RDDs), because it actually builds the grouped data: that is in general very bad. `groupBy` generates only a logical ("transformation style") grouping, when you add an aggregation function (like we did by adding `count`, the generic one would be `.agg(agg_function, column)`) what will happen when an action appears is similar to a `reduceByKey`, in other words, the groups are never created:

---

#### Differences between `groupByKey` vs `groupBy` (or `reduceByKey`)

Imagine we have this dataset: `(a, 1), (b, 2), (a, 12), (a, 13)` and we `groupBy` the first column. This first step would generate a dataset similar to `a: [(a, 1), (a, 12), (a, 13)], b: [(b, 2)]` and would require all data for key `a` to fit in one machine. If we then count, it will count the array length per key, `(a, 3, b, 1)`.

`reduceByKey` would be an equivalent way of cheaply aggregating in an RDD setting (first you'd run , and we'd need to offer the `reduce` function, a function that "does something" for each element with that key. In this case, it would just be adding 1 per element.

But, since you should probably be using Dataframes only, you'd want to use `groupBy`. When you run it, nothing happens (it is a transformation), but when we run `groupBy().count()` (followeded by an action, since counting groups is a transformation as well) the full group will never, ever, appear in memory: each time an element is "checked", the counter for the key will be incremented and we will directly get `(a, 3), (b, 1)`.

---

In [33]:
best.show()

+--------------------+-----+
|              winner|count|
+--------------------+-----+
|     Manchester City|   95|
|           Liverpool|   84|
|             Chelsea|   80|
|   Tottenham Hotspur|   75|
|             Arsenal|   67|
|   Manchester United|   66|
|             Everton|   49|
|      Leicester City|   47|
|      Crystal Palace|   41|
|     West Ham United|   40|
|             Burnley|   40|
|     AFC Bournemouth|   40|
|             Watford|   37|
|         Southampton|   30|
|           Newcastle|   28|
|Brighton and Hove...|   22|
|        Swansea City|   20|
|       Wolverhampton|   19|
|          Stoke City|   18|
|West Bromwich Albion|   18|
+--------------------+-----+
only showing top 20 rows



This `show` took longer than the others, what is going on? To figure it out we can use `explain` (and some other methods)

In [34]:
best.explain()

== Physical Plan ==
*(3) Sort [count#2219L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count#2219L DESC NULLS LAST, 200)
   +- *(2) HashAggregate(keys=[winner#2201], functions=[count(1)])
      +- Exchange hashpartitioning(winner#2201, 200)
         +- *(1) HashAggregate(keys=[winner#2201], functions=[partial_count(1)])
            +- *(1) Project [CASE WHEN CASE WHEN (cast(score1#24 as int) > cast(score2#25 as int)) THEN true ELSE false END THEN team1#13 ELSE team2#14 END AS winner#2201]
               +- *(1) Filter ((((isnotnull(score1#24) && isnotnull(league#12)) && Contains(league#12, Barclays)) && isnotnull(cast(score1#24 as int))) && (CASE WHEN (cast(score1#24 as int) > cast(score2#25 as int)) THEN true ELSE false END || CASE WHEN (cast(score2#25 as int) > cast(score1#24 as int)) THEN true ELSE false END))
                  +- *(1) FileScan csv [league#12,team1#13,team2#14,score1#24,score2#25] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan

In this we can find all the steps we have applied on top of the CSV: selection (that's `Project`), casting... There is some magic going here, with filters and operations being pushed down as closer to the data as possible. If we hadn't been `show`ing so often, the process would have been:
- Read the data row-by-row (or batch by batch), ignore columns we have removed and apply the required `NotNull` filters before casting
- Apply `when` conditions
- Aggregate and count

You can get additional information with


In [35]:
best.explain(extended=True)

== Parsed Logical Plan ==
'Sort ['count DESC NULLS LAST], true
+- Aggregate [winner#2201], [winner#2201, count(1) AS count#2219L]
   +- Project [winner#2201]
      +- Project [team1#1572, team2#1573, win1#2177, win2#2178, CASE WHEN win1#2177 THEN team1#1572 ELSE team2#1573 END AS winner#2201]
         +- Filter (win1#2177 || win2#2178)
            +- Project [team1#1572, team2#1573, CASE WHEN (score1#1574 > score2#1575) THEN true ELSE false END AS win1#2177, CASE WHEN (score2#1575 > score1#1574) THEN true ELSE false END AS win2#2178]
               +- Filter isnotnull(score1#1574)
                  +- Filter Contains(league#1571, Barclays)
                     +- Project [cast(date#10 as date) AS date#1570, cast(league#12 as string) AS league#1571, cast(team1#13 as string) AS team1#1572, cast(team2#14 as string) AS team2#1573, cast(score1#24 as int) AS score1#1574, cast(score2#25 as int) AS score2#1575]
                        +- Relation[date#10,league_id#11,league#12,team1#13,team2#1

---

In Scala you can do some more fancy stuff with the plans, but usually you shouldn't anyway

---

Now, imagine that we want to report on results (what we had in `premier`) together with the total number of wins for the team. And, that we don't want to use a `Window` function. I.e. we have an excuse to learn about `join` in Spark!

In [36]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) #  This will be explained later

joined = premier.alias("premier").join(best.alias("best1"), col("best1.winner") == col("premier.team1"))\
                                 .withColumnRenamed("count", "team1_wins")\
                                 .drop(col("best1.winner"))\
                                 .join(best.alias("best2"), col("best2.winner") == col("premier.team2"))\
                                 .withColumnRenamed("count", "team2_wins")\
                                 .drop(col("best2.winner")) 

Writing joins without resorting to SQL needs some practice and some tips:
- I recommend adding aliases to the tables, since it makes joins clearer, especially if there are columns _with the same name_ in both tables but you only join on a few
- Please, please try to remove/rename columns with the same names _before_ the join, it will make your life easier
- Note that the name of format `table.column` is only relevant for some methods (the ones using columns as inputs instead of strings). This can be a source of confusion (especially since `withColumnRenamed` doesn't fail if the column does not exist)

In [37]:
joined.printSchema()
joined.show()

root
 |-- date: date (nullable = true)
 |-- league: string (nullable = true)
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- score1: integer (nullable = true)
 |-- score2: integer (nullable = true)
 |-- team1_wins: long (nullable = false)
 |-- team2_wins: long (nullable = false)

+----------+--------------------+-----------------+------------+------+------+----------+----------+
|      date|              league|            team1|       team2|score1|score2|team1_wins|team2_wins|
+----------+--------------------+-----------------+------------+------+------+----------+----------+
|2017-05-13|Barclays Premier ...|       Sunderland|Swansea City|     0|     2|         6|        20|
|2017-04-30|Barclays Premier ...|Manchester United|Swansea City|     1|     1|        66|        20|
|2018-03-31|Barclays Premier ...|Manchester United|Swansea City|     2|     0|        66|        20|
|2016-10-15|Barclays Premier ...|          Arsenal|Swansea City|     3|     2|    

In [38]:
joined.explain()

== Physical Plan ==
*(13) Project [date#1570, league#12, team1#13, team2#14, score1#1574, score2#1575, team1_wins#2260L, count#2219L AS team2_wins#2303L]
+- *(13) SortMergeJoin [team2#14], [winner#2201], Inner
   :- *(8) Sort [team2#14 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(team2#14, 200)
   :     +- *(7) Project [date#1570, league#12, team1#13, team2#14, score1#1574, score2#1575, count#2219L AS team1_wins#2260L]
   :        +- *(7) SortMergeJoin [team1#13], [winner#2201], Inner
   :           :- *(2) Sort [team1#13 ASC NULLS FIRST], false, 0
   :           :  +- Exchange hashpartitioning(team1#13, 200)
   :           :     +- *(1) Project [cast(date#10 as date) AS date#1570, league#12, team1#13, team2#14, cast(score1#24 as int) AS score1#1574, cast(score2#25 as int) AS score2#1575]
   :           :        +- *(1) Filter (((isnotnull(league#12) && Contains(league#12, Barclays)) && isnotnull(team1#13)) && isnotnull(team2#14))
   :           :           +- *(1) Fil

---
[`SortMergeJoin`](https://en.wikipedia.org/wiki/Sort-merge_join) is [one of the methods](https://github.com/apache/spark/tree/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins) for joining two tables, when they are sorted. In this case, the Spark query planner has decided sorting these tables for the join is the best way of doing it.

*TRIVIA*: The "gold standard" for distributed joins is the (distributed) [hash join](https://en.wikipedia.org/wiki/Hash_join). Hash join is used locally also when data is too large for memory.

In this case, Spark would also have been smart enough to use a *broadcast join*, since the tables are small (this is why I disabled it before, I wanted to show how to force it). Sometimes, it can be necessary to hint to Spark that one of the tables is significantly smaller, and broadcasting is cheaper. You'd do it as follows:

In [39]:
from pyspark.sql.functions import broadcast

joined_2 = premier.alias("premier").join(broadcast(best.alias("best1")), col("best1.winner") == col("premier.team1"))\
                                 .withColumnRenamed("count", "team1_wins")\
                                 .drop("winner")\
                                 .join(broadcast(best.alias("best2")), col("best2.winner") == col("premier.team2"))\
                                 .withColumnRenamed("count", "team2_wins")\
                                 .drop("winner") 

In [40]:
joined_2.explain()

== Physical Plan ==
*(7) Project [date#1570, league#12, team1#13, team2#14, score1#1574, score2#1575, team1_wins#2381L, count#2219L AS team2_wins#2424L]
+- *(7) BroadcastHashJoin [team2#14], [winner#2201], Inner, BuildRight
   :- *(7) Project [date#1570, league#12, team1#13, team2#14, score1#1574, score2#1575, count#2219L AS team1_wins#2381L]
   :  +- *(7) BroadcastHashJoin [team1#13], [winner#2201], Inner, BuildRight
   :     :- *(7) Project [cast(date#10 as date) AS date#1570, league#12, team1#13, team2#14, cast(score1#24 as int) AS score1#1574, cast(score2#25 as int) AS score2#1575]
   :     :  +- *(7) Filter (((isnotnull(league#12) && Contains(league#12, Barclays)) && isnotnull(team1#13)) && isnotnull(team2#14))
   :     :     +- *(7) FileScan csv [date#10,league#12,team1#13,team2#14,score1#24,score2#25] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/spi_matches.csv], PartitionFilters: [], PushedFilters: [IsNotNull(league), StringContains(league,Bar

This hinted Spark to use a `BroadcastHashJoin` which is in general more efficient if tables are small: less shuffling means faster operations.

# UDFs (User-Defined functions)

So far, we have operated mostly in the JVM. When using the Dataframe API, only `show` and `count` have sent anything significant to the Pythonk side of operations, all else has actually been running in the Spark core. That's actually pretty good! The serialization cost of sending data back and forth between Python and the JVM is large.

UDFs, functions defined in Python that operate in Spark data can, thus, be _extremely_ slow, especially when compared with similar functions available in `spark.sql.functions`. There are some improvements in Spark 2.4 and greater, which we'll see now.

In [41]:
from pyspark.sql.types import BooleanType

def home_win(home_score, away_score):
    return home_score > away_score
spark.udf.register("home_win_python", home_win, BooleanType()) #  It's better if we can have the type!

<function __main__.home_win(home_score, away_score)>

In [42]:
udf_example = current_premier.select("team1", "team2", "score1", "score2")\
                             .withColumn("home_win", home_win(col("score1"), col("score2")))

udf_example.printSchema()
udf_example.show(5)

root
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- score1: integer (nullable = true)
 |-- score2: integer (nullable = true)
 |-- home_win: boolean (nullable = true)

+--------------+--------------------+------+------+--------+
|         team1|               team2|score1|score2|home_win|
+--------------+--------------------+------+------+--------+
|     Hull City|      Leicester City|     2|     1|    true|
|Crystal Palace|West Bromwich Albion|     0|     1|   false|
|       Everton|   Tottenham Hotspur|     1|     1|   false|
|       Burnley|        Swansea City|     0|     1|   false|
|   Southampton|             Watford|     1|     1|   false|
+--------------+--------------------+------+------+--------+
only showing top 5 rows



---

You can also use a decorator for defining UDFs

In [43]:
from pyspark.sql.functions import udf

@udf(BooleanType())
def away_win(home_score, away_score):
    return home_score < away_score

In [44]:
udf_example_2 = current_premier.select("team1", "team2", "score1", "score2")\
                               .withColumn("away_win", away_win(col("score1"), col("score2")))

udf_example_2.printSchema()
udf_example_2.show(5)

root
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- score1: integer (nullable = true)
 |-- score2: integer (nullable = true)
 |-- away_win: boolean (nullable = true)

+--------------+--------------------+------+------+--------+
|         team1|               team2|score1|score2|away_win|
+--------------+--------------------+------+------+--------+
|     Hull City|      Leicester City|     2|     1|   false|
|Crystal Palace|West Bromwich Albion|     0|     1|    true|
|       Everton|   Tottenham Hotspur|     1|     1|   false|
|       Burnley|        Swansea City|     0|     1|    true|
|   Southampton|             Watford|     1|     1|   false|
+--------------+--------------------+------+------+--------+
only showing top 5 rows



A problem though is that these UDFs require moving data to and from the JVM, serializing and deserializing, which is slow. A faster way is using Pandas columnar UDFs, especially if you plan on doing any Pandas or Numpy computation


## Pandas vectorized UDFs

In [45]:
import pandas as pd

from pyspark.sql.functions import pandas_udf

@pandas_udf(returnType=BooleanType())
def tie(home_score, away_score):
    return home_score == away_score

In [46]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

udf_example_3 = current_premier.select("team1", "team2", "score1", "score2")\
                               .withColumn("tie", tie(col("score1"), col("score2")))
udf_example_3.printSchema()
udf_example_3.toPandas().head()

root
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- score1: integer (nullable = true)
 |-- score2: integer (nullable = true)
 |-- tie: boolean (nullable = true)





Unnamed: 0,team1,team2,score1,score2,tie
0,Hull City,Leicester City,2,1,False
1,Crystal Palace,West Bromwich Albion,0,1,False
2,Everton,Tottenham Hotspur,1,1,True
3,Burnley,Swansea City,0,1,False
4,Southampton,Watford,1,1,True


In [47]:
%%time
udf_example_3 = current_premier.select("team1", "team2", "score1", "score2")\
                               .withColumn("tie", tie(col("score1"), col("score2")))
udf_example_3.show(5)

+--------------+--------------------+------+------+-----+
|         team1|               team2|score1|score2|  tie|
+--------------+--------------------+------+------+-----+
|     Hull City|      Leicester City|     2|     1|false|
|Crystal Palace|West Bromwich Albion|     0|     1|false|
|       Everton|   Tottenham Hotspur|     1|     1| true|
|       Burnley|        Swansea City|     0|     1|false|
|   Southampton|             Watford|     1|     1| true|
+--------------+--------------------+------+------+-----+
only showing top 5 rows

CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 1.59 s


In [48]:
%%time
udf_example_2 = current_premier.select("team1", "team2", "score1", "score2")\
                               .withColumn("away_win", away_win(col("score1"), col("score2")))
udf_example_2.show(5)

+--------------+--------------------+------+------+--------+
|         team1|               team2|score1|score2|away_win|
+--------------+--------------------+------+------+--------+
|     Hull City|      Leicester City|     2|     1|   false|
|Crystal Palace|West Bromwich Albion|     0|     1|    true|
|       Everton|   Tottenham Hotspur|     1|     1|   false|
|       Burnley|        Swansea City|     0|     1|    true|
|   Southampton|             Watford|     1|     1|   false|
+--------------+--------------------+------+------+--------+
only showing top 5 rows

CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 469 ms


This didn't look faster to me

In [49]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [50]:
%%time
udf_example_3 = current_premier.select("team1", "team2", "score1", "score2")\
                               .withColumn("tie", tie(col("score1"), col("score2")))
udf_example_3.toPandas().head()

CPU times: user 0 ns, sys: 20 ms, total: 20 ms
Wall time: 1.63 s


Unnamed: 0,team1,team2,score1,score2,tie
0,Hull City,Leicester City,2,1,False
1,Crystal Palace,West Bromwich Albion,0,1,False
2,Everton,Tottenham Hotspur,1,1,True
3,Burnley,Swansea City,0,1,False
4,Southampton,Watford,1,1,True


In [51]:
%%time
udf_example_2 = current_premier.select("team1", "team2", "score1", "score2")\
                               .withColumn("away_win", away_win(col("score1"), col("score2")))
udf_example_2.toPandas().head()

CPU times: user 0 ns, sys: 30 ms, total: 30 ms
Wall time: 594 ms


Unnamed: 0,team1,team2,score1,score2,away_win
0,Hull City,Leicester City,2,1,False
1,Crystal Palace,West Bromwich Albion,0,1,True
2,Everton,Tottenham Hotspur,1,1,False
3,Burnley,Swansea City,0,1,True
4,Southampton,Watford,1,1,False


It is still not significantly faster, but this is due to how small this dataset is. Let's create some random data and compare the effect of the Arrow optimization

In [52]:
from pyspark.sql.functions import rand

df = spark.range(1 << 20).toDF("id").withColumn("x", rand()) 
# this is a large-ish random dataset, generated fully in the JVM. At least, largest that the match data!
df.count()

1048576

In [53]:
%%time
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
pandas_df = df.toPandas()

CPU times: user 7.65 s, sys: 1.31 s, total: 8.96 s
Wall time: 11.9 s


On my run, this took `13.5s` wall time

In [54]:
%%time
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_df = df.toPandas()

CPU times: user 50 ms, sys: 40 ms, total: 90 ms
Wall time: 1.09 s


And this took `686ms` for a _20x speedup_

We'd see similar speedups with using `pandas_udf`s, if our match dataset was larger.

In [55]:
from pyspark.sql.functions import randn, floor, abs

many_matches = spark.range(1 << 24).toDF("id")\
                    .withColumn("score1", floor(abs(randn())*2.5))\
                    .withColumn("score2", floor(abs(randn())*1.5)) # We might as well make it more natural

many_matches.show(5) #  Although it would look nicer with a toPandas().head(), this might make your session explode
many_matches.count()

+---+------+------+
| id|score1|score2|
+---+------+------+
|  0|     0|     1|
|  1|     2|     0|
|  2|     1|     0|
|  3|     0|     0|
|  4|     2|     0|
+---+------+------+
only showing top 5 rows



16777216

In [56]:
from pyspark.sql.functions import desc, col

In [57]:
spark.conf.set("spark.sql.execution.arrow.enabled", "false")

In [58]:
%%time
udf_example_no_pandas = many_matches.select("id", "score1", "score2")\
                                    .withColumn("away_win", away_win(col("score1"), col("score2")))
udf_example_no_pandas.orderBy(desc("id")).show() #  The orderBy is to prevent Spark from being too smart and optimising
udf_example_no_pandas.count()

+--------+------+------+--------+
|      id|score1|score2|away_win|
+--------+------+------+--------+
|16777215|     0|     1|    true|
|16777214|     0|     1|    true|
|16777213|     1|     2|    true|
|16777212|     5|     1|   false|
|16777211|     1|     0|   false|
|16777210|     0|     0|   false|
|16777209|     2|     1|   false|
|16777208|     4|     1|   false|
|16777207|     4|     0|   false|
|16777206|     1|     0|   false|
|16777205|     0|     2|    true|
|16777204|     3|     0|   false|
|16777203|     0|     0|   false|
|16777202|     0|     0|   false|
|16777201|     1|     2|    true|
|16777200|     0|     1|    true|
|16777199|     5|     0|   false|
|16777198|     0|     2|    true|
|16777197|     1|     0|   false|
|16777196|     0|     1|    true|
+--------+------+------+--------+
only showing top 20 rows

CPU times: user 10 ms, sys: 10 ms, total: 20 ms
Wall time: 55.1 s


16777216

In [59]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "100000")

@pandas_udf(returnType=BooleanType())
def away_win_pd(home_score, away_score):
    return home_score < away_score

In [60]:
%%time
udf_example_pandas = many_matches.select("id", "score1", "score2")\
                                 .withColumn("away_win", away_win_pd(col("score1"), col("score2")))
udf_example_pandas.orderBy(desc("id")).show()
udf_example_pandas.count()

+--------+------+------+--------+
|      id|score1|score2|away_win|
+--------+------+------+--------+
|16777215|     0|     1|    true|
|16777214|     0|     1|    true|
|16777213|     1|     2|    true|
|16777212|     5|     1|   false|
|16777211|     1|     0|   false|
|16777210|     0|     0|   false|
|16777209|     2|     1|   false|
|16777208|     4|     1|   false|
|16777207|     4|     0|   false|
|16777206|     1|     0|   false|
|16777205|     0|     2|    true|
|16777204|     3|     0|   false|
|16777203|     0|     0|   false|
|16777202|     0|     0|   false|
|16777201|     1|     2|    true|
|16777200|     0|     1|    true|
|16777199|     5|     0|   false|
|16777198|     0|     2|    true|
|16777197|     1|     0|   false|
|16777196|     0|     1|    true|
+--------+------+------+--------+
only showing top 20 rows

CPU times: user 10 ms, sys: 10 ms, total: 20 ms
Wall time: 11.5 s


16777216

Without Arrow and Pandas, this took around 35-40 seconds, vs 13-15 using Arrow.

In [61]:
udf_example_no_pandas.explain()

== Physical Plan ==
*(2) Project [id#2637L, score1#2641L, score2#2644L, pythonUDF0#2730 AS away_win#2670]
+- BatchEvalPython [away_win(score1#2641L, score2#2644L)], [id#2637L, score1#2641L, score2#2644L, pythonUDF0#2730]
   +- *(1) Project [id#2637L, score1#2641L, FLOOR((abs(randn(-8244966035173394950)) * 1.5)) AS score2#2644L]
      +- *(1) Project [id#2637L, FLOOR((abs(randn(-5461079002447256900)) * 2.5)) AS score1#2641L]
         +- *(1) Range (0, 16777216, step=1, splits=2)


In [62]:
udf_example_pandas.explain()

== Physical Plan ==
*(2) Project [id#2637L, score1#2641L, score2#2644L, pythonUDF0#2731 AS away_win#2702]
+- ArrowEvalPython [away_win_pd(score1#2641L, score2#2644L)], [id#2637L, score1#2641L, score2#2644L, pythonUDF0#2731]
   +- *(1) Project [id#2637L, score1#2641L, FLOOR((abs(randn(-8244966035173394950)) * 1.5)) AS score2#2644L]
      +- *(1) Project [id#2637L, FLOOR((abs(randn(-5461079002447256900)) * 2.5)) AS score1#2641L]
         +- *(1) Range (0, 16777216, step=1, splits=2)


As you can see, the only practical difference is `ArrowEvalPython` instead of `BatchEvalPython`

# The history server

If you are running locally (or in a dedicated cluster) you can access either master:4040 or master:18080 (or master:280888) to visit [Spark History Server](https://spark.apache.org/docs/latest/monitoring.html) where you can explore different tabs of information

<img src="Images/JobsView.png">

<img src="Images/StagesView.png">

Here you can see two queries of the SQL tab expanded (do they look familiar?)

<img src="Images/QueryPlan1.png" width=300 style="float:left;"> <img src="Images/QueryPlan2.png" width=300>

# Deployment and other tips

## Launching jobs

For both Scala Spark and PySpark, deployment of batch and streaming jobs is done via the [spark-submit script](https://spark.apache.org/docs/latest/submitting-applications.html). You will need a cluster somewhere, if you want to play locally with a cluster in docker you may be able to tweak [this project](https://github.com/rberenguel/spark_hadoop_kudu) I have in docker-compose, building a stack of HDFS, Spark and Kudu locally.

## Testing

Always write tests. Use pytest, or your preferred framework, together with a library like [spark-testing-base](https://github.com/holdenk/spark-testing-base/blob/master/python/sparktestingbase/test/helloworld_test.py).

spark-testing-base basically handles creating and removing a Spark session when needed, otherwise you can create your own fixtures for pytest (I vastly prefer `pytest` as a testing framework in case it wasn't clear) with a fixture like this one (taken from [here](https://dev.to/diogoaurelio/getting-started-with-spark-part-4----unit-testing-26a5), if you want to see a `unittest` approach, see [here](https://blog.cambridgespark.com/unit-testing-with-pyspark-fb31671b1ad8) and [here](https://medium.com/albert-franzi/how-to-unittest-pyspark-udfs-e34044e6ab68))

```python
import pytest
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import logging


def quiet_py4j():
    """ turn down spark logging for the test context """
    logger = logging.getLogger('py4j')
    logger.setLevel(logging.WARN)


@pytest.fixture(scope="session")
def spark_context(request):
    """
        fixture for creating a spark session
    Args:
        request: pytest.FixtureRequest object
    """
    conf = SparkConf() \
        .setMaster("local[2]") \
        .setAppName("pytest-pyspark-local-testing")
    sc = SparkContext(conf=conf)
    request.addfinalizer(lambda: sc.stop())
    quiet_py4j()
    return sc


@pytest.fixture(scope="session")
def spark_session(request):
    """
        fixture for creating a spark session
    Args:
        request: pytest.FixtureRequest object
    """
    spark_conf = SparkConf() \
        .setMaster("local[2]") \
        .setAppName("pytest-pyspark2.+-local-testing")
    spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
    request.addfinalizer(lambda: spark.stop())
    quiet_py4j()
    return spark
```

Use a linting tool as well.

## Code structure and naming

Spark code is still Python code, mostly, so just use your usual Python common sense. Methods on PySpark sometimes follow Scala naming conventions instead (`camelCase`), if for some reason some method in your code looks better to you in `camelCase`, nobody will scream at you except `pylint`.

## Debugging failing stuff

- The history server is your friend. There, you can investigate out of memory issues, bad skew of the data and many other things. 
- Add logging where possible, logging is also your friend.
- Most errors in Spark applications are either executor out of memory, driver out of memory or off-heap out of memory. Look up how to increase these parameters and tune your jobs. Once tuned to run, investigate _why_ you need so much.
- The JVM stacktraces are not your friend, and you will need to learn to understand them. To train yourself, break spark jobs frequently in testing and locally. 

### Enjoy!