From 99e9e7b475a19a206af909a73326b6eed23c0cdd Mon Sep 17 00:00:00 2001 From: Sylwester Dec Date: Mon, 10 Mar 2025 20:06:14 +0100 Subject: [PATCH 1/3] Create readme.md --- .../Streaming_from_ObjectStorage/readme.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage/readme.md diff --git a/data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage/readme.md b/data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage/readme.md new file mode 100644 index 000000000..7fff8eaa3 --- /dev/null +++ b/data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage/readme.md @@ -0,0 +1,17 @@ +OCI Data Flow Reading files from Object Storage in Streaming mode + +When to use this asset? + +When you need continously monitor a Object Storage (S3 location) and incrementally process new incoming data + +How to use this asset? + +Review the code in the notebook and add the code to your personal OCI Data Flow application. + +License + +Copyright (c) 2024 Oracle and/or its affiliates. + +Licensed under the Universal Permissive License (UPL), Version 1.0. + +See LICENSE for more details. From 99626c7ccf24b1e9388cbf86b5751f75cc9dd175 Mon Sep 17 00:00:00 2001 From: Sylwester Dec Date: Mon, 10 Mar 2025 20:34:20 +0100 Subject: [PATCH 2/3] Update readme.md --- .../Streaming_from_ObjectStorage/readme.md | 49 +++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage/readme.md b/data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage/readme.md index 7fff8eaa3..17aa7e339 100644 --- a/data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage/readme.md +++ b/data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage/readme.md @@ -1,11 +1,52 @@ -OCI Data Flow Reading files from Object Storage in Streaming mode +# OCI Data Flow Reading files from Object Storage in Streaming mode -When to use this asset? +Sometimes you would like to continously monitor a Object Storage (S3 compatible) location and incrementally process new incoming data.
+With Spark we can create a StreamingQuery using ObjectStorage source and process data from files in streaming mode .... without streaming platform. +All we need is to use spark.readStream with a location - object storage or S3 compatible. +It looks like this: -When you need continously monitor a Object Storage (S3 location) and incrementally process new incoming data +## START -How to use this asset? +### Define source and target locations +namespace = 'objstr_namespace'
+source_bucket = 'src_bucket'
+inc_folder = 'inc_folder'
+target_bucket = 'trg_bucket'
+target_folder = 'trg_folder'
+checkpoint_bucket = 'check_bucket'
+checkpoint_folder = 'check_folder'
+input_path = 'oci://'+source_bucket+'@'+namespace+'/'+inc_folder
+archivelocation = 'oci://archivebucket+'@'+namespace+'/arch_folder'
+ +### Infer schema from sample file +example_file = 'oci://'+source_bucket+'@'+namespace+'/'+inc_folder+'/example_file.parquet'
+example = spark.read.option("basePath", input_path).option("inferSchema", "true").parquet(example_file)
+schema = example.schema
+ +### Read files in streaming mode - streaming query +kafka = spark.readStream.schema(schema).parquet(input_path)
+stream_path = 'oci://'+target_bucket+'@'+namespace+'/'+target_folder
+ +wr = kafka.writeStream.queryName('StreamObjects').format("parquet").option("path", stream_path).option("checkpointLocation", 'oci://'+checkpoint_bucket+'@'+namespace+'/'+checkpoint_folder).option("cleanSource", "archive").option("sourceArchiveDir", archivelocation).start() + +### Stop streaming query +wr.awaitTermination(60)
+wr.stop()
+### Check streamed data +nd = spark.read.option("inferSchema", "true").parquet(stream_path+'/*.parquet')
+nd.count() + +## END of code + +## Additional comments: +You may to provide : +option("checkpointLocation") - to persist medatada about processed files +Option("cleanSource") — It can archive or delete the source file after processing. Values can be archive, delete and default is off. +Option("sourceArchiveDir") — Archive directory if the cleanSource option is set to archive. + + +How to use this asset? Review the code in the notebook and add the code to your personal OCI Data Flow application. License From 4412f05bb17b8dc2a021fa465b1086b036cd1860 Mon Sep 17 00:00:00 2001 From: Sylwester Dec Date: Mon, 10 Mar 2025 20:36:29 +0100 Subject: [PATCH 3/3] Update readme.md --- .../Streaming_from_ObjectStorage/readme.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage/readme.md b/data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage/readme.md index 17aa7e339..7163aa3c1 100644 --- a/data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage/readme.md +++ b/data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage/readme.md @@ -35,15 +35,15 @@ wr.stop()
### Check streamed data nd = spark.read.option("inferSchema", "true").parquet(stream_path+'/*.parquet')
-nd.count() +nd.count()
## END of code ## Additional comments: -You may to provide : -option("checkpointLocation") - to persist medatada about processed files -Option("cleanSource") — It can archive or delete the source file after processing. Values can be archive, delete and default is off. -Option("sourceArchiveDir") — Archive directory if the cleanSource option is set to archive. +You may to provide :
+option("checkpointLocation") - to persist medatada about processed files
+Option("cleanSource") — It can archive or delete the source file after processing. Values can be archive, delete and default is off.
+Option("sourceArchiveDir") — Archive directory if the cleanSource option is set to archive.
How to use this asset?