In [None]:
import pyspark
from pyspark.sql import SparkSession

# Create a Spark session (Control center of Spark functionality)
spark = SparkSession.builder.appName("MySparkApp").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", "5")

# A Spark Overview


- Running apps with **spark-submit**
- Datasets: type-safe APIs
- Structured Streaming
- Machine learning and advanced analytics
- RDD: Spark low level APIs
- SparkR
- Third-party package ecosystem


## spark-submit

A build-in command-line that sends your application code to a cluster and lunch it execute it there.

The application will run there until it completes it's task and gets an error

**spark-submit main.py**

| Aspect              | `spark-submit main.py` | `python main.py`      |
| ------------------- | ---------------------- | --------------------- |
| Starts Spark JVM    | ✅ Yes                 | ❌ Indirect / fragile |
| Cluster execution   | ✅ Yes                 | ❌ No                 |
| Config control      | ✅ Full                | ❌ Limited            |
| Executor management | ✅ Yes                 | ❌ No                 |
| Dependency shipping | ✅ Yes                 | ❌ No                 |
| Production-ready    | ✅ Yes                 | ❌ No                 |
| Notebooks           | ❌                     | ✅                    |


## Datasets

Only available in Java and Scala


## Structured Streaming

A stream processing engine built on top of Spark SQL. It allows you to process real-time data streams using the same DataFrame/Dataset API that Spark uses for batch processing.

Streams are treated as tables that grow continuously.
Each record in the stream is like a new row in a continuously appending table.

Structured Streaming processes data in micro-batches or in continuous mode:

- Micro-batch mode (default): Spark processes incoming data in small batches at intervals.
- Continuous mode: Spark processes data as it arrives (very low latency, experimental).


In [None]:
# loads all the csv files from the path
# we can specify just the folder or use wildcards  to extract just certain files
staticDataFrame = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("../data/retail-data/by-day")
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

#either form works
staticDF2011 = spark.read.csv("../data/retail-data/by-day/2011*.csv", header=True, inferSchema=True)
staticDF2010 = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("../data/retail-data/by-day/2010*.csv")
staticDataFrame.createOrReplaceTempView("retail_data")


In [None]:
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(5)


staticDataFrame.createOrReplaceTempView("retail_data")
spark.sql("""
SELECT CustomerId,
       window(InvoiceDate, '1 day') as day_window,
       SUM(UnitPrice * Quantity) as total_cost
FROM retail_data
GROUP BY CustomerId, window(InvoiceDate, '1 day')
""").show(5)

staticDF2011.createOrReplaceTempView("retail_data")
spark.sql("""
SELECT COUNT(*) as Data_2011
FROM retail_data
""").show(1)

staticDF2010.createOrReplaceTempView("retail_data")
spark.sql("""
SELECT COUNT(*) as Data_2010
FROM retail_data
""").show(1)

spark.read.csv:

- will only read the files at the path once
- can infer schema (inferSchema=True) or you can specify it
- returns a DataFrame that is finite (all data is already there)

spark.readStream:

- will reads new files that appear in a folder continuously
- schema must be explicitly defined (inferSchema is not supported in streaming)
- returns a DataFrame representing an unbounded table


In [None]:
staticSchema = staticDataFrame.schema

# maxFilesPerTrigger number of files we will read at once (In PRD scenarios usually omitted)
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load("../data/retail-data/by-day/*.csv")

In [None]:
streamingDataFrame.isStreaming # returns True

In [None]:
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")

In [None]:
# creates table customer_purchases in memory and constantly updates it with ne files added
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

In [None]:
# current running stream
for q in spark.streams.active:
    print(q.name, q.id, q.status)


In [None]:
# we query the table customer_purchases
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")\
.show(5)

spark.sql("""
SELECT COUNT(*) AS num_rows
FROM customer_purchases
""")\
.show(5)

In [None]:
for q in spark.streams.active:
    q.stop()

## Machine Learning and Advanced Analytics


In [None]:
# Stop the Spark session - good practice to free up resources on low-RAM machines
spark.stop()