In [1]:
import os, sys
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
print("PYSPARK_PYTHON =", os.environ["PYSPARK_PYTHON"])
print("PYSPARK_DRIVER_PYTHON =", os.environ["PYSPARK_DRIVER_PYTHON"])


PYSPARK_PYTHON = c:\training\Databricks\AssetBundles\dab_project\.venv_pyspark\Scripts\python.exe
PYSPARK_DRIVER_PYTHON = c:\training\Databricks\AssetBundles\dab_project\.venv_pyspark\Scripts\python.exe


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, when, avg, count, sum as Fsum

spark = (
    SparkSession.builder
    .master("local[2]")
    .appName("local-display-only")
    .getOrCreate()
)

data = [
    (1, "bike-101", "2025-12-24 08:10:00", "2025-12-24 08:34:00", "member"),
    (2, "bike-102", "2025-12-24 09:00:00", "2025-12-24 09:07:00", "casual"),
    (3, "bike-101", "2025-12-24 10:15:00", "2025-12-24 10:55:00", "member"),
    (4, "bike-103", "2025-12-24 11:05:00", "2025-12-24 11:18:00", "casual"),
    (5, "bike-102", "2025-12-24 12:00:00", "2025-12-24 12:42:00", "member"),
]

df = spark.createDataFrame(data, ["ride_id", "bike_id", "start_ts", "end_ts", "rider_type"])

df2 = (
    df.withColumn("start_ts", to_timestamp("start_ts"))
      .withColumn("end_ts", to_timestamp("end_ts"))
      .withColumn("duration_mins", (col("end_ts").cast("long") - col("start_ts").cast("long"))/60.0)
      .withColumn("bucket",
                  when(col("duration_mins") < 10, "short")
                  .when(col("duration_mins") < 30, "medium")
                  .otherwise("long"))
)

agg = (
    df2.groupBy("rider_type", "bucket")
       .agg(
           count("*").alias("rides"),
           avg("duration_mins").alias("avg_duration"),
           Fsum("duration_mins").alias("total_duration")
       )
       .orderBy("rider_type", "bucket")
)

print("=== Sample rows ===")
df2.show(truncate=False)

print("=== Aggregation ===")
agg.show(truncate=False)

# If you're in a notebook, this is nicer:
# display(df2)
# display(agg)


=== Sample rows ===
+-------+--------+-------------------+-------------------+----------+-------------+------+
|ride_id|bike_id |start_ts           |end_ts             |rider_type|duration_mins|bucket|
+-------+--------+-------------------+-------------------+----------+-------------+------+
|1      |bike-101|2025-12-24 08:10:00|2025-12-24 08:34:00|member    |24.0         |medium|
|2      |bike-102|2025-12-24 09:00:00|2025-12-24 09:07:00|casual    |7.0          |short |
|3      |bike-101|2025-12-24 10:15:00|2025-12-24 10:55:00|member    |40.0         |long  |
|4      |bike-103|2025-12-24 11:05:00|2025-12-24 11:18:00|casual    |13.0         |medium|
|5      |bike-102|2025-12-24 12:00:00|2025-12-24 12:42:00|member    |42.0         |long  |
+-------+--------+-------------------+-------------------+----------+-------------+------+

=== Aggregation ===
+----------+------+-----+------------+--------------+
|rider_type|bucket|rides|avg_duration|total_duration|
+----------+------+-----+-------

Unnamed: 0,rider_type,bucket,rides,avg_duration,total_duration
0,casual,medium,1,13.0,13.0
1,casual,short,1,7.0,7.0
2,member,long,2,41.0,82.0
3,member,medium,1,24.0,24.0
