# Lance-Spark Getting Started

This notebook demonstrates how to use Lance with Apache Spark for reading and writing Lance datasets.

## 1. Initialize Spark Session

The Spark session is already configured with the Lance catalog in the Docker container.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

# Get the existing Spark session
spark = (
    SparkSession.builder
        # Directory namespace
        .config("spark.sql.catalog.lance_dir", "com.lancedb.lance.spark.LanceNamespaceSparkCatalog")
        .config("spark.sql.catalog.lance_dir.impl", "dir")
        .config("spark.sql.catalog.lance_dir.root", "s3://lance-warehouse/dir_ns")
        .config("spark.sql.catalog.lance_dir.storage.endpoint", "http://minio:9000")
        .config("spark.sql.catalog.lance_dir.storage.aws_allow_http", "true")
        .config("spark.sql.catalog.lance_dir.storage.access_key_id", "admin")
        .config("spark.sql.catalog.lance_dir.storage.secret_access_key", "password")
        .config("spark.sql.catalog.lance_dir.storage.region", "us-east-1")
        # Glue namespace
        .config("spark.sql.catalog.lance_glue", "com.lancedb.lance.spark.LanceNamespaceSparkCatalog")
        .config("spark.sql.catalog.lance_glue.impl", "glue")
        .config("spark.sql.catalog.lance_glue.root", "s3://lance-warehouse/glue_ns")
        .config("spark.sql.catalog.lance_glue.access_key_id", "xyz")
        .config("spark.sql.catalog.lance_glue.secret_access_key", "abc")
        .config("spark.sql.catalog.lance_glue.region", "us-east-1")
        .config("spark.sql.catalog.lance_glue.storage.endpoint", "http://minio:9000")
        .config("spark.sql.catalog.lance_glue.storage.aws_allow_http", "true")
        .config("spark.sql.catalog.lance_glue.storage.access_key_id", "admin")
        .config("spark.sql.catalog.lance_glue.storage.secret_access_key", "password")
        .getOrCreate()
)

# Enable loading the Spark catalog
spark.sql("set spark.sql.defaultCatalog=lance_dir")
spark.sql("use default")

# Uncomment to use Glue catalog
# spark.sql("set spark.sql.defaultCatalog=lance_glue")
# spark.sql("use default")


# Verify Lance catalog is configured
spark.sql("SHOW CATALOGS").show()

25/08/07 06:10:42 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/08/07 06:10:42 WARN CheckAllocator: More than one DefaultAllocationManager on classpath. Choosing first found


+-------------+
|      catalog|
+-------------+
|    lance_dir|
|spark_catalog|
+-------------+



## 2. Create Sample Data

In [2]:
# Create a sample DataFrame
data = [
    (1, "Alice", 25, "Engineering", 75000),
    (2, "Bob", 30, "Marketing", 65000),
    (3, "Charlie", 35, "Sales", 70000),
    (4, "Diana", 28, "Engineering", 80000),
    (5, "Eve", 32, "HR", 60000)
]

columns = ["id", "name", "age", "department", "salary"]
df = spark.createDataFrame(data, columns)

df.show()

+---+-------+---+-----------+------+
| id|   name|age| department|salary|
+---+-------+---+-----------+------+
|  1|  Alice| 25|Engineering| 75000|
|  2|    Bob| 30|  Marketing| 65000|
|  3|Charlie| 35|      Sales| 70000|
|  4|  Diana| 28|Engineering| 80000|
|  5|    Eve| 32|         HR| 60000|
+---+-------+---+-----------+------+



## 3. Create Table

In [3]:
spark.sql("CREATE TABLE employees (id INT, name STRING, age INT, department STRING, salary INT)");

## 4. Show Tables

In [4]:
spark.sql("SHOW TABLES").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|employees|      false|
+---------+---------+-----------+



## 5. Describe Table

In [5]:
spark.sql("DESCRIBE TABLE EXTENDED employees").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|                  id|      int|   NULL|
|                name|   string|   NULL|
|                 age|      int|   NULL|
|          department|   string|   NULL|
|              salary|      int|   NULL|
|                    |         |       |
|  # Metadata Columns|         |       |
|              _rowid|   bigint|       |
|            _rowaddr|   bigint|       |
|                    |         |       |
|# Detailed Table ...|         |       |
|                Name|employees|       |
|                Type|  MANAGED|       |
|    Table Properties|       []|       |
+--------------------+---------+-------+



## 6. Write Data

In [6]:
df.writeTo("employees").append()

## 7. Simple Query

In [7]:
spark.table("employees").show()

+---+-------+---+-----------+------+
| id|   name|age| department|salary|
+---+-------+---+-----------+------+
|  1|  Alice| 25|Engineering| 75000|
|  2|    Bob| 30|  Marketing| 65000|
|  3|Charlie| 35|      Sales| 70000|
|  4|  Diana| 28|Engineering| 80000|
|  5|    Eve| 32|         HR| 60000|
+---+-------+---+-----------+------+



## 8. Complex Query

In [8]:
result = spark.sql("""
    SELECT department, 
           COUNT(*) as employee_count,
           AVG(salary) as avg_salary
    FROM employees
    GROUP BY department
    ORDER BY avg_salary DESC
""")

result.show()

+-----------+--------------+----------+
| department|employee_count|avg_salary|
+-----------+--------------+----------+
|Engineering|             2|   77500.0|
|      Sales|             1|   70000.0|
|  Marketing|             1|   65000.0|
|         HR|             1|   60000.0|
+-----------+--------------+----------+



## 9. Append More Data

In [9]:
new_data = [
    (6, "Frank", 29, "Engineering", 77000),
    (7, "Grace", 31, "Marketing", 68000)
]

new_df = spark.createDataFrame(new_data, columns)

new_df.writeTo("employees").append()

spark.sql("SELECT * FROM employees ORDER BY id DESC LIMIT 2").show()

+---+-----+---+-----------+------+
| id| name|age| department|salary|
+---+-----+---+-----------+------+
|  7|Grace| 31|  Marketing| 68000|
|  6|Frank| 29|Engineering| 77000|
+---+-----+---+-----------+------+



## 10. Cleanup

In [10]:
spark.sql("DROP TABLE IF EXISTS employees")

DataFrame[]