each new SparkSession you create in a different notebook starts a new SparkContext, which in turn creates a new Spark UI on an incremented port (4040 → 4041 → 4042, etc.).

When you call `SparkSession.builder`..., Spark checks if a SparkContext (sc) already exists.

- If no existing SparkContext is found, Spark creates a new one and assigns the first available UI port (4040).
- If another SparkContext is already running on 4040, the new one will increment to 4041, 4042, etc..

In Jupyter Notebook, each notebook runs in an isolated process.
If you run SparkSession.builder.getOrCreate() in multiple notebooks:
- The first notebook gets port 4040.
- The second notebook gets port 4041.
- The third notebook gets port 4042, and so on.

so, Spark UI for this notebook is `http://localhost:4042/jobs/`

In [15]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test1') \
    .getOrCreate()

25/03/08 15:16:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# GroupBy

In [16]:
df_green = spark.read.parquet('data/green/*/*')

In [17]:
df_green.registerTempTable('green')

In [28]:
df_green_revenue = spark.sql("""
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
ORDER BY
    1, 2
""")

In [29]:
df_green_revenue.show()

+-------------------+----+------------------+--------------+
|               hour|zone|            amount|number_records|
+-------------------+----+------------------+--------------+
|2020-01-01 00:00:00|   7| 769.7299999999997|            45|
|2020-01-01 00:00:00|  17|            195.03|             9|
|2020-01-01 00:00:00|  18|               7.8|             1|
|2020-01-01 00:00:00|  22|              15.8|             1|
|2020-01-01 00:00:00|  24|              87.6|             3|
|2020-01-01 00:00:00|  25|             531.0|            26|
|2020-01-01 00:00:00|  29|              61.3|             1|
|2020-01-01 00:00:00|  32| 68.94999999999999|             2|
|2020-01-01 00:00:00|  33|317.27000000000004|            11|
|2020-01-01 00:00:00|  35|            129.96|             5|
|2020-01-01 00:00:00|  36|295.34000000000003|            11|
|2020-01-01 00:00:00|  37|            175.67|             6|
|2020-01-01 00:00:00|  38| 98.78999999999999|             2|
|2020-01-01 00:00:00|  4

In [20]:
df_green_revenue \
    .repartition(20) \
    .write.parquet('data/report/revenue/green', mode='overwrite')

# materialize the results:
# we saved the result and then we can just load this and continue 
# (because maybe we need it for something else, maybe we want to have a dashboard just for green data separately)

25/03/08 15:16:48 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


In [21]:
df_yellow = spark.read.parquet('data/yellow/*/*')
df_yellow.registerTempTable('yellow')

In [22]:
df_yellow_revenue = spark.sql("""
SELECT 
    date_trunc('hour', tpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    yellow
WHERE
    tpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
""")

In [27]:
df_yellow_revenue \
    .repartition(20) \
    .write.parquet('data/report/revenue/yellow', mode='overwrite')
# materialize the results

                                                                                

# Joins

## Join two large tables

In [30]:
# use materialized (saved) results:

df_green_revenue = spark.read.parquet('data/report/revenue/green')
df_yellow_revenue = spark.read.parquet('data/report/revenue/yellow')

In [31]:
df_green_revenue_tmp = df_green_revenue \
    .withColumnRenamed('amount', 'green_amount') \
    .withColumnRenamed('number_records', 'green_number_records')

df_yellow_revenue_tmp = df_yellow_revenue \
    .withColumnRenamed('amount', 'yellow_amount') \
    .withColumnRenamed('number_records', 'yellow_number_records')

In [34]:
df_join = df_green_revenue_tmp.join(df_yellow_revenue_tmp, on=['hour', 'zone'], how='outer')

# Outer join: includes unmatched rows of one of the tables, or of both tables

In [35]:
df_join.show()

+-------------------+----+------------------+--------------------+------------------+---------------------+
|               hour|zone|      green_amount|green_number_records|     yellow_amount|yellow_number_records|
+-------------------+----+------------------+--------------------+------------------+---------------------+
|2020-01-01 00:00:00|  25|             531.0|                  26| 324.3500000000001|                   16|
|2020-01-01 00:00:00|  35|            129.96|                   5|              NULL|                 NULL|
|2020-01-01 00:00:00|  38| 98.78999999999999|                   2|              NULL|                 NULL|
|2020-01-01 00:00:00|  50|              NULL|                NULL|  4177.48000000001|                  183|
|2020-01-01 00:00:00|  51|              17.8|                   2|              31.0|                    1|
|2020-01-01 00:00:00|  52|             83.33|                   4|              49.8|                    2|
|2020-01-01 00:00:00|  55|  

In [33]:
df_join.write.parquet('data/report/revenue/total', mode='overwrite')

                                                                                

## Join one large table and one small table

In [38]:
df_join = spark.read.parquet('data/report/revenue/total')

In [39]:
df_join

DataFrame[hour: timestamp, zone: bigint, green_amount: double, green_number_records: bigint, yellow_amount: double, yellow_number_records: bigint]

In [40]:
df_zones = spark.read.parquet('zones/')

In [42]:
df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID) 
# 对应column的名字不同, 不能用 on=['hour', 'zone']

In [44]:
df_result.drop('LocationID', 'zone').write.parquet('tmp/revenue-zones')