***

# <font color="red">Introduction to the Oracle Cloud Infrastructure Data Flow integration with Data Science</font>
<p style="margin-left:10%; margin-right:10%;">by the <font color="teal">Oracle Cloud Infrastructure Data Science Service.</font></p>

---
# Overview:

Oracle Cloud Infrastructure (OCI) Data Flow is a fully managed Apache Spark service that performs processing tasks on extremely large datasets—without infrastructure to deploy or manage. With Data Flow, you can configure Data Science notebooks to run applications interactively against Data Flow.

Apache Spark is a distributed compute system designed to process data at scale. It supports large-scale SQL, batch, and stream processing, and machine learning tasks. Spark SQL provides database-like support. To query structured data, use Spark SQL. It is an ANSI standard SQL implementation.

Apache Livy is a REST interface to Spark. Submit fault-tolerant Spark jobs from the notebook using synchronous and asynchronous methods to retrieve the output.

SparkMagic allows for interactive communication with Spark using Livy. Using the `%%spark` magic directive within a JupyterLab code cell.

Data Flow Sessions support auto-scaling Data Flow cluster capabilities.

This notebook demonstrates how to run interactive Spark workloads on a long lasting [Oracle Cloud Infrastructure Data Flow](https://docs.oracle.com/en-us/iaas/data-flow/using/home.htm) cluster. **Data Flow Spark Magic** is used for interactively working with remote Spark clusters through Livy, a Spark REST server, in Jupyter notebooks. It includes a set of magic commands for interactively running Spark code.


Developed on conda environment [PySpark 3.2 and Data Flow](https://docs.oracle.com/iaas/data-science/using/conda-pyspark-fam.htm) for CPU on Python 3.8

---

## Contents:

- <a href='#pre-requisites'>1. Pre-requisites</a>
    - <a href='#prerequisites_helpers'>1.1 Helpers</a>
    - <a href='#prerequisites_authentication'>1.2 Authentication</a>
    - <a href='#prerequisites_variables'>1.3 Variables</a>    
- <a href='#dataflow_magic'>2. Data Flow Spark Magic</a>
    - <a href='#load_extension'>2.1. Load Extension</a>
    - <a href='#create_session'>2.2. Create Session</a>
        - <a href='#create_session_dynamic_allocation'>2.2.1. Example command for Spark dynamic allocation</a>
    - <a href='#update_session'>2.3. Update Session</a>
    - <a href='#stop_session'>2.4. Stop Session</a>
- <a href='#basic_examples'>3. Basic Spark Usage Examples</a>
    - <a href='#examples_pyspark'>3.1. PySpark</a>
    - <a href='#examples_sparksql'>3.2. Spark SQL</a>
- <a href='#cleanup'>4. Clean Up</a> 
- <a href='#ref'>5. References</a>   

---

<a id='pre-requisites'></a>
# 1. Pre-requisites 

Data Flow Sessions are accessible through the following conda environment: 

* **PySpark 3.2 and Data Flow 2.0 (pyspark32_p38_cpu_v2)**

You can customize `pypspark32_p38_cpu_v2`, publish it, and use it as a runtime environment for a Data Flow Session. 

<a id="prerequisites_helpers"></a>
## 1.1 Helpers
This section provides a helper method used across the notebook to prepare arguments for the magic commands. This function is particularly useful when you want to pass Python variables as arguments to the spark magic commands 

In [1]:
import json


def prepare_command(command: dict) -> str:
    """Converts dictionary command to the string formatted commands."""
    return f"'{json.dumps(command)}'"

<a id="prerequisites_authentication"></a>
## 1.2. Authentication
The [Oracle Accelerated Data Science SDK (ADS)](https://docs.oracle.com/iaas/tools/ads-sdk/latest/index.html) controls the authentication mechanism with the Data Flow Session Spark cluster.<br> 
To setup authentication use the ```ads.set_auth("resource_principal")``` or ```ads.set_auth("api_key")```. 

In [2]:
import ads

ads.set_auth("resource_principal")  # Supported values: resource_principal, api_key

<a id="prerequisites_variables"></a>
## 1.3. Variables
To run this notebook, you must provide some information about your tenancy configuration.  

In [3]:
import os

compartment_id = os.environ.get("NB_SESSION_COMPARTMENT_OCID")

<a id="dataflow_magic"></a>
# 2. Data Flow Spark Magic
Data Flow Spark Magic commands allow you to interactively work with Data Flow Spark clusters (sessions) in Jupyter notebooks through the Livy REST API. It provides a set of Jupyter Notebook cell magic commands to turn Jupyter into an integrated Spark development environment for remote clusters. 

**Data Flow Magic allows you to:**

* Run Spark code against Data Flow remote Spark cluster
* Create a Data Flow Spark Session with SparkContext against Data Flow remote Spark cluster
* Capture the output of Spark queries as a local Pandas data frame to interact easily with other Python libraries (e.g. matplotlib)

<a id="load_extension"></a>
### 2.1. Load Spark Magic Commands and Getting Help
Data Flow Spark Magic is a JupyterLab extension that you need to activate in your notebook using the `%load_ext dataflow.magics` magic command.<br>
After the extension is activated, the `%help` command can be used to get the list of supported commands.

In [4]:
%load_ext dataflow.magics

The `%help` command gives you a list of all the available commands, along with a list of their arguments and example calls. 

In [5]:
%help

Magic,Example,Explanation
create_session,"%create_session -l python -c '{""compartmentId"":""Data Flow Run resource compartment OCID"",""displayName"":""SessionApp"",""sparkVersion"":""3.2.1"",""driverShape"":""VM.Standard2.1"",""executorShape"":""VM.Standard2.1"",""numExecutors"":1,""archiveUri"":""Object Storage URL for Data Flow zip archive."",""metastoreId"":""optional metastore OCID"",""configuration"":{ ""spark.archives"":""oci://bucket@namespace/path/to/conda/pack"", #optional property to use Dataflow 'Run' resource to access OCI resources.  ""dataflow.auth"":""resource_principal"" }}'","Creates new session by providing session details. Example command for Flex shapes :  %create_session -l python -c '{""compartmentId"":""Data Flow Run resource compartment OCID"",""displayName"":""SessionApp"",""sparkVersion"":""3.2.1"",""driverShape"":""VM.Standard.E4.Flex"",""executorShape"":""VM.Standard.E4.Flex"",""numExecutors"":1,""driverShapeConfig"":{""ocpus"":1,""memoryInGBs"":16},""executorShapeConfig"":{""ocpus"":1,""memoryInGBs"":16}}'  Example command for Spark dynamic allocation :  %create_session -l python -c '{""compartmentId"":""Data Flow Run resource compartment OCID"",""displayName"":""SessionApp"",""sparkVersion"":""3.2.1"",""driverShape"":""VM.Standard2.1"",""executorShape"":""VM.Standard2.1"",""numExecutors"":1,""configuration"":{ ""spark.dynamicAllocation.enabled"":""true"", ""spark.dynamicAllocation.shuffleTracking.enabled"":""true"", ""spark.dynamicAllocation.minExecutors"":""1"", ""spark.dynamicAllocation.maxExecutors"":""4"", ""spark.dynamicAllocation.executorIdleTimeout"":""60"", ""spark.dynamicAllocation.schedulerBacklogTimeout"":""60"", ""spark.dataflow.dynamicAllocation.quotaPolicy"":""min"" }}'"
activate_session,"%activate_session -l python -c '{""compartmentId"":""Data Flow Run resource compartment OCID"",""displayName"":""SessionApp"",""applicationId"":""Existing sessionId to activate.""}'",Activate session by providing existing sessionId.
use_session,%use_session -s {sessionId},To use already existing active session.
status,%status,Outputs current session status.
update_session,"%update_session -i '{""maxDurationInMinutes"": 4896,""idleTimeoutInMinutes"": 4888}'",Updates current active session[not session config] for max duration or idle time out.
stop_session,%stop_session,Stops current active session. One active session should be associated with current notebook to stop.
config,%config,Outputs current session configuration.
configure_session,"%configure_session -i '{""driverShape"": ""VM.Standard2.1"", ""executorShape"": ""VM.Standard2.1"", ""numExecutors"": 1}'","Configures the session creation parameters. The force flag -f is mandatory for immediate effect of the config change, in that case session will be dropped and recreated."
spark,%%spark -o df df = spark.read.parquet('...,"Executes spark commands.  Parameters:  -o VAR_NAME: The Spark dataframe of name VAR_NAME will be available in the %%local Python context as a  Pandas dataframe with the same name.  -m METHOD: Sample method, either take or sample.  -n MAXROWS: The maximum number of rows of a dataframe that will be pulled from Livy to Jupyter.  If this number is negative, then the number of rows will be unlimited.  -r FRACTION: Fraction used for sampling."


If you want to access the docstrings of any magic command and figure out what arguments to provide, simply add `?` at then end of the command.

In [6]:
?%create_session

[0;31mDocstring:[0m
::

  %create_session [-l LANGUAGE] [-c CONFIG] [-e ENDPOINT]

optional arguments:
  -l LANGUAGE, --language LANGUAGE
                        Language to use.
  -c CONFIG, --config CONFIG
                        Region to use.
  -e ENDPOINT, --endpoint ENDPOINT
                        Endpoint to use.
[0;31mFile:[0m      ~/conda/pyspark32_p38_cpu_v2/lib/python3.8/site-packages/dataflow/livyclientlib/exceptions.py


### 2.2. Create Session
To create a new Data Flow cluster session use the `%create_session` magic command.

<a id="create_session_dynamic_allocation"></a>
#### 2.2.1. Example command for Spark dynamic allocation (aka auto-scaling)
To help you save resources and reduce time on management, Spark [dynamic allocation](https://docs.oracle.com/iaas/data-flow/using/dynamic-alloc-about.htm#dynamic-alloc-about) is now enabled in Data Flow.

Resource planning for data processing is a complex task. Resource usage is a function of the volume of the data. Day-to-day volumes of data can vary, meaning the computational resource required changes, too.

You can define a Data Flow cluster based on a range of executors, instead of just a fixed number of executors. Spark provides a mechanism to dynamically adjust the resources application occupies based on the workload. The application might relinquish resources if they are no longer used and request them again later when there is demand.

In [7]:
command = prepare_command(
    {
        "compartmentId": compartment_id,
        "displayName": "TestDataFLowSessionDynamicAllocation",
        "language": "PYTHON",
        "sparkVersion": "3.2.1",
        "numExecutors": 2,
        "driverShape": "VM.Standard.E4.Flex",
        "executorShape": "VM.Standard.E4.Flex",
        "driverShapeConfig": {"ocpus": 2, "memoryInGBs": 32},
        "executorShapeConfig": {"ocpus": 2, "memoryInGBs": 32},
        "configuration": {
            "fs.oci.client.hostname": "https://objectstorage.us-ashburn-1.oraclecloud.com",
            "spark.dynamicAllocation.enabled": "true",
            "spark.dynamicAllocation.shuffleTracking.enabled": "true",
            "spark.dynamicAllocation.minExecutors": "1",
            "spark.dynamicAllocation.maxExecutors": "4",
            "spark.dynamicAllocation.executorIdleTimeout": "60",
            "spark.dynamicAllocation.schedulerBacklogTimeout": "60",
            "spark.dataflow.dynamicAllocation.quotaPolicy": "min",
        },
    }
)

%create_session -l python -c $command

Setting up the Cluster..


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cluster setup is still in progress.
Cluster setup is still in progress.
Cluster setup is still in progress.
Cluster setup is still in progress.
Cluster setup is still in progress.
Cluster setup is still in progress.
Cluster setup is still in progress.
Cluster setup is still in progress.
Cluster is ready..
Starting Spark application..


Session ID,Kind,State,Current session
ocid1.dataflowapplication.oc1.iad.anuwcljsnf25m3qa5pivohxxhjhehupxfabvsfmkekct7ovtccfanjd4dfiq,pyspark,IN_PROGRESS,Dataflow Run


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.
SparkContext available as 'sc'.


Use the `%status` magic command to check the status of the current session.

In [8]:
%status

Session,State,Max Duration In Minutes,Total Execution Time In Minutes,Remaining Duration In Minutes,Current Session
ocid1.dataflowapplication.oc1.iad.anuwcljsnf25m3qa5pivohxxhjhehupxfabvsfmkekct7ovtccfanjd4dfiq,IN_PROGRESS,1440,18,1422,Dataflow Run


Use the `%config` magic command to see the configuration of the current session.

In [9]:
%config

{
    "applicationLogConfig": {
        "logGroupId": null,
        "logId": null
    },
    "archiveUri": "",
    "arguments": null,
    "className": "",
    "compartmentId": "ocid1.compartment.oc1..aaaaaaaa3vneejwlpdacc55zo33foeu2twbykcalr6rhvjxysoyvoflrubda",
    "configuration": {
        "fs.oci.client.hostname": "https://objectstorage.us-ashburn-1.oraclecloud.com",
        "spark.dataflow.dynamicAllocation.quotaPolicy": "min",
        "spark.dynamicAllocation.enabled": "true",
        "spark.dynamicAllocation.executorIdleTimeout": "60",
        "spark.dynamicAllocation.maxExecutors": "4",
        "spark.dynamicAllocation.minExecutors": "1",
        "spark.dynamicAllocation.schedulerBacklogTimeout": "60",
        "spark.dynamicAllocation.shuffleTracking.enabled": "true"
    },
    "definedTags": {
        "Owner": {
            "CreatedOn": "2023-05-11T00:27:03.507Z",
            "Creator": "ocid1.datasciencenotebooksession.oc1.iad.amaaaaaanf25m3qaf74n5jqyszzlhxypcjputesanccwjixps

<a id="update_session"></a>
### 2.3. Update Session
You can modify the configuration of your running session using the `%update_session` command. For example, Data Flow Sessions can last up to 7 days or 10080 mins (168 hours) (**maxDurationInMinutes**) and have default idle timeout value of 480 mins (8 hours)(**idleTimeoutInMinutes**). Here's how you would update your session and configure for a max session time:<br>

In [10]:
%update_session -i '{"maxDurationInMinutes": 1440, "idleTimeoutInMinutes": 420}'

Max Duration In Minutes,Idle Timeout In Minutes
1440,420


In [11]:
%status

Session,State,Max Duration In Minutes,Total Execution Time In Minutes,Remaining Duration In Minutes,Current Session
ocid1.dataflowapplication.oc1.iad.anuwcljsnf25m3qa5pivohxxhjhehupxfabvsfmkekct7ovtccfanjd4dfiq,IN_PROGRESS,1440,40,1400,Dataflow Run


<a id="stop_session"></a>
### 2.4. Stop Session
To stop the current session use the `%stop_session` magic command. You don't need to provide any arguments for this command. The current active cluster will be stopped. All data in memory will be lost. 

<a id="basic_examples"></a>
# 3. Basic Spark Usage Examples

A SparkContext (`sc`) and HiveContext (`sqlContext`) are automatically created in the session cluster. The magic commands include the `%%spark` command to run Spark commands in the cluster. You can access information about the Spark application, define a dataframe where results are to be stored, modify the configuration, and so on.

The `%%spark` magic command comes with a number of parameters that allow you to interact with the Data Flow Spark cluster. **Any cell content that starts with the `%%spark` command will be executed in the remote Spark cluster.**

<a id="examples_pyspark"></a>
### 3.1. PySpark
The `sc` variable represents the Spark context and it's available when the `%%spark` magic command is used. The next cell is a toy example of how to use `sc` in a Data Flow Spark Magic cell. The cell calls the [`.parallelize()`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.parallelize.html) method, which creates an [RDD](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html), `numbers`, from a list of numbers. Information about the RDD is printed. The [`.toDebugString()`](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.RDD.toDebugString.html) method returns a description of the RDD.

In [12]:
%%spark
print(sc.version)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3.2.1

In [13]:
%%spark
numbers = sc.parallelize([4, 3, 2, 1])
print(f"First element of numbers is {numbers.first()}")
print(f"The RDD, numbers, has the following description\n{numbers.toDebugString()}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

First element of numbers is 4
The RDD, numbers, has the following description
b'(4) ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274 []'

<a id="examples_sparksql"></a>
### 3.2. Spark SQL
Using the `-c sql` option allows you to run Spark SQL commands in a cell. In this section, the [NYC Taxi and Limousine Commission (TLC) Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) dataset is used. The size of the dataset is around **35GB**. 

The next cell reads the dataset into a Spark dataframe, and then saves it as a view used to demonstrate Spark SQL.

In [14]:
%%spark
df_nyc_tlc = spark.read.parquet("oci://hosted-ds-datasets@bigdatadatasciencelarge/nyc_tlc/201[1,2,3,4,5,6,7,8]/**/data.parquet", header=False, inferSchema=True)
df_nyc_tlc.show()

df_nyc_tlc.createOrReplaceTempView("nyc_tlc")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+------------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+------------+
|vendor_id|          pickup_at|         dropoff_at|passenger_count|trip_distance|pickup_longitude|pickup_latitude|rate_code_id|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|total_amount|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+------------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+------------+
|      CMT|2011-01-29 02:38:35|2011-01-29 02:47:07|              1|          1.2|       -74.00526|      40.729084|           1|                 N|        -73.98869|       40.727127|         CSH|        6.1|  0.5|    0.5|      

The following cell uses the `-c sql` option to tell Data Flow Spark Magic that the contents of the cell is SparkSQL. The `-o <variable>` option takes the results of the Spark SQL operation and stores it in the defined variable. In this case, the `df_nyc_tlc` will be a Pandas dataframe that is available to be used in the notebook.

In [15]:
%%spark -c sql -o df_nyc_tlc
SELECT vendor_id, passenger_count, trip_distance, payment_type FROM nyc_tlc LIMIT 1000;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Unnamed: 0,vendor_id,passenger_count,trip_distance,payment_type
0,CMT,1,1.2,CSH
1,CMT,1,0.4,CSH
2,CMT,3,1.2,CSH
3,CMT,3,0.8,CSH
4,CMT,1,5.3,CSH
...,...,...,...,...
995,CMT,1,0.5,CRD
996,CMT,2,7.2,CRD
997,CMT,1,4.7,CRD
998,CMT,1,0.3,CRD


In [16]:
type(df_nyc_tlc)

pandas.core.frame.DataFrame

In [17]:
df_nyc_tlc.head()

Unnamed: 0,vendor_id,passenger_count,trip_distance,payment_type
0,CMT,1,1.2,CSH
1,CMT,1,0.4,CSH
2,CMT,3,1.2,CSH
3,CMT,3,0.8,CSH
4,CMT,1,5.3,CSH


In [18]:
%%spark -c sql -o df_tables
SHOW TABLES

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Unnamed: 0,namespace,tableName,isTemporary
0,NaT,nyc_tlc,True


Similarly you can use `sqlContext` to query the table

In [19]:
%%spark
df_nyc_tlc = sqlContext.sql("SELECT * FROM nyc_tlc LIMIT 1000")
df_nyc_tlc.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+------------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+------------+
|vendor_id|          pickup_at|         dropoff_at|passenger_count|trip_distance|pickup_longitude|pickup_latitude|rate_code_id|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|total_amount|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+------------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+------------+
|      CMT|2011-01-29 02:38:35|2011-01-29 02:47:07|              1|          1.2|       -74.00526|      40.729084|           1|                 N|        -73.98869|       40.727127|         CSH|        6.1|  0.5|    0.5|      

<a id='cleanup'></a>
# 4. Clean Up
Use the `%stop_session` magic command to stop your active Data Flow session.

In [None]:
%stop_session

<a id='ref'></a>
# 5. References

- [ADS Library Documentation](https://accelerated-data-science.readthedocs.io/en/latest/index.html)
- [Data Science YouTube Videos](https://www.youtube.com/playlist?list=PLKCk3OyNwIzv6CWMhvqSB_8MLJIZdO80L)
- [OCI Data Science Documentation](https://docs.cloud.oracle.com/en-us/iaas/data-science/using/data-science.htm)
- [Oracle Data & AI Blog](https://blogs.oracle.com/datascience/)
- [Data Flow Policies](https://docs.oracle.com/iaas/data-flow/using/policies.htm/)
- [Getting Started with Data Flow](https://docs.oracle.com/iaas/data-flow/using/dfs_getting_started.htm)
- [About Data Science Policies](https://docs.oracle.com/iaas/data-science/using/policies.htm)
- [Data Catalog Metastore](https://docs.oracle.com/en-us/iaas/data-catalog/using/metastore.htm)