### Use ReadBigQuery_FinalNotebook.ipynb to Read GCP BigQuery Table from OCI Data Flow. And write it down to OCI Object Storage or ADW. 

In [15]:
import ads
ads.set_auth("resource_principal") # Supported values: resource_principal, api_key

In [None]:
%load_ext dataflow.magics

In [None]:
import datetime
print("Start creating Session : ",datetime.datetime.now())

### Create OCI Dataflow Session using LIVY service through OCI Data SCience Notebook.

In [None]:

import json
command = {
    "compartmentId": "ocid1.compartment.oc1..xxxxxxx",
    "displayName": "Demo_BigQuery_v1",
    "sparkVersion": "3.2.1",
    "driverShape": "VM.Standard.E3.Flex",
    "executorShape": "VM.Standard.E3.Flex",
    "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "numExecutors": 1,
    "configuration":{"spark.archives":"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda",
                     "spark.files":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json",
                     "spark.oracle.datasource.enabled":"true",
                     "spark.hadoop.google.cloud.auth.service.account.enable":"true",
                     "spark.jars":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar"
                    }
}
command = f'\'{json.dumps(command)}\''
print("command",command)

#enableHiveSupport()

%create_session -l python -c $command

In [None]:

print("Session is Created : ",datetime.datetime.now())

In [None]:
%help

In [None]:
%%spark
#Import required libraries.

import json
import os
import sys
import datetime
import oci
import google.cloud.bigquery as bigquery
import google.cloud
import pyspark.sql 
from pyspark.sql.functions import countDistinct


### Read GCP BigQuery Table

In [None]:
%%spark
# Read from BigQuery : "bitcoin_blockchain.transactions". i.e. At Source "BigQuery"
#Number of rows : 340,311,544
#Total logical bytes : 587.14 GB
    
df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','bitcoin_blockchain.transactions').load()
print("Total Records Count bitcoin_blockchain.transactions : ",df.count())

In [None]:
%%spark

#Read another BigQuery Table
df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','Retail_Channel.RetailPOS_15min').load()
df_RetailPOS_15min.show()

### Load Data into Object Storage 

In [None]:
%%spark

#Write in Object Storage 
print("Writing Started at : ",datetime.datetime.now())
df_RetailPOS_15min.write.format("json").option("mode","overwrite").save("oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions")
print("Writing Completed at : ",datetime.datetime.now())

### Load Data into ADW using Wallet Password

In [None]:
%%spark
print("Set Parameters for ADW connectivity.")
#ADB_ID = "ocid1.autonomousdatabase.oc1.phx.xxxxx"
USERNAME = "admin"
PASSWORD = "xxxxx"
connectionId= "demolakehouseadw_medium"
walletUri = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip"

#properties = {"adbId": ADB_ID,"user" : USERNAME,"password": PASSWORD}
properties = {"connectionId": connectionId,"user" : USERNAME,"password": PASSWORD,"walletUri": walletUri}
print("properties:",properties)

In [None]:
%%spark

#Load into ADW:
TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
print("TARGET_TABLE : ",TARGET_TABLE)

# Write to ADW.
print("Write to ADW : ")
df_RetailPOS_15min.write.format("oracle").mode("append").option("dbtable",TARGET_TABLE).options(**properties).save()
print("Writing completed to ADW.....")

### Load Data into ADW using Secret Vault for Wallet

In [None]:
%%spark

def get_authenticated_client(token_path, client, file_location=None, profile_name=None):
    """
    Get an an authenticated OCI client.
    Example: get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient)
    """
    import oci

    if not in_dataflow():
        # We are running locally, use our API Key.
        if file_location is None:
            file_location = oci.config.DEFAULT_LOCATION
        if profile_name is None:
            profile_name = oci.config.DEFAULT_PROFILE
        config = oci.config.from_file(file_location=file_location, profile_name=profile_name)
        authenticated_client = client(config)
    else:
        # We are running in Data Flow, use our Delegation Token.
        with open(token_path) as fd:
            delegation_token = fd.read()
        signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
            delegation_token=delegation_token
        )
        authenticated_client = client(config={}, signer=signer)
    return authenticated_client


def get_password_from_secrets(token_path, password_ocid):
    """
    Get a password from the OCI Secrets Service.
    """
    import base64
    import oci

    secrets_client = get_authenticated_client(token_path, oci.secrets.SecretsClient)
    response = secrets_client.get_secret_bundle(password_ocid)
    base64_secret_content = response.data.secret_bundle_content.content
    base64_secret_bytes = base64_secret_content.encode("ascii")
    base64_message_bytes = base64.b64decode(base64_secret_bytes)
    secret_content = base64_message_bytes.decode("ascii")
    return secret_content


def get_delegation_token_path(spark):
    """
    Get the delegation token path when we're running in Data Flow.
    """
    if not in_dataflow():
        return None
    token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath"
    token_path = spark.sparkContext.getConf().get(token_key)
    if not token_path:
        raise Exception(f"{token_key} is not set")
    return token_path    

def get_temporary_directory():
    if in_dataflow():
        return "/opt/spark/work-dir/"
    else:
        import tempfile
        return tempfile.gettempdir()

def in_dataflow():
    """
    Determine if we are running in OCI Data Flow by checking the environment.
    """
    if os.environ.get("HOME") == "/home/dataflow":
        return True
    return False


def download_wallet(spark, wallet_path):
    """
    Download an ADW/ATP wallet file and prepare it for use in a Data Flow
    application.
    """
    import oci
    import zipfile

    # Get an object store client.
    token_path = get_delegation_token_path(spark)
    object_store_client = get_authenticated_client(
        token_path, oci.object_storage.ObjectStorageClient
    )

    # Download the wallet file.
    from urllib.parse import urlparse
    parsed = urlparse(wallet_path)
    bucket_name, namespace = parsed.netloc.split("@")
    file_name = parsed.path[1:]
    response = object_store_client.get_object(namespace, bucket_name, file_name)
    temporary_directory = get_temporary_directory()
    zip_file_path = os.path.join(temporary_directory, "wallet.zip")
    with open(zip_file_path, "wb") as fd:
        for chunk in response.data.raw.stream(1024 * 1024, decode_content=False):
            fd.write(chunk)

    # Extract everything locally.
    with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
        zip_ref.extractall(temporary_directory)

    # Distribute all wallet files.
    contents = "cwallet.sso ewallet.p12 keystore.jks ojdbc.properties sqlnet.ora tnsnames.ora truststore.jks".split()
    spark_context = spark.sparkContext
    for file in contents:
        spark_context.addFile(os.path.join(temporary_directory, file))

    return temporary_directory

In [None]:
%%spark
PASSWORD_SECRET_OCID = "ocid1.vaultsecret.oc1.phx.xxxxxxx"
TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
TNSNAME = "demolakehouseadw_medium"
USER = "admin"
WALLET_PATH = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip"

In [None]:
%%spark
# Download and distribute our wallet file.
print("Downloading wallet")
wallet_path = download_wallet(spark, WALLET_PATH)
adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNSNAME, wallet_path)

In [None]:
%%spark
# Get our password using the secret service.
print("Getting wallet password")
token_path = get_delegation_token_path(spark)
password = get_password_from_secrets(token_path, PASSWORD_SECRET_OCID)
print("Done getting wallet password")


# Save the results to the database.
print("Saving processed data to " + adw_url)
properties = {
    "driver": "oracle.jdbc.driver.OracleDriver",
    "oracle.net.tns_admin": TNSNAME,
    "password": password,
    "user": USER
}

In [None]:
%%spark

#Load into ADW:

TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
print("TARGET_TABLE : ",TARGET_TABLE)

# Write to ADW.
print("Write to ADW : ")
df_RetailPOS_15min.write.jdbc(url=adw_url, table=TARGET_TABLE, mode="Append", properties=properties)
print("Writing done to ADW : ")


### Stop OCI Data Flow Session

In [None]:
%stop_session