# 2 Your first data program in PySpark


Data-driven applications, no matter how complex, all boil down to what we can think of as three meta steps, which are easy to distinguish in a program:

- We start by **loading** or reading the data we wish to work with.
- We **transform** the data, either via a few simple instructions or a very complex machine learning model.
- We then **export** (or sink) the resulting data, either into a file or by summarizing our findings into a visualization.

The next two chapters will introduce a basic workflow with PySpark via the creation of a simple **ETL** (extract, transform, and load, which is a more business-speak way of saying ingest, transform, and export). You will find these three simple steps repeated in every program we build in this book, from a simple summary to the most complex ML model.

## The SparkSession entry point

PySpark uses a builder pattern through the `SparkSession.builder` object. For those familiar with object-oriented programming, a builder pattern provides a set of methods to create a highly configurable object without having multiple constructors. In this chapter, we will only look at the happiest case, but the `SparkSession` builder pattern will become increasingly useful in parts 2 and 3 as we look into cluster configuration and adding dependencies to our jobs.

In [14]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(
    "Analyzing the vocabulary of Pride and Prejudice."
).getOrCreate()


By using the `getOrCreate()` method, your program will work in both interactive and batch mode by avoiding the creation of a new SparkSession if one already exists. Note that if a session already exists, you won’t be able to change certain configuration settings (mostly related to JVM options). If you need to change the configuration of your `SparkSession`, kill everything and start from scratch to avoid any confusion.

`SparkSession` is a superset of that. It wraps the `SparkContext` and provides functionality for interacting with the **Spark SQL API**, which includes the data frame structure we’ll use in most of our programs. Just to prove our point, see how easy it is to get to the `SparkContext` from our `SparkSession` object—just call the `sparkContext` attribute from spark:

In [15]:
spark.sparkContext

The `SparkSession` object is a more recent addition to the PySpark API, making its way in version 2.0. This is due to the API evolving in a way that makes more room for the faster, more versatile data frame as the main data structure over the lower-level RDD. Before that time, you had to use another object (called the SQLContext) to use the data frame. It’s much easier to have everything under a single umbrella.

In [16]:
spark?

[1;31mType:[0m        SparkSession
[1;31mString form:[0m <pyspark.sql.session.SparkSession object at 0x000001EED8CF6970>
[1;31mFile:[0m        c:\users\micha\spark\spark-3.2.1-bin-hadoop3.2\python\pyspark\sql\session.py
[1;31mDocstring:[0m  
The entry point to programming Spark with the Dataset and DataFrame API.

A SparkSession can be used create :class:`DataFrame`, register :class:`DataFrame` as
tables, execute SQL over tables, cache tables, and read parquet files.
To create a :class:`SparkSession`, use the following builder pattern:

.. autoattribute:: builder
   :annotation:

Examples
--------
>>> spark = SparkSession.builder \
...     .master("local") \
...     .appName("Word Count") \
...     .config("spark.some.config.option", "some-value") \
...     .getOrCreate()

>>> from datetime import datetime
>>> from pyspark.sql import Row
>>> spark = SparkSession(sc)
>>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1,
...     b=True, list=[1, 2, 3], dict={"s": 0}, ro

## Configuring how chatty spark is: The log level

This section covers the log level, probably the most overlooked (and annoying) element of a PySpark program. Monitoring your PySpark jobs is an important part of developing a robust program. PySpark provides many levels of logging, from nothing at all to a full description of everything happening on the cluster. The `pyspark` shell defaults on `WARN`, which can be a little chatty when we’re learning. More importantly, a non-interactive PySpark program (which is how you’ll run your scripts for the most part) defaults to the oversharing `INFO` level. Fortunately, we can change the settings for your session by using the code in the next listing.

In [17]:
spark.sparkContext.setLogLevel("ERROR")

Table 2.1 lists the available keywords you can pass to setLogLevel (as strings). Each subsequent keyword contains all the previous ones, with the obvious exception of OFF, which doesn’t show anything.
![](https://drek4537l1klr.cloudfront.net/rioux/HighResolutionFigures/table_2-1.png)

## Mapping Our Program

This section maps the blueprint of our simple program. Taking the time to design our data analysis beforehand pays dividends since we can construct our code knowing what’s coming. This will eventually speed up our coding and improve the reliability and modularity of our code. Think of it like reading the recipe when cooking: you never want to realize you’re missing a cup of flour when mixing the dough!

In this chapter’s introduction, we introduced our problem statement: “What are the most popular words used in the English language?” Before we can even hammer out code in the REPL, we have to start by mapping the major steps our program will need to perform:

- Read—Read the input data (we’re assuming a plain text file).
- Token—Tokenize each word.
- Clean—Remove any punctuation and/or tokens that aren’t words. Lowercase each word.
- Count—Count the frequency of each word present in the text.
- Answer—Return the top 10 (or 20, 50, 100).

![](https://drek4537l1klr.cloudfront.net/rioux/Figures/02-01.png)

## Reading data into a data frame with spark.read

In [18]:
spark.read

<pyspark.sql.readwriter.DataFrameReader at 0x1eedbd08d90>

In [19]:
[m for m in dir(spark.read) if not m.startswith("_")]

['csv',
 'format',
 'jdbc',
 'json',
 'load',
 'option',
 'options',
 'orc',
 'parquet',
 'schema',
 'table',
 'text']

PySpark can accommodate the different ways you can process data. Under the hood, `spark.read.csv()` will map to `spark.read.format('csv').load()`, and you may encounter this form in the wild. I usually prefer using the direct csv method as it provides a handy reminder of the different parameters the reader can take.

orc and parquet are also data formats that are especially well suited for big data processing. ORC (which stands for “optimized row columnar”) and Parquet are competing data formats that pretty much serve the same purpose. Both are open sourced and now part of the Apache project, just like Spark.

PySpark defaults to using Parquet when reading and writing files, and we’ll use this format to store our results throughout the book. I’ll provide a longer discussion about the usage, advantages, and trade-offs of using Parquet or ORC as a data format.

Let’s read our data file in listing 2.5. I am assuming you launched PySpark at the root of this book’s repository. Depending on your case, you might need to change the path where the file is located. The code is all available on the book’s companion repository on GitHub (http://mng.bz/6ZOR).

In [20]:
book = spark.read.text("../../data/gutenberg_books/1342-0.txt")
 
book

DataFrame[value: string]

![](https://drek4537l1klr.cloudfront.net/rioux/Figures/02-03.png)

When working with a larger data frame (think hundreds or even thousands of columns), you may want to see the schema displayed more clearly. PySpark provides `printSchema()` to display the schema in a tree form. I use this method probably more than any other one as it gives you direct information on the structure of the data frame. Since `printSchema()` directly prints to the REPL with no other option, should you want to filter the schema, you can use the `dtypes` attributes of the data frame, which gives you a list of tuples (column_name, column_type). You can also access the schema programmatically (as a data structure) using the `schema` attribute (see chapter 6 for more information).

In [21]:
book.printSchema()

root
 |-- value: string (nullable = true)



In [22]:
book.dtypes

[('value', 'string')]

In [23]:
book.schema

StructType(List(StructField(value,StringType,true)))

##  From structure to content: Exploring our data frame with `show()`

Enter the `show()` method, which displays a few rows of the data back to you—nothing more, nothing less. With `printSchema()`, this method will become one of your best friends when performing data exploration and validation. By default, it will show 20 rows and truncate long values. The code in listing 2.8 shows the default behavior of the method applied to our book data frame. For text data, the length limitation is limiting (pun intended). Fortunately, `show()` provides some options to display just what you need

In [24]:
book.show()

+--------------------+
|               value|
+--------------------+
|The Project Guten...|
|                    |
|This eBook is for...|
|almost no restric...|
|re-use it under t...|
|with this eBook o...|
|                    |
|                    |
|Title: Pride and ...|
|                    |
| Author: Jane Austen|
|                    |
|Posting Date: Aug...|
|Release Date: Jun...|
|Last Updated: Mar...|
|                    |
|   Language: English|
|                    |
|Character set enc...|
|                    |
+--------------------+
only showing top 20 rows



In [25]:
book.collect()

[Row(value='The Project Gutenberg EBook of Pride and Prejudice, by Jane Austen'),
 Row(value=''),
 Row(value='This eBook is for the use of anyone anywhere at no cost and with'),
 Row(value='almost no restrictions whatsoever.  You may copy it, give it away or'),
 Row(value='re-use it under the terms of the Project Gutenberg License included'),
 Row(value='with this eBook or online at www.gutenberg.org'),
 Row(value=''),
 Row(value=''),
 Row(value='Title: Pride and Prejudice'),
 Row(value=''),
 Row(value='Author: Jane Austen'),
 Row(value=''),
 Row(value='Posting Date: August 26, 2008 [EBook #1342]'),
 Row(value='Release Date: June, 1998'),
 Row(value='Last Updated: March 10, 2018'),
 Row(value=''),
 Row(value='Language: English'),
 Row(value=''),
 Row(value='Character set encoding: UTF-8'),
 Row(value=''),
 Row(value='*** START OF THIS PROJECT GUTENBERG EBOOK PRIDE AND PREJUDICE ***'),
 Row(value=''),
 Row(value=''),
 Row(value=''),
 Row(value=''),
 Row(value='Produced by Anonymous Volu

The `show()` method takes three optional parameters:

- `n` can be set to any positive integer and will display that number of rows.
- `truncate`, if set to true, will truncate the columns to display only 20 characters. Set to False, it will display the whole length, or any positive integer to truncate to a specific number of characters.
- `vertical` takes a Boolean value and, when set to True, will display each record as a small table. If you need to check records in detail, this is a very useful option. 

In [26]:
book.show(10, truncate=50)

+--------------------------------------------------+
|                                             value|
+--------------------------------------------------+
|The Project Gutenberg EBook of Pride and Prejud...|
|                                                  |
|This eBook is for the use of anyone anywhere at...|
|almost no restrictions whatsoever.  You may cop...|
|re-use it under the terms of the Project Gutenb...|
|    with this eBook or online at www.gutenberg.org|
|                                                  |
|                                                  |
|                        Title: Pride and Prejudice|
|                                                  |
+--------------------------------------------------+
only showing top 10 rows



Together, `show()` and `printSchema()` give you a complete overview of the structure and the content of the data frame. It’s no surprise that those will be the methods you will reach for most often when building a data analysis at the REPL.

We can now start the real work: performing transformations on the data frame to accomplish our goal. Let’s take some time to review the five steps we outlined at the beginning of the chapter:

- [DONE]Read—Read the input data (we’re assuming a plain text file).
- Token—Tokenize each word.
- Clean—Remove any punctuation and/or tokens that aren’t words. Lowercase each word.
- Count—Count the frequency of each word present in the text.
- Answer—Return the top 10 (or 20, 50, 100).


## Simple column transformations: Moving from a sentence to a list of words

When ingesting our selected text into a data frame, PySpark created one record for each line of text and provided a `value` column of type `String`. To tokenize each word, we need to split each string into a list of distinct words. This section covers simple transformations using `select()`. We will split our lines of text into words so we can count them.

Because PySpark’s code can be pretty self-explanatory, I start by providing the code in one fell swoop, and then we’ll break down each step one at a time. You can see it in all its glory in the next listing.


In [27]:
from pyspark.sql.functions import split

lines = book.select(split(book.value, " ").alias("line"))

lines.show(5, truncate=50)


+--------------------------------------------------+
|                                              line|
+--------------------------------------------------+
|[The, Project, Gutenberg, EBook, of, Pride, and...|
|                                                []|
|[This, eBook, is, for, the, use, of, anyone, an...|
|[almost, no, restrictions, whatsoever., , You, ...|
|[re-use, it, under, the, terms, of, the, Projec...|
+--------------------------------------------------+
only showing top 5 rows



## Selecting specific columns using `select()` 

This section will introduce the most basic functionality of `select()`, which is to select one or more columns from your data frame. It’s a conceptually very simple method but provides the foundation for many additional operations on your data.

In PySpark’s world, a data frame is made out of `Column` objects, and you perform transformations on them. The most basic transformation is the identity, where you return exactly what was provided to you. If you’ve used SQL in the past, you might think that this sounds like a `SELECT` statement, and you’d be right! You also get a free pass: the method name is also conveniently named `select()`.


In [28]:
book.select(book.value)

DataFrame[value: string]

PySpark provides more than one way to select columns. I display the four most common in the next listing.

In [29]:
from pyspark.sql.functions import col
 
book.select(book.value)
book.select(book["value"])
book.select(col("value"))
book.select("value")

DataFrame[value: string]

The third one uses the `col` function from the `pyspark.sql.functions` module. The main difference here is that you don’t specify that the column comes from the book data frame. This will become very useful when working with more complex data pipelines in part 2 of the book. I’ll use the `col` object as much as I can since I consider its usage more idiomatic and it’ll prepare us for more complex use cases, such as performing column transformation (see chapter 4 and 5).

Finally, the fourth one only uses the name of the column as a string. PySpark is smart enough to infer that we mean a column here. For simple select statements (and other methods that I’ll cover later), using the name of the column directly can be a viable option. That being said, it’s not as flexible as the other options, and the moment your code requires column transformations, like in section 2.4.2, you’ll have to use another option.

In [30]:
print(type(col("value")))
[m for m in dir(col("value")) if not m.startswith("_")]

<class 'pyspark.sql.column.Column'>


['alias',
 'asc',
 'asc_nulls_first',
 'asc_nulls_last',
 'astype',
 'between',
 'bitwiseAND',
 'bitwiseOR',
 'bitwiseXOR',
 'cast',
 'contains',
 'desc',
 'desc_nulls_first',
 'desc_nulls_last',
 'dropFields',
 'endswith',
 'eqNullSafe',
 'getField',
 'getItem',
 'isNotNull',
 'isNull',
 'isin',
 'like',
 'name',
 'otherwise',
 'over',
 'rlike',
 'startswith',
 'substr',
 'when',
 'withField']

In [31]:
# col() is more powerful than all the other methods if you want to work with the column!
book.select(col("value").like("")).show()

+-----------+
|value LIKE |
+-----------+
|      false|
|       true|
|      false|
|      false|
|      false|
|      false|
|       true|
|       true|
|      false|
|       true|
|      false|
|       true|
|      false|
|      false|
|      false|
|       true|
|      false|
|       true|
|      false|
|       true|
+-----------+
only showing top 20 rows



## Transforming columns: Splitting a string into a list of words

PySpark provides a `split()` function in the `pyspark.sql.functions` module for splitting a longer string into a list of shorter strings. The most popular use case for this function is to split a sentence into words. The `split()` function takes two or three parameters:

- A `column` object containing `strings`
- A **Java regular expression** delimiter to split the strings against
- An optional integer about how many times we apply the delimiter (not used here)

In [32]:
from pyspark.sql.functions import col, split
 
lines = book.select(split(col("value"), " "))
 
lines

DataFrame[split(value,  , -1): array<string>]

In [33]:
lines.printSchema()

root
 |-- split(value,  , -1): array (nullable = true)
 |    |-- element: string (containsNull = true)



In [34]:
lines.show(truncate=50)

+--------------------------------------------------+
|                               split(value,  , -1)|
+--------------------------------------------------+
|[The, Project, Gutenberg, EBook, of, Pride, and...|
|                                                []|
|[This, eBook, is, for, the, use, of, anyone, an...|
|[almost, no, restrictions, whatsoever., , You, ...|
|[re-use, it, under, the, terms, of, the, Projec...|
|[with, this, eBook, or, online, at, www.gutenbe...|
|                                                []|
|                                                []|
|                   [Title:, Pride, and, Prejudice]|
|                                                []|
|                           [Author:, Jane, Austen]|
|                                                []|
|[Posting, Date:, August, 26,, 2008, [EBook, #13...|
|                     [Release, Date:, June,, 1998]|
|                [Last, Updated:, March, 10,, 2018]|
|                                             

The `split` functions transformed our string column into an array column, containing one or more string elements. This is what we were expecting: even before looking at the data, seeing that the structure behaves according to plan is a good way to sanity-check our code.

Looking at the five rows we’ve printed, we can see that our values are now separated by a comma and wrapped in square brackets, which is how PySpark visually represents an array. The second record is empty, so we just see `[]`, an empty array.


PySpark’s built-in functions for data manipulations are extremely useful, and you should spend a little bit of time going over the API documentation (http://spark.apache.org/docs/latest/api/python/) to see what’s available at core functionality. If you don’t find exactly what you’re after, chapter 6 covers how you can create your function over `Column` objects, and gives a deeper look into PySpark’s complex data types like the array. **Built-in PySpark functions are as performant as plain Spark** (in Java and Scala), as they map directly to a JVM function. (See the following sidebar for more information.)

With our lines of text now tokenized into words, there is a little annoyance present: Spark gave a very unintuitive name `(split(value, , -1))` to our column. The next section addresses how we can rename transformed columns to our liking so we can explicitly control our columns’ naming schema. 

## Renaming columns: alias and withColumnRenamed

When performing a transformation on your columns, PySpark will give a default name to the resulting column. In our case, we were blessed by the `split(value, , -1)` name after splitting our value column, using a space as the delimiter. While accurate, it’s not programmer-friendly. This section provides a blueprint to rename columns, both newly created and existing, using `alias()` and `withColumnRenamed()`.

There is an implicit assumption that you’ll want to rename the resulting column yourself, using the `alias()` method. Its usage isn’t very complicated: when applied to a column, it takes a single parameter and returns the column it was applied to, with the new name. A simple demonstration is provided in the next listing.


In [35]:
book.select(split("value", " ")).show(5)  #  or printSchema as an alternative

+--------------------+
| split(value,  , -1)|
+--------------------+
|[The, Project, Gu...|
|                  []|
|[This, eBook, is,...|
|[almost, no, rest...|
|[re-use, it, unde...|
+--------------------+
only showing top 5 rows



In [36]:
book.select(split("value", " ").alias("line")).show(5)  #  or printSchema as an alternative

+--------------------+
|                line|
+--------------------+
|[The, Project, Gu...|
|                  []|
|[This, eBook, is,...|
|[almost, no, rest...|
|[re-use, it, unde...|
+--------------------+
only showing top 5 rows



When writing your code, choosing between those two options is pretty easy:

- When you’re using a method where you’re specifying which columns you want to appear, like the `select()` method, use `alias()`.
- If you just want to rename a column without changing the rest of the data frame, use `.withColumnRenamed`. Note that, should the column not exist, PySpark will treat this method as a no-op and not perform anything.


In [37]:
# This looks a lot cleaner
lines = book.select(split(book.value, " ").alias("line"))

# This is messier, and you have to remember the name PySpark assigns automatically
lines = book.select(split(book.value, " "))
lines = lines.withColumnRenamed("split(value,  , -1)", "line")

This section introduced a new set of PySpark fundamentals: we learned how to select not only plain columns but also column transformations. We also learned how to explicitly name the resulting columns, avoiding PySpark’s predictable but jarring naming convention. Now we can move forward with the remainder of the operations. If we look at our five steps, we’re halfway done with step 2. We have a list of words, but we need each token or word to be its own record:

- [DONE]Read—Read the input data (we’re assuming a plain text file).
- [IN PROGRESS]Token—Tokenize each word.
- Clean—Remove any punctuation and/or tokens that aren’t words. Lowercase each word.
- Count—Count the frequency of each word present in the text.
- Answer—Return the top 10 (or 20, 50, 100). 

## Reshaping your data: Exploding a list into rows

Enter the `explode()` function. When applied to a column containing a container-like data structure (such as an array), it’ll take each element and give it its own row. This is much easier explained visually rather than using words, and figure 2.4 explains the process.

![](https://drek4537l1klr.cloudfront.net/rioux/Figures/02-04.png)

The code follows the same structure as `split()`, and you can see the results in the next listing. We now have a data frame containing, at most, one word per row. We are almost there!

In [38]:
from pyspark.sql.functions import explode, col
 
words = lines.select(explode(col("line")).alias("word"))
 
words.show(15)

+----------+
|      word|
+----------+
|       The|
|   Project|
| Gutenberg|
|     EBook|
|        of|
|     Pride|
|       and|
|Prejudice,|
|        by|
|      Jane|
|    Austen|
|          |
|      This|
|     eBook|
|        is|
+----------+
only showing top 15 rows



Looking back at our five steps, we can now conclude step 2, and our words are tokenized. Let’s attack the third one, where we’ll clean our words to simplify the counting:

- [DONE]Read—Read the input data (we’re assuming a plain text file).
- [DONE]Token—Tokenize each word.
- Clean—Remove any punctuation and/or tokens that aren’t words. Lowercase each word.
- Count—Count the frequency of each word present in the text.
- Answer—Return the top 10 (or 20, 50, 100). 

## Working with words: Changing case and removing punctuation

So far, with `split()` and `explode()` our pattern has been the following: find the relevant function in `pyspark.sql.functions`, apply it, profit! This section will use the same winning formula to normalize the case of our words and remove punctuation, so I’ll focus on the functions’ behavior rather than on how to apply them. This section takes care of lowering the case (using the `lower()` function) and removing punctuation through the usage of a regular expression.

Let’s get right to it. Listing 2.17 contains the source code to lower the case of all the words in the data frame. The code should look very familiar: we select a column transformed by `lower`, a PySpark function lowering the case of the data inside the column passed as a parameter. We then alias the resulting column to `word_lower` to avoid PySpark’s default nomenclature.

In [39]:
from pyspark.sql.functions import lower

words_lower = words.select(lower(col("word")).alias("word_lower"))
 
words_lower.show()

+----------+
|word_lower|
+----------+
|       the|
|   project|
| gutenberg|
|     ebook|
|        of|
|     pride|
|       and|
|prejudice,|
|        by|
|      jane|
|    austen|
|          |
|      this|
|     ebook|
|        is|
|       for|
|       the|
|       use|
|        of|
|    anyone|
+----------+
only showing top 20 rows



Next, we want to clean our words of any punctuation and other non-useful characters; in this case, we’ll keep only the letters using a regular expression (see the end of the section for a reference on regular expressions [or regex]). This can be a little trickier: we won’t improvise a full NLP (Natural Language Processing) library here, and instead rely on the functionality PySpark provides in its data manipulation toolbox. In the spirit of keeping this exercise simple, we’ll keep the first contiguous group of letters as the word, and remove the rest. It will effectively remove punctuation, quotation marks, and other symbols, at the expense of being less robust with more exotic word construction. The next listing shows the code in all its splendor.

In [40]:
from pyspark.sql.functions import regexp_extract

words_clean = words_lower.select(
    regexp_extract(col("word_lower"), "[a-z]+", 0).alias("word")
)

words_clean.show()


+---------+
|     word|
+---------+
|      the|
|  project|
|gutenberg|
|    ebook|
|       of|
|    pride|
|      and|
|prejudice|
|       by|
|     jane|
|   austen|
|         |
|     this|
|    ebook|
|       is|
|      for|
|      the|
|      use|
|       of|
|   anyone|
+---------+
only showing top 20 rows



Our data frame of words looks pretty regular by now, except for the empty cell between austen and this. In the next section, we cover the filtering operation by removing any empty records.

## Exercise 2.1

Given the following `exo_2_1_df` data frame, how many records will the `solution_ 2_1_df` data frame contain? (Note: No need to write code to solve this problem.)


In [41]:
from pyspark.sql.functions import explode, col

exo_2_1_df = spark.createDataFrame([{"numbers": [1,2,3,4,5]}, {"numbers": [5,6,7,8,9,10]}])


exo_2_1_df.show()
 
# +-------------------+
# |            numbers|
# +-------------------+
# |    [1, 2, 3, 4, 5]|
# |[5, 6, 7, 8, 9, 10]|
# +-------------------+
 
solution_2_1_df = exo_2_1_df.select(explode(col("numbers")).alias("numbers"))

+-------------------+
|            numbers|
+-------------------+
|    [1, 2, 3, 4, 5]|
|[5, 6, 7, 8, 9, 10]|
+-------------------+



In [42]:
solution_2_1_df.collect()

[Row(numbers=1),
 Row(numbers=2),
 Row(numbers=3),
 Row(numbers=4),
 Row(numbers=5),
 Row(numbers=5),
 Row(numbers=6),
 Row(numbers=7),
 Row(numbers=8),
 Row(numbers=9),
 Row(numbers=10)]

In [43]:
solution_2_1_df.count()

11

## Filtering rows

Conceptually, we should be able to provide a test to perform on each record. If it returns true, we keep the record. False? You’re out! PySpark provides not one, but two identical methods to perform this task. You can use either `.filter()` or its alias `.where()`. This duplication is to ease the transition for users coming from other data-processing engines or libraries; some use one, some the other. PySpark provides both, so no arguments are possible! I prefer `filter()`, because `w` maps to more data frame methods (`withColumn()` in chapter 4 or `withColumnRenamed()` in chapter 3). If we look at the next listing, we can see that columns can be compared to values using the usual Python comparison operators. In this case, we’re using “not equal,” or `!=`.

In [44]:
words_nonull = words_clean.filter(col("word") != "")
 
words_nonull.show()
 
# +---------+
# |     word|
# +---------+
# |      the|
# |  project|
# |gutenberg|
# |    ebook|
# |       of|
# |    pride|
# |      and|
# |prejudice|
# |       by|
# |     jane|
# |   austen|
# |     this|
# |    ebook|
# |       is|
# |      for|
# |      the|
# |      use|
# |       of|
# |   anyone|
# | anywhere|
# +---------+
# only showing top 20 rows

+---------+
|     word|
+---------+
|      the|
|  project|
|gutenberg|
|    ebook|
|       of|
|    pride|
|      and|
|prejudice|
|       by|
|     jane|
|   austen|
|     this|
|    ebook|
|       is|
|      for|
|      the|
|      use|
|       of|
|   anyone|
| anywhere|
+---------+
only showing top 20 rows



If you want to negate a whole expression in a `filter()` method, PySpark provides the `~` operator. We could theoretically use `filter(~(col("word") == ""))`. Look at the exercises at the end of the chapter to see them in an application. You can also use SQL-style expression; check out chapter 7 for an alternative syntax.

This seems like a good time to take a break and reflect on what we have accomplished so far. If we look at our five steps, we’re 60% of the way there. Our cleaning step took care of nonletter characters and filtered the empty records. We’re ready for counting and displaying the results of our analysis:

- [DONE]Read—Read the input data (we’re assuming a plain text file).
- [DONE]Token—Tokenize each word.
- [DONE]Clean—Remove any punctuation and/or tokens that aren’t words. Lowercase each word.
- Count—Count the frequency of each word present in the text.
- Answer—Return the top 10 (or 20, 50, 100).

We can now rest. The next chapter will cover the end of our program. We will also be looking at bringing our code into one single file, moving away from the REPL into batch mode. We’ll explore options to simplify and increase the readability of our program and then finish by scaling it to a larger corpus of texts.

### Summary

- Almost all PySpark programs will revolve around three major steps: reading, transforming, and exporting data.
- PySpark provides a REPL (read, evaluate, print, loop) via the `pyspark` shell where you can experiment interactively with data.
- PySpark data frames are a collection of columns. You operate on the structure using chained transformations. PySpark will optimize the transformations and perform the work only when you submit an action, such as `show()`. This is one of the pillars of PySpark’s performance.
- PySpark’s repertoire of functions that operate on columns is located in pyspark `.sql.functions`.
- You can select columns or transformed columns via the `select()` method.
- You can filter columns using the `where()` or `filter()` methods and by providing a test that will return `True` or `False`; only the records returning True will be kept.
- PySpark can have columns of nested values, like arrays of elements. In order to extract the elements into distinct records, you need to use the `explode()` method.


### Additional exercises

For all exercises, assume the following:

```python
from pyspark.sql import SparkSession
 
spark = SparkSession.builder.getOrCreate()
```

## Exercise 2.2

Given the following data frame, programmatically count the number of columns that aren’t strings (answer = only one column isn’t a string).

`createDataFrame()` allows you to create a data frame from a variety of sources, such as a pandas data frame or (in this case) a list of lists.




In [45]:
exo2_2_df = spark.createDataFrame(
    [["test", "more test", 10_000_000_000]], ["one", "two", "three"]
)
 
exo2_2_df.printSchema()

root
 |-- one: string (nullable = true)
 |-- two: string (nullable = true)
 |-- three: long (nullable = true)



In [46]:
exo2_2_df.dtypes

[('one', 'string'), ('two', 'string'), ('three', 'bigint')]

In [59]:
for field in exo2_2_df.schema.fields:
    print(field.dataType)

StringType
StringType
LongType


## Exercise 2.3

Rewrite the following code snippet, removing the withColumnRenamed method. Which version is clearer and easier to read?

In [62]:
from pyspark.sql.functions import col, length
 
# The `length` function returns the number of characters in a string column.
 
exo2_3_df = (
    spark.read.text("../../data/gutenberg_books/1342-0.txt")
    .select(length(col("value")))
    .withColumnRenamed("length(value)", "number_of_char")
)

exo2_3_df.show()

+--------------+
|number_of_char|
+--------------+
|            66|
|             0|
|            64|
|            68|
|            67|
|            46|
|             0|
|             0|
|            26|
|             0|
|            19|
|             0|
|            43|
|            24|
|            28|
|             0|
|            17|
|             0|
|            29|
|             0|
+--------------+
only showing top 20 rows



In [65]:
from pyspark.sql.functions import col, length
 
# The `length` function returns the number of characters in a string column.
 
exo2_3_df = (
    spark.read.text("../../data/gutenberg_books/1342-0.txt")
    .select(length(col("value")).alias("number_of_char"))
)

exo2_3_df.show()

+--------------+
|number_of_char|
+--------------+
|            66|
|             0|
|            64|
|            68|
|            67|
|            46|
|             0|
|             0|
|            26|
|             0|
|            19|
|             0|
|            43|
|            24|
|            28|
|             0|
|            17|
|             0|
|            29|
|             0|
+--------------+
only showing top 20 rows



## Exercise 2.4

Assume a data frame `exo2_4_df`. The following code block gives an error. What is the problem, and how can you solve it?

In [66]:
from pyspark.sql.functions import col, greatest
 
exo2_4_df = spark.createDataFrame(
    [["key", 10_000, 20_000]], ["key", "value1", "value2"]
)
 
exo2_4_df.printSchema()
# root
#  |-- key: string (containsNull = true)
#  |-- value1: long (containsNull = true)
#  |-- value2: long (containsNull = true)
 
# `greatest` will return the greatest value of the list of column names,
# skipping null value
 
# The following statement will return an error
from pyspark.sql.utils import AnalysisException
 
try:
    exo2_4_mod = exo2_4_df.select(
        greatest(col("value1"), col("value2")).alias("maximum_value")
    ).select("key", "max_value")
except AnalysisException as err:
    print(err)

root
 |-- key: string (nullable = true)
 |-- value1: long (nullable = true)
 |-- value2: long (nullable = true)

cannot resolve 'key' given input columns: [maximum_value];
'Project ['key, 'max_value]
+- Project [greatest(value1#188L, value2#189L) AS maximum_value#193L]
   +- LogicalRDD [key#187, value1#188L, value2#189L], false



In [67]:
from pyspark.sql.functions import col, greatest
 
exo2_4_df = spark.createDataFrame(
    [["key", 10_000, 20_000]], ["key", "value1", "value2"]
)
 
exo2_4_df.printSchema()
# root
#  |-- key: string (containsNull = true)
#  |-- value1: long (containsNull = true)
#  |-- value2: long (containsNull = true)
 
# `greatest` will return the greatest value of the list of column names,
# skipping null value
 
# The following statement will return an error
from pyspark.sql.utils import AnalysisException
 
try:
    exo2_4_mod = exo2_4_df.select(
        greatest(col("value1"), col("value2")).alias("maximum_value")
    ).select("key", "maximum_value")
except AnalysisException as err:
    print(err)

root
 |-- key: string (nullable = true)
 |-- value1: long (nullable = true)
 |-- value2: long (nullable = true)

cannot resolve 'key' given input columns: [maximum_value];
'Project ['key, maximum_value#201L]
+- Project [greatest(value1#196L, value2#197L) AS maximum_value#201L]
   +- LogicalRDD [key#195, value1#196L, value2#197L], false



## Exercise 2.5

Let’s take our `words_nonull` data frame, available in the next listing. You can use the code from the repository (code/Ch02/end_of_chapter.py) in your REPL to get the data frame loaded.

In [72]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, lower, regexp_extract

spark = SparkSession.builder.getOrCreate()
 
book = spark.read.text("../../data/gutenberg_books/1342-0.txt")
 
lines = book.select(split(book.value, " ").alias("line"))
 
words = lines.select(explode(col("line")).alias("word"))
 
words_lower = words.select(lower(col("word")).alias("word_lower"))
 
words_clean = words_lower.select(
    regexp_extract(col("word_lower"), "[a-z]*", 0).alias("word")
)
 
words_nonull = words_clean.where(col("word") != "")

words_nonull.show()

+---------+
|     word|
+---------+
|      the|
|  project|
|gutenberg|
|    ebook|
|       of|
|    pride|
|      and|
|prejudice|
|       by|
|     jane|
|   austen|
|     this|
|    ebook|
|       is|
|      for|
|      the|
|      use|
|       of|
|   anyone|
| anywhere|
+---------+
only showing top 20 rows



a) Remove all of the occurrences of the word is.

In [70]:
words_without_is = words_nonull.filter(col("word") != "is")

In [71]:
words_without_is.show()

+---------+
|     word|
+---------+
|      the|
|  project|
|gutenberg|
|    ebook|
|       of|
|    pride|
|      and|
|prejudice|
|       by|
|     jane|
|   austen|
|     this|
|    ebook|
|      for|
|      the|
|      use|
|       of|
|   anyone|
| anywhere|
|       at|
+---------+
only showing top 20 rows



b) (Challenge) Using the `length` function, keep only the words with more than three characters.

In [74]:
from pyspark.sql.functions import length, col

words_longer_3_chars = words_nonull.filter(length(col("word")) > 3)

words_longer_3_chars.show()

+------------+
|        word|
+------------+
|     project|
|   gutenberg|
|       ebook|
|       pride|
|   prejudice|
|        jane|
|      austen|
|        this|
|       ebook|
|      anyone|
|    anywhere|
|        cost|
|        with|
|      almost|
|restrictions|
|  whatsoever|
|        copy|
|        give|
|        away|
|       under|
+------------+
only showing top 20 rows



## Exercise 2.6

The `where` clause takes a Boolean expression over one or many columns to filter the data frame. Beyond the usual Boolean operators (`>`, `<`, `==`, `<=`, `>=`, `!=`), PySpark provides other functions that return Boolean columns in the `pyspark.sql.functions` module.

A good example is the `isin()` method (applied on a Column object, like `col(...).isin(...)`), which takes a list of values as a parameter, and will return only the records where the value in the column equals a member of the list.

Let’s say you want to remove the words "is", "not", "the" and "if" from your list of words, using a single `where()` method on the `words_nonull` data frame. Write the code to do so.


In [76]:
unwanted_words = ["is", "not", "the", "if"]

words_filtered = words_nonull.filter(~col("word").isin(unwanted_words))

words_filtered.show()

+---------+
|     word|
+---------+
|  project|
|gutenberg|
|    ebook|
|       of|
|    pride|
|      and|
|prejudice|
|       by|
|     jane|
|   austen|
|     this|
|    ebook|
|      for|
|      use|
|       of|
|   anyone|
| anywhere|
|       at|
|       no|
|     cost|
+---------+
only showing top 20 rows



## Exercise 2.7

One of your friends comes to you with the following code. They have no idea why it doesn’t work. Can you diagnose the problem in the try block, explain why it is an error, and provide a fix?

In [77]:
from pyspark.sql.functions import col, split
 
try:
    book = spark.read.text("../../data/gutenberg_books/1342-0.txt")
    book = book.printSchema()
    lines = book.select(split(book.value, " ").alias("line"))
    words = lines.select(explode(col("line")).alias("word"))
except AnalysisException as err:
    print(err)

root
 |-- value: string (nullable = true)



AttributeError: 'NoneType' object has no attribute 'select'

In [78]:
from pyspark.sql.functions import col, split
 
try:
    book = spark.read.text("../../data/gutenberg_books/1342-0.txt")
    book.printSchema()
    lines = book.select(split(book.value, " ").alias("line"))
    words = lines.select(explode(col("line")).alias("word"))
except AnalysisException as err:
    print(err)

root
 |-- value: string (nullable = true)



In [79]:
words.show()

+----------+
|      word|
+----------+
|       The|
|   Project|
| Gutenberg|
|     EBook|
|        of|
|     Pride|
|       and|
|Prejudice,|
|        by|
|      Jane|
|    Austen|
|          |
|      This|
|     eBook|
|        is|
|       for|
|       the|
|       use|
|        of|
|    anyone|
+----------+
only showing top 20 rows

