diff --git a/data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage/readme.md b/data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage/readme.md new file mode 100644 index 000000000..7163aa3c1 --- /dev/null +++ b/data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage/readme.md @@ -0,0 +1,58 @@ +# OCI Data Flow Reading files from Object Storage in Streaming mode + +Sometimes you would like to continously monitor a Object Storage (S3 compatible) location and incrementally process new incoming data.
+With Spark we can create a StreamingQuery using ObjectStorage source and process data from files in streaming mode .... without streaming platform. +All we need is to use spark.readStream with a location - object storage or S3 compatible. +It looks like this: + +## START + +### Define source and target locations +namespace = 'objstr_namespace'
+source_bucket = 'src_bucket'
+inc_folder = 'inc_folder'
+target_bucket = 'trg_bucket'
+target_folder = 'trg_folder'
+checkpoint_bucket = 'check_bucket'
+checkpoint_folder = 'check_folder'
+input_path = 'oci://'+source_bucket+'@'+namespace+'/'+inc_folder
+archivelocation = 'oci://archivebucket+'@'+namespace+'/arch_folder'
+ +### Infer schema from sample file +example_file = 'oci://'+source_bucket+'@'+namespace+'/'+inc_folder+'/example_file.parquet'
+example = spark.read.option("basePath", input_path).option("inferSchema", "true").parquet(example_file)
+schema = example.schema
+ +### Read files in streaming mode - streaming query +kafka = spark.readStream.schema(schema).parquet(input_path)
+stream_path = 'oci://'+target_bucket+'@'+namespace+'/'+target_folder
+ +wr = kafka.writeStream.queryName('StreamObjects').format("parquet").option("path", stream_path).option("checkpointLocation", 'oci://'+checkpoint_bucket+'@'+namespace+'/'+checkpoint_folder).option("cleanSource", "archive").option("sourceArchiveDir", archivelocation).start() + +### Stop streaming query +wr.awaitTermination(60)
+wr.stop()
+ +### Check streamed data +nd = spark.read.option("inferSchema", "true").parquet(stream_path+'/*.parquet')
+nd.count()
+ +## END of code + +## Additional comments: +You may to provide :
+option("checkpointLocation") - to persist medatada about processed files
+Option("cleanSource") — It can archive or delete the source file after processing. Values can be archive, delete and default is off.
+Option("sourceArchiveDir") — Archive directory if the cleanSource option is set to archive.
+ + +How to use this asset? +Review the code in the notebook and add the code to your personal OCI Data Flow application. + +License + +Copyright (c) 2024 Oracle and/or its affiliates. + +Licensed under the Universal Permissive License (UPL), Version 1.0. + +See LICENSE for more details.