# Setup

Cluster is setup as usual using Google Cloud's DataProc.

One launched the R and python environment are updated via conda:

```shell
conda upgrade --all
```

`SparkR` should be already installed and we can install RStudio's `sparklyr` with


In [None]:
install.packages("sparklyr")

Also for good measure I've installed quarto,

```
wget https://github.com/quarto-dev/quarto-cli/releases/download/v1.1.251/quarto-1.1.251-linux-amd64.deb
dpkg -i quarto-1.1.251-linux-amd64.deb
```

## Data

NYC taxi data from https://github.com/toddwschneider/nyc-taxi-data

```shell
mkdir nyc_taxi
cd nyc_taxi

wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-03.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-04.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-05.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-06.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-07.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-08.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-09.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-10.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-11.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-12.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-02.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-03.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-04.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-05.parquet
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-06.parquet
```

Move to hdfs,

```shell
hdfs dfs -mkdir nyc_taxi
hdfs dfs -put *.parquet nyc_taxi/
```

```shell
hdfs dfs -ls nyc_taxi/
## Found 18 items
## -rw-r--r--   2 root hadoop   21686067 2022-09-20 15:43 nyc_taxi/yellow_tripdata_2021-01.parquet
## -rw-r--r--   2 root hadoop   21777258 2022-09-20 15:43 nyc_taxi/yellow_tripdata_2021-02.parquet
## -rw-r--r--   2 root hadoop   30007852 2022-09-20 15:43 nyc_taxi/yellow_tripdata_2021-03.parquet
## -rw-r--r--   2 root hadoop   34018560 2022-09-20 15:43 nyc_taxi/yellow_tripdata_2021-04.parquet
## -rw-r--r--   2 root hadoop   38743682 2022-09-20 15:43 nyc_taxi/yellow_tripdata_2021-05.parquet
## -rw-r--r--   2 root hadoop   44071592 2022-09-20 15:43 nyc_taxi/yellow_tripdata_2021-06.parquet
## -rw-r--r--   2 root hadoop   43697690 2022-09-20 15:43 nyc_taxi/yellow_tripdata_2021-07.parquet
## -rw-r--r--   2 root hadoop   43425907 2022-09-20 15:43 nyc_taxi/yellow_tripdata_2021-08.parquet
## -rw-r--r--   2 root hadoop   46125883 2022-09-20 15:43 nyc_taxi/yellow_tripdata_2021-09.parquet
## -rw-r--r--   2 root hadoop   53286464 2022-09-20 15:43 nyc_taxi/yellow_tripdata_2021-10.parquet
## -rw-r--r--   2 root hadoop   53100722 2022-09-20 15:43 nyc_taxi/yellow_tripdata_2021-11.parquet
## -rw-r--r--   2 root hadoop   49639052 2022-09-20 15:43 nyc_taxi/yellow_tripdata_2021-12.parquet
## -rw-r--r--   2 root hadoop   38139949 2022-09-20 15:41 nyc_taxi/yellow_tripdata_2022-01.parquet
## -rw-r--r--   2 root hadoop   45616512 2022-09-20 15:41 nyc_taxi/yellow_tripdata_2022-02.parquet
## -rw-r--r--   2 root hadoop   55682369 2022-09-20 15:41 nyc_taxi/yellow_tripdata_2022-03.parquet
## -rw-r--r--   2 root hadoop   55222692 2022-09-20 15:41 nyc_taxi/yellow_tripdata_2022-04.parquet
## -rw-r--r--   2 root hadoop   55558821 2022-09-20 15:41 nyc_taxi/yellow_tripdata_2022-05.parquet
## -rw-r--r--   2 root hadoop   55365184 2022-09-20 15:41 nyc_taxi/yellow_tripdata_2022-06.parquet
```

Since relative paths were used the data lives in the current user's home directory (happens to be root here). If you want to track it down you can do something like the following,

```shell
hdfs dfs -find / -name "nyc_taxi"
## /user/root/nyc_taxi
```

## SparkR


In [None]:
library(SparkR)

Before we can use Spark we need to create a Spark Session which will connect to the existing cluster, here we use `master = "yarn"` since we are using yarn for job management.


In [None]:
SparkR::sparkR.session(master = "yarn")

Data can be read locally or from hdfs - the package provides various `read.*()` functions, since our data is parquet we will use `read.parquet()`.


In [None]:
(d = SparkR::read.parquet("hdfs:///user/root/nyc_taxi/*.parquet"))

In [None]:
head(d)

In [None]:
dim(d)

In [None]:
SparkR::schema(d)

### Basic data processing

SparkR's basic interface is just a poor implementation of dplyr and the basic words - most everything works how you would expect with the primary exception being the NSE is not the same.


In [None]:
library(magrittr)

In [None]:
d %>%
    summarize(
        avg_n = mean(d$passenger_count),
        avg_fare = mean(d$fare_amount),
        avg_tip = mean(d$tip_amount / d$fare_amount)
    )

In [None]:
d %>%
    summarize(
        avg_n = mean(d$passenger_count),
        avg_fare = mean(d$fare_amount),
        avg_tip = mean(d$tip_amount / d$fare_amount)
    ) %>%
    collect()

In [None]:
d %>%
    group_by("VendorID") %>%
    summarize(
        avg_n = mean(d$passenger_count),
        avg_fare = mean(d$fare_amount),
        avg_tip = mean(d$tip_amount / d$fare_amount)
    ) %>%
    collect()

### More advanced data processing

Spark is not R so of the common operations or functions that we might want / expect are not going to be there - in which case we need to find the equivalent spark function for our purpose.

A good place to look is at the function reference found [here](https://spark.apache.org/docs/latest/api/R/reference/index.html)


In [None]:
d %>%
    mutate(duration = d$tpep_dropoff_datetime - d$tpep_pickup_datetime) %>%
    select("duration", "tpep_dropoff_datetime", "tpep_pickup_datetime") %>%
    head()    

In [None]:
d %>%
    mutate(duration = d$tpep_dropoff_datetime - d$tpep_pickup_datetime) %>%
    select("duration", "tpep_dropoff_datetime", "tpep_pickup_datetime") %>%
    head() %>% 
    {.[["duration"]][[1]]}

In [None]:
d %>%
    mutate(duration = datediff(d$tpep_dropoff_datetime, d$tpep_pickup_datetime)) %>%
    select("duration", "tpep_dropoff_datetime", "tpep_pickup_datetime") %>%
    head()

In [None]:
d %>%
    mutate(duration = as.integer(d$tpep_dropoff_datetime) - as.integer(d$tpep_pickup_datetime)) %>%
    select("duration", "tpep_dropoff_datetime", "tpep_pickup_datetime") %>%
    head()

In [None]:
d %>%
    mutate(duration = (cast(d$tpep_dropoff_datetime, "double") - cast(d$tpep_pickup_datetime, "double")) / 60) %>%
    select("duration", "tpep_dropoff_datetime", "tpep_pickup_datetime") %>%
    head()

## sparklyr

This is RStudio's effort in this space - it leverages the existing work on dplyr + dbplyr to interface with spark / hdfs. As such it is performing basically the same trick in so far as your dplyr code is being translated into SQL and then Spark is being interacted with via its SQL interface.

Like with SparkR (and other R database connections) we need to create a session / connection first,


In [None]:
library(sparklyr)
library(dplyr)

In [None]:
sc = sparklyr::spark_connect(master = "yarn")

In [None]:
d = sparklyr::spark_read_parquet(sc, "taxi", "hdfs:///user/root/nyc_taxi/*.parquet")

In [None]:
d

In [None]:
dim(d)

In [None]:
d %>% 
    summarize(n())

In [None]:
d %>%
    group_by(VendorID) %>%
    summarize(
        avg_passengers = mean(passenger_count),
        avg_tip = mean(tip_amount/fare_amount)
    )

In [None]:
d %>%
    mutate(
        duration = tpep_dropoff_datetime - tpep_pickup_datetime
    ) %>%
    select(duration, tpep_dropoff_datetime, tpep_pickup_datetime)

In [None]:
d %>%
    mutate(
        duration = as.double(tpep_dropoff_datetime - tpep_pickup_datetime)
    ) %>%
    select(duration, tpep_dropoff_datetime, tpep_pickup_datetime)

In [None]:
d %>%
    mutate(
        duration = (as.double(tpep_dropoff_datetime) - as.double(tpep_pickup_datetime))/60
    ) %>%
    select(duration, tpep_dropoff_datetime, tpep_pickup_datetime)