# Delta Lake Viewer

This notebook demonstrates how to read and query Delta Lake tables stored in MinIO (S3-compatible storage).

## 1. Setup Spark Session with Delta Lake

In [None]:
from pyspark.sql import SparkSession
# from delta import *

# Create Spark session with Delta Lake configuration
# Using Spark 4.0.1 + Delta Lake 4.0.0 (matches job execution environment)
spark = SparkSession.builder \
    .appName("DeltaLakeViewer") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.13:4.0.0,org.apache.hadoop:hadoop-aws:3.4.1,com.amazonaws:aws-java-sdk-bundle:1.12.681") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .getOrCreate()

print("Spark session created successfully!")
print(f"Spark version: {spark.version}")

## 2. Read Delta Lake Table

In [3]:
# Define Delta table path
delta_table_path = "s3a://delta-lake/tables/customers"

# Read Delta table
df = spark.read.format("delta").load(delta_table_path)

# Show schema
print("Table Schema:")
df.printSchema()

# Show data
print("\nTable Data:")
df.show(truncate=False)

Py4JJavaError: An error occurred while calling o42.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: delta. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: delta.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 15 more


## 3. Basic Queries

In [None]:
# Count total records
total_count = df.count()
print(f"Total records: {total_count}")

# Show statistics
df.describe().show()

In [None]:
# Filter by name
from pyspark.sql.functions import col

# Example: Filter customers whose name contains 'John'
filtered_df = df.filter(col("name").contains("John"))
filtered_df.show(truncate=False)

## 4. Delta Lake Time Travel

In [None]:
# View Delta Lake table history
deltaTable = DeltaTable.forPath(spark, delta_table_path)

print("Delta Table History:")
deltaTable.history().select("version", "timestamp", "operation", "operationMetrics").show(truncate=False)

In [None]:
# Read a specific version (time travel)
# Example: Read version 0
version_0_df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
print("Data at version 0:")
version_0_df.show(truncate=False)

## 5. SQL Queries

In [None]:
# Register table as SQL temporary view
df.createOrReplaceTempView("customers")

# Run SQL query
result = spark.sql("""
    SELECT 
        name,
        email,
        DATE(created_at) as signup_date
    FROM customers
    ORDER BY created_at DESC
""")

result.show(truncate=False)

## 6. Data Visualization (Optional)

In [None]:
# Convert to Pandas for visualization
import pandas as pd
import matplotlib.pyplot as plt

pandas_df = df.toPandas()
print(pandas_df.head())

# Example: Plot count by operation type (if available)
if 'operation' in pandas_df.columns:
    operation_counts = pandas_df['operation'].value_counts()
    operation_counts.plot(kind='bar', title='Operations Count')
    plt.show()

## 7. Cleanup

In [None]:
# Stop Spark session (optional)
# spark.stop()