# [Learning Spark Second Edition](https://github.com/databricks/LearningSparkV2)


_all-spark-notebook_ 

### [Chapter Five](https://learning.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch05.html)
> User Defined Functions (UDF)

In [12]:
from pyspark.sql.types import LongType
from pyspark.sql import SparkSession


# Create cubed function
def cubed(s):
    return s * s * s


# Create a SparkSession
spark = (SparkSession.builder.appName("SparkSQLExampleApp").getOrCreate())

# Register UDF
spark.udf.register("cubed", cubed, LongType())

# Generate temporary view
spark.range(1, 9).createOrReplaceTempView("udf_test")

In [13]:
#Query the cubed UDF
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()

+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+



### [Chapter Five](https://learning.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch05.html)
> Pandas UDF

In [3]:
# pandas
import pandas as pd

# Import various pyspark SQL functions including pandas_udf
from pyspark.sql.types import LongType

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf
# Create a SparkSession
spark = (SparkSession.builder.appName("SparkSQLExampleApp").getOrCreate())


# Declare the cubed function
def cubed(a: pd.Series) -> pd.Series:
    return a * a * a


# Create the pandas UDF for the cubed function
cubed_udf = pandas_udf(cubed, returnType=LongType())

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.range(1, 4)

#  Execute function as a Spark vectorized UDF
df.select("id", cubed(col("id"))).show()

+---+----------------+
| id|((id * id) * id)|
+---+----------------+
|  1|               1|
|  2|               8|
|  3|              27|
+---+----------------+



### [Chapter Five](https://learning.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch05.html)
> Higher order functions: TRANSFORM, FILTER, EXISTS

In [12]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession

# Create a SparkSession
spark = (SparkSession.builder.appName("SparkSQLExampleApp").getOrCreate())

#create dataframe
schema = StructType([StructField("celsius", ArrayType(IntegerType()))])

t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
t_c = spark.createDataFrame(t_list, schema)
t_c.createOrReplaceTempView("tC")

# Show the DataFrame
t_c.show()

+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+



In [13]:
# Calculate Fahrenheit from Celsius for an array of temperatures
spark.sql(
    """
SELECT celsius, 
 transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit 
  FROM tC
"""
).show()

+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+



In [14]:
#Filter temperatures > 38C for array of temperatures
spark.sql(
    """
SELECT celsius, 
 filter(celsius, t -> t > 38) as high 
  FROM tC
"""
).show()

+--------------------+--------+
|             celsius|    high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]|
|[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+



In [15]:
# Is there a temperature of 38C in the array of temperatures
spark.sql(
    """
SELECT celsius, 
       exists(celsius, t -> t = 38) as threshold
  FROM tC
"""
).show()

+--------------------+---------+
|             celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...|     true|
|[31, 32, 34, 55, 56]|    false|
+--------------------+---------+



### [Chapter Five](https://learning.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch05.html)
> Common DataFrames and Spark SQL Operations: Union and Join 

In [2]:
import os
from pyspark.sql import SparkSession

PARENT_DIR = os.popen('dirname $PWD').read().strip()

# Create a SparkSession
spark = (SparkSession.builder.appName("SparkSQLExampleApp").getOrCreate())

#create dataframe

# Set file paths
from pyspark.sql.functions import expr

tripdelaysFilePath = os.path.join(
    PARENT_DIR,
    "databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
)
airportsnaFilePath = os.path.join(
    PARENT_DIR,
    "databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"
)

# Obtain airports data set
airportsna = (
    spark.read.format("csv").options(
        header="true", inferSchema="true", sep="\t"
    ).load(airportsnaFilePath)
)

airportsna.createOrReplaceTempView("airports_na")

# Obtain departure delays data set
departureDelays = (
    spark.read.format("csv").options(header="true").load(tripdelaysFilePath)
)

departureDelays = (
    departureDelays.withColumn("delay",
                               expr("CAST(delay as INT) as delay")).withColumn(
                                   "distance",
                                   expr("CAST(distance as INT) as distance")
                               )
)

departureDelays.createOrReplaceTempView("departureDelays")

# Create temporary small table
foo = (
    departureDelays.filter(
        expr(
            """origin == 'SEA' and destination == 'SFO' and 
    date like '01010%' and delay > 0"""
        )
    )
)
foo.createOrReplaceTempView("foo")

In [3]:
spark.sql("SELECT * FROM airports_na LIMIT 10").show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+



In [4]:
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+



In [5]:
spark.sql("SELECT * FROM foo").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [6]:
# Union two tables
bar = departureDelays.union(foo)
bar.createOrReplaceTempView("bar")

# Show the union (filtering for SEA and SFO in a specific time range)
bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
AND date LIKE '01010%' AND delay > 0""")).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [7]:
#Union in SQL
spark.sql("""
SELECT * 
  FROM bar 
 WHERE origin = 'SEA' 
   AND destination = 'SFO' 
   AND date LIKE '01010%' 
   AND delay > 0
""").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [8]:
# Join in  SQL
spark.sql("""
SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination 
  FROM foo f
  JOIN airports_na a
    ON a.IATA = f.origin
""").show()

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



### [Chapter Five](https://learning.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch05.html)
> Common DataFrames and Spark SQL Operations: Windowing

In [6]:
import os

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import expr

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .getOrCreate()

PARENT_DIR = os.popen('dirname $PWD').read().strip()


# Set file paths

tripdelaysFilePath = os.path.join(
    PARENT_DIR,
    "databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
)
airportsnaFilePath = os.path.join(
    PARENT_DIR,
    "databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"
)

# Obtain airports data set
airportsna = (
    spark.read.format("csv").options(
        header="true", inferSchema="true", sep="\t"
    ).load(airportsnaFilePath)
)

airportsna.createOrReplaceTempView("airports_na")

# Obtain departure delays data set
departureDelays = (
    spark.read.format("csv").options(header="true").load(tripdelaysFilePath)
)

departureDelays = (
    departureDelays.withColumn("delay",
                               expr("CAST(delay as INT) as delay")).withColumn(
                                   "distance",
                                   expr("CAST(distance as INT) as distance")
                               )
)


departureDelays.createOrReplaceTempView("departureDelays")



# Create temporary small table
foo = (
    departureDelays.filter(
        expr(
            """origin == 'SEA' and destination == 'SFO' and 
    date like '01010%' and delay > 0"""
        )
    )
)
foo.createOrReplaceTempView("foo")


In [7]:
# Windowing : dense_rank

spark.sql('DROP VIEW IF EXISTS departureDelaysWindow')


spark.sql("""
CREATE TEMP VIEW departureDelaysWindow  AS  
SELECT origin, destination, SUM(delay) AS TotalDelays 
  FROM departureDelays 
 WHERE origin IN ('SEA', 'SFO', 'JFK') 
   AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL') 
 GROUP BY origin, destination

 """)


spark.sql("""
SELECT * FROM departureDelaysWindow  
 """).show(10)

+------+-----------+-----------+
|origin|destination|TotalDelays|
+------+-----------+-----------+
|   JFK|        ORD|       5608|
|   SEA|        LAX|       9359|
|   JFK|        SFO|      35619|
|   SFO|        ORD|      27412|
|   JFK|        DEN|       4315|
|   SFO|        DEN|      18688|
|   SFO|        SEA|      17080|
|   SEA|        SFO|      22293|
|   JFK|        ATL|      12141|
|   SFO|        ATL|       5091|
+------+-----------+-----------+
only showing top 10 rows



In [8]:
#
spark.sql("""SELECT origin, destination, SUM(TotalDelays) AS TotalDelays
 FROM departureDelaysWindow
WHERE origin = 'JFK'
GROUP BY origin, destination
ORDER BY SUM(TotalDelays) DESC
LIMIT 3
""").show()

+------+-----------+-----------+
|origin|destination|TotalDelays|
+------+-----------+-----------+
|   JFK|        LAX|      35755|
|   JFK|        SFO|      35619|
|   JFK|        ATL|      12141|
+------+-----------+-----------+



In [9]:
# dense_rank
 
spark.sql("""
SELECT origin, destination, TotalDelays, rank 
  FROM ( 
     SELECT origin, destination, TotalDelays, dense_rank() 
       OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank 
       FROM departureDelaysWindow
  ) t 
 WHERE rank <= 4
""").show()

+------+-----------+-----------+----+
|origin|destination|TotalDelays|rank|
+------+-----------+-----------+----+
|   SEA|        SFO|      22293|   1|
|   SEA|        DEN|      13645|   2|
|   SEA|        ORD|      10041|   3|
|   SEA|        LAX|       9359|   4|
|   SFO|        LAX|      40798|   1|
|   SFO|        ORD|      27412|   2|
|   SFO|        JFK|      24100|   3|
|   SFO|        DEN|      18688|   4|
|   JFK|        LAX|      35755|   1|
|   JFK|        SFO|      35619|   2|
|   JFK|        ATL|      12141|   3|
|   JFK|        SEA|       7856|   4|
+------+-----------+-----------+----+



In [10]:
#add column

from pyspark.sql.functions import expr
foo2 = (foo.withColumn(
          "status", 
          expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")
        ))
foo2.show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+



In [11]:
# drop column
foo3 = foo2.drop("delay")
foo3.show()

+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|Delayed|
|01010955|     590|   SEA|        SFO|Delayed|
|01010730|     590|   SEA|        SFO|On-time|
+--------+--------+------+-----------+-------+



In [12]:
#renaming columns
foo4 = foo3.withColumnRenamed("status", "flight_status")
foo4.show()


+--------+--------+------+-----------+-------------+
|    date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710|     590|   SEA|        SFO|      Delayed|
|01010955|     590|   SEA|        SFO|      Delayed|
|01010730|     590|   SEA|        SFO|      On-time|
+--------+--------+------+-----------+-------------+



### [Chapter Five](https://learning.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch05.html)
> Common DataFrames and Spark SQL Operations: Pivoting

In [6]:
import os

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import expr

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .getOrCreate()

PARENT_DIR = os.popen('dirname $PWD').read().strip()


# Set file paths

tripdelaysFilePath = os.path.join(
    PARENT_DIR,
    "databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
)
airportsnaFilePath = os.path.join(
    PARENT_DIR,
    "databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"
)

# Obtain airports data set
airportsna = (
    spark.read.format("csv").options(
        header="true", inferSchema="true", sep="\t"
    ).load(airportsnaFilePath)
)

airportsna.createOrReplaceTempView("airports_na")

# Obtain departure delays data set
departureDelays = (
    spark.read.format("csv").options(header="true").load(tripdelaysFilePath)
)

departureDelays = (
    departureDelays.withColumn("delay",
                               expr("CAST(delay as INT) as delay")).withColumn(
                                   "distance",
                                   expr("CAST(distance as INT) as distance")
                               )
)


departureDelays.createOrReplaceTempView("departureDelays")




In [13]:
spark.sql("""
SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay 
  FROM departureDelays 
 WHERE origin = 'SEA'
 """)

DataFrame[destination: string, month: int, delay: int]

In [15]:
spark.sql("""
SELECT * FROM (
SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay 
  FROM departureDelays WHERE origin = 'SEA' 
) 
PIVOT (
  CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay
  FOR month IN (1 JAN, 2 FEB)
)
ORDER BY destination
""").show()

+-----------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|
+-----------+------------+------------+------------+------------+
|        ABQ|       19.86|         316|       11.42|          69|
|        ANC|        4.44|         149|        7.90|         141|
|        ATL|       11.98|         397|        7.73|         145|
|        AUS|        3.48|          50|       -0.21|          18|
|        BOS|        7.84|         110|       14.58|         152|
|        BUR|       -2.03|          56|       -1.89|          78|
|        CLE|       16.00|          27|        null|        null|
|        CLT|        2.53|          41|       12.96|         228|
|        COS|        5.32|          82|       12.18|         203|
|        CVG|       -0.50|           4|        null|        null|
|        DCA|       -1.15|          50|        0.07|          34|
|        DEN|       13.13|         425|       12.95|         625|
|        D