In [None]:
#Copyright (c) 2025, Oracle and/or its affiliates.
#Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
pip install ocifs

In [None]:
import oci
import os
import io
import zipfile
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.functions import *

In [None]:
prefixes = [
    "fscmtopmodelam_finextractam_apbiccextractam",
    "fscmtopmodelam_finextractam_arbiccextractam",
    "fscmtopmodelam_finextractam_cebiccextractam",
    "fscmtopmodelam_finextractam_exmbiccextractam",
    "fscmtopmodelam_finextractam_fabiccextractam",
    "fscmtopmodelam_finextractam_funbiccextractam",
    "fscmtopmodelam_finextractam_glbiccextractam",
    "fscmtopmodelam_finextractam_iexbiccextractam",
    "fscmtopmodelam_finextractam_xlabiccextractam",
    "fscmtopmodelam_finextractam_xlebiccextractam",
    "fscmtopmodelam_finextractam_zxbiccextractam"
]

In [None]:
import base64
import oci
# OCI Configuration
CONFIG_PROFILE = "DEFAULT"
config = oci.config.from_file('~/.oci/config', CONFIG_PROFILE)

def get_text_secret(secret_ocid):
    vault_client = oci.secrets.SecretsClient(config)
    try:
        secret_content = vault_client.get_secret_bundle(secret_ocid).data.secret_bundle_content.content
        decrypted_secret_content = base64.b64decode(secret_content).decode("utf-8")
    except Exception as ex:
        print("ERROR: failed to retrieve the secret content", ex, flush=True)
        raise
    return secret_content

In [None]:
passowrd=get_text_secret("<oci vault secret>")

sfOptions = {
    "sfUrl": "<sf account>.snowflakecomputing.com:443",
    "sfAccount": "<>",
    "sfUser": "<>",
    "sfPassword": password,
    "sfRole": "<>",
    "sfWarehouse": "<>", 
    "sfDatabase": "<<", 
    "sfSchema": "<>",
    "createTableOptions": "(overwrite=true)"
}  

In [None]:
def in_dataflow():
    """
    Determine if we are running in OCI Data Flow by checking the environment.
    """
    if os.environ.get("HOME") == "/home/dataflow":
        return True
    return False

In [None]:
def get_dataflow_spark_session(
    app_name="DataFlow", file_location=None, profile_name=None, spark_config={}
):
    """
    Get a Spark session in a way that supports running locally or in Data Flow.
    """
    if in_dataflow():
        spark_builder = SparkSession.builder.appName(app_name)
        
    else:
        # Import OCI.
        try:
            import oci
        except:
            raise Exception(
                "You need to install the OCI python library to test locally"
            )
        # Use defaults for anything unset.
        if file_location is None:
            file_location = oci.config.DEFAULT_LOCATION
        if profile_name is None:
            profile_name = oci.config.DEFAULT_PROFILE
        # Load the config file.
        try:
            oci_config = oci.config.from_file(
                file_location=file_location, profile_name=profile_name
            )
        except Exception as e:
            print("You need to set up your OCI config properly to run locally")
            raise e
        conf = SparkConf()
        conf.set("fs.oci.client.auth.tenantId", oci_config["tenancy"])
        conf.set("fs.oci.client.auth.userId", oci_config["user"])
        conf.set("fs.oci.client.auth.fingerprint", oci_config["fingerprint"])
        conf.set("fs.oci.client.auth.pemfilepath", oci_config["key_file"])
        conf.set(
            "fs.oci.client.hostname",
            "https://objectstorage.{0}.oraclecloud.com".format(oci_config["region"]),
        )
        spark_builder = SparkSession\
        .builder\
        .appName("AppBicc")\
        .config("spark.log.level", "ERROR") \
        .config("spark.jars", "/home/datascience/snowflake-jdbc-3.15.1.jar,/home/datascience/spark-snowflake_2.12-2.15.0-spark_3.2.jar") \
        .config(conf=conf) 
    # Add in extra configuration.
    for key, val in spark_config.items():
        spark_builder.config(key, val)

    # Create the Spark session.
    session = spark_builder.getOrCreate()
    return session

In [None]:
spark = get_dataflow_spark_session()

In [None]:
# List of bicc prefixes
def checkLisOfObjects(path_file):
    for substring in prefixes:
        if substring in path_file:
            return True

In [None]:
def process_filename(filename):
    # Split the filename based on underscores
    parts = filename.split('_')
    
    # Ensure there are at least four underscores in the filename
    if len(parts) < 5:
        return "Invalid filename"
    
    # Join the parts after the fourth underscore
    new_filename = '_'.join(parts[4:])
    
    # Remove everything after the first hyphen
    new_filename = new_filename.split('-')[0]
    
    return new_filename+".csv"

In [None]:
def get_delegation_token_path(spark):
    """
    Get the delegation token path when we're running in Data Flow.
    """
    if not in_dataflow():
        return None
    token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath"
    token_path = spark.sparkContext.getConf().get(token_key)
    if not token_path:
        raise Exception(f"{token_key} is not set")
    return token_path

In [None]:
def get_authenticated_client(token_path, client, file_location=None, profile_name=None):
    """
    Get an an authenticated OCI client.

    Example: get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient)
    """
    try:
        import oci
        if not in_dataflow():
            # We are running locally, use our API Key.
            if file_location is None:
                file_location = oci.config.DEFAULT_LOCATION
            if profile_name is None:
                profile_name = oci.config.DEFAULT_PROFILE
            config = oci.config.from_file(file_location=file_location, profile_name=profile_name)
            authenticated_client = client(config)
        else:
            # We are running in Data Flow, use our Delegation Token.
            with open(token_path) as fd:
                delegation_token = fd.read()
            signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
                delegation_token=delegation_token
            )
            authenticated_client = client(config={}, signer=signer)
    except Exception as e:
            print("Error in get_authenticated_client function")
            raise e
    return authenticated_client

In [None]:
def unzipToBucket(file_contents, destination_bucket_name, object_storage, namespace):
    with zipfile.ZipFile(io.BytesIO(file_contents)) as zip_ref:
    # List contents of the zip file
        for file_info in zip_ref.infolist():
            extracted_file_content = zip_ref.read(file_info.filename)
            # Upload extracted file to destination bucket
            object_storage.put_object(namespace,destination_bucket_name,process_filename(file_info.filename),io.BytesIO(extracted_file_content))

In [None]:
import gzip
import io

def unzipGzToBucket(file_contents, destination_bucket_name, object_storage, namespace, original_filename):
    # Use gzip to decompress the .gz file
    with gzip.GzipFile(fileobj=io.BytesIO(file_contents)) as gz_ref:
        extracted_file_content = gz_ref.read()
        # Derive the filename by stripping the .gz extension
        filename = original_filename[:-3] if original_filename.endswith('.gz') else original_filename
        # Upload extracted file to destination bucket
        object_storage.put_object(namespace, destination_bucket_name, process_filename(filename), io.BytesIO(extracted_file_content))

In [None]:
#process to read just the zip(or gzip) files with the prefixes and unzip them into a new bucket.
try:
    import oci
    source_bucket = '<OCI Bucket>'
    source_namespace='<OCI Namespace>'
    next_starts_with = None
    token_path = get_delegation_token_path(spark)
    object_storage = get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient)
except Exception as e:
    print("Error in authentication")
    raise e
response = object_storage.list_objects(source_namespace, source_bucket,start=next_starts_with, fields='size,timeCreated,timeModified,storageTier', retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY)
next_starts_with= response.data.next_start_with
try:                         
    start_time = time.time()
    print(start_time)
    for object_file in response.data.objects:
            path_file=str(object_file.name)
            if checkLisOfObjects(path_file):
                get_object_response = object_storage.get_object(source_namespace, source_bucket, path_file)
                unzipGzToBucket(get_object_response.data.content, "BucketToDelete", object_storage, "idprle0k7dv3",path_file)
    if not next_starts_with:
        end_time = time.time()
        elapsed_time= end_time - start_time
        print("Elapsed time: ", elapsed_time)
except Exception as e:
    print(f"Error: {e}")

In [None]:
def loadFiles(path_file,spark):
    try:
        df = spark.read.format("csv").option("header", "true").load("oci://<target bucket>@<OCI Namespace>/"+path_file)
        df.write \
          .format("net.snowflake.spark.snowflake") \
          .options(**sfOptions) \
          .option("dbtable", path_file[0:-4]) \
          .mode("overwrite") \
          .save()
    except Exception as e:
        print("Error Message!!!!!!!", e)
        df = spark.read.format("csv").option("delimiter","|||").option("header", "true").load("oci://<target bucket>@<OCI Namespace>/"+path_file)
        df.write \
          .format("net.snowflake.spark.snowflake") \
          .options(**sfOptions) \
          .option("dbtable", path_file[0:-4]) \
          .mode("overwrite") \
          .save()

In [None]:
def run_tasks(function, q, spark):
    while not q.empty():
        value = q.get()
        function(value,spark)
        q.task_done()

In [None]:
from threading import Thread
from queue import Queue

q = Queue()

worker_count = 4

try:
    import oci
    source_bucket = '<>'
    source_namespace='<OCI Namespace>'
    next_starts_with = None
    token_path = get_delegation_token_path(spark)
    object_storage = get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient)
except Exception as e:
    print("Error in authentication")
    raise e
response = object_storage.list_objects(source_namespace, source_bucket,start=next_starts_with, fields='size,timeCreated,timeModified,storageTier', retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY)
next_starts_with= response.data.next_start_with
#In this case, I'm using a Bucket that only contains the csv files we want to load, so no filtering.
start_time = time.time()
file_paths = [file_name.name for file_name in response.data.objects]
for file in file_paths:
    q.put(file)

for i in range(worker_count):
    t=Thread(target=run_tasks, args=(loadFiles,q,spark))
    t.daemon = True
    t.start()

q.join()
end_time = time.time()
elapsed_time= end_time - start_time
print("Elapsed time: ", elapsed_time)

In [None]:
#delete process
buckets = ["<source bucket with csv files>", "source bucket with zip files"]
source_namespace='<OCI Namespace>'
next_starts_with = None
token_path = get_delegation_token_path(spark)
object_storage = get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient)
for bucket in buckets:
    response = object_storage.list_objects(source_namespace, bucket,start=next_starts_with, fields='size,timeCreated,timeModified,storageTier', retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY)
    next_starts_with= response.data.next_start_with
    #In this case, I'm using a Bucket that only contains the csv files we want to load, so no filtering.
    try:                         
        for object_file in response.data.objects:
                path_file=str(object_file.name)
                if checkLisOfObjects(path_file):
                    object_storage.delete_object(source_namespace, bucket, path_file)
                elif bucket=="<source bucket with csv files>":
                    object_storage.delete_object(source_namespace, bucket, path_file)
        if not next_starts_with:
            success='{"status":"200"}'
    except Exception as e:
        print(f"Error: {e}")