# **OCI Data Science & OCI Data Flow**

## Use the notebook to:
- Create a Spark session in OCI Data Flow and work interactively OCI Data Flow
- Test the .py script in the interactive session
- Create a new OCI Data Flow application and execute a new OCI Data Flow run with the .py script

-----

In [None]:
## Install Pyspark conda env

In [1]:
import ads
import os
import json
import oci

#get variables/set
compartment_id = os.environ.get("NB_SESSION_COMPARTMENT_OCID")
logs_bucket_uri = "oci://West_BP@frqap2zhtzbe/data_flow_logs"

## set auth to notebook
ads.set_auth("resource_principal")

In [2]:
%load_ext dataflow.magics

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

# **1. Test Spark code in Interactive Data Flow Session**

## **Help function**

In [3]:
def prepare_command(command: dict) -> str:
    """Converts dictionary command to the string formatted commands."""
    return f"'{json.dumps(command)}'"

## **Create an interactive OCI Data Flow Session**

In [5]:
command = prepare_command(
    {
        "compartmentId": compartment_id,
        "displayName": "DF_sessionxx",
        "language": "PYTHON",
        "sparkVersion": "3.2.1",
        "numExecutors": 1,
        "driverShape": "VM.Standard.E4.Flex",
        "executorShape": "VM.Standard.E4.Flex",
        "driverShapeConfig": {"ocpus": 2, "memoryInGBs": 32},
        "executorShapeConfig": {"ocpus": 2, "memoryInGBs": 32},
        "type": "SESSION",
        "logsBucketUri": logs_bucket_uri,
        "configuration": {
            "fs.oci.client.hostname": "https://objectstorage.eu-frankfurt-1.oraclecloud.com"
        },
    }
)

%create_session -l python -c $command

Setting up the Cluster..


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cluster is ready..
Starting Spark application..


Session ID,Kind,State,Current session
ocid1.dataflowapplication.oc1.eu-frankfurt-1.antheljrngencdya44cf5hkxcvfuarsi2a74fjvgqy6clw372oo4hxi7rnnq,pyspark,IN_PROGRESS,Dataflow Run


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.
SparkContext available as 'sc'.


## **Check status**

In [14]:
%status

Session,State,Max Duration In Minutes,Total Execution Time In Minutes,Remaining Duration In Minutes,Current Session
ocid1.dataflowapplication.oc1.eu-frankfurt-1.antheljrngencdya44cf5hkxcvfuarsi2a74fjvgqy6clw372oo4hxi7rnnq,IN_PROGRESS,1440,26,1414,Dataflow Run


## **Run/Test Spark code in the active session - From notebook**

In [13]:
%%spark
print(sc.version)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3.2.1

In [16]:
%%spark

from __future__ import print_function
import sys
from random import random
from operator import add
import argparse
import os
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, sum, to_date, from_unixtime, unix_timestamp, expr
from pyspark.sql.types import StringType


print("Start")

input_location = "oci://West_BP@frqap2zhtzbe/retail_wow/fake_order_lines_data.csv"
output_location = "oci://West_BP@frqap2zhtzbe/retail_wow/output_in_interactive_session"

#read csv file
df = spark.read.option("delimiter", ",").option("header", True).csv(input_location)

#Group by Customer ID, multiple values.
grouped_df = df.groupBy("customer_id").agg(avg("quantity").alias("avg_quantity_ordered"),
                                          (avg("sales_price").alias("avg_sales_price")),
                                          (avg("discount").alias("avg_discount")),
                                          (avg("gross_unit_price").alias("avg_gross_unit_price")),
                                          (avg("shipping_cost").alias("avg_shipping_costs")))

## round to 2 decimals
grouped_df = grouped_df.withColumn('avg_quantity_ordered', round(grouped_df.avg_quantity_ordered, 2))
grouped_df = grouped_df.withColumn('avg_sales_price', round(grouped_df.avg_sales_price, 2))
grouped_df = grouped_df.withColumn('avg_discount', round(grouped_df.avg_discount, 2))
grouped_df = grouped_df.withColumn('avg_gross_unit_price', round(grouped_df.avg_gross_unit_price, 2))
grouped_df = grouped_df.withColumn('avg_shipping_costs', round(grouped_df.avg_shipping_costs, 2))


print("Saving output csv in bucket as one file")
grouped_df.repartition(1).write.mode("overwrite").option("header", True).csv(output_location)

grouped_df.show()   
    


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Start
Saving output csv in bucket as one file
+-----------+--------------------+---------------+------------+--------------------+------------------+
|customer_id|avg_quantity_ordered|avg_sales_price|avg_discount|avg_gross_unit_price|avg_shipping_costs|
+-----------+--------------------+---------------+------------+--------------------+------------------+
|      41374|                14.0|        1043.94|       78.58|               80.18|             20.88|
|      29912|                 3.0|         218.61|        8.88|               72.88|              4.69|
|      28117|                10.5|         334.27|       31.33|               32.65|               7.3|
|      37246|               15.67|         958.32|      103.93|                62.3|              19.4|
|      28316|                10.0|         305.07|        10.8|               21.88|              6.21|
|       5925|                 6.0|         238.25|       28.61|               44.47|              2.39|
|      11888|     

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

# **2. Create .py script, a OCI Data Flow application, and run application**

## **Create script.py**

In [6]:
%%writefile ./script_v2.py
from __future__ import print_function
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
import sys
from random import random
from operator import add
import argparse
import os
from pyspark.sql.functions import *
from pyspark.sql.functions import col, sum, to_date, from_unixtime, unix_timestamp, expr
from pyspark.sql.types import StringType


def main():
    
    print("Start")
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--input_path", required=True)
    parser.add_argument("--output_path", required=True)
    args = parser.parse_args()

    # Set up Spark.
    spark_session = get_dataflow_spark_session()
    sql_context = SQLContext(spark_session)  

    #read csv file
    df = sql_context.read.option("delimiter", ",").option("header", True).csv(args.input_path)

    #Group by Customer ID, multiple values.
    grouped_df = df.groupBy("customer_id").agg(avg("quantity").alias("avg_quantity_ordered"),
                                          (avg("sales_price").alias("avg_sales_price")),
                                          (avg("discount").alias("avg_discount")),
                                          (avg("gross_unit_price").alias("avg_gross_unit_price")),
                                          (avg("shipping_cost").alias("avg_shipping_costs")))

    ## round to 2 decimals
    grouped_df = grouped_df.withColumn('avg_quantity_ordered', round(grouped_df.avg_quantity_ordered, 2))
    grouped_df = grouped_df.withColumn('avg_sales_price', round(grouped_df.avg_sales_price, 2))
    grouped_df = grouped_df.withColumn('avg_discount', round(grouped_df.avg_discount, 2))
    grouped_df = grouped_df.withColumn('avg_gross_unit_price', round(grouped_df.avg_gross_unit_price, 2))
    grouped_df = grouped_df.withColumn('avg_shipping_costs', round(grouped_df.avg_shipping_costs, 2))


    print("Saving output csv in bucket as one file")
    grouped_df.repartition(1).write.mode("overwrite").option("header", True).csv(args.output_path)

    #grouped_df.show()   

    
    
def get_dataflow_spark_session(
    app_name="DataFlow", file_location=None, profile_name=None, spark_config={}
):
    """
    Get a Spark session in a way that supports running locally or in Data Flow.
    """
    if in_dataflow():
        spark_builder = SparkSession.builder.appName(app_name)
    else:
        # Import OCI.
        try:
            import oci
        except:
            raise Exception(
                "You need to install the OCI python library to test locally"
            )

        # Use defaults for anything unset.
        if file_location is None:
            file_location = oci.config.DEFAULT_LOCATION
        if profile_name is None:
            profile_name = oci.config.DEFAULT_PROFILE

        # Load the config file.
        try:
            oci_config = oci.config.from_file(
                file_location=file_location, profile_name=profile_name
            )
        except Exception as e:
            print("You need to set up your OCI config properly to run locally")
            raise e
        conf = SparkConf()
        conf.set("fs.oci.client.auth.tenantId", oci_config["tenancy"])
        conf.set("fs.oci.client.auth.userId", oci_config["user"])
        conf.set("fs.oci.client.auth.fingerprint", oci_config["fingerprint"])
        conf.set("fs.oci.client.auth.pemfilepath", oci_config["key_file"])
        conf.set(
            "fs.oci.client.hostname",
            "https://objectstorage.{0}.oraclecloud.com".format(oci_config["region"]),
        )
        spark_builder = SparkSession.builder.appName(app_name).config(conf=conf)

    # Add in extra configuration.
    for key, val in spark_config.items():
        spark_builder.config(key, val)

    # Create the Spark session.
    session = spark_builder.getOrCreate()
    return session


def in_dataflow():
    """
    Determine if we are running in OCI Data Flow by checking the environment.
    """
    if os.environ.get("HOME") == "/home/dataflow":
        return True
    return False


if __name__ == "__main__":
    main()


Overwriting ./script_v2.py


## **Store .py file in the bucket**

In [29]:
#!oci os object put --namespace frqap2zhtzbe --bucket-name West_BP --name script.py --file script.py --debug --force 

## **Create an application**

In [17]:
import oci

#set the display name
display_name = "retail_demo_v2"

#compartment id and log uri
comp_id = os.environ.get("NB_SESSION_COMPARTMENT_OCID")
logs_bucket_uri = "oci://West_BP@frqap2zhtzbe/data_flow_logs"

#auth
config = oci.config.from_file()

#set client
data_flow_client = oci.data_flow.DataFlowClient(config)


create_application_response = data_flow_client.create_application(
    create_application_details=oci.data_flow.models.CreateApplicationDetails(
        compartment_id=comp_id,
        display_name=display_name,
        driver_shape="VM.Standard.E4.Flex",
        executor_shape="VM.Standard.E4.Flex",
        
        language="PYTHON",
        num_executors=2,
        spark_version="3.2.1",        
        
        arguments=["--input_path", "oci://West_BP@frqap2zhtzbe/retail_wow/fake_order_lines_data.csv", 
                   "--output_path", "oci://West_BP@frqap2zhtzbe/retail_wow/output"],

        description=display_name,
        driver_shape_config=oci.data_flow.models.ShapeConfig(
            ocpus=2,
            memory_in_gbs=16),
        executor_shape_config=oci.data_flow.models.ShapeConfig(
            ocpus=2,
            memory_in_gbs=16),
        
        file_uri="oci://West_BP@frqap2zhtzbe/retail_wow/script_v2.py",
        
        logs_bucket_uri=logs_bucket_uri,

        type="BATCH"))

# Get the data from response
print(create_application_response.data)

{
  "application_log_config": {
    "log_group_id": null,
    "log_id": null
  },
  "archive_uri": "",
  "arguments": [
    "--input_path",
    "oci://West_BP@frqap2zhtzbe/retail_wow/fake_order_lines_data.csv",
    "--output_path",
    "oci://West_BP@frqap2zhtzbe/retail_wow/output"
  ],
  "class_name": "",
  "compartment_id": "ocid1.compartment.oc1..aaaaaaaae3n6r6hrjipbap2hojicrsvkzatrtlwvsyrpyjd7wjnw4za3m75q",
  "configuration": null,
  "defined_tags": {
    "default-tags": {
      "CreatedBy": "oracleidentitycloudservice/bob.peulen@oracle.com"
    }
  },
  "description": "retail_demo_v2",
  "display_name": "retail_demo_v2",
  "driver_shape": "VM.Standard.E4.Flex",
  "driver_shape_config": {
    "memory_in_gbs": 16.0,
    "ocpus": 2.0
  },
  "execute": null,
  "executor_shape": "VM.Standard.E4.Flex",
  "executor_shape_config": {
    "memory_in_gbs": 16.0,
    "ocpus": 2.0
  },
  "file_uri": "oci://West_BP@frqap2zhtzbe/retail_wow/script_v2.py",
  "freeform_tags": {},
  "id": "ocid1.dat

## **Start a Run**

In [18]:
#get application id
application_id = create_application_response.data.id

data_flow_client = oci.data_flow.DataFlowClient(config)

create_run_response = data_flow_client.create_run(
    create_run_details=oci.data_flow.models.CreateRunDetails(
        compartment_id=comp_id,
        application_id=application_id,
        display_name="run_job"))

print(create_run_response.data)

{
  "application_id": "ocid1.dataflowapplication.oc1.eu-frankfurt-1.antheljrngencdyarwfdwanonky3i2k2xu65a47sv4in4l5lc4getdze6bvq",
  "application_log_config": {
    "log_group_id": null,
    "log_id": null
  },
  "archive_uri": "",
  "arguments": [
    "--input_path",
    "oci://West_BP@frqap2zhtzbe/retail_wow/fake_order_lines_data.csv",
    "--output_path",
    "oci://West_BP@frqap2zhtzbe/retail_wow/output"
  ],
  "class_name": "",
  "compartment_id": "ocid1.compartment.oc1..aaaaaaaae3n6r6hrjipbap2hojicrsvkzatrtlwvsyrpyjd7wjnw4za3m75q",
  "configuration": null,
  "data_read_in_bytes": 0,
  "data_written_in_bytes": 0,
  "defined_tags": {
    "default-tags": {
      "CreatedBy": "oracleidentitycloudservice/bob.peulen@oracle.com"
    }
  },
  "display_name": "run_job",
  "driver_shape": "VM.Standard.E4.Flex",
  "driver_shape_config": {
    "memory_in_gbs": 16.0,
    "ocpus": 2.0
  },
  "execute": "",
  "executor_shape": "VM.Standard.E4.Flex",
  "executor_shape_config": {
    "memory_in_g

## **Extra**

In [5]:
## Activate ssession

command = prepare_command(
    {
        "compartmentId": os.environ.get("NB_SESSION_COMPARTMENT_OCID"),
        "displayName": "DF_session",
        "applicationId": "ocid1.dataflowapplication.oc1.eu-frankfurt-1.antheljrngencdya44cf5hkxcvfuarsi2a74fjvgqy6clw372oo4hxi7rnnq",
    }
)

%activate_session -l python -c $command

Setting up the Cluster..


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cluster is ready..
Starting Spark application..


Session ID,Kind,State,Current session
ocid1.dataflowapplication.oc1.eu-frankfurt-1.antheljrngencdya44cf5hkxcvfuarsi2a74fjvgqy6clw372oo4hxi7rnnq,pyspark,IN_PROGRESS,Dataflow Run


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.
SparkContext available as 'sc'.


In [None]:
%stop_session