- Adapted from: https://docs.databricks.com/aws/en/dev-tools/databricks-connect/python/examples
- Key items:
    - Use U2M OAuth (previously set up by Databricks CLI and profile)
    - Use serverless compute during spark session init
    - Use unity catalog 3-level name space

## 1. Initialize Spark Session to Remote Serverless Compute

In [None]:
from databricks.connect import DatabricksSession


# init the spark session to connect to the workspace serverless compute:
spark = DatabricksSession.builder.profile("joshuale-common").serverless(True).getOrCreate()

In [None]:
# you can validate the session:
DatabricksSession.builder.validateSession(True).profile("joshuale-common").serverless(True).getOrCreate()

<pyspark.sql.connect.session.SparkSession at 0x76714c104650>

## 2. Reading an Existing Table

In [None]:
# Read a table from the Databricks catalog
# This operation will be executed on the serverless compute as dictated by the spark session.
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

+--------------------+---------------------+-------------+-----------+----------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
+--------------------+---------------------+-------------+-----------+----------+-----------+
| 2016-02-13 21:47:53|  2016-02-13 21:57:15|          1.4|        8.0|     10103|      10110|
| 2016-02-13 18:29:09|  2016-02-13 18:37:23|         1.31|        7.5|     10023|      10023|
| 2016-02-06 19:40:58|  2016-02-06 19:52:32|          1.8|        9.5|     10001|      10018|
| 2016-02-12 19:06:43|  2016-02-12 19:20:54|          2.3|       11.5|     10044|      10111|
| 2016-02-23 10:27:56|  2016-02-23 10:58:33|          2.6|       18.5|     10199|      10022|
+--------------------+---------------------+-------------+-----------+----------+-----------+
only showing top 5 rows


## 3. Create, Add Data to, Query from a Table

In [11]:
from pyspark.sql.types import StringType, IntegerType, DateType, StructType, StructField
from datetime import date

catalog_name = "workspace"
schema_name = "bronze"
table_name = "demo_temps_table"
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
  StructField('AirportCode', StringType(), False),
  StructField('Date', DateType(), False),
  StructField('TempHighF', IntegerType(), False),
  StructField('TempLowF', IntegerType(), False)
])

data = [
  [ 'BLI', date(2021, 4, 3), 52, 43],
  [ 'BLI', date(2021, 4, 2), 50, 38],
  [ 'BLI', date(2021, 4, 1), 52, 41],
  [ 'PDX', date(2021, 4, 3), 64, 45],
  [ 'PDX', date(2021, 4, 2), 61, 41],
  [ 'PDX', date(2021, 4, 1), 66, 39],
  [ 'SEA', date(2021, 4, 3), 57, 43],
  [ 'SEA', date(2021, 4, 2), 54, 39],
  [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

In [7]:
df.show()

+--------------------+---------------------+-------------+-----------+----------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
+--------------------+---------------------+-------------+-----------+----------+-----------+
| 2016-02-13 21:47:53|  2016-02-13 21:57:15|          1.4|        8.0|     10103|      10110|
| 2016-02-13 18:29:09|  2016-02-13 18:37:23|         1.31|        7.5|     10023|      10023|
| 2016-02-06 19:40:58|  2016-02-06 19:52:32|          1.8|        9.5|     10001|      10018|
| 2016-02-12 19:06:43|  2016-02-12 19:20:54|          2.3|       11.5|     10044|      10111|
| 2016-02-23 10:27:56|  2016-02-23 10:58:33|          2.6|       18.5|     10199|      10022|
| 2016-02-13 00:41:43|  2016-02-13 00:46:52|          1.4|        6.5|     10023|      10069|
| 2016-02-18 23:49:53|  2016-02-19 00:12:53|         10.4|       31.0|     11371|      10003|
| 2016-02-18 20:21:45|  2016-02-18 20:38:23|        10.15|  

In [None]:
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql(f'USE {catalog_name}.{schema_name}')
spark.sql(f'DROP TABLE IF EXISTS {catalog_name}.{schema_name}.{table_name}')
temps.write.saveAsTable(f'{catalog_name}.{schema_name}.{table_name}')

In [12]:

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql(f"SELECT * FROM {catalog_name}.{schema_name}.{table_name} " \
  "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
  "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
  "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

+-----------+----------+---------+--------+
|AirportCode|      Date|TempHighF|TempLowF|
+-----------+----------+---------+--------+
|        PDX|2021-04-03|       64|      45|
|        PDX|2021-04-02|       61|      41|
|        SEA|2021-04-03|       57|      43|
|        SEA|2021-04-02|       54|      39|
+-----------+----------+---------+--------+



In [None]:
# Clean up by deleting the table from the Databricks cluster.
spark.sql(f'DROP TABLE IF EXISTS {catalog_name}.{schema_name}.{table_name}')