In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("ch4").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/01/08 18:35:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
file = "../../LearningSparkV2/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"

schema = (
    "`date` STRING,"
    "`delay` INT,"
    "`distance` INT,"
    "`origin` STRING,"
    "`destination` STRING"
)

df = spark.read.format("csv").option("header", "true").schema(schema).load(file)
df.createOrReplaceTempView("us_delay_flights_tbl")

df.printSchema()


root
 |-- date: string (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [3]:
qry = """
select distance, origin, destination
from us_delay_flights_tbl
where distance > 1000
order by distance desc
"""

spark.sql(qry).show(10, truncate=False)

[Stage 0:>                                                          (0 + 8) / 8]

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
+--------+------+-----------+
only showing top 10 rows



                                                                                

In [4]:
df.select("distance", "origin", "destination").where(F.col("distance") > 1000).orderBy(
    F.desc("distance")
).show(10, truncate=False)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
+--------+------+-----------+
only showing top 10 rows



In [5]:
qry = """
select * from (
    select
        substr(date, 0, 2) as month,
        substr(date, 3, 2) as day,
        avg(delay) as avg_delay

    from us_delay_flights_tbl
    group by month, day
) s order by avg_delay desc
"""

spark.sql(qry).show(20, truncate=False)

[Stage 2:>                                                          (0 + 8) / 8]

+-----+---+------------------+
|month|day|avg_delay         |
+-----+---+------------------+
|01   |04 |37.52043085476025 |
|01   |05 |37.04616558565433 |
|01   |03 |34.90262760495319 |
|01   |06 |33.18830573715403 |
|01   |02 |31.266602094557783|
|02   |21 |27.702819560319103|
|01   |07 |25.374344333045613|
|03   |29 |21.36448664352016 |
|02   |06 |21.299418971832765|
|01   |08 |20.21732283464567 |
|02   |14 |19.978383629850022|
|01   |01 |19.966420849144413|
|01   |30 |19.453478096199976|
|01   |11 |17.438488430164806|
|02   |03 |16.91670367315866 |
|01   |10 |16.431458976135005|
|03   |01 |16.28876178698408 |
|03   |02 |15.88726473061251 |
|03   |17 |15.799988157972646|
|02   |05 |15.572676518883416|
+-----+---+------------------+
only showing top 20 rows



                                                                                

In [6]:
(
    df.select(
        F.substring("date", 0, 2).alias("month"),
        F.substring("date", 3, 2).alias("day"),
        "delay",
    )
    .groupBy("month", "day")
    .agg(
        F.avg("delay").alias("avg_delay"),
        F.min("delay").alias("min_delay"),
        F.max("delay").alias("max_delay"),
    )
    .alias("max_delay")
    .orderBy("avg_delay", ascending=False)
    .show(20, truncate=False),
)

[Stage 5:>                                                          (0 + 8) / 8]

+-----+---+------------------+---------+---------+
|month|day|avg_delay         |min_delay|max_delay|
+-----+---+------------------+---------+---------+
|01   |04 |37.52043085476025 |-27      |1159     |
|01   |05 |37.04616558565433 |-25      |1461     |
|01   |03 |34.90262760495319 |-85      |1167     |
|01   |06 |33.18830573715403 |-44      |1054     |
|01   |02 |31.266602094557783|-42      |1321     |
|02   |21 |27.702819560319103|-46      |1511     |
|01   |07 |25.374344333045613|-42      |1149     |
|03   |29 |21.36448664352016 |-29      |908      |
|02   |06 |21.299418971832765|-39      |822      |
|01   |08 |20.21732283464567 |-35      |1024     |
|02   |14 |19.978383629850022|-41      |967      |
|01   |01 |19.966420849144413|-28      |1191     |
|01   |30 |19.453478096199976|-40      |1500     |
|01   |11 |17.438488430164806|-42      |1404     |
|02   |03 |16.91670367315866 |-39      |1417     |
|01   |10 |16.431458976135005|-32      |1180     |
|03   |01 |16.28876178698408 |-

                                                                                

(None,)

In [7]:
spark.sql("create database learn_spark_db")
spark.sql("use learn_spark_db")

DataFrame[]

In [12]:
df.write.saveAsTable("managed_us_delay_flights")

23/01/08 19:18:34 WARN MemoryManager: Total allocation exceeds 95.00% (971,243,508 bytes) of heap memory
Scaling row group sizes to 90.45% for 8 writers
                                                                                

In [14]:
# creates a view
df_sfo = spark.sql("select * from managed_us_delay_flights where origin = 'SFO'")

df_sfo.createOrReplaceGlobalTempView("us_origin_airport_sfo_global_tmp_view")

spark.read.table("global_temp.us_origin_airport_sfo_global_tmp_view").show(10, truncate=False)

# same as:
qry = """
create or replace global temp view us_origin_airport_sfo_global_tmp_view as
    select * from managed_us_delay_flights where origin = 'SFO';
"""

# temporary view is tied to a single SparkSession
# global temporary view is visible to all sessions

+--------+-----+--------+------+-----------+
|date    |delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011250|55   |2247    |SFO   |JFK        |
|01012230|0    |2247    |SFO   |JFK        |
|01010705|-7   |2247    |SFO   |JFK        |
|01010620|-3   |2246    |SFO   |MIA        |
|01010915|-3   |293     |SFO   |LAX        |
|01011005|-8   |1273    |SFO   |DFW        |
|01011800|0    |1604    |SFO   |ORD        |
|01011740|-7   |293     |SFO   |LAX        |
|01012015|-7   |293     |SFO   |LAX        |
|01012110|-1   |2246    |SFO   |MIA        |
+--------+-----+--------+------+-----------+
only showing top 10 rows

