# Spark 3.0 Tutorial
- Author: Akira Takihara Wang (https://github.com/akiratwang)
- Tutorial Up-to-Date as of: April 2021  
- Usage: For MAST30034 students only  

Tutorial Operating System(s):
- Windows 10 and WSL2
- Linux

# Working with Larger Datasets with a Scalable Solution!
- Consider the full 2015 Taxi Dataset (at ~2GB per month @ ~24GB annually)
- Datasets with more than 20k rows would be hard for Excel, but fine for Pandas.
- A file with more than 100mil rows (a few GB) is large for Pandas.
- Although `pandas` would be sufficient for each month, how about a whole year?

That's right, use Spark 3.0!
![image.png](https://spark.apache.org/images/spark-logo-trademark.png)


## Pre-Requisites for this Tutorial
1. You must already have Spark installed.
2. You need the dataset downloaded.

The code below downloads all 2015 data directly from the Amazon S3 Bucket. This is approximately ~21.3GB in size, so make sure you have ample storage space. You will only need to run this once.
```python
from os.path import getsize
from urllib.request import urlretrieve

output_dir = "../data/large"
fname_template = "yellow_tripdata_2015"

for m in range(1, 13):
    month = str(m).zfill(2)
    out = f'{fname_template}-{month}.csv'
    url = f"https://s3.amazonaws.com/nyc-tlc/trip+data/{out}"
    urlretrieve(url, f"{output_dir}/{out}")

    print(f"Done downloading {out} to {output_dir} with size {getsize(f'{output_dir}/{out}') / 1073741824:.2f}GB")
```

## Optional Installation
- Requires NodeJS and nbextensions installed:
```bash
# install NodeJS
sudo apt install npm
# install Jupyter Extensions
pip3 install jupyter_contrib_nbextensions
jupyter contrib nbextension install --user
jupyter nbextension enable varInspector/main
```
- Follow instructions to install `SparkMonitor` (https://github.com/swan-cern/jupyter-extensions)
```bash
pip3 install sparkmonitor
jupyter nbextension install sparkmonitor --py --user
jupyter nbextension enable  sparkmonitor --py --user
jupyter serverextension enable --py --system sparkmonitor  --user
jupyter lab build
ipython profile create
echo "c.InteractiveShellApp.extensions.append('sparkmonitor.kernelextension')" >>  $(ipython profile locate default)/ipython_kernel_config.py
```

![image.gif](https://user-images.githubusercontent.com/6822941/29753710-ff8849b6-8b94-11e7-8f9c-bdc59bf72143.gif)

Only run this cell below if you have installed `SparkMonitor`. Otherwise, it will result in an error.

In [1]:
from pyspark import SparkContext

# Start the spark context
sc = SparkContext.getOrCreate(conf=swan_spark_conf) 

NameError: name 'swan_spark_conf' is not defined

# Starting a Spark Session
Start your Spark Session using `SparkSession.builder.getOrCreate()`. This is an object that provides some point of entry to interact with Spark functionalities.

In [4]:
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings("ignore")

# create a spark session (which will run spark jobs)
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

## Spark DataFrames
To create a Spark DataFrames using a Pandas, simply pass it through `spark.createDataFrame()`.
- It's common convention to name pandas df as `df` and spark df as `sdf`
- And yes, Spark DataFrames *do* look ugly...

In [5]:
import pandas as pd

df = pd.read_csv('../data/sample.csv')

In [6]:
sdf = spark.createDataFrame(df)
sdf.show(5)

Py4JJavaError: An error occurred while calling o64.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (DESKTOP-FE732SE.modem executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:476)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 29 more


- If you want to make it look nice (for the first 20 rows), then you can change the setting.
- Use `sdf.limit()` as the alternative to `df.head()` from Pandas

In [None]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [None]:
sdf.limit(5)

To convert a Spark DataFrame back into a Pandas dataframe:

In [None]:
sdf.toPandas()

- Now, you might realize that this is still redundant as you need to read it in using Pandas with this method.
- Likewise, using a `feather` dataset format requires you to read it into Pandas and then into Spark. 

## Overcoming Dataset Formats
- Directly use Apache Arrow (framework that `feather` is built on) with `pip3 install pyarrow`
- Set `spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)`

In [None]:
# Benchmark Normal
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', False)
%time sdf.toPandas()

# Benchmark with Apache Arrow
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
%time sdf.toPandas()

As you can see, Apache Arrow is *magnitudes* faster!

## Reading in directly to Spark
Use `spark.read`, where you can pass through either:
- A single file;
- comma separated file names;
- or a folder directory with files.

Below, we read all csv's in 2015 with a dataset size of 20GB+!

In [None]:
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

sdf = spark.read.csv('../data/large', header=True)

In [None]:
f"{sdf.count():,} rows!"

Damn, ain't it great that you can read in all the csv's without having to append or merge them **AND** no `MemoryError`???

## Schema
- It is best practice to create a standard **schema** for your dataset. 
- It's very similar to creating a table in SQL (in fact its based on this) where you must specify what datatype the column is prior to adding data values.
- View all data types here: https://spark.apache.org/docs/latest/sql-ref-datatypes.html


Note: `RatecodeID` and `RateCodeID` are the same column, but inconsistent across months. We will be renaming it.

In [None]:
import pyspark.sql.functions as F

from pyspark.sql.types import *
from pyspark.sql.functions import col

In [None]:
ints = ('VendorID', 'passenger_count', 'RateCodeID', 'RatecodeID','payment_type')
doubles = ('trip_distance', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude',
           'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount')
strings = ('store_and_fwd_flag',)
dtimes = ('tpep_pickup_datetime', 'tpep_dropoff_datetime', )

dtypes = {column: IntegerType() for column in ints}
dtypes.update({column: DoubleType() for column in doubles})
dtypes.update({column: StringType() for column in strings})
dtypes.update({column: TimestampType() for column in dtimes})

In [None]:
schema = StructType()

for column in sdf.columns:
    schema.add(column, # column name
               dtypes[column], # data type
               True # is nullable?
              )

In [None]:
sdf_with_schema = spark.read.csv('../data/large', header=True, schema=schema) \
    .withColumnRenamed("RatecodeID","RateCodeID") # rename the wrong column

sdf_with_schema.printSchema()

In [None]:
sdf.printSchema()

Although most PySpark operations will automatically handle incorrect data types, it is not recommended to rely on this from a Data Integrity standpoint. Schema's will be used by Business Analysts when describing or explaining the whole data pipeline. If the schema is incorrect or suspect to change, then many things can fall apart down the end of the pipeline!

In [None]:
sdf_with_schema.limit(5)

## Transformations and Lazy Evaluation (IMPORTANT)
- Transformations transform a Spark DataFrame into a new DataFrame *without* altering the original data, making Spark **immutable**.
- For example, operations will return transformed results rather than mutating the original. 
- It's common to see `sdf = sdf.some_transformation()` if you are looking to overwrite it.
- Finally, all operations in Spark are evaluated lazily! That is, the data doesn't "move" until called upon.

Take for example the code block below. Even when renaming columns, we need to overwrite the original variable.

In [None]:
sdf_with_schema = sdf_with_schema.withColumnRenamed('tpep_pickup_datetime', 'pickup_time') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_time')

In [None]:
sdf_with_schema.limit(5)

## DataType Conversions
Consider `tpep_pickup_datetime` as a `StringType()` and wish to convert to a `TimeStampType()`:
```python
# Method 1 using withColumn()
sdf_with_schema.withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast(TimestampType()))

# Method 2 using select()
sdf_with_schema.select(col("tpep_pickup_datetime").cast("Timestamp"))

# Method 3 using selectExpr() - similar to SQL syntax
sdf_with_schema.selectExpr("cast(tpep_pickup_datetime as timestamp)")
```

For an actual example, let's take a look at the `store_and_fwd_flag` which should be boolean. Currently, we have `N` and `Y` which we can resolve by assigning the column to a boolean condition.

In [None]:
sdf_with_schema_temp = sdf_with_schema.withColumn("store_and_fwd_flag_bool", 
                                             (sdf_with_schema["store_and_fwd_flag"] == 'Y') \
                                             .cast("boolean"))

In [None]:
sdf_with_schema_temp.select('store_and_fwd_flag', 'store_and_fwd_flag_bool').limit(5)

As you can see, we now have the string `store_and_fwd_flag` set to boolean now under `store_and_fwd_flag_bool`. Let's change it in the `sdf_with_schema` DataFrame.

In [None]:
sdf_with_schema = sdf_with_schema.withColumn("store_and_fwd_flag", 
                                             (sdf_with_schema["store_and_fwd_flag"] == 'Y') \
                                             .cast("boolean"))

## Retrieving and Filtering Data 
Collecting:
- The `collect()` method is an operation that _collects_ all the rows for you (recall that Spark has lazy evaluation, so this method is the evaluation step).
- If you use `collect()` on the full dataset or large partition, you will still result in an `OutOfMemoryError` as it will need to bring it into memory.
- If you want to just get the size of the result, you can use `count()`.

Filtering:
- Similar to the syntax with `df.loc[]` from Pandas.
- Use bitwise `&` or `|` to filter based on several conditions.
- If you want to use NumPy's `.isin()` method, it's the same for Spark (and bitwise not `~` for the not in)

In [None]:
small_sdf = sdf_with_schema.limit(1000)

In [None]:
rows = small_sdf.select('total_amount').limit(5).collect()
rows

In [None]:
# you can index your rows like normal lists
rows[0][0]

In [None]:
small_sdf.filter(small_sdf.store_and_fwd_flag == True)

You can also filter DataFrame rows using `startswith()`, `endswith()`, and `contains()` 

In [None]:
# all trips whose pickups are -74.X, 40.Y
small_sdf.filter((small_sdf.pickup_longitude.startswith('-74.')) 
                 & (small_sdf.pickup_latitude.startswith('40.')))

Even better, you can use the SQL `LIKE` syntax!
- `like()` for the SQL `LIKE`;
- and `rlike()` for regex matching.

In [None]:
# using SQL LIKE
small_sdf.filter(small_sdf.pickup_time.like('%19:%%:%%'))

In [None]:
# using regex
small_sdf.filter(small_sdf.pickup_time.rlike(r'.+\s(19):\d{2}:\d{2}'))

## Unique Values, Duplicates, and Missing Values
- You can easily grab unique values using `sdf.distinct()` and drop duplicates using `sdf.dropDuplicates()`.
- For missing values, it's the same with Pandas `.fillna()`.

In [None]:
small_sdf.select('passenger_count').distinct()

Here's a code snippet which inserts a `null` value into `total_amount`, then finds the number `nulls` present in `total_amount`. This is because the dataset has no nulls present.
- We should get 1 instance back as there are no `nulls` except the one we inserted.
- As you can see, granular changes with Spark requires a round-a-bout approach!

In [None]:
# create a single row aand convert to sdf
temp = small_sdf.limit(1).toPandas()
temp.rename({'pickup_time': 'tpep_pickup_datetime', 'dropoff_time': 'tpep_dropoff_datetime'}, axis=1, inplace=True)
temp['total_amount'] = None
r = spark.createDataFrame(temp, schema)
r = r.withColumn("store_and_fwd_flag",(r["store_and_fwd_flag"] == 'true').cast("boolean"))

# take the union of the two (that is, add the row to small_sdf)
small_sdf = small_sdf.union(r)

# sample 5 random rows from 10% of all the data
small_sdf.sample(0.1).limit(5)

In [None]:
# now, find all row values that are nan or null for total_amount
small_sdf.where(col("total_amount").isNull())

## Sorting Values
- As you should know from 2nd year Algorithms, sorting algorithms whilst efficient, are still costly!
- Even for a fast `O(nlogn)` sorting algorithm, our dataset will require `146,000,000 x log2(146,000,000) = 146,000,000 x 27.12` operations!
- So, be careful when you run a sort - avoid it if you can.

Here's a few ways to sort a column:
```python
# sort by total amount from largest and passenger count from smallest
small_sdf.sort(small_sdf.total_amount.desc(), small_sdf.passenger_count.asc())
small_sdf.sort(col("total_amount").desc(), col("passenger_count").asc())
small_sdf.orderBy(col("total_amount").desc(), col("passenger_count").asc())
```

In [None]:
small_sdf.sort(col("total_amount").desc(), col("passenger_count").asc())

## Aggregations
- Aggregations are always a useful function whether it be for summarising data for analysis or fact tables.
- Like Pandas, Spark also covers `count()`, `mean()`, `max()`, `agg()`, etc...

In [None]:
from pyspark.sql.functions import mean

small_sdf.groupBy("passenger_count") \
    .agg(mean("total_amount").alias("Average Trip Amount USD$"),
         mean("trip_distance").alias("Average Distance in Miles")) 

In [None]:
# and yes, it does work on the full dataset (albeit it does take time...)
results = sdf_with_schema.groupBy("passenger_count") \
    .agg(mean("total_amount").alias('avg_trip_amount')) \
    .orderBy("passenger_count")

results

## Writing to Disk
- Conventionally, the "go-to" dataset was a `csv`. For good reasons, we have explored alternatives such as data serialization methods with formats such as Python's `pickle` or Apache Arrow's `feather`.
- Spark introduces its' own type which is a **Parquet File**.
- You can write a specific format (if supported) using `sdf.write.format("parquet").save(path)`.

### Parquet:
- Parquet files are stored as a directory structure which contains data files, metadata, and some compressed files.|
- If the file already exists, it cannot be overwritten without removing the existing file.

In [None]:
# Check to see if the fpath already exists. If so, remove it.
from shutil import rmtree
from os import path

fpath = '../data/aggregated_results.parquet/'
if path.exists(fpath):
    rmtree(fpath)

In [None]:
results.write.format('parquet').save('../data/aggregated_results.parquet')

Reading in `parquet` files are similar to `csv`:

In [None]:
spark.read.parquet(fpath)

## Union and Merging
- The `union()` method merges two Spark DataFrames and returns *a new* DataFrame with all rows from the two DataFrames *including duplicates*. 
- It works identical to SQL `UNION` and as a result, may include duplicate results.
- If you want no duplicates, you can do `union().distinct()` (distinct was mentioned previously).

The example below takes the union of two identical DataFrames consisting of 5 rows.

In [None]:
sdf1 = spark.createDataFrame(df.iloc[:5])
sdf2 = spark.createDataFrame(df.iloc[:5])

sdf1.union(sdf2)

In [None]:
sdf1.union(sdf2).distinct()

# User Defined Functions (UDF) and Pandas UDFs
So far, all the functions and methods have been about simple aggregations or filtering rows. However, preprocessing and data cleansing usually requires more powerful tools such as `regex`.

Unlike Pandas's `apply()` method (and also `rdd.map()`), we need to do a "bit" more work to generate UDFs.

1. Create a function with a `@udf()` decorator.
2. Specify an output data type (i.e `StringType()`) as format `@udf("string")` or `@udf(StringType())`.
3. Apply onto column(s) of choice (remembering that Spark is immutable).

Alternatively, if we want to use Pandas framework:
1. Create a function with a `@pandas_udf()` decorator and format as required.
2. Apply onto column(s) of choice.

In the following example, we will create a tuple consisting of pickup lat/lon to 4 decimal places.

In [None]:
# using UDF
@F.udf(ArrayType(DoubleType(), True))
def create_coords(lat, lon):
    return round(lat, 4), round(lon, 4)

In [None]:
small_sdf.withColumn("pickup_coords", create_coords(col("pickup_latitude"), col("pickup_longitude"))) \
    .limit(10)

And here's an example of mapping values from our data dictionary using a Pandas UDF:
- Type definition Syntax: https://www.python.org/dev/peps/pep-0484/#type-definition-syntax
- Function Decorators: https://johnpaton.net/posts/clean-spark-udfs/

The Pandas UDF is also quite new so there isn't much *help* other than the documentation: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html?highlight=pandas%20udf

Syntax:
```python
@pandas_udf(THE DATATYPE OF THE OUTPUT)
def FUNCTION_NAME(ARGUMENTS: INPUT DATA FORMAT) -> OUTPUT DATA FORMAT:
    ...
    return ...

sdf.withColumn(COLUMN OUT, FUNCTION_NAME(col(COLUMN IN)))
```

In [None]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [None]:
vendors = {1: 'Creative Mobile Technologies, LLC', 2: 'VeriFone Inc.'}

@pandas_udf("string")
def vendorMap(vid_col: pd.Series) -> pd.Series:
    return vid_col.map(vendors)

In [None]:
small_sdf.withColumn("VendorName", vendorMap(col("VendorID"))) \
    .limit(10)

And that's the basics of PySpark! If you would like to further increase your scope, here are some pathways:
- Data Science: Continue with Spark's MLlib to perform machine learning.
- Data Engineering: Learn Spark SQL and Spark Connectors (i.e connecting to data sources such as S3 buckets)