%undefined
## PySpark on Databricks Serverless

**Key Point**: SparkSession is **already available** as `spark` - no need to create it!

This notebook demonstrates PySpark usage on Serverless compute, from basic to advanced examples.

In [0]:
# SparkSession is already available as 'spark'
print(f"Spark Version: {spark.version}")
print(f"\nSparkSession is ready to use!")

Spark Version: 4.0.0

SparkSession is ready to use!


In [0]:
# Example 1: Create DataFrame from Python list
data = [
    (1, "Alice", 28, "Engineering", 95000),
    (2, "Bob", 35, "Sales", 75000),
    (3, "Charlie", 42, "Engineering", 110000),
    (4, "Diana", 31, "Marketing", 80000),
    (5, "Eve", 29, "Engineering", 98000),
    (6, "Frank", 45, "Sales", 85000),
    (7, "Grace", 33, "Marketing", 82000),
    (8, "Henry", 38, "Engineering", 105000)
]

columns = ["id", "name", "age", "department", "salary"]

df = spark.createDataFrame(data, columns)

print("DataFrame created successfully!")
df.printSchema()
display(df)

DataFrame created successfully!
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)



id,name,age,department,salary
1,Alice,28,Engineering,95000
2,Bob,35,Sales,75000
3,Charlie,42,Engineering,110000
4,Diana,31,Marketing,80000
5,Eve,29,Engineering,98000
6,Frank,45,Sales,85000
7,Grace,33,Marketing,82000
8,Henry,38,Engineering,105000


In [0]:
from pyspark.sql import functions as F

# Filter, select, and add computed columns
df_transformed = df.filter(F.col("age") > 30) \
    .select(
        "name",
        "department",
        "age",
        "salary",
        (F.col("salary") * 0.1).alias("bonus"),
        F.when(F.col("salary") > 90000, "Senior")
         .otherwise("Junior").alias("level")
    ) \
    .orderBy(F.col("salary").desc())

print(f"Filtered records: {df_transformed.count()}")
display(df_transformed)

Filtered records: 6


name,department,age,salary,bonus,level
Charlie,Engineering,42,110000,11000.0,Senior
Henry,Engineering,38,105000,10500.0,Senior
Frank,Sales,45,85000,8500.0,Junior
Grace,Marketing,33,82000,8200.0,Junior
Diana,Marketing,31,80000,8000.0,Junior
Bob,Sales,35,75000,7500.0,Junior


In [0]:
# Group by department and calculate statistics
dept_stats = df.groupBy("department").agg(
    F.count("*").alias("employee_count"),
    F.avg("salary").alias("avg_salary"),
    F.min("salary").alias("min_salary"),
    F.max("salary").alias("max_salary"),
    F.avg("age").alias("avg_age")
).orderBy(F.col("avg_salary").desc())

print("Department Statistics:")
display(dept_stats)

Department Statistics:


department,employee_count,avg_salary,min_salary,max_salary,avg_age
Engineering,4,102000.0,95000,110000,34.25
Marketing,2,81000.0,80000,82000,32.0
Sales,2,80000.0,75000,85000,40.0


In [0]:
# Create a second DataFrame for joining
projects_data = [
    (1, "Project Alpha", 50),
    (2, "Project Beta", 30),
    (3, "Project Gamma", 70),
    (5, "Project Delta", 40),
    (8, "Project Epsilon", 60)
]

projects_df = spark.createDataFrame(projects_data, ["id", "project_name", "completion_percentage"])

# Perform inner join
joined_df = df.join(projects_df, on="id", how="inner") \
    .select(
        "name",
        "department",
        "project_name",
        "completion_percentage",
        "salary"
    )

print("Employees with assigned projects:")
display(joined_df)

Employees with assigned projects:


name,department,project_name,completion_percentage,salary
Alice,Engineering,Project Alpha,50,95000
Bob,Sales,Project Beta,30,75000
Charlie,Engineering,Project Gamma,70,110000
Eve,Engineering,Project Delta,40,98000
Henry,Engineering,Project Epsilon,60,105000


In [0]:
from pyspark.sql.window import Window

# Rank employees by salary within each department
window_spec = Window.partitionBy("department").orderBy(F.col("salary").desc())

df_ranked = df.withColumn("rank_in_dept", F.rank().over(window_spec)) \
    .withColumn("dense_rank_in_dept", F.dense_rank().over(window_spec)) \
    .withColumn("row_number_in_dept", F.row_number().over(window_spec))

print("Employee rankings within departments:")
display(df_ranked.orderBy("department", "rank_in_dept"))

Employee rankings within departments:


id,name,age,department,salary,rank_in_dept,dense_rank_in_dept,row_number_in_dept
3,Charlie,42,Engineering,110000,1,1,1
8,Henry,38,Engineering,105000,2,2,2
5,Eve,29,Engineering,98000,3,3,3
1,Alice,28,Engineering,95000,4,4,4
7,Grace,33,Marketing,82000,1,1,1
4,Diana,31,Marketing,80000,2,2,2
6,Frank,45,Sales,85000,1,1,1
2,Bob,35,Sales,75000,2,2,2


In [0]:
from pyspark.sql.types import StringType

# Define a Python UDF
def categorize_salary(salary):
    if salary >= 100000:
        return "High"
    elif salary >= 80000:
        return "Medium"
    else:
        return "Low"

# Register UDF
categorize_udf = F.udf(categorize_salary, StringType())

# Apply UDF
df_with_category = df.withColumn("salary_category", categorize_udf(F.col("salary")))

print("DataFrame with salary categories:")
display(df_with_category.select("name", "salary", "salary_category").orderBy("salary", ascending=False))

DataFrame with salary categories:


name,salary,salary_category
Charlie,110000,High
Henry,105000,High
Eve,98000,Medium
Alice,95000,Medium
Frank,85000,Medium
Grace,82000,Medium
Diana,80000,Medium
Bob,75000,Low


In [0]:
# Pivot example: salary ranges by department
df_with_range = df.withColumn(
    "age_group",
    F.when(F.col("age") < 30, "20s")
     .when(F.col("age") < 40, "30s")
     .otherwise("40s+")
)

pivot_df = df_with_range.groupBy("department").pivot("age_group").agg(
    F.avg("salary").alias("avg_salary"),
    F.count(F.lit(1)).alias("count")
)

print("Pivoted salary data by age group:")
display(pivot_df)

Pivoted salary data by age group:


department,20s_avg_salary,20s_count,30s_avg_salary,30s_count,40s+_avg_salary,40s+_count
Engineering,96500.0,2.0,105000.0,1,110000.0,1.0
Sales,,,75000.0,1,85000.0,1.0
Marketing,,,81000.0,2,,


%undefined
## Key Takeaways for PySpark on Serverless

1. **No SparkSession creation needed** - use the pre-configured `spark` object
2. **All PySpark APIs work** - DataFrames, SQL, MLlib, etc.
3. **Serverless auto-scales** - no cluster management required
4. **Use `display()`** - for rich DataFrame visualization in notebooks

### For Flyte Connector:
```python
# In your Flyte tasks, simply use:
df = spark.createDataFrame(data, schema)
result = df.filter(...).groupBy(...).agg(...)
```

No special initialization required!