# Data Pipeline in Data Flow: Copy Data From ADW to Bucket

Data Flow Integration with Data Science uses [oci](https://docs.oracle.com/en-us/iaas/data-flow/using/data-flow-studio.htm) environments to manage python dependencies.

> https://docs.oracle.com/es-ww/iaas/data-flow/using/spark_oracle_datasource.htm

[![Notebook Examples](https://img.shields.io/badge/docs-notebook--examples-blue)](https://github.com/oracle-samples/oci-data-science-ai-samples/tree/master/notebook_examples)
[![Conda Environments](https://img.shields.io/badge/docs-conda--environments-blue)](https://docs.oracle.com/en-us/iaas/data-science/using/conda_understand_environments.htm)
[![Source Code](https://img.shields.io/badge/source-accelerated--datascience-blue)](https://github.com/oracle/accelerated-data-science)

#### [Step 1] Libraries & Authentication and Access

In [1]:
import oci
import ads

import json

# Supported values: resource_principal, api_key
ads.set_auth("resource_principal") 
signer = oci.auth.signers.get_resource_principals_signer()

# Config
config = {'region': signer.region, 'tenancy': signer.tenancy_id}

#### [Step 2] [Python]

##### [Step 2.1] [Python] Parameters

In [2]:
# Enter Compartment ID
compartment_id   = "ocid1.compartment.oc1..************************************************************"
# Enter Display Name
display_name     = "data-science-data-flow-adb-export-data-spark"

# Enter your Namespace
namespace        = "************"
# Enter your Object storage: Bucket Name
bucket_name      = "DLK1LAG"
# Enter Folder Name
folder_name      = "CUSTOMER_DEMO"

##### [Step 2.2] [Python] Delete Object (Folder) in Bucket

In [3]:
def fn_delete_object_bucket(namespace, bucket_name, folder_name):
    # Create the Object Storage client using the ADS signer
    object_storage_client = oci.object_storage.ObjectStorageClient(config, signer=signer)
        
    try:
        object_storage_client.delete_object(namespace, bucket_name, f"{folder_name}/")
        print(f"Folder /'{folder_name}', deleted successfully from OCI Object Storage bucket '{bucket_name}'.")
    
    # Capture specific OCI errors
    except oci.exceptions.ServiceError as e:
        if e.status == 404 and e.code == 'ObjectNotFound':
            print(f"Error: The folder /'{folder_name}', does not exist in bucket '{bucket_name}'. Nothing to delete.")
        else:
            print("OCI Service Error:", str(e))
    
    # Catch other errors
    except Exception as e:
        print("An error occurred:", str(e))

##### [Step 2.3] [Python] Starting Data Flow

In [4]:
def fn_get_data_flow_application_id(display_name):
    # Crear un cliente para el servicio Data Flow
    data_flow_client = oci.data_flow.DataFlowClient(config, signer=signer)
    
    # List executions in Data Flow
    list_runs_response = data_flow_client.list_runs(compartment_id=compartment_id, sort_order="DESC")    

    # Filtrar los registros donde lifecycle_state es 'IN_PROGRESS' and display_name
    in_progress_runs = [run for run in list_runs_response.data if run.lifecycle_state == 'IN_PROGRESS' and run.display_name == display_name]
    
    # Return the application id if it exists
    try:
        application_id = in_progress_runs[0].application_id
        return application_id
    
    # Catch other errors
    except Exception as e:
        return None

In [5]:
# Load Magics Extension for Data Flow
%load_ext dataflow.magics

In [6]:
# Application Id
application_id        = fn_get_data_flow_application_id(display_name)

if application_id:
    # Use Variable Directly in Python Code
    command = f"%use_session -s {application_id}"
    get_ipython().run_line_magic('use_session', f"-s {application_id}")
else:
    command = {
        "compartmentId": "ocid1.compartment.oc1..************************************************************",
        "displayName": display_name,
        "language": "PYTHON",
        "sparkVersion": "3.2.1",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 6,
        "type": "SESSION",
        "logsBucketUri": f"oci://{bucket_name}@{namespace}/dataflow-logs",
        "configuration": {
            "spark.oracle.datasource.enabled":"true"
        }
    }
    # Convert the configuration to JSON and pass it as a string to the magic command
    json_command = json.dumps(command)
    # Using line magic with the appropriate arguments
    get_ipython().run_line_magic('create_session', f"-l python -c '{json_command}'")

Setting up the Cluster..


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cluster is ready..
Starting Spark application..


Session ID,Kind,State,Current session
ocid1.dataflowapplication.oc1.iad.anuwcljrfioir7iacyfk7xyqaduam7v23ep2yimyibeqp7lgrt2ybm5onu6a,pyspark,IN_PROGRESS,Dataflow Run


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.
SparkContext available as 'sc'.


#### [Step 3] [Data Flow]

##### [Step 3.1] [Data Flow] Parameters

In [7]:
%%spark

# Enter your Namespace
namespace        = "************"
# Enter your Object storage: Bucket Name
bucket_name      = "DLK1LAG"

# Autonomous DB
walletUri        = f"oci://{bucket_name}@{namespace}/Wallet_ADWDEMO.zip"
connectionId     = "adwdemo_high"
user             = "admin"
password         = "************"

# Schema.Table
schema_name      = "ADMIN"
table_name       = "CUSTOMER_DEMO"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### [Step 3.2] [Data Flow] Get Partitions Autononous DB

In [8]:
%%spark

def fn_get_partitions_adb(schema_name, table_name, walletUri, connectionId, user, password):
    # Consulta para obtener los nombres de las particiones
    query = f"(SELECT PARTITION_NAME FROM DBA_TAB_PARTITIONS WHERE table_owner = '{schema_name}' AND table_name = '{table_name}')"

    # Ejecutar la consulta y obtener una lista de nombres de particiones
    df_partitions = spark.read \
        .format("oracle") \
        .option("walletUri", walletUri) \
        .option("connectionId", connectionId) \
        .option("dbtable", query) \
        .option("user", user) \
        .option("password", password) \
        .load()

    # Convertir el DataFrame de nombres de particiones a una lista
    return [row['PARTITION_NAME'] for row in df_partitions.collect()]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### [Step 3.3] [Data Flow] Get Partitions from a Table in Autonomous DB

In [None]:
%%spark

from functools import reduce
from pyspark.sql import DataFrame

def fn_get_partitions_table_adb(schema_name, table_name, walletUri, connectionId, user, password):
    # List to store temporary DataFrames
    dfs = []
    
    # Partition
    for partition in partitions:
        # Query by table
        query = f"(SELECT * FROM {schema_name}.{table_name} PARTITION({partition}))"    
        
        df_partition = spark.read \
            .format("oracle") \
            .option("walletUri", walletUri) \
            .option("connectionId", connectionId) \
            .option("dbtable", query) \
            .option("user", user) \
            .option("password", password) \
            .load()

        # Add the DataFrame to the list
        dfs.append(df_partition)

    # Join all DataFrames into one
    df = reduce(DataFrame.unionAll, dfs)

    # Persist the merged DataFrame in memory
    df.persist()

    # Force data to be loaded into memory
    return df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### [Step 2.4] [Data Flow] Send Data to Bucket

In [10]:
%%spark

def fn_export_dataframe_to_bucket(namespace, bucket_name, table_name, df):
    # Adjust and Observe Performance
    df = df.repartition(100)

    # Write Dataframe to Bucket
    df.write \
        .option("compression", "snappy") \
        .mode("overwrite") \
        .parquet(f"oci://{bucket_name}@{namespace}/{table_name}/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### [Step 4] Data Processing 

#### [Step 4.1] [Data Processing] [Python] 

In [None]:
fn_delete_object_bucket(namespace, bucket_name, folder_name)

Folder 'CUSTOMER_DEMO/' deleted successfully from OCI Object Storage bucket 'DLK1LAG'.


#### [Step 4.2] [Data Processing] [Spark] 

In [None]:
%%spark

par = fn_get_partitions_adb(schema_name, table_name, walletUri, connectionId, user, password)
par

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['P_202202', 'P_202310', 'P_202205', 'P_202106', 'P_202402', 'P_202405', 'P_202104', 'P_202210', 'P_202112', 'P_202108', 'P_202408', 'P_202005', 'P_202306', 'P_202002', 'P_202312', 'P_202406', 'P_202010', 'P_202412', 'P_202201', 'P_202309', 'P_202209', 'P_202109', 'P_202001', 'P_202304', 'P_202111', 'P_202401', 'P_202211', 'P_202411', 'P_202409', 'P_202007', 'P_202012', 'P_202206', 'P_202203', 'P_202404', 'P_202003', 'P_202303', 'P_202103', 'P_202305', 'P_202403', 'P_202110', 'P_202207', 'P_202004', 'P_202407', 'P_202311', 'P_202410', 'P_202011', 'P_202307', 'P_202009', 'P_202302', 'P_202102', 'P_202101', 'P_202308', 'P_202006', 'P_202208', 'P_202212', 'P_202301', 'P_202105', 'P_202204', 'P_202008', 'P_202107', 'P_202501', 'P_202502', 'P_202503', 'P_202504', 'P_202505', 'P_202506', 'P_202507', 'P_202508', 'P_202509', 'P_202510', 'P_202511', 'P_202512']

In [None]:
%%spark

df = fn_get_partitions_table_adb(schema_name, table_name, walletUri, connectionId, user, password)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark

fn_export_dataframe_to_bucket(namespace, bucket_name, table_name, df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark

# See the First 5 Results of the Dataframe
df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+--------------------+-------------+--------------------+-------------+--------------------+--------------------+--------------------+------------+--------------------+------------+------------+----------------------+
|        CUSTOMER_ID|   REGISTRATION_DATE|   MODIFICATION_DATE|CUSTOMER_CODE|       DOCUMENT_CODE|DOCUMENT_TYPE|          FIRST_NAME|           LAST_NAME|          BIRTH_DATE|      GENDER|               EMAIL|      STATUS|PHONE_NUMBER|LAST_MODIFICATION_DATE|
+-------------------+--------------------+--------------------+-------------+--------------------+-------------+--------------------+--------------------+--------------------+------------+--------------------+------------+------------+----------------------+
|25040773.0000000000|2024-04-15 19:42:...|2024-04-15 19:42:...|   LSZYCPUUSQ|PZQJC35Z0P40LM10F...| 4.0000000000|ktmEgQnehmcXrEFDM...|syLCTDKhntDOyKzip...|2016-02-01 09:46:...|1.0000000000|lBmHRhDpLG@exampl...|1.0000000000| 

In [None]:
%%spark

# Clean Dataframe After Use
df.unpersist()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[CUSTOMER_ID: decimal(38,10), REGISTRATION_DATE: timestamp, MODIFICATION_DATE: timestamp, CUSTOMER_CODE: string, DOCUMENT_CODE: string, DOCUMENT_TYPE: decimal(38,10), FIRST_NAME: string, LAST_NAME: string, BIRTH_DATE: timestamp, GENDER: decimal(38,10), EMAIL: string, STATUS: decimal(38,10), PHONE_NUMBER: string, LAST_MODIFICATION_DATE: timestamp]

In [None]:
# Stop Spark session
%stop_session