# Packages needed

Let's install some extra packages we'll need.

In [None]:
! pip install pyspark
! pip install requests

# Databricks File Areas

In Databricks, if you click "Workspaces" on the menu to the left, you'll see a file explorer type interface.

There are two main areas:

- "Home" is your personal area (this is the same as "Workspace/users/<your_user_name>")
- "Workspace/Shared" (where this notebook is), is the shared area that everyone can access.

# Cloning notebooks in Databricks

If you're in databricks and want to copy a notebook to your personal file area, click "File" from the menu bar at the top, then "Clone...", then "Browse". Then navigate to your user area and clone it into there.

# Import PySpark functions

PySpark comes with a library of functions we'll need to use in our code, so we'll import these first:

In [None]:
from pyspark.sql import functions as F

In Spark, there is a special object called the SparkSession. This is the entry point to Spark's functionality. When you use Databricks, it creates this for you automatically. But outside Databricks, we have to do it ourselves:

In [None]:
from pyspark import sql

spark = (sql.SparkSession
    .builder
    .appName("pyspark_intro")
    .getOrCreate()
)

# Download Artificial HES data

We will use the Artificial NHS Hospital Episode Statistics Accident and Emergency (HES AE) data from 2003 for these examples. The cell below just downloads this data from the public website, and unzips the CSVs inside it.

In [None]:
# These libraries will help us download the file
import zipfile
import io
from pathlib import Path
import requests

zip_file_url = "https://files.digital.nhs.uk/assets/Services/Artificial%20data/Artificial%20HES%20final/artificial_hes_ae_202302_v1_sample.zip"
path_to_downloaded_data = "data_in/artificial_hes_ae_202302_v1_sample.zip/artificial_hes_ae_202302_v1_sample/artificial_hes_ae_2122.csv"

filename = Path(zip_file_url).name
output_path = f"data_in/{filename}"

response = requests.get(zip_file_url, stream=True,timeout=3600)
downloaded_zip = zipfile.ZipFile(io.BytesIO(response.content))
downloaded_zip.extractall(output_path)

Now we can load the data from the CSV into a spark DataFrame:

In [None]:
df_hes = (spark.read
    .option('header', 'true')
    .csv(path_to_downloaded_data)
)

The `spark` variable here is our SparkSession. We used `spark.read.csv` to load data stored in the CSVs we downloaded.

In UDAL, data is stored in parquet files, so we'd use spark.read.parquet() instead. And the path would be the location in the Azure Blob of the data you want to read (the path would start with abfss://).

`spark.read` returns the data as a DataFrame - basically a table.

# Displaying data

In Databricks you can use the `display()` function and pass the DataFrame to it. This gives a nice tabular output where you can look at data.

It can take a while with a lot of data or intense queries!

Outside of Databricks we use `df.show()` like this:

In [None]:
df_hes.show()

# PySpark DataFrame methods

Most functions you use to manipulate data in PySpark belong to the DataFrame itself. They are functions attached to the DataFrame class. When a function belongs to a class it's known as a method.

So everytime you create a DataFrame, it has access to all these dataframe methods. These are what we use to manipulate the data.

There's a PySpark equivalent to all the usual suspects in SQL: SELECT, WHERE (filter), GROUP BY, COUNT, etc.

To use a DataFrame method, you invoke the name of your dataframe, then a dot, then the name of the method you want to call. E.g.:

`df.select()`
`df.count()`

You chain these methods together to build your query.

Here are a few common examples.

# COUNT(*)

To count all rows, use the `.count()` method.

In [None]:
df_hes.count()

When used by itself, `.count()` just returns the count as an integer without modifying the DataFrame. This is useful sometimes, as we can capture the count in a variable if we need to:

In [None]:
hes_count = df_hes.count()

print(hes_count)

# TOP

The equivalent of the T-SQL TOP is `.limit()`

The example below creates a new dataframe with a different name as the original df_hes one. So we create a new DataFrame called df_hes_top_1000, and the old df_hes one still exists unmodified.

If we wanted to overwrite the original one, we'd just use `df_hes = df_hes.limit(1000)`.

In [None]:
df_hes_top_1000 = (df_hes
    .limit(1000)            
)

df_hes_top_1000.count()

# SELECT

Use the `.select()` method and pass the names of the columns you want to select.

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
      "ARRIVALDATE"
    )             
)

df_hes_filtered.show()

# ORDER BY

`.orderBy()` is pretty straight forward:

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
      "ARRIVALDATE"
    )
    .orderBy("ARRIVALDATE")
)

df_hes_filtered.show()

It's ascending by default. For descending you could use:

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
      "ARRIVALDATE"
    )
    .orderBy( F.desc("ARRIVALDATE") )
)

df_hes_filtered.show()

The `F` in `F.desc()` indicates that it's a function from the pyspark.sql.functions library that we imported at the top. So it's not a DataFrame method.

Note that in Spark, order is not guaranteed unless you use `.orderBy()`. It just depends on which workers on the cluster return their results first, which can differ for any number of reasons, machines overheating, network being clogged, cats eating cables in the data centre etc etc.

# WHERE

For WHERE clauses you can use `.where()` or `.filter()` - they are the same.

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
      "ARRIVALDATE"
    )
    .where( F.col("ARRIVALDATE") > "2021-06-01")
    .orderBy("ARRIVALDATE")
)

df_hes_filtered.show()

## WHERE col IN () - use `.isin()`

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
      "ARRIVALDATE"
    )
    .where(F.col("CCG_GP_PRACTICE").isin(["72Q", "91Q"]))
)

df_hes_filtered.show()

## WHERE col NOT IN () - use ~

To do the equivelent of the SQL WHERE NOT IN, just use `.isin()` as before, but put a `~` at the start of the expression. This means NOT.

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
      "ARRIVALDATE"
    )
    .where(~F.col("CCG_GP_PRACTICE").isin(["72Q", "91Q"]))
)

df_hes_filtered.show()

# Multiple where clauses

For AND, you can use `&`, or simply use two `.where()` method calls

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
      "ARRIVALDATE"
    )
    .where(F.col("ARRIVALDATE") > "2021-06-01")
    .where(F.col("ARRIVALDATE").isNotNull())
    .orderBy("ARRIVALDATE")
)

df_hes_filtered.show()

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
      "ARRIVALDATE"
    )
    .where(
        (F.col("ARRIVALDATE") > "2021-06-01")
        &
        (F.col("ARRIVALDATE").isNotNull()) # IS NOT NULL
    )
    .orderBy("ARRIVALDATE")
)

df_hes_filtered.show()

For OR use the pipe `|`

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
      "ARRIVALDATE"
    )
    .where(
        (F.col("ARRIVALDATE") > "2021-06-01")
        |
        (F.col("ARRIVALDATE").isNull()) # IS NULL
    )
    .orderBy("ARRIVALDATE")
)

df_hes_filtered.show()


# GROUP BY / COUNT(*)

Note that the select statement is not necessary here - it gets overrided by the groupBy.

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
      "ARRIVALDATE"
    )
    .where(
        (F.col("ARRIVALDATE") > "2021-06-01")
        |
        (F.col("ARRIVALDATE").isNull()) # IS NULL
    )
    .groupBy(
      "CCG_GP_PRACTICE"
    )
    .count()
    .orderBy(F.desc('count'))
)

df_hes_filtered.show()

## Other aggregations - use `.agg()`

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
      "ARRIVALDATE"
    )
    .where(
        (F.col("ARRIVALDATE") > "2021-06-01")
        |
        (F.col("ARRIVALDATE").isNull()) # IS NULL
    )
    .groupBy("CCG_GP_PRACTICE")
    .agg(
        F.countDistinct("EPIKEY"),
        F.min("ARRIVALDATE"),
        F.max("ARRIVALDATE"),
        F.sum("EPIKEY")
    )
)

df_hes_filtered.show()

## Renaming columns - `.alias()` in chains

The default column names Spark has given to the aggregate columns are not very pretty. But we can chain `.alias()` onto the aggregate function to rename them.

### In `.groupBy()`

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
      "ARRIVALDATE"
    )
    .where(
        (F.col("ARRIVALDATE") > "2021-06-01")
        |
        (F.col("ARRIVALDATE").isNotNull()) 
    )
    .groupBy(
      "CCG_GP_PRACTICE"
    )
    .agg(
        F.count("EPIKEY").alias("EPIKEY_count"),
        F.countDistinct("EPIKEY").alias("EPIKEY_distinct"),
        F.min("ARRIVALDATE").alias("Min_ARRIVALDATE"),
        F.max("ARRIVALDATE").alias("Max_ARRIVALDATE"),
        F.sum("EPIKEY").alias("EPIKEY_sum"),
    )
)

df_hes_filtered.show()

### In `.select()`

We can also us `.alias()` in `.select()`, but to do this, we have to use `F.col()` to name the column and chain `.alias()` to that.

In [None]:
df_hes_filtered = (df_hes
    .select(
      F.col("EPIKEY").alias("episode_key"),
      F.col("CCG_GP_PRACTICE").alias('icb_code_of_patients_gp'),
    )
)

df_hes_filtered.show()

# Add a new column - `.withColumn()`

We can use `.withColumn()` to add new columns to a DataFrame. It takes two parameters - the name of the new column, and the value you want the column to have. 

To create a column with the same value for every row, use `lit()`

In [None]:
df_hes_filtered = (df_hes
    .select(
      F.col("EPIKEY").alias("episode_key"),
      F.col("CCG_GP_PRACTICE").alias('icb_code_of_patients_gp'),
    )
    .withColumn("some_flag_column", F.lit(True))
)

df_hes_filtered.show()

But you can do this within `.select()` too. This way is actually a bit more efficient, so is good practice to do it this way where possible, though in practice it won't make much difference unless you're adding a large number of columns.

In [None]:
df_hes_filtered = (df_hes
    .select(
      F.col("EPIKEY").alias("episode_key"),
      F.col("CCG_GP_PRACTICE").alias('icb_code_of_patients_gp'),
      F.lit(True).alias("some_flag_column")
    )
)

df_hes_filtered.show()

### For a CASE WHEN use `F.when().otherwise()`

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
      "ARRIVALDATE",
    )
    .withColumn("a_new_col", 
      F.when(F.col("ARRIVALDATE").isNull(), F.lit("No date"))
       .when(F.col("ARRIVALDATE") > "2021-06-30", F.lit("After June 30th 2021`"))
       .otherwise(F.lit("On or before June 30th 2021"))
    )
)

df_hes_filtered.show()

# Creating DataFrames manually

Sometimes you might need to create your own DataFrame manually. Maybe to make some reference data that you want to join.

To do that, you need to use the `SparkSession`. The `SparkSession` has a method called `.createDataFrame()`. This method takes two parameters:

## The data

The data takes the form of a list (square brackets) of tuples (round brackets). Each tuple represents a row, and each item in the tuple represents the values for that row across all the columns.

## The schema

It is possible to specify the column types in the schema, and in some cases you might need to do that. But you can also just give a list of column names, and let Spark figure it out. For this simple example we'll just do that.

In [None]:
data = [
    ("72Q", "NHS SOUTH EAST LONDON ICB"),
    ("91Q", "NHS KENT AND MEDWAY ICB")
]

schema = ["CCG_GP_PRACTICE", "icb_name"]

df_icb_names = spark.createDataFrame(data, schema)

df_icb_names.show()

# Joins

Just like in SQL, we need three things to do a join:

- The name of the **other** DataFrame we're joining to
- The column/s we want to join **on**
- **How** we want to join (left, right, inner, etc.)

We just use the `.join()` method and pass these as parameters.

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
    )
    .where(F.col("CCG_GP_PRACTICE").isin(["72Q", "91Q"]))
    .limit(1000)
    .join(other=df_icb_names, on="CCG_GP_PRACTICE", how="left")
)

df_hes_filtered.show()

**Important:** Notice how the `.select()` method does not contain icb_name, but it appears in the results anyway.

This is where PySpark differs from SQL a bit. In PySpark, the order of the method calls matters. The join came after the select, so we can't select a column in the other DataFrame, because Spark doesn't know about it yet. 

So how does provider_name end up in the output?

It's because when you left join, all of the other DataFrame's columns are selected by default!

If you don't need all the columns, then you can use `.select()` on the other DataFrame upstream when you define it, to ensure you only select the columns you need. Another way would be to put the `.select()` after the `.join()`


With `.join()`, you don't need to specify the names of the `other`, `on`, and `how` parameters. As long as you put them in the right order it will still work, and it's conventional to not include the names:

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
    )
    .where(F.col("CCG_GP_PRACTICE").isin(["72Q", "91Q"]))
    .join(df_icb_names, "CCG_GP_PRACTICE", "left")
)

df_hes_filtered.show()

For an inner join, just change "left" to "inner".

In [None]:
df_hes_filtered = (df_hes
    .select(
      "EPIKEY",
      "CCG_GP_PRACTICE",
    )
    .join(df_icb_names, "CCG_GP_PRACTICE", "inner")
)

df_hes_filtered.show()

# UNION - .union()

For a union, we can use the `.union()` method. Notice how the HES financial years are all on separate CSV files.

Let's say we want to do some analysis on two financial years at once.

First, we'd load both of the CSVs we need into DataFrames:

In [None]:
path_to_18_19_data = "data_in/artificial_hes_ae_202302_v1_sample.zip/artificial_hes_ae_202302_v1_sample/artificial_hes_ae_1819.csv"
path_to_19_20_data = "data_in/artificial_hes_ae_202302_v1_sample.zip/artificial_hes_ae_202302_v1_sample/artificial_hes_ae_1920.csv"

df_hes_18_19 = (spark.read
    .option('header', 'true')
    .csv(path_to_18_19_data)
)

df_hes_19_20 = (spark.read
    .option('header', 'true')
    .csv(path_to_19_20_data)
)

Great, now that we have our two dfs, let's union them:

In [None]:
df_hes_all = (df_hes_18_19
    .union(df_hes_19_20)              
)

df_hes_all.select("FYEAR").distinct().show()

To do a union, the dataframes need to have the same number of columns and compatible column types. For example, if we try to union the small DF of ICB names we created above, this won't work:

In [None]:
df_hes_all = (df_hes_18_19
    .union(df_icb_names)              
)

# Practice Exercises


If you want to get some practice in, try some of these:

- Try making your own queries using the methods above, e.g:
  - Could you get the number of episodes for the CCG_GP_PRACTICE 029, grouped by arrivaldata, and sorted by the number of episodes (EPIKEY)?
  - Which CCG_GP_PRACTICE has the most episodes in the data, in the first quarter of the financial year according to arrival date?
  - Are there any duplicate episode keys (EPIKEY) in the data? (hint: try `.distinct()`...)
- could you recreate the following SQL query in PySpark?:

```SQL
select
  AEARRIVALMODE AE_arrival_mode,
  count(EPIKEY) episode_count
from
  hes.ae_2122
where
  SEX = 1
group by
  AEARRIVALMODE
order by
  count(epikey) desc
```