---

# <font color="red">Introduction to Data Flow</font>
<p style="margin-left:10%; margin-right:10%;">modified to use <font color="teal">MovieLens Dataset and Parquet Files</font></p>

---

# Overview:

This notebook demonstrates operations that can be performed using the Advanced Data Science (ADS) Data Flow module. The demonstrated operations are: 

* How to prepare and create an application.
* How to prepare and create a run.
* How to list existing Data Flow applications.
* How to retrieve and display the logs.

The purpose of the `dataflow` module is to provide an efficient and convenient way for users to launch an Apache Spark application and run Apache Spark jobs.

***
   
## Contents:
* <a href='#instance'>Creating a Data Flow application</a>
    * <a href='#instance'>Create a Data Flow instance</a>
       * <a href='#templates'>Leveraging PySpark and Apache Spark SQL templates</a>
       * <a href='#appscript'>application script</a>
    * <a href='#app'>Preparing the application</a>
    * <a href='#regapp'>Registering the application</a>
* <a href='#run'>Running a Data Flow application</a>
    * <a href='#run'>Preparing the run</a>
* <a href='#logs'>Working with logs</a>
* <a href='#sync'>Editing and synchronizing a PySpark script</a>
* <a href='#params'>Passing Arguments to the script</a>
* <a href='#list'>Listing, filtering, and sorting existing Data Flow applications and runs</a>
* <a href='#load'>Loading an existing Data Flow application</a>
* <a href='#reference'>References
</a>

---

**Important:**

**Cancellare** il file result.csv dal bucket WORKSHOP prima di rilanciare la demo

---
The notebook is compatible with the following [Data Science conda environments](https://docs.oracle.com/en-us/iaas/data-science/using/conda_environ_list.htm):

* [PySpark 2.4 and Data Flow](https://docs.oracle.com/en-us/iaas/data-science/using/conda-pyspark-fam.htm) for CPU on Python 3.7 (version 3.0)


In [1]:
import ads
import io
import os
import tempfile
import uuid

from ads.common import auth as authutil
from ads.dataflow.dataflow import DataFlow
from os import path

The following cell can be commented it out if API keys are used.

In [2]:
ads.set_auth(auth="resource_principal")

# Creating a Data Flow application

<a id='instance'></a>
## Create a Data Flow instance

A `DataFlow` object is used to interact with the Data Flow service. The optional `dataflow_base_folder` parameter defines the path where the Data Flow artifacts are stored. It defaults to the `~/dataflow` folder. A compartment can be specified with the optional `compartment_id` parameter. The default behavior is to use the compartment of the notebook session.

The optional parameters like `os_auth` can be used to specify the preferred authentication method to access OCI Object Store.

In [3]:
dataflow_base_folder = tempfile.mkdtemp()

# modificato in modo da usare RP
data_flow = DataFlow(dataflow_base_folder=dataflow_base_folder, os_auth=authutil.resource_principal(), df_auth=authutil.resource_principal())

print("Data flow directory: {}".format(dataflow_base_folder))

Data flow directory: /tmp/tmpq12o3t1y


<a id='template'></a>
### Leveraging PySpark and Apache Spark SQL templates

The PySpark and Apache Spark SQL templates assist you to get started with Data Flow. Use `data_flow.template()` to generate a template file. 

The supported templates are:
1. `standard_pyspark`: template, which is for standard PySpark jobs.
2. `sparksql`: template, which is for Apache Spark SQL jobs.

For example, to create an Apache Spark SQL template use:
```python
script = data_flow.template(job_type='sparksql')
```
This creates a Python file in the `dataflow_base_folder`. The `template()` method returns the path to the file.

<a id='appscript'></a>
### Application script

In addition to the template scripts, custom scripts are supported. The following writes a python script that loads comma separated value (CSV) files from Object Storage and applies filtering. In this example, the data is read in from a publically accessible Object Store.

In [4]:
pyspark_file_path = path.join(dataflow_base_folder, "example-{}.py".format(str(uuid.uuid4())[-6:]))
script = '''
from pyspark.sql import SparkSession

def main():
    
    # Create a Spark session
    spark = SparkSession \\
        .builder \\
        .appName("Python Spark SQL MovieLens example-parquet") \\
        .getOrCreate()
    
    # Load movies file from WORKSHOP bucket
    df_movies = spark.read.parquet("oci://WORKSHOP@frqap2zhtzbe/movies.parquet")
    
    # Create a temp view and do some SQL operations
    df_movies.createOrReplaceTempView("MOVIES")
    
    # Load ratings file from WORKSHOP bucket
    df_ratings = spark.read.parquet("oci://WORKSHOP@frqap2zhtzbe/ratings.parquet")
    
    # Create a temp view and do some SQL operations
    df_ratings.createOrReplaceTempView("RATINGS")
    
    #
    # the Query
    #
    query_result_df = spark.sql("""
    SELECT MOVIES.title, ROUND(AVG(RATINGS.rating), 1) as avg_rating, count(*) as num_ratings 
    FROM RATINGS, MOVIES 
    WHERE MOVIES.movieId = RATINGS.movieId 
    and MOVIES.genres = "Adventure" 
    GROUP BY MOVIES.title
    HAVING count(*) > 10
    ORDER BY avg_rating DESC LIMIT 10
    """)
    
    # Convert the filtered Apache Spark DataFrame into JSON format
    # Note: we are writing to the spark stdout log so that we can retrieve the log later at the end of the notebook.
    print('\\n'.join(query_result_df.toJSON().collect()))
    
    # save result df to WORKSHOP bucket
    query_result_df.write.format("csv").save("oci://WORKSHOP@frqap2zhtzbe/results.csv")
    
if __name__ == '__main__':
    main()
'''

with open(pyspark_file_path, 'w') as f:
    print(script.strip(), file=f)
    
print("Script path: {}".format(pyspark_file_path))

Script path: /tmp/tmpq12o3t1y/example-a1f030.py


<a id='app'></a>
## Preparing the application

The application creation process contains preparation and creation stages. In the preparation stage, a configuration object is created with a call to the `prepare_app()` method. The following parameters are required:

* `display_name`: The application name.
* `script_bucket`: The bucket used to read and write the `pyspark` script in Object Storage
* `pyspark_file_path`: The path to the `pyspark` script

There are also a number of common optional parameters:
* `logs_bucket`: Bucket for the run logs. Default: `dataflow-logs`
* `compartment_id`: compartment used to run the job. Default: Compartment of the notebook session.
* `driver_shape`: CPU shape for the driver VM. Default: VM.Standard2.4
* `executor_shape`: CPU shape for the executor VMs. Default: VM.Standard2.4 
* `num_executors`: Number of executor machines. Default: 1

To use a private bucket as the `logs_bucket`, ensure that a Data Flow Service policy has been added. See the [prerequisite step](#prereq) and the [policy setup page](https://docs.cloud.oracle.com/en-us/iaas/data-flow/using/dfs_getting_started.htm#policy_set_up) for more details.

**Update the `script_bucket` and `logs_bucket` variables to match your tenancy's configuration.**

In [5]:
script_bucket = "dataflow-test"                     
logs_bucket = "dataflow-log"               
display_name = "DataFlowAppMovieLens"      

# (LS) changed to Spark version 3.0.2
app_config = data_flow.prepare_app(display_name=display_name,
                                   script_bucket=script_bucket,
                                   pyspark_file_path=pyspark_file_path,
                                   logs_bucket=logs_bucket,
                                   spark_version="3.0.2")

<a id='regapp'></a>
## Registering the application

A Data Flow application must be registered within the Oracle Cloud Infrastructure using the `create_app()` method. This method accepts the `app_config` dictionary and creates a `DataFlowApp` object.

In [6]:
app = data_flow.create_app(app_config)

loop1:   0%|          | 0/2 [00:00<?, ?it/s]

The `config` attribute in a `DataFlowApp` object returns a dictionary of configuration information about the Data Flow application.

In [7]:
app.config

{'compartment_id': 'ocid1.compartment.oc1..aaaaaaaag2cpni5qj6li5ny6ehuahhepbpveopobooayqfeudqygdtfe6h3a',
 'language': 'PYTHON',
 'pyspark_file_path': '/tmp/tmpq12o3t1y/example-a1f030.py',
 'script_bucket': 'dataflow-test',
 'archive_path': None,
 'archive_bucket': None,
 'logs_bucket': 'dataflow-log',
 'display_name': 'DataFlowAppMovieLens',
 'driver_shape': 'VM.Standard2.4',
 'executor_shape': 'VM.Standard2.4',
 'num_executors': 1,
 'spark_version': '3.0.2'}

The `oci_link` attribute returns a link to the Oracle Cloud Infrastructure Console Application Details page: 

In [8]:
app.oci_link

'https://console.eu-milan-1.oraclecloud.com/data-flow/apps/details/ocid1.dataflowapplication.oc1.eu-milan-1.anwgsljrngencdyajej25cklyuvtxrjaw2pep3mziqib34z4c27hmobzfx7a'

<a id='run'></a>
# Running a Data Flow application

<a id='preprun'></a>
## Preparing a Data Flow run

To run a Data Flow application, a run configuration is created using the `prepare_run()` method. The application is then executed with the `run()` method.

The `prepare_run()` method has the following common parameters:
* `run_display_name`: Name of the run.
* `compartment_id`: Compartment used to run the job. Default: Compartment of the notebook session.
* `logs_bucket`: (optional) Bucket for the run logs. Default: Inherited from the application.

In [27]:
run_display_name = "Data_Flow_MovieLens_run3"
run_config = app.prepare_run(run_display_name=run_display_name)

## Running a Data Flow Application

Execute a Data Flow application with the `run()` method. This returns a `DataFlowRun` object.

The `run()` method accepts the `run_config` dictionary. When the optional `save_log_to_local` parameter is set to `True`, it pulls a copy of the logs into a subfolder of the `dataflow_base_folder`. The subfolder name is based on the application display name with a random extension and it contains another folder whos name is based on the run display name with a random extension.

The run configuration is stored in the run subfolder in the file `run_metadata.json`. This subfolder also has a copy of the executed script.

In [28]:
run = app.run(run_config, save_log_to_local=False, wait=False)

The `status` attribute of a `DataFlowRun` object provides the execution status.

In [30]:
run.status

'IN_PROGRESS'

A dictionary of a run's configuration is accessible from the `config` attribute. This is the same information that is stored in the `run_metadata.json` file.

In [24]:
run.config

{'compartment_id': 'ocid1.compartment.oc1..aaaaaaaag2cpni5qj6li5ny6ehuahhepbpveopobooayqfeudqygdtfe6h3a',
 'script_bucket': 'dataflow-test',
 'pyspark_file_path': '/tmp/tmpq12o3t1y/example-a1f030.py',
 'archive_path': None,
 'archive_bucket': None,
 'run_display_name': 'Data_Flow_MovieLens_run2',
 'logs_bucket': 'dataflow-log',
 'logs_bucket_uri': 'oci://dataflow-log@frqap2zhtzbe',
 'driver_shape': 'VM.Standard2.4',
 'executor_shape': 'VM.Standard2.4',
 'num_executors': 1}

The `oci_link` attribute gives a link to the Oracle Cloud Infrastructure Run Details page.

In [None]:
run.oci_link

The optional `wait` parameter can be set to `False` to have the run be asynchronous. Using `run.status()` you can monitor when run is accepted, in progress and finally complete. 

In [None]:
run_observer = app.run(run_config, wait=False)
run_observer.status

In [None]:
#Uncomment following line to run synchronously
#run = run_observer.wait()

In [None]:
run.save_log_to_local

<a id='logs'></a>
# Working with logs

The Data Flow logs are stored in Object Storage. If the parameter `save_log_to_local` is set to `True`, then the logs are pulled onto the local drive. The `fetch_log()` method returns a `DataFlowLog` object. The pass in `"stdout"` or `"stderr"` to get the standard out and error logs, respectively. Using the `save()` method on the `DataFlowLog` object causes the logs to be stored on the local drive.

The following example pulls the standard out and error logs onto the local storage:

In [None]:
run.save_log_to_local("stdout")
run.save_log_to_local("stderr")

The `log_stdout` and `log_stderr` attributes return `DataFlowLog` objects for the standard out and error logs. The `head()` and `tail()` methods prints the beginning or end of the log files. By default, 10 lines are printed though they accept a parameter to specify the number of lines to return.

In [None]:
run.log_stdout.head(5)

The `DataFlowLog` objects that are returned by `log_stdout` and `log_stderr` also have the `oci_path` and `local_path` attributes. These return the bucket and local file path of the logs.

In [None]:
run.log_stdout.oci_path

In [None]:
run.log_stdout.local_path

<a id='sync'></a>
# Editing and synchronizing a PySpark script

A Data Flow application can be run multiple times by calling `run()`. A common use case would be when the data has changed and an updated analysis is wantede. 

Another common use case is when there have been changes to the PySpark script. The Data Flow application keeps a copy of script. However, if local changes are made to the script the default behavior is to synchronize the local script with the Data Flow application. Setting the `sync` parameter in the `run()` method to `False` prevents the local copy of the script from being updated in the Data Flow application so the existing script in the application is executed.

<a id='params'></a>
# Passing Arguments to the script

To pass command line arguments to the Data Flow application, set the value of the `arguments` parameter in the `prepare_app()` method. The `arguments` parameter takes a list of command line arguments to be passed to the PySpark script. For example:
```python
arguments = ['-f', 'foobar', '-d', '--file', 'file.txt']
```
In this example, the arguments are hardcoded. Data Flow supports mechanism to parameterize arguments. The `script_parameter` option accepts a dictionary that is used to update values in the `arguments` parameter. The arguments must be in the format of `'${key}'` and they are replaced by the value associated with the key. The following demonstrates this process:

```python
arguments = ['${foo}', '-d', '--file', '${filename}'], 
script_parameters={'foo': '-bar', 'filename': 'file.txt'}
```

The command line argument seen by the PySpark script is:
```bash
-bar -d --file file.txt
```

An example workflow would look like:
```python
app_config = data_flow.prepare_app(
    display_name, script_bucket, pyspark_file_path, 
    arguments = ['${foo}', 'bar', '-d', '--file', '${filename}'], 
    script_parameters={'foo': '-bar', 'filename': 'file.txt'})
app = data_flow.create_app(app_config)
run_config = app.prepare_run("Argument_run")
run = app.run(run_config)
```

In this example, the script parameters are associated with the application configuration. They can be overridden in the `prepare_run()` method by passing a parameter to this method that has the same name as the script parameter that is to be updated. In this example, the value of `foo` is `-bar`. If this was replaced with `-babar`, then the following call could be used:
```python
run_config = app.prepare_run("Override_Argument_run", foo='-babar')
```

<a id='list'></a>
# Listing, filtering, and sorting existing Data Flow applications and runs

From `ADS` you can list applications and runs. They are returned as a list of dictionaries, with a function to display the data in a Pandas dataframe. The default sort order is the most recent application or run first.

## Listing Applications

The method `list_apps()` returns a `SummaryList` object with the list of Data Flow applications, which can be sliced:
```python
data_flow.list_apps()[0:2]
```
Or it can be converted to a dataframe with the `to_dataframe()` method.

In [None]:
data_flow.list_apps().to_dataframe().head(5)

## Runs
### Listing runs

The method `list_runs()` method on a `DataFlowApp` object returns a `SummaryList` object with the list of runs for that application, which can be sliced:
```python
app.list_runs()[0:2]
```
Or it can  be converted to a dataframe with the `to_dataframe()` method.

In [None]:
app.list_runs().to_dataframe().head(5)

### Getting information on a run

The `get_run()` method returns information about a run. It requires the OCID for a run. This can be the shortened id or the complete OCID. In this example, the full OCID is used:

In [None]:
app.get_run(app.list_runs()[0].id)

<a id='load'></a>
# Loading an existing Data Flow application

`ADS` uses a Data Flow application's OCID to load an existing application. These Data Flow applications must be Python applications. The OCID must be provided with the `app_id` parameter. The OCID can be obtained from the Oracle Cloud Infrastructure Console or by listing existing applications using the `list_apps()` method. 

Optionally, the `target_folder` parameter defines the directory in which application artifacts are copied to. If `target_folder` is not provided, by default the application artifacts are stored in the `dataflow_base_folder` defined by the Data Flow object.

Once an application has been loaded, the `DataFlowApp` object can be used to update the application script, and run a Data Flow job using the previously defined methods.

In [None]:
loaded_app = data_flow.load_app(app_id=data_flow.list_apps()[0].id) 

<a id="reference"></a>
# References

- [ADS Library Documentation](https://docs.cloud.oracle.com/en-us/iaas/tools/ads-sdk/latest/index.html)
- [Data Science YouTube Videos](https://www.youtube.com/playlist?list=PLKCk3OyNwIzv6CWMhvqSB_8MLJIZdO80L)
- [OCI Data Science Documentation](https://docs.cloud.oracle.com/en-us/iaas/data-science/using/data-science.htm)
- [Oracle Data & AI Blog](https://blogs.oracle.com/datascience/)