# Applied Data Science (MAST30034) Tutorial 1

- Basic intro to Apache Spark (30-60 minutes)
- Project 1 Tips (yes, it's already out and we **strongly recommend you start today**) (remainder of time)
_________________

## Starting a Spark Session
To begin with Spark, we need to start a `SparkSession` class.
- `appName`: Name of the Spark app
- `config`: Configurations to initialise with. We will initialise this example with `'spark.sql.repl.eagerEval.enabled'` which enables a nicer HTML display (similar to `pandas`) for the DataFrame outputs.
- `.getOrCreate()`: Create the spark session.

In [2]:
from pyspark.sql import SparkSession

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Tutorial 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/17 18:19:58 WARN Utils: Your hostname, Jams-Laptoop, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/08/17 18:19:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/17 18:19:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


A general note is to understand that Spark is **immutable**. We'll discuss it further down the track, but for now, just remember this!

Documentation is also going to be your saving grace. If you have tried your best **and have read the documentation and researched on Stack Overflow** but still can't get it working, then try Chat-GPT. If that's not possible, then you can ask your tutor for help :P

## Reading in the Parquet
As of 2022, TLC has made a **great decision** to drop `csv` and adopt `parquet` formats instead. So, what's a `parquet`? 

Related materials:
1. [What if you could get the simplicity, convenience, interoperability, and storage niceties of an old-fashioned CSV with the speed of a NoSQL database and the storage requirements of a gzipped file? Enter Parquet.](https://databricks.com/session/spark-parquet-in-depth)
2. [The Parquet Format and Performance Optimization Opportunities](https://databricks.com/session_eu19/the-parquet-format-and-performance-optimization-opportunities)

CSV:
- `csv` are tabular data formats read in line by line using a `,` delimiter.
- That is, these are stored by rows.
- They consume a lot of disk space and are one of the **most inefficient** ways of storing data.
- However, they are widely used and easy to use for smaller datasets.

Parquet:
- `parquet` on the other hand is stored in columns and (ELI5) are very efficient with data formats.
- For example, a single row in a `csv` can contain several different data types. 
- `parquet` just have the single data type per column, allowing compression algorithms to be applied to reduce disk space and read efficiency.
- For alternatives to `csv` for row based data formats, you can take a look at `avro`.

![Divisions of storage format](../../media/storageformat.png)

Cost Analysis from Amazon Web Services (AWS): ![image.png](https://miro.medium.com/max/1400/1*vdasMxTjInhBXIRA8K1XYQ.png)

Spark Docs
- https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.parquet.html?highlight=read%20parquet
- https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.show.html?highlight=show#pyspark.sql.DataFrame.show

In [3]:
# sdf = spark df = spark data frame
sdf = spark.read.parquet('../../data/tlc_data/2024-01.parquet')
sdf.show(1, vertical=True, truncate=100)

                                                                                

-RECORD 0------------------------------------
 VendorID              | 2                   
 tpep_pickup_datetime  | 2024-01-01 00:57:55 
 tpep_dropoff_datetime | 2024-01-01 01:17:43 
 passenger_count       | 1                   
 trip_distance         | 1.72                
 RatecodeID            | 1                   
 store_and_fwd_flag    | N                   
 PULocationID          | 186                 
 DOLocationID          | 79                  
 payment_type          | 2                   
 fare_amount           | 17.7                
 extra                 | 1.0                 
 mta_tax               | 0.5                 
 tip_amount            | 0.0                 
 tolls_amount          | 0.0                 
 improvement_surcharge | 1.0                 
 total_amount          | 22.7                
 congestion_surcharge  | 2.5                 
 Airport_fee           | 0.0                 
only showing top 1 row


The Spark UI is quite ugly at times, so if you miss `pandas` and want the "nice" display you can set `spark.sql.repl.eagerEval.enabled` to `True` in the config. To see the nice format, use `.limit()`.

`pyspark`'s `.show()`, `.head()`, `.limit()`, etc are all alternatives to `pandas`'s `.head()` (`.tail` exists in both `pandas` and `pyspark`).

In [4]:
sdf.limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
2,2024-01-01 00:57:55,2024-01-01 01:17:43,1,1.72,1,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0
1,2024-01-01 00:03:00,2024-01-01 00:09:36,1,1.8,1,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0
1,2024-01-01 00:17:06,2024-01-01 00:35:01,1,4.7,1,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0
1,2024-01-01 00:36:38,2024-01-01 00:44:56,1,1.4,1,N,79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0
1,2024-01-01 00:46:51,2024-01-01 00:52:57,1,0.8,1,N,211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0


Spark has also been designed to read in directories as well! We won't be using it for the tutorial, but if you wish to use it for your project, feel free to do so!

In [5]:
# here, we give it the directory, rather than a specific parquet
sdf_all = spark.read.parquet('../../data/tlc_data/')

To count the number of records, use the `.count()` method. The equivalent in `pandas` would be `len(df)` or `df.shape` or alternative. 

In [6]:
sdf.count(), sdf_all.count()

(2964624, 9554778)

To view the data types of our `sdf`, we can use:
- `.printSchema()` to print it nicely.
- `.schema` for the actual schema object

The `pandas` alternative is `df.dtypes`

In [7]:
sdf.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [8]:
sdf.schema

StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampNTZType(), True), StructField('tpep_dropoff_datetime', TimestampNTZType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])

See here for the available data types: https://spark.apache.org/docs/latest/sql-ref-datatypes.html

## Basic Operations
### Selection
To show a specific column, we will use `sdf.select(col).limit(5)`. 
- The equivalent in `pandas` is `df[col].head()`.

To _access_ a specific column, use the `sdf[col]` syntax (equivalent to `df[col]`). Avoid using `sdf.col` or `df.col` as it is **not** robust (cannot handle columns with spaces) or future-proof. 

For multiple columns, pass them through an array as usual.

Please note, this selection is only good for seeing bits and pieces of data and not for filtering.

In [9]:
sdf.select('passenger_count').limit(5)

passenger_count
1
1
1
1
1


_Students to write code to select the first 10 records for `passenger_count` and `trip_distance`_

In [10]:
# write code here to select the first 10 records for `passenger_count` and `trip_distance`
sdf.select('passenger_count', 'trip_distance').limit(10)

passenger_count,trip_distance
1,1.72
1,1.8
1,4.7
1,1.4
1,0.8
1,4.7
2,10.82
0,3.0
1,5.44
1,0.04


### Filtering
For filtering data, we use `sdf.filter(condition)` or `sdf.where(condition)` (they are aliases of each other)
- The equivalent in `pandas` is `df.loc[condition].head()`
- When using multiple conditions, use parenthesis and `&` (AND) / `|` (OR)

To do so, we will use `pyspark.sql.functions.col` to specify the column we are working with.

In [11]:
from pyspark.sql import functions as F

In [12]:
F.col("passenger_count")

Column<'passenger_count'>

As you can see, this is just a "column type" and doesn't do much. We'll come back to this in the next tutorial. For now, take our word.

In [13]:
sdf.filter(F.col('passenger_count') == 5).limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
2,2024-01-01 00:14:52,2024-01-01 00:17:38,5,0.55,1,N,239,143,1,5.1,1.0,0.5,1.52,0.0,1.0,11.62,2.5,0.0
2,2024-01-01 00:22:06,2024-01-01 00:51:28,5,3.1,1,N,143,170,1,26.8,1.0,0.5,6.36,0.0,1.0,38.16,2.5,0.0
2,2024-01-01 00:11:22,2024-01-01 00:20:14,5,1.63,1,N,246,246,1,11.4,1.0,0.5,4.92,0.0,1.0,21.32,2.5,0.0
2,2024-01-01 00:17:21,2024-01-01 00:23:22,5,1.31,1,N,229,140,2,7.9,1.0,0.5,0.0,0.0,1.0,12.9,2.5,0.0
2,2024-01-01 00:17:17,2024-01-01 00:23:11,5,0.94,1,N,144,231,1,7.9,1.0,0.5,0.0,0.0,1.0,12.9,2.5,0.0


_Students to write code to retrieve all non-zero passenger counts and all non-zero trip distances using `.filter()`_

In [14]:
# write code here to retrieve all non-zero passenger counts and all non-zero trip distances using filter()
sdf.filter((F.col('passenger_count') > 0) & (F.col('trip_distance') > 0)).select('passenger_count', 'trip_distance').limit(10)

passenger_count,trip_distance
1,1.72
1,1.8
1,4.7
1,1.4
1,0.8
1,4.7
2,10.82
1,5.44
1,0.04
2,0.75


### GroupBy (Aggregation)
To groupby the data (i.e mean), we can use `sdf.groupby(col).mean(aggregated columns).limit(5)`
- The equivalent in `pandas` is `df.groupby(col)[aggregated columns].mean().head()`

In [15]:
sdf.groupby('passenger_count').mean('trip_distance').limit(5)

passenger_count,avg(trip_distance)
0,2.7438750993167025
7,2.29375
6,2.951688811345235
5,3.0734722139318347
1,3.1375658449911463


We can also apply multiple different aggregations and change their output names using `.agg()` and `.alias()`! To see the list of all SQL functions, visit https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html

We'll also use `.orderBy()` to display the results nicely.

In [16]:
aggregated_results = sdf \
                    .groupBy("passenger_count") \
                    .agg(
                        F.mean("total_amount").alias("avg_trip_amount_usd"),
                        F.max("trip_distance").alias("max_trip_distance_miles")
                    ) \
                    .orderBy("passenger_count")

aggregated_results.show()

+---------------+-------------------+-----------------------+
|passenger_count|avg_trip_amount_usd|max_trip_distance_miles|
+---------------+-------------------+-----------------------+
|           NULL| 25.811736633327225|               312722.3|
|              0| 25.327816939456696|                   60.2|
|              1|  26.20523044549061|               15400.32|
|              2|   29.5206599309276|                  277.4|
|              3| 29.138309044289365|                  83.92|
|              4|    30.877266710278|                 120.78|
|              5|  26.26912911120415|                 120.86|
|              6| 25.801183286359887|                  44.52|
|              7|             57.735|                  11.96|
|              8|  95.66803921568629|                  17.74|
|              9|              18.45|                    1.8|
+---------------+-------------------+-----------------------+



## Saving Data
By default, Spark will save your data sources as a `parquet` (highly recommended). If you wish to take a smaller sample and save it as a `csv` to load into `pandas`, that is also fine.

In [17]:
aggregated_results.write.mode('overwrite').parquet('../../data/tute_data/aggregated_results')

Your directory may look a bit funky like this:

![image.png](../../media/aggregated_results_dir.png)

Don't worry, just leave it as is (we don't have time to cover everything about Spark unfortunately) and you can just read in the directory as is.

In [18]:
temp_results = spark.read.parquet('../../data/tute_data/aggregated_results')
temp_results.show()

+---------------+-------------------+-----------------------+
|passenger_count|avg_trip_amount_usd|max_trip_distance_miles|
+---------------+-------------------+-----------------------+
|           NULL| 25.811736633327225|               312722.3|
|              0| 25.327816939456696|                   60.2|
|              1|  26.20523044549061|               15400.32|
|              2|   29.5206599309276|                  277.4|
|              3| 29.138309044289365|                  83.92|
|              4|    30.877266710278|                 120.78|
|              5|  26.26912911120415|                 120.86|
|              6| 25.801183286359887|                  44.52|
|              7|             57.735|                  11.96|
|              8|  95.66803921568629|                  17.74|
|              9|              18.45|                    1.8|
+---------------+-------------------+-----------------------+



---

# Summary (and Break)
Cool, we've covered the very very basics of Spark and will now cover the basics of plotting.

Rest assured, we will cover more intricate transformations for the next tutorial (which you may go ahead in of course).

---

## Sampling Data for Plotting

Whilst Spark is amazing at handling big data sets, it isn't a great idea to plot all of it. We suggest taking a maximum of 5% of records for the tutorial. 

You can up it to your requirements, but we recommend sticking to less than 1 million records per month for visualization purposes.

**Project 1 Checklist:**
- You have justified your sample size (i.e due to runtime, distribution of data, etc)
- You have justified your sampling method (i.e random, stratified, etc)
- You have detailed in your report that you have sampled for visualization purposes BUT your analysis still uses the full distribution of data
- You mention any issues that can potentially be caused by sampling (i.e biased visualisation if using random)

Remember, it is your responsibility as the student (future Data Scientist) to convince the tutor (your stakeholders) that your justifications and assumptions are correct!

To sample your data and convert it into a `pandas` dataframe, you can use the `.toPandas()` and save a sample of the `sdf` to read it in. We will also fix the random seed to be `0` just for consistency.

In [19]:
SAMPLE_SIZE = 0.05

In [20]:
import pandas as pd
df = sdf.sample(SAMPLE_SIZE, seed=0).toPandas()
df.to_csv('../../data/tute_data/sample_data.csv', index=False)

In [21]:
df.to_parquet('../../data/tute_data/sample_data.parquet')

Just spend a moment and look at the disk space the `csv` takes for the 5% sample size (16.1mb). Compare that to the `parquet` which isn't even 4mb, let alone the full sample size in `parquet` format taking only 48mb of disk space.

Let that sink in and give our thanks to the devs who made Spark. 

In [22]:
%%time
df_csv = pd.read_csv('../../data/tute_data/sample_data.csv')

CPU times: user 215 ms, sys: 42.1 ms, total: 257 ms
Wall time: 240 ms


In [None]:
%%time
df_parquet = pd.read_parquet('../../data/tute_data/sample_data.parquet')

CPU times: user 51.2 ms, sys: 275 ms, total: 326 ms
Wall time: 82.4 ms


25/08/17 19:08:53 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1389409 ms exceeds timeout 120000 ms
25/08/17 19:08:53 WARN SparkContext: Killing executors is not supported by current scheduler.
25/08/17 19:08:54 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:81)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:669)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1296)
	at 

We recommend you save every dataframe or aggregation as `parquet` so you don't keep running your notebook from top to bottom waiting 20 years for a result, or have so many variables and dataframes defined that you run out of memory for small transformations.

We strongly suggest you have a `code` folder in your Project 1 directory with the following structure:
- `preprocessing_notebook_part_1.ipynb`: outputs a structured parquet format and saves it.
- `preprocessing_notebook_part_2.ipynb`: reads in the output from above and does some aggregations and sampling before saving it.
- `data_analysis_xyz.ipynb`: conducts analysis on a single sample or aggregation from the output above.
- `data_analysis_abc.ipynb`: conducts analysis on another single sample or aggregation from the output above.
- `...`

This is a very basic version of what you call a "data pipeline" (or ETL pipeline, etc).

_________________


# Project 1 Tips and Questions

### IMPORTANT PLEASE READ THIS
First and foremost, you want to be familiar with the homepage https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

Read through the relevant data dictionaries:
- **MUST READ:** https://www1.nyc.gov/assets/tlc/downloads/pdf/trip_record_user_guide.pdf
- https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf
- https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_green.pdf
- https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_fhv.pdf
- https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_hvfhs.pdf

Why? Your tutors can be treated as "experts" in this field. To prepare you for the Industry Project, we need to assess students on adhering to requirements and business rules. 

The tutor team knows this dataset inside out. If you are incorrectly filtering records without sufficient justification, you will be losing marks as per requirements.

### An Incorrect Example
- Scenario: Student does analysis on `tip_amount` and finds several `NULL` values and either drops them or includes it in the analysis. Later on, they use a regression model to predict this value.

- Result: According to the data dictionary, `tip_amount` is automatically populated for credit card tips (`payment_type` is `1`). Cash tips are not included. This means that the students' analysis included all payment types despite this field clearly specifying the rule. 

- Penalty: The student will lose marks on the analysis section. The modelling section will be marked _assuming_ they got this filtering method correct. However, if another issue pops up due to this, there will be another penalty applied. Please get this right!

- Solution: Student should filter for only `payment_type=1` and now, the student can (hopefully) conduct correct analysis on `tip_amount`.

Several students over the past few years have lost many marks for simple rules like this (especially `tip_amount`).

### Readable Code
- We will be assessing the quality of your code and how you present it in your notebooks. 
- This is because there is no point writing code that cannot be easily interpreted. At the end of the day, employers and clients are not only paying for your analysis, but also the corresponding code. 
- If your code is confusing or difficult to read, there is little chance your client will come back to you.

**Variable Names:**  
As long as you are consistent, then it is fine. For example, commit to either using:
- Snake Case: words are seperated by underscores such as `variable_name`
- Camel Case: words are seperated by captials such as `variableName`

Your variables should be contextual and describe the code. That is, try to name your variables to be understandable **without comments**.

**Comments and Docstrings (w.r.t JupyterNotebook Cells):**  
Cells in Jupyter Notebook should aim to do one "block of logic" at a time (i.e importing libraries, defining functions, filtering rows, etc).
- If it takes a reader more than a few seconds to understand your cell, you need comments.
- Your functions need to have docstrings describing what they do. If you forgot, search it online or go visit your COMP10001 Grok course.
- Use markdown cells for longer comments or explaining logic, inline comments in code for short descriptions of hard-to-understand code.

We won't ask you to run `flake8` or `pylint` on your notebooks. We just ask for good comments in the code and markdown cells, reasonable variable names, and clean directories.

Here is a good example of good docstring + comments for functions.

```python
def some_function(some_val: str) -> str:
    """
    This function takes in some string value
    and outputs some string value via some transformation.
    """
    # make sure the casing is correct
    new_val = some_val.casefold()
    return new_val
```