**Introduction to Python for Data Science and Data Engineering**

Spark Components

  ![](https://jmp.sh/s/YzccOZReG7jdljk57jox)

  Spark Core (RDD API)-> Dataframe API -> Spark SQL/ Spark R API/ MLlib/ Structured Streaming API

# Spark Runtime Architecture
Driver / Cluster Manager / Workers / Executors
###  Driver    
- Responsible for planning and co-ordinating execution.
- Creates the SparkSession, the entry point to all spark applications.
- analyzes spark application and constructs DAG
- Schedules and distributes tasks to executors for execution
- monitors the progress of tasks and handles failures
- returns results to the client
###  Cluster Manager/Master
- Manages cluster resources and allocates them to driver
###  Workers
 - Nodes in the cluster that host ececutors.
###  Executors
- Processes on worker nodes that execute tasks assigned by the driver.
- Run on worker nodes in a spark cluster and host Tasks.
- Store intermediate and final resluts in memory or on disk.
- Interact with the driver for task co-ordination and data transfer


# The Spark DAG
 - Spark jobs are broken down in stages i.e group of tasks that can be run in parallel.
 - Computations flow in one direction through the stages
 - Stages never loop back, ensuring the job terminates
 - Stages are organized into a dependency graph for execution flow.

The Spark UI
 Visualising Spark applications

 Spark provides web user interfaces for monitoring and management including

###  Application UI
  - Per application SparkSession
  - Track Application progress and task execution
  - DAG visualization and stage details
  - Resource usage and performance metrics.

### Master UI
  - Per cluster
  - Worker node status and health and cluster-wide resource allocation
  - Shows all running applications and available resources.

# Spark Clusters in databricks

- **All purpose clusters** - interactive clusters that support notebooks , jobs, dashboards with auto termination
- **Job Cluster** - Clusters that stat when a job runs and terminate automatically upon completion, optimized for non interactive workloads.
- **SQL Warehouses** - Optimized clusters for SQL query performance with instant startup and auto-scaling to balance cost and performance.

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("MySparkSession") \
    .getOrCreate()

In [3]:
# install ngrok reverse proxy python wrapper required in google colab to check the Spark UI
!pip install pyngrok
!pip install findspark

Collecting pyngrok
  Downloading pyngrok-7.3.0-py3-none-any.whl.metadata (8.1 kB)
Downloading pyngrok-7.3.0-py3-none-any.whl (25 kB)
Installing collected packages: pyngrok
Successfully installed pyngrok-7.3.0
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [4]:
#required in google colab to check the Spark UI
import findspark
findspark.init()
from pyngrok import ngrok
from pyngrok import conf
import os

In [5]:
#required in google colab to check the Spark UI
ngrok.set_auth_token("Get your token from https://dashboard.ngrok.com/get-started/your-authtoken")



In [6]:
#required in google colab to check the Spark UI
#conf.get_default().ngrok_skip_browser_warning = True

spark_ui_url = spark.sparkContext.uiWebUrl
print(spark_ui_url)
if spark_ui_url:
  # Extract the port from the URL
  spark_ui_port = int(spark_ui_url.split(':')[-1])
  ngrok_tunnel = ngrok.connect(spark_ui_port)
  print(f"Spark UI URL: {ngrok_tunnel.public_url}")
else:
  print("Spark UI is not available.")



http://a4a09b43a1c4:4040
Spark UI URL: https://20599a4ee1f6.ngrok-free.app


# DataFrames

* Dataframes are distributed collection of records aall with the same pre-defined structure   
* Built on Sparks core concepts but with structure , optimization and familar SQL like operations for data manipulation.
* DataFrames tack their schema and provide native support for many common SQL functions and relational operators like JOINs.
* DataFrames are evaluated as DAGs using lazy evaluation . Prepare the DAG as execute when data is requested.

* Can be created from JSON,CSV, Parquet, ORC,Text or Binary Files
* Delta Lake or other Table storage format directories.

## DataFrame API Optimization
- Adaptive Query Execution
- In-memory Columnar Storage
- Built in Statistics
- Catalyst Optimizer adn Photon (DataBricks)  

## DataFrame/ Query Planning
- When a DF is evaluated, the driver creates an optimzed execution plan throught a series of transformation
 Unresolved logical plan -> Analyzed Logical Plan -> Optimized logical Plan -> Physical Plan



#Columnar Storage


*   Organizes data by column enabling efficient scanning and analysis
*   Efficient for analytical workloads
*   Implemented in dataframe internal storage and in physical file encoding formats such as Parquet and ORC.



# DataFrameReader and DataFrameWriter

df = spark.read.format("format").option().load()

df = spark.read.csv("filelocation")
df = spark.read.parquet("filelocation")

-------------

df.write.format("format").mode("mode").save()

df.write.csv("filelocation")


In [None]:
#Infer the schema of the dataframe Using a DDL string

housing_ddl_schema = '''
longitude DOUBLE,
latitude DOUBLE,
housing_median_age DOUBLE,
total_rooms DOUBLE,
total_bedrooms DOUBLE,
population DOUBLE,
households DOUBLE,
median_income DOUBLE,
median_house_value DOUBLE
'''

housing_ddl_df = spark.read.format("csv") \
.option("header","true")\
.option("inferSchema","false")\
.schema(housing_ddl_schema)\
.load("/content/sample_data/california_housing_test.csv")

In [None]:
#Record count

housing_ddl_df.count()

3000

In [None]:
#explicitly define the schema
from pyspark.sql.types import *

housing_schema = StructType([
    StructField("longitude",DoubleType()),
    StructField("latitude",DoubleType()),
    StructField("housing_median_age",DoubleType()),
    StructField("total_rooms",DoubleType()),
    StructField("total_bedrooms",DoubleType()),
    StructField("population",DoubleType()),
    StructField("households",DoubleType()),
    StructField("median_income",DoubleType()),
    StructField("median_house_value",DoubleType())
    ])

housing_data_df = spark.read.format("csv") \
.option("header","true")\
.option("inferSchema","false")\
.schema(housing_schema)\
.load("/content/sample_data/california_housing_test.csv")

In [None]:
#Record count
housing_data_df.count()

3000

In [None]:
#Reading data from a csv file
#This will create a spark job to load data as it infers the Schema

housing_df = spark.read.format("csv") \
.option("header","true")\
.option("inferSchema","true")\
.load("/content/sample_data/california_housing_test.csv")



In [None]:
#display the fields available
housing_df.printSchema()

#display sample data
housing_df.show()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|     

In [None]:
#writing data onto a file csv

result.write.format("csv").mode("overwrite").save("primes")


In [None]:
housing_df.write.format("parquet") \
.mode("overwrite") \
.save("housing_parquet")

In [None]:
#Writing the dataframe to a new table
housing_df.write.saveAsTable("california_housing")

# Dataframe Schema

- Every DF has a define schema i.e structure and data types of all columns
- Can be inferred from data or explicitly specified
- self describing format like parquet include schema information
- df.printSchema() --> to print out the dataFrame schema
- DDL schema
    - ddl_schema = "name STRING NOT NULL, age INT, city STRING"
    - df = spark.read.csv("Filelocation",schema = ddl_schema)
    - df.printSchema()
- DataFrame Data Types (Primitive and Complex datatypes)
    - TINYINT/SMALLINT/INT/BIGINT
    - FLOAT/DOUBLE
    - STRING
    - BINARY
    - TIMESTAMP/DATE
    - ARRAY
    - MAP
    - STRUCT


In [None]:
df.printSchema()

#Transformations and Actions

DF are immutable - once created their data cannot be modified


*   **Transformations** create new DF from existing ones
    - select/filter/withColumn/groupBy/agg
*   **Actions** like showing or saving output trigger actual computation and produce final results.
    - count/show/take/first/write
    - Multiple transformations can be called, the job is only created when an action is requested - Lazy evaluation



#SparkSQL
SQL interface for Spark DataFrames

**DataFrame Registration**
    - Temporary views: createOrReplaceTempView()
    - Global Temp views: createGlobalTempView()

**SQL Query Execution**
    - spark.sql() for SQL statements


In [None]:
#Display all available tables
spark.sql("SHOW TABLES").show()

+---------+--------------+-----------+
|namespace|     tableName|isTemporary|
+---------+--------------+-----------+
|         |streaming_data|       true|
+---------+--------------+-----------+



In [None]:
# Display data in table
spark.sql("select * from california_housing").show()



+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
|  -118.36|   33.82|              28.0|       67.0|          15.0|      49.0|      11.0|       6.1359|          330000.0|
|  -119.67|   36.33|              19.0|     1241.0|         244.0|     850.0|     237.0|       2.9375|           81700.0|
|  -119.56|   36.51|    

#Distributed System Programming


*   Distributed computing fundamentals
    -   Shared nothing architecture
        - Independence
        - Scalability
        - Fault tolerance
        - Resource partitioning
    -   Partitioning
        - Data distribution
            -   Data divided into mutually exclusive in-memory partitions
            -   partitioning can be based upon input and manipulated
            -   size and number of partitions affect parallelism        
    -   Parallel Processing
        -   Each partition processed independently
        -   multiple partitions can run in parallel
        -   one partition = one task in spark
*   Data Movement and Shuffling
    -   Data movement between nodes ops like groupby and Join
        Redistribute data across partitions required for operations like groupBy it occurs when wide transformations, key based operations, data repartitioning
*   MapReduce
    -   MapReduce Algorithm
        - Map/Shuffle/Reduce
        - GroupBy: Map (extract keys)->Shuffle (by key)-> Reduce(aggregate)
        - join   : Map (prepare keys)->Shuffle (co-locate)-> Reduce(combine)
        - filter : Map (evaluate condition) -> No shuffle or reduce needed
    -   Its application in RDD and DataFrame APIs

# Basic ETL with dataframe API

- DF operations are distributed automatically across all partitions
- All DF transformations have equivalent SQL operations.
- Methods return new DF rather than modifying the existing one.
- Each operation builds logical plan until an action triggers execution(Lazy loading)

```
## Basic transformation methods and SQL equivalent
- select()          - SELECT
- filter(),where()  - WHERE
- groupBy()         - GROUP BY
- orderBy(),sort()  - ORDER BY
- join()            - JOIN
```

## Referencing dataframe columns

```
df.select("column-Name")
df.select(df.column_name)
df.select(df["column_Name"])
df.select(col("column-Name").alias("CustomerName"))
```

## Common column object methods

```
alias()
cast()/astype()
isNull()/isNotNull()
contains()
asc()/desc()
```




In [None]:
from pyspark.sql.functions import sum, when, col

invalidData = housing_df.select(
    sum(when(col("housing_median_age").isNull(),1).otherwise(0)).alias("Null_Count")
)

invalidData.show()

+----------+
|Null_Count|
+----------+
|         0|
+----------+



In [None]:
#Creating a temporary view

housing_df.selectExpr("longitude","latitude","housing_median_age","total_rooms").createOrReplaceTempView("HousingLimitedData")

In [None]:
#List available views and table
spark.sql("SHOW TABLES").show()

+---------+--------------+-----------+
|namespace|     tableName|isTemporary|
+---------+--------------+-----------+
|         |streaming_data|       true|
+---------+--------------+-----------+



In [None]:
#Spark SQL command
invalidRecords = spark.sql('''
select COUNT_IF(housing_median_age IS NULL) AS NULL_AGE_COUNT
,COUNT_IF(total_rooms IS NULL) AS NULL_TOTAL_ROOMS_COUNT
 from HousingLimitedData
 ''')

invalidRecordsData = invalidRecords.show()

+--------------+----------------------+
|NULL_AGE_COUNT|NULL_TOTAL_ROOMS_COUNT|
+--------------+----------------------+
|             0|                     0|
+--------------+----------------------+



In [None]:
#Explain plan for SQL

sql_plan = invalidRecords.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(if (isnotnull(housing_median_age#19)) null else isnull(housing_median_age#19)), count(if (isnotnull(total_rooms#20)) null else isnull(total_rooms#20))])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=184]
      +- HashAggregate(keys=[], functions=[partial_count(if (isnotnull(housing_median_age#19)) null else isnull(housing_median_age#19)), partial_count(if (isnotnull(total_rooms#20)) null else isnull(total_rooms#20))])
         +- FileScan csv [housing_median_age#19,total_rooms#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/sample_data/california_housing_test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<housing_median_age:double,total_rooms:double>




In [None]:
#Explain plan for DF

df_explainPlan = invalidData.explain()

type(df_explainPlan)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[sum(CASE WHEN isnull(housing_median_age#19) THEN 1 ELSE 0 END)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=197]
      +- HashAggregate(keys=[], functions=[partial_sum(CASE WHEN isnull(housing_median_age#19) THEN 1 ELSE 0 END)])
         +- FileScan csv [housing_median_age#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/sample_data/california_housing_test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<housing_median_age:double>




NoneType

#User Defined Functions
- UDFs allow developer to create resuable custom functions
- has performance impact as they cannot be optimized by Catalyst optimizer and have serialization overhead.
- Always used builtin functions if need use the Pandas functions.
- Pandas UDFs allow you to write python functions that operate on batches of rows instead of single rows, leveraging Apache arrow for more efficient Python-JVM serialization.


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import col

import math

# user defined function
def primeNumbercheck(myNum):
	if myNum==2:
		return True
	elif myNum%2==0 or myNum==1:
		return False
	x=int(math.sqrt(myNum))
	if x%2==0:
		strtno=x+1
	else:
		strtno=x
	for i in range(strtno,1,-2):
		if myNum%i==0:
			return False
	return True


In [None]:
#register the function primecheck as a
#User Defined Function (UDF) for use with Spark DataFrames
from pyspark.sql.functions import udf

@udf(returnType=BooleanType())
def primecheck(myNum):
    return primeNumbercheck(myNum)

In [None]:
#create dataframe with sample data 1000 records
df = spark.range(0,100)

print(type(df))

#Add a new column isPrime with boolean value as output
df = df.withColumn("isPrime", primecheck(df["id"]))

<class 'pyspark.sql.dataframe.DataFrame'>


In [None]:
#capture the result of only prime records
result = df.filter(df.isPrime==True)

In [None]:
# number of primes cache the result
# visible on spark UI /storage/
result.cache()

DataFrame[id: bigint, isPrime: boolean]

In [None]:
result.collect()
result.count()

[Row(id=2, isPrime=True),
 Row(id=3, isPrime=True),
 Row(id=5, isPrime=True),
 Row(id=7, isPrime=True),
 Row(id=11, isPrime=True),
 Row(id=13, isPrime=True),
 Row(id=17, isPrime=True),
 Row(id=19, isPrime=True),
 Row(id=23, isPrime=True),
 Row(id=29, isPrime=True),
 Row(id=31, isPrime=True),
 Row(id=37, isPrime=True),
 Row(id=41, isPrime=True),
 Row(id=43, isPrime=True),
 Row(id=47, isPrime=True),
 Row(id=53, isPrime=True),
 Row(id=59, isPrime=True),
 Row(id=61, isPrime=True),
 Row(id=67, isPrime=True),
 Row(id=71, isPrime=True),
 Row(id=73, isPrime=True),
 Row(id=79, isPrime=True),
 Row(id=83, isPrime=True),
 Row(id=89, isPrime=True),
 Row(id=97, isPrime=True)]

In [None]:
# Free up executor memeory by unpersisting cached objects
result.unpersist()

DataFrame[id: bigint, isPrime: boolean]

In [None]:
#Pandas UDF function

from pyspark.sql.functions import pandas_udf
#population|households
@pandas_udf("double")
def get_avg_house_members(populationSeries,householdsSeries):
  return populationSeries/householdsSeries


housing_df_enriched= housing_df.withColumn("avg_house_members",get_avg_house_members(col("population"),col("households")))

In [None]:
housing_df_enriched.show()

#Spark Streaming background

Spark Streaming introduced in 2013 as an extension to core Spark.
- built on the RDD API
- Using the DStream (Discretized Streams) model.
- Processing data in small time-based (RDD) batches.

Structured Streaming was introduced in 2016
- Built on Dataframe/dataset APIs
- Introduced event-time processing
- Simplified API with SQL-like operations
- Better handling of late and out-of-order data

## Microbatching
- Microbatching processes a stream as a series of small batches
- Data is collected into time-based chunks
- Each chunk processed as mini-batch job typical interval of 100 ms to few seconds



#DataStreamReader and DataStreamWriter

###Triggers
- Default Trigger - Processes data as soon as the previous micro-batch completes
- Fixed Interval - Process data at specified time intervals, useful for controlling resource usage.
- Available Now - Process available data then stops

###Output Modes for structured streaming write output results

- append - default mode only adds new records to the sink.
- update - modifies existing records and adds new ones
- complete - writes entire result table to sink each time


In [11]:
import os
print(os.environ['SPARK_HOME'])

#list the files available in the jars folder
!ls /usr/local/lib/python3.11/dist-packages/pyspark/jars


In [17]:
#OCI bucket connectivity

#installation of required jars to connect to OCI
#https://github.com/oracle/oci-hdfs-connector/releases

#download the jars from below link refer the latest version from above
!wget https://github.com/oracle/oci-hdfs-connector/releases/download/v3.4.1.0.0.3/oci-hdfs.zip

#the file is downloaded into /content folder

!unzip oci-hdfs.zip

#move the files to jars folder
!cp /content/third-party/lib/* /usr/local/lib/python3.11/dist-packages/pyspark/jars

!cp /content/lib/* /usr/local/lib/python3.11/dist-packages/pyspark/jars

#check if the two files are available in the folder
!ls /usr/local/lib/python3.11/dist-packages/pyspark/jars |grep -i 'hdfs\|jsr'

jackson-datatype-jsr310-2.15.2.jar
jsr305-3.0.0.jar
jsr305-3.0.2.jar
oci-hdfs-full-3.4.1.0.0.3.jar


In [42]:
!ls /usr/local/lib/python3.11/dist-packages/pyspark/jars |grep -i 'hdfs\|jsr'

!rm /usr/local/lib/python3.11/dist-packages/pyspark/jars/jsr305-3.0.0.jar

!ls /usr/local/lib/python3.11/dist-packages/pyspark/jars |grep -i 'hdfs\|jsr'

jackson-datatype-jsr310-2.15.2.jar
jsr305-3.0.0.jar
jsr305-3.0.2.jar
oci-hdfs-full-3.4.1.0.0.3.jar
jackson-datatype-jsr310-2.15.2.jar
jsr305-3.0.2.jar
oci-hdfs-full-3.4.1.0.0.3.jar


In [19]:
#keysetup

!mkdir -p /content/.oci
!openssl genrsa -out /content/.oci/oci_api_key.pem 2048
!chmod go-rwx /content/.oci/oci_api_key.pem
!openssl rsa -pubout -in /content/.oci/oci_api_key.pem -out /content/.oci/oci_api_key_public.pem



writing RSA key


In [20]:
!cat /content/.oci/oci_api_key_public.pem

-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEApTp69nC5Kjs5ktZGHUIm
f60oHIXOr5a7cHTpHsWyTDoEmU/q3t0Wk632pwNB7JO3mp64+U2isuGxvRJjEROo
EBgfK57nOcEe27PkHBM303Q5eLlLfPuvXwYQWJDjnxM0AOT7NkdGBIlZ82Au/fia
zvlgKnE+b8fUn8Y38HomOj2PuPtYFD/RXWOJwDJeXcMO8t9dZjotJFC+SmhKR5eu
j9legAxwlZ3fTUtnT04Q1eTTZSL9NwGboVOjIv/SljekF7LN4+Ky/uXEkZ5mqBV6
kXr4Tgo2K7I0bZXPkMTsMkxNFIza3mUEplKFqY/j/rrvaFcp1YU+22CYkW5BfuYR
EQIDAQAB
-----END PUBLIC KEY-----


In [22]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
#define the schema structure

#spotify_track_uri,ts,platform,ms_played,track_name,artist_name,album_name,reason_start,reason_end,shuffle,skipped

schema = StructType([
    StructField("spotify_track_uri",StringType()),
    StructField("ts",TimestampType()),
    StructField("platform",StringType()),
    StructField("ms_played",LongType()),
    StructField("track_name",StringType()),
    StructField("artist_name",StringType()),
    StructField("album_name",StringType()),
    StructField("reason_start",StringType()),
    StructField("reason_end",StringType()),
    StructField("shuffle",BooleanType()),
    StructField("skipped",BooleanType())
])


In [43]:
try:
    spark.stop()
except NameError:
    pass

In [44]:
import os
import findspark

# Define the Spark version you are using
spark_version = "spark-3.5.0"
os.environ["SPARK_HOME"] = f"/usr/local/{spark_version}"

# Path to the OCI HDFS connector JARs
# Use the paths you confirmed with your ls command
oci_jar_path = "/usr/local/lib/python3.11/dist-packages/pyspark/jars/oci-hdfs-full-3.4.1.0.0.3.jar"
jsr305_jar_path = "/usr/local/lib/python3.11/dist-packages/pyspark/jars/jsr305-3.0.2.jar"
jackson_jsr_jar_path = "/usr/local/lib/python3.11/dist-packages/pyspark/jars/jackson-datatype-jsr310-2.15.2.jar"

# Combine the paths into a comma-separated string for the --jars argument
jars_string = f"{oci_jar_path},{jsr305_jar_path},{jackson_jsr_jar_path}"

# Now, set the PYSPARK_SUBMIT_ARGS environment variable
os.environ["PYSPARK_SUBMIT_ARGS"] = f"--jars {jars_string} pyspark-shell"

# Initialize findspark
findspark.init()


In [47]:
from google.colab import userdata


try:
    spark.stop()
except NameError:
    pass


print("Environment variables for Spark and JARs have been set.")
# Build the SparkSession with OCI configurations
spark = SparkSession.builder \
    .appName("OCIStructuredStreaming") \
    .config("spark.driver.extraClassPath", classpath_jars_string) \
    .config("spark.executor.extraClassPath", classpath_jars_string) \
    .config("spark.hadoop.fs.oci.client.hostname", userdata.get('OCI_HOST')) \
    .config("spark.hadoop.fs.oci.client.auth.tenantId", userdata.get('OCI_TENANTID')) \
    .config("spark.hadoop.fs.oci.client.auth.userId", userdata.get('OCI_ID')) \
    .config("spark.hadoop.fs.oci.client.auth.fingerprint", userdata.get('OCI_FINGERPRINT')) \
    .config("spark.hadoop.fs.oci.client.auth.pemfilepath", "/content/.oci/oci_api_key.pem") \
    .config("spark.hadoop.fs.oci.impl", "com.oracle.bmc.hdfs.OCIHDFSFileSystem") \
    .getOrCreate()

spark.sparkContext.setLogLevel("DEBUG")

print("SparkSession created with OCI configurations.")

# # Define the input path for your OCI bucket
# # The OCI path format is oci://<bucket-name>@<namespace>/<prefix>


# # Use spark.readStream to read the data
# # You need to specify the format of your files (e.g., "json", "csv", "parquet")
# # You also need to provide a checkpoint location, which is crucial for
# # Structured Streaming to maintain state and provide fault tolerance.
# stream_df = spark.readStream \
#     .format("csv") \
#     .schema(schema) \
#     .option("maxFilesPerTrigger", 1) \
#     .load(input_path)

Environment variables for Spark and JARs have been set.
SparkSession created with OCI configurations.


In [None]:
input_path = "oci://ocipysparkdatasource@bm1mycqpwdo2/pysparksourcedata"
stream_df = spark.read.text(input_path).limit(1)
stream_df.show()

In [None]:
#accessing variables from config files if needed
!pip install python-dotenv

import dotenv
import os

dotenv.load_dotenv("/content/config.env")

my_url = os.getenv("MY_OCI_BUCKET_URL")

#print(f"The URL read from config.env is: {my_url}")


In [1]:
#OCI accesssetup
!echo SPARK_HOME

SPARK_HOME


In [None]:
# stream_df = spark.readStream \
# .format("csv") \
# .schema(schema) \
# .option("maxFilesPerTrigger",1) \
# .option("path",my_url) \
# .load()

In [None]:
print("isStreaming: ",stream_df.isStreaming)

isStreaming:  True


In [None]:
display(stream_df)

DataFrame[spotify_track_uri: string, ts: timestamp, platform: string, ms_played: bigint, track_name: string, artist_name: string, album_name: string, reason_start: string, reason_end: string, shuffle: boolean, skipped: boolean]

In [None]:
#stream_df.show()

# Correct way to display streaming data
query = stream_df.writeStream \
    .format("memory") \
    .queryName("streaming_data") \
    .outputMode("append") \
    .start()



In [None]:
# To see the data, you can query the in-memory table
spark.sql("SELECT * FROM streaming_data").show()

+--------------------+-------------------+----------+---------+--------------------+--------------------+--------------------+------------+----------+-------+-------+
|   spotify_track_uri|                 ts|  platform|ms_played|          track_name|         artist_name|          album_name|reason_start|reason_end|shuffle|skipped|
+--------------------+-------------------+----------+---------+--------------------+--------------------+--------------------+------------+----------+-------+-------+
|   spotify_track_uri|               NULL|  platform|     NULL|          track_name|         artist_name|          album_name|reason_start|reason_end|   NULL|   NULL|
|2J3n32GeLmMjwuAzy...|2013-07-08 02:44:34|web player|     3185| Say It, Just Say It|        The Mowgli's|Waiting For The Dawn|    autoplay|  clickrow|  false|  false|
|1oHxIPqJyvAYHy0PV...|2013-07-08 02:45:37|web player|    61865|Drinking from the...|       Calvin Harris|           18 Months|    clickrow|  clickrow|  false|  false

In [None]:
#simple filter

filtered_stream = stream_df.filter(stream_df.shuffle==False)


In [None]:
#filtered count

query = filtered_stream.writeStream \
    .format("memory") \
    .queryName("filteredStream") \
    .outputMode("append") \
    .start()

In [None]:
spark.sql("select count(1) from filteredStream").show()

+--------+
|count(1)|
+--------+
|   38141|
+--------+



In [None]:
display(filtered_stream)

DataFrame[spotify_track_uri: string, ts: timestamp, platform: string, ms_played: bigint, track_name: string, artist_name: string, album_name: string, reason_start: string, reason_end: string, shuffle: boolean, skipped: boolean]