# Accessing Data From S3

This example shows how to configure a JupyterLab docker image to access data from AWS S3.

## Build a Docker Image with AWS Related JARs

First, we need to build a docker image that includes the missing jars files needed for accessing S3. You can also add the jars using a volume mount, and then include code in your notebook to update the `PYSPARK_SUBMIT_ARGS` to include the jars from their location within the docker image. I felt like baking the jars into the docker image was a little easier that having to run a code cell to update the `PYSPARK_SUBMIT_ARGS`.

This example is using Spark 3.0.1 with Hadoop 3.2, and the files that we're adding are:

* aws-java-sdk-bundle-1.11.950.jar
* hadoop-aws-3.2.0.jar
* jets3t-0.9.4.jar

Here is an example Dockerfile to use:

```
FROM jupyter/pyspark-notebook:8ea7abc5b7bc

USER root

ENV PYSPARK_SUBMIT_ARGS '--packages com.amazonaws:aws-java-sdk:1.11.950,org.apache.hadoop:hadoop-aws:3.2.0,net.java.dev.jets3t:jets3t:0.9.4 pyspark-shell'


# Download missing jars

# Get AWS SDK JAR
RUN (cd /usr/local/spark/jars && curl -O https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.950/aws-java-sdk-bundle-1.11.950.jar)

# Get Hadoop-AWS Jar
RUN (cd /usr/local/spark/jars && curl -O https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar)

# Get jets3t JAR
RUN (cd /usr/local/spark/jars && curl -O https://repo1.maven.org/maven2/net/java/dev/jets3t/jets3t/0.9.4/jets3t-0.9.4.jar)

USER $NB_UID
```


## Run the Docker Container and Pass in AWS Credentials

This example is assuming that you have appropriate credentials saved in $HOME/.aws/credentials, and have jq installed.

Fetch temporary credentials from AWS and run the docker container with the credentials and session token passed in as environment variables:

```bash
creds_json=$(aws --profile default --region us-west-2 sts get-session-token)

docker run -d --name jupyter --rm -p 8888:8888 \
  -e AWS_ACCESS_KEY_ID=$(echo "$creds_json" | jq -r .Credentials.AccessKeyId) \
  -e AWS_SECRET_ACCESS_KEY=$(echo "$creds_json" | jq -r .Credentials.SecretAccessKey) \
  -e AWS_SESSION_TOKEN=$(echo "$creds_json" | jq -r .Credentials.SessionToken) \
  jupyter-docker:yourtag jupyter lab --LabApp.token ''
```


## Configure Spark

In [1]:
from pyspark.sql import SparkSession

In [2]:
import logging
logging.getLogger().setLevel(logging.DEBUG)

### Set the SparkSession Thread Count and Memory

If you have JupyterLab running in the cloud, and you can afford to run enough instances where you're not overly concerned with cost, then don't worry about this section. If you are running JupyterLab on a single machine (for example, a laptop with limited resources), and the amount of data you want to process is more than you have available on the machine, then you might want to be thoughtful about how you initialize the SparkSession. If the single machine (perhaps your home laptop) use case sounds like you, then this is what I considered when configuring the SparkSession.

I have 8 cores and 16GB of memory available on my laptop, and I configured Docker to use 4 cores and up to 3GB of memory. 


Things to consider if the Spark cluster is on a constrained system:

* How much memory do you have available for your Spark job?
    > If you don't have much memory available, then consider reading the [Spark Memory Tuning Guide](https://spark.apache.org/docs/latest/tuning.html#memory-tuning). There are great suggestions for everything from changing the default serializer to being aware of the impacts of using broadcast variables.
* How much data do you plan to process? 
    > You also might want to be aware of the format that your source data is in. [Here is a nice article comparing CSV, JSON, and Parquet](https://www.linkedin.com/pulse/spark-file-format-showdown-csv-vs-json-parquet-garren-staubli/). If your data is in JSON, but you want to process the data as Parquet, then consider creating a job to convert the data to Parquet before using the data in your processing jobs.

In [3]:
MAX_MEMORY = "2g"

spark = SparkSession.builder \
    .master("local[4]") \
    .appName("Covid19TimeSeries") \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .config("fs.s3a.path.style.access", True) \
    .config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") \
    .config("fs.s3a.endpoint", "s3.us-west-2.amazonaws.com") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("com.amazonaws.services.s3.enableV4", True) \
    .config("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true") \
    .getOrCreate()

## Read Data From S3

At this point you should be able to read data in from S3.

In [4]:
s3path = "s3a://dev-leewallen-spark/covid-19-time-series/parquet/covid-19.parquet"

In [5]:
parquetDF = spark.read.parquet(s3path)

In [6]:
from pyspark.sql.functions import col 

In [7]:
parquetDF.show(5)

+---------+--------------+----------+------+--------------+---------+
|Confirmed|Country/Region|      Date|Deaths|Province/State|Recovered|
+---------+--------------+----------+------+--------------+---------+
|        0|   Afghanistan|2020-01-22|     0|          null|        0|
|        0|   Afghanistan|2020-01-23|     0|          null|        0|
|        0|   Afghanistan|2020-01-24|     0|          null|        0|
|        0|   Afghanistan|2020-01-25|     0|          null|        0|
|        0|   Afghanistan|2020-01-26|     0|          null|        0|
+---------+--------------+----------+------+--------------+---------+
only showing top 5 rows



In [8]:
usConfirmed = parquetDF.filter((col('`Country/Region`') == "US"))

In [47]:
# import chart_studio.plotly as py
# import plotly.graph_objects as go
# from  plotly.offline import plot
import pandas as pd
import matplotlib.pyplot as plt
import requests
requests.packages.urllib3.disable_warnings()

In [48]:
usConfirmed.show(5)

+---------+--------------+----------+------+--------------+---------+----------+
|Confirmed|Country/Region|      Date|Deaths|Province/State|Recovered|    DateTS|
+---------+--------------+----------+------+--------------+---------+----------+
|        1|            US|2020-01-22|     0|          null|        0|2020-01-22|
|        1|            US|2020-01-23|     0|          null|        0|2020-01-23|
|        2|            US|2020-01-24|     0|          null|        0|2020-01-24|
|        2|            US|2020-01-25|     0|          null|        0|2020-01-25|
|        5|            US|2020-01-26|     0|          null|        0|2020-01-26|
+---------+--------------+----------+------+--------------+---------+----------+
only showing top 5 rows



In [49]:
usConfirmed.printSchema()

root
 |-- Confirmed: long (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Deaths: long (nullable = true)
 |-- Province/State: string (nullable = true)
 |-- Recovered: long (nullable = true)
 |-- DateTS: date (nullable = true)



In [57]:
from pyspark.sql.types import DateType

usConfirmed = usConfirmed.withColumn("DateTS",usConfirmed["Date"].cast(DateType()))

In [58]:
usConfirmed.show()
usConfirmed.printSchema()

+---------+--------------+----------+------+--------------+---------+----------+
|Confirmed|Country/Region|      Date|Deaths|Province/State|Recovered|    DateTS|
+---------+--------------+----------+------+--------------+---------+----------+
|        1|            US|2020-01-22|     0|          null|        0|2020-01-22|
|        1|            US|2020-01-23|     0|          null|        0|2020-01-23|
|        2|            US|2020-01-24|     0|          null|        0|2020-01-24|
|        2|            US|2020-01-25|     0|          null|        0|2020-01-25|
|        5|            US|2020-01-26|     0|          null|        0|2020-01-26|
|        5|            US|2020-01-27|     0|          null|        0|2020-01-27|
|        5|            US|2020-01-28|     0|          null|        0|2020-01-28|
|        6|            US|2020-01-29|     0|          null|        0|2020-01-29|
|        6|            US|2020-01-30|     0|          null|        0|2020-01-30|
|        8|            US|20

In [59]:
usPandas = usConfirmed.toPandas()

In [60]:
usPandas

Unnamed: 0,Confirmed,Country/Region,Date,Deaths,Province/State,Recovered,DateTS
0,1,US,2020-01-22,0,,0,2020-01-22
1,1,US,2020-01-23,0,,0,2020-01-23
2,2,US,2020-01-24,0,,0,2020-01-24
3,2,US,2020-01-25,0,,0,2020-01-25
4,5,US,2020-01-26,0,,0,2020-01-26
...,...,...,...,...,...,...,...
368,25147891,US,2021-01-24,419251,,0,2021-01-24
369,25298986,US,2021-01-25,421168,,0,2021-01-25
370,25445583,US,2021-01-26,425252,,0,2021-01-26
371,25598061,US,2021-01-27,429195,,0,2021-01-27


### Pandas Plot Related Settings

In [61]:
pd.options.plotting.matplotlib.register_converters = True
plt.close("all")

### Make the Plot Interactive

Make the plot resizeable, and provide an interface so you can save your plot.

In [62]:
%matplotlib widget

In [63]:
usPandas.plot.bar(stacked=True)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

<AxesSubplot:>

In [64]:
usPandas.plot()

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

<AxesSubplot:>