In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, col, month, dayofmonth, desc

In [2]:
# Create a sapr
spark = SparkSession.builder \
    .appName("SparkSQLExampleAPP") \
    .config('spark.sql.catalogImplementation', 'hive') \
    .enableHiveSupport() \
    .getOrCreate()

# Set log level to ERROR to reduce verbosity
spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/01 19:28:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# create schema
schema = "`date` string, `delay` INT, `distance` INT, `origin` STRING, `destination` STRING" 


In [4]:
df = spark.read.csv('departuredelays.csv', schema=schema, header=True)

In [5]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [6]:
df.show(5)

                                                                                

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 5 rows



In [7]:
df = df.withColumn('flight_date', to_timestamp(col('date'), 'MMddHHmm'))

In [8]:
df.show(5)

+--------+-----+--------+------+-----------+-------------------+
|    date|delay|distance|origin|destination|        flight_date|
+--------+-----+--------+------+-----------+-------------------+
|01011245|    6|     602|   ABE|        ATL|1970-01-01 12:45:00|
|01020600|   -8|     369|   ABE|        DTW|1970-01-02 06:00:00|
|01021245|   -2|     602|   ABE|        ATL|1970-01-02 12:45:00|
|01020605|   -4|     602|   ABE|        ATL|1970-01-02 06:05:00|
|01031245|   -4|     602|   ABE|        ATL|1970-01-03 12:45:00|
+--------+-----+--------+------+-----------+-------------------+
only showing top 5 rows



In [9]:
## Create a temporary view - this only exists for the duration of the Spark session
df.createOrReplaceTempView('us_delay_flights_tbl')

In [10]:
spark.sql(
    """
select *
from us_delay_flights_tbl
limit 5
"""
).show()

+--------+-----+--------+------+-----------+-------------------+
|    date|delay|distance|origin|destination|        flight_date|
+--------+-----+--------+------+-----------+-------------------+
|01011245|    6|     602|   ABE|        ATL|1970-01-01 12:45:00|
|01020600|   -8|     369|   ABE|        DTW|1970-01-02 06:00:00|
|01021245|   -2|     602|   ABE|        ATL|1970-01-02 12:45:00|
|01020605|   -4|     602|   ABE|        ATL|1970-01-02 06:05:00|
|01031245|   -4|     602|   ABE|        ATL|1970-01-03 12:45:00|
+--------+-----+--------+------+-----------+-------------------+



In [11]:
spark.sql("""select date, delay, distance, origin, destination 
          from us_delay_flights_tbl
          where distance > 1000
          order by distance desc
          """).show()

[Stage 3:>                                                          (0 + 8) / 8]

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011625|   -4|    4330|   HNL|        JFK|
|03011625|   -1|    4330|   HNL|        JFK|
|02011625|   -1|    4330|   HNL|        JFK|
|03291530|   37|    4330|   HNL|        JFK|
|01211625|  115|    4330|   HNL|        JFK|
|03021625|   14|    4330|   HNL|        JFK|
|02271625|   -7|    4330|   HNL|        JFK|
|03051625|   -6|    4330|   HNL|        JFK|
|01021625|  110|    4330|   HNL|        JFK|
|03061625|   -2|    4330|   HNL|        JFK|
|02021625|   -5|    4330|   HNL|        JFK|
|03071625|   -1|    4330|   HNL|        JFK|
|01031625|   -1|    4330|   HNL|        JFK|
|03081530|    4|    4330|   HNL|        JFK|
|02051625|   -8|    4330|   HNL|        JFK|
|03091530|   -7|    4330|   HNL|        JFK|
|01041625|   -7|    4330|   HNL|        JFK|
|03121530|   -3|    4330|   HNL|        JFK|
|02061625|   -9|    4330|   HNL|        JFK|
|03131530|

                                                                                

spark.sql("""select date, delay, distance, origin, destination
          from fire_calls
          where origin = 'SFO' and destination = 'ORD'
                and delay >= 120
          order by delay desc
          """).show()

In [12]:
spark.sql("""
select delay, origin, destination,
          case
            when delay > 360 then 'Very Long Delays'
            when delay >= 120 and delay <= 120 then 'Short Delays'
            when delay >= 60 and delay < 120 then 'Short Delays'
            when delay > 0 and delay <60 then 'Tolerable Delays'
            when delay = 0 then 'No Delays'
            else 'Early'
            end as flight_delays
          from us_delay_flights_tbl
          order by delay desc         
""").show(10)

[Stage 4:>                                                          (0 + 8) / 8]

+-----+------+-----------+----------------+
|delay|origin|destination|   flight_delays|
+-----+------+-----------+----------------+
| 1642|   TPA|        DFW|Very Long Delays|
| 1638|   SFO|        ORD|Very Long Delays|
| 1636|   FLL|        DFW|Very Long Delays|
| 1592|   RSW|        ORD|Very Long Delays|
| 1560|   BNA|        DFW|Very Long Delays|
| 1553|   PDX|        DFW|Very Long Delays|
| 1543|   CLE|        DFW|Very Long Delays|
| 1511|   MCO|        ORD|Very Long Delays|
| 1500|   EGE|        JFK|Very Long Delays|
| 1496|   ONT|        DFW|Very Long Delays|
+-----+------+-----------+----------------+
only showing top 10 rows



                                                                                

In [13]:
(df.select('distance', 'origin', 'destination') \
    .where(col('distance') > 1000) \
    .orderBy(desc('distance'))).show(10)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows



                                                                                

In [14]:
df.select('distance','origin', 'destination') \
    .where('distance > 1000') \
    .orderBy('distance', ascending = False).show(10)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows



In [15]:
spark.sql('create database learn_spark_db')

DataFrame[]

In [16]:
spark.sql("""CREATE TABLE managed_us_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING)""")

DataFrame[]

In [19]:
spark.sql(
    """
select *
from managed_us_flights_tbl
"""
).show()

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
+----+-----+--------+------+-----------+



In [20]:
spark.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='Default Hive database', locationUri='file:/Users/yasony/Library/Mobile%2520Documents/com~apple~CloudDocs/GitHub/ApachSpark/spark-warehouse'),
 Database(name='learn_spark_db', catalog='spark_catalog', description='', locationUri='file:/Users/yasony/Library/Mobile%2520Documents/com~apple~CloudDocs/GitHub/ApachSpark/spark-warehouse/learn_spark_db.db')]

In [24]:
spark.catalog.listTables()

[Table(name='managed_us_delay_flights_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='managed_us_flights_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='us_delay_flights_tbl', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
spark.catalog.listColumns('us_delay_flights_tbl')

[Column(name='date', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='delay', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='distance', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='origin', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='destination', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='flight_date', description=None, dataType='timestamp', nullable=True, isPartition=False, isBucket=False)]

25/10/01 21:15:18 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE