##  Section 17 Apache Spark using Python - Joining Data sets

In [1]:
# 200 Prepare Datasets for Joining Spark Data Frames

In [2]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession. \
        builder. \
        config('spark.ui.port','0'). \
        config('spark.sql.warehouse.dir',f'/user/{username}/warehouse'). \
        config('spark.shuffle.io.connectionTimeout','6000'). \
        config('spark.driver.memory','6g'). \
        config('spark.executor.memory','6g'). \
        config('spark.dynamicAllocation.minExecutors', '4'). \
        enableHiveSupport(). \
        appName(f'{username} | Section 17 Joining Data Sets'). \
        master('yarn'). \
        getOrCreate()

In [3]:
spark.sparkContext.getConf().getAll()

[('spark.dynamicAllocation.minExecutors', '4'),
 ('spark.eventLog.enabled', 'true'),
 ('spark.sql.repl.eagerEval.enabled', 'true'),
 ('spark.eventLog.dir', 'hdfs:///spark-logs'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
  'http://m02.itversity.com:19088/proxy/application_1707552082651_2938'),
 ('spark.ui.proxyBase', '/proxy/application_1707552082651_2938'),
 ('spark.app.startTime', '1707865966456'),
 ('spark.dynamicAllocation.maxExecutors', '10'),
 ('spark.shuffle.io.connectionTimeout', '6000'),
 ('spark.driver.port', '39103'),
 ('spark.yarn.historyServer.address', 'm02.itversity.com:18080'),
 ('spark.driver.memory', '6g'),
 ('spark.yarn.jars', ''),
 ('spark.driver.appUIAddress', 'http://g02.itversity.com:45177'),
 ('spark.history.provider',
  'org.apache.spark.deploy.history.FsHistoryProvider'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.history.fs.logDirectory', 'hdfs:///spark-logs'),
 ('spark.submit.deployMode', 'clien

In [4]:
!hdfs dfs -ls /public/

Found 34 items
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 09:08 /public/Black_Friday
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 08:08 /public/Tableau
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 08:38 /public/Tableau_stocks
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 09:11 /public/TetrasoftBigDataHackathon
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 09:09 /public/addresses
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 08:04 /public/airlines_all
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 08:02 /public/black_friday
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 09:04 /public/cards
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 09:13 /public/citibike
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 08:13 /public/connect-distributed-config
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 07:49 /public/covid19
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 08:08 /public/crime
drwxr-xr-x   - hdfs supergr

In [5]:
!hdfs dfs -ls /public/airlines_all/

Found 3 items
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 11:33 /public/airlines_all/airlines
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 11:29 /public/airlines_all/airlines-part
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 10:48 /public/airlines_all/airport-codes


In [6]:
!hdfs dfs -ls /public/airlines_all/airport-codes

Found 1 items
-rw-r--r--   2 hdfs supergroup      11411 2021-01-28 10:48 /public/airlines_all/airport-codes/airport-codes-na.txt


In [7]:
!hdfs dfs -cat  /public/airlines_all/airport-codes/airport-codes-na.txt | tail 

Yuma	AZ	USA	YUM	Canada	YZFLa	YWKCanada	YQYada	YZP

In [8]:
airportCodesPath = "/public/airlines_all/airport-codes"

In [9]:
airportCodes = spark. \
    read. \
    option("sep","\t"). \
    option("header",True). \
    option("inferSchema",True). \
    csv(airportCodesPath)

In [10]:
airportCodes.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- IATA: string (nullable = true)



In [11]:
airportCodes.count()

526

In [12]:
airportCodes.show(5)

+----------+-----+-------+----+
|      City|State|Country|IATA|
+----------+-----+-------+----+
|Abbotsford|   BC| Canada| YXX|
|  Aberdeen|   SD|    USA| ABR|
|   Abilene|   TX|    USA| ABI|
|     Akron|   OH|    USA| CAK|
|   Alamosa|   CO|    USA| ALS|
+----------+-----+-------+----+
only showing top 5 rows



In [13]:
!hdfs dfs -ls /public/airlines_all/airlines-part

Found 257 items
-rw-r--r--   2 hdfs supergroup          0 2021-01-28 09:28 /public/airlines_all/airlines-part/_SUCCESS
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 11:32 /public/airlines_all/airlines-part/flightmonth=198710
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 09:40 /public/airlines_all/airlines-part/flightmonth=198711
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 09:41 /public/airlines_all/airlines-part/flightmonth=198712
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 09:37 /public/airlines_all/airlines-part/flightmonth=198801
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 09:44 /public/airlines_all/airlines-part/flightmonth=198802
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 09:30 /public/airlines_all/airlines-part/flightmonth=198803
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 11:02 /public/airlines_all/airlines-part/flightmonth=198804
drwxr-xr-x   - hdfs supergroup          0 2021-01-28 09:30 /public/airlines_all/airlines-p

In [14]:
!hdfs dfs -ls /public/airlines_all/airlines-part/flightmonth=200801

Found 1 items
-rw-r--r--   2 hdfs supergroup   14654075 2021-01-28 11:28 /public/airlines_all/airlines-part/flightmonth=200801/part-00252-5cde1303-4ebf-4a12-8fad-f5d9f9c9124a.c000.snappy.parquet


In [15]:
!hdfs dfs -find /public/retail_db_json/

/public/retail_db_json
/public/retail_db_json/categories
/public/retail_db_json/categories/_SUCCESS
/public/retail_db_json/categories/part-r-00000-ce1d8208-178d-48d3-bfb2-1a97d9c05094
/public/retail_db_json/customers
/public/retail_db_json/customers/_SUCCESS
/public/retail_db_json/customers/part-r-00000-70554560-527b-44f6-9e80-4e2031af5994
/public/retail_db_json/departments
/public/retail_db_json/departments/_SUCCESS
/public/retail_db_json/departments/part-r-00000-3db7cfae-3ad2-4fc7-88ff-afe0ec709f49
/public/retail_db_json/order_details
/public/retail_db_json/order_details/order_details.json
/public/retail_db_json/order_items
/public/retail_db_json/order_items/_SUCCESS
/public/retail_db_json/order_items/part-r-00000-6b83977e-3f20-404b-9b5f-29376ab1419e
/public/retail_db_json/orders
/public/retail_db_json/orders/_SUCCESS
/public/retail_db_json/orders/part-r-00000-990f5773-9005-49ba-b670-631286032674
/public/retail_db_json/products
/public/retail_db_json/products/_SUCCESS
/public/retail_

In [16]:
orders = spark.read.json("/public/retail_db_json/orders")

In [17]:
orders.printSchema()

root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [18]:
orders.show(5)

+-----------------+--------------------+--------+---------------+
|order_customer_id|          order_date|order_id|   order_status|
+-----------------+--------------------+--------+---------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|            12111|2013-07-25 00:00:...|       3|       COMPLETE|
|             8827|2013-07-25 00:00:...|       4|         CLOSED|
|            11318|2013-07-25 00:00:...|       5|       COMPLETE|
+-----------------+--------------------+--------+---------------+
only showing top 5 rows



In [19]:
airportCodes.show(5)

+----------+-----+-------+----+
|      City|State|Country|IATA|
+----------+-----+-------+----+
|Abbotsford|   BC| Canada| YXX|
|  Aberdeen|   SD|    USA| ABR|
|   Abilene|   TX|    USA| ABI|
|     Akron|   OH|    USA| CAK|
|   Alamosa|   CO|    USA| ALS|
+----------+-----+-------+----+
only showing top 5 rows



In [20]:
spark.conf.set("spark.sql.shuffle.partitions","2")

In [21]:
spark.sparkContext.getConf().getAll()

[('spark.dynamicAllocation.minExecutors', '4'),
 ('spark.eventLog.enabled', 'true'),
 ('spark.sql.repl.eagerEval.enabled', 'true'),
 ('spark.eventLog.dir', 'hdfs:///spark-logs'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
  'http://m02.itversity.com:19088/proxy/application_1707552082651_2938'),
 ('spark.ui.proxyBase', '/proxy/application_1707552082651_2938'),
 ('spark.app.startTime', '1707865966456'),
 ('spark.dynamicAllocation.maxExecutors', '10'),
 ('spark.shuffle.io.connectionTimeout', '6000'),
 ('spark.driver.port', '39103'),
 ('spark.yarn.historyServer.address', 'm02.itversity.com:18080'),
 ('spark.driver.memory', '6g'),
 ('spark.yarn.jars', ''),
 ('spark.driver.appUIAddress', 'http://g02.itversity.com:45177'),
 ('spark.history.provider',
  'org.apache.spark.deploy.history.FsHistoryProvider'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.history.fs.logDirectory', 'hdfs:///spark-logs'),
 ('spark.submit.deployMode', 'clien

In [22]:
airtraffic = spark. \
    read. \
    parquet("/public/airlines_all/airlines-part/flightmonth=200801")

In [23]:
airtraffic.count()

605659

In [24]:
airtraffic.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

In [25]:
airportCodes.count()

526

In [26]:
airportCodes.show(5)

+----------+-----+-------+----+
|      City|State|Country|IATA|
+----------+-----+-------+----+
|Abbotsford|   BC| Canada| YXX|
|  Aberdeen|   SD|    USA| ABR|
|   Abilene|   TX|    USA| ABI|
|     Akron|   OH|    USA| CAK|
|   Alamosa|   CO|    USA| ALS|
+----------+-----+-------+----+
only showing top 5 rows



In [27]:
airportCodes.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- IATA: string (nullable = true)



In [28]:
airportCodes. \
    select("IATA"). \
    distinct(). \
    count()

524

In [29]:
from pyspark.sql.functions import count, lit,col

airportCodes. \
    groupBy('IATA'). \
    agg(count(lit(1)).alias("iata_count")). \
    orderBy(col('Iata_count').desc()). \
    show()
    

+----+----------+
|IATA|iata_count|
+----+----------+
| Big|         3|
| BHM|         1|
| ABR|         1|
| ABI|         1|
| ALB|         1|
| AEX|         1|
| ABE|         1|
| APN|         1|
| YAA|         1|
| ATW|         1|
| YEK|         1|
| ASE|         1|
| ACY|         1|
| AGS|         1|
| AUS|         1|
| YBG|         1|
| BRW|         1|
| BKW|         1|
| BLI|         1|
| BTT|         1|
+----+----------+
only showing top 20 rows



In [30]:
airportCodes. \
    groupBy('IATA'). \
    agg(count(lit(1)).alias("iata_count")). \
    filter(col('iata_count')>1). \
    show()

+----+----------+
|IATA|iata_count|
+----+----------+
| Big|         3|
+----+----------+



In [31]:
airportCodes. \
    filter(col("IATA")=='Big'). \
    show()

+-----------+------+-------+----+
|       City| State|Country|IATA|
+-----------+------+-------+----+
|       Hilo|    HI|    USA| Big|
|Kailua-Kona|Hawaii|    USA| Big|
|    Kamuela|Hawaii|    USA| Big|
+-----------+------+-------+----+



In [32]:
airportCodes. \
    filter("IATA = 'Big'"). \
    show()

+-----------+------+-------+----+
|       City| State|Country|IATA|
+-----------+------+-------+----+
|       Hilo|    HI|    USA| Big|
|Kailua-Kona|Hawaii|    USA| Big|
|    Kamuela|Hawaii|    USA| Big|
+-----------+------+-------+----+



In [33]:
airportCodes. \
    filter("!(State='Hawaii' AND IATA='Big')"). \
    show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
|   Alliance|   NE|    USA| AIA|
|     Alpena|   MI|    USA| APN|
|    Altoona|   PA|    USA| AOO|
|   Amarillo|   TX|    USA| AMA|
|Anahim Lake|   BC| Canada| YAA|
|  Anchorage|   AK|    USA| ANC|
|   Appleton|   WI|    USA| ATW|
|     Arviat|  NWT| Canada| YEK|
|  Asheville|   NC|    USA| AVL|
|      Aspen|   CO|    USA| ASE|
+-----------+-----+-------+----+
only showing top 20 rows



In [34]:
airportCodes. \
    filter("!(State='Hawaii' AND IATA='Big')"). \
    count()

524

In [35]:
airportCodesPath

'/public/airlines_all/airport-codes'

In [36]:
spark.\
    read. \
    option("sep","\t"). \
    option("header",True). \
    option("inferSchema",True). \
    csv(airportCodesPath). \
    filter("!(State='Hawaii' AND IATA='Big') AND Country='USA'"). \
    count()

443

In [37]:
airportCodes = spark.\
        read. \
        option("sep","\t"). \
        option("header",True). \
        option("inferSchema",True). \
        csv(airportCodesPath). \
        filter("!(State='Hawaii' AND IATA='Big') AND Country='USA'")

In [38]:
airportCodes.count()

443

In [39]:
airportCountByState = airportCodes. \
    groupBy("Country","State"). \
    agg(count(lit(1)).alias("IATACount")). \
    orderBy(col("IATACount").desc())

In [40]:
airportCountByState.show()

+-------+-----+---------+
|Country|State|IATACount|
+-------+-----+---------+
|    USA|   CA|       29|
|    USA|   TX|       26|
|    USA|   AK|       25|
|    USA|   MI|       18|
|    USA|   FL|       18|
|    USA|   NY|       18|
|    USA|   MT|       14|
|    USA|   PA|       13|
|    USA|   CO|       12|
|    USA|   IL|       12|
|    USA|   WY|       10|
|    USA|   NC|       10|
|    USA|   GA|        9|
|    USA|   NM|        9|
|    USA|   WA|        9|
|    USA|   WI|        9|
|    USA|   HI|        9|
|    USA|   NE|        9|
|    USA|   KS|        9|
|    USA|   IA|        8|
+-------+-----+---------+
only showing top 20 rows



In [41]:
airportCountByState.count()

51

In [42]:
 orders = spark.read.json("/public/retail_db_json/orders")

In [43]:
order_items = spark.read.json("/public/retail_db_json/order_items")

In [44]:
orders.printSchema()

root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [45]:
order_items.printSchema()

root
 |-- order_item_id: long (nullable = true)
 |-- order_item_order_id: long (nullable = true)
 |-- order_item_product_id: long (nullable = true)
 |-- order_item_product_price: double (nullable = true)
 |-- order_item_quantity: long (nullable = true)
 |-- order_item_subtotal: double (nullable = true)



In [46]:
from pyspark.sql.functions import col

orders.\
    select(col("order_customer_id")). \
    distinct(). \
    count()

12405

In [47]:
orders.count()

68883

In [48]:
orders. \
    groupBy(col("order_customer_id")). \
    agg(count(lit(1)).alias("order_count")). \
    orderBy(col("order_count").desc()). \
    show()

+-----------------+-----------+
|order_customer_id|order_count|
+-----------------+-----------+
|              569|         16|
|             5897|         16|
|            12431|         16|
|             6316|         16|
|             5283|         15|
|             5654|         15|
|            12284|         15|
|              221|         15|
|             4320|         15|
|             5624|         15|
|             4517|         14|
|             3710|         14|
|             4116|         14|
|             1011|         14|
|             1940|         14|
|             6248|         14|
|              791|         14|
|            10591|         14|
|             4249|         14|
|            11689|         14|
+-----------------+-----------+
only showing top 20 rows



In [49]:
help(orders.join)

Help on method join in module pyspark.sql.dataframe:

join(other, on=None, how=None) method of pyspark.sql.dataframe.DataFrame instance
    Joins with another :class:`DataFrame`, using the given join expression.
    
    .. versionadded:: 1.3.0
    
    Parameters
    ----------
    other : :class:`DataFrame`
        Right side of the join
    on : str, list or :class:`Column`, optional
        a string for the join column name, a list of column names,
        a join expression (Column), or a list of Columns.
        If `on` is a string or a list of strings indicating the name of the join column(s),
        the column(s) must exist on both sides, and this performs an equi-join.
    how : str, optional
        default ``inner``. Must be one of: ``inner``, ``cross``, ``outer``,
        ``full``, ``fullouter``, ``full_outer``, ``left``, ``leftouter``, ``left_outer``,
        ``right``, ``rightouter``, ``right_outer``, ``semi``, ``leftsemi``, ``left_semi``,
        ``anti``, ``leftanti`` a

In [50]:
orders_join = orders.join(
    order_items,
    orders.order_id == order_items.order_item_order_id
)

In [51]:
orders_join = orders.join(
    order_items,
    on=orders.order_id == order_items.order_item_order_id
)

In [52]:
orders_join = orders.join(
    order_items,
    on = col("order_id") == col("order_item_order_id")
)

In [53]:
orders_join = orders.join(
    order_items,
    on=orders['order_id']==order_items["order_item_order_id"]
)

In [54]:
orders_join = orders.join(
    order_items,
    on=orders['order_id']==order_items["order_item_order_id"],
    how='inner'
)

In [55]:
orders_join.printSchema()

root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_item_id: long (nullable = true)
 |-- order_item_order_id: long (nullable = true)
 |-- order_item_product_id: long (nullable = true)
 |-- order_item_product_price: double (nullable = true)
 |-- order_item_quantity: long (nullable = true)
 |-- order_item_subtotal: double (nullable = true)



In [56]:
orders_join.show(5)

+-----------------+--------------------+--------+---------------+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_customer_id|          order_date|order_id|   order_status|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-----------------+--------------------+--------+---------------+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|            1|                  1|                  957|                  299.98|                  1|             299.98|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|            2|                  2|                 1073|                  199.99|                  1|             199.99|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|

In [57]:
orders.count()

68883

In [58]:
order_items.count()

172198

In [59]:
orders_join.count()

172198

In [60]:
orders. \
    join(order_items,
        on = orders['order_id'] == order_items['order_item_order_id'],
        how='inner'
    ). \
    select(orders['*'],order_items['order_item_subtotal']). \
    show()


+-----------------+--------------------+--------+---------------+-------------------+
|order_customer_id|          order_date|order_id|   order_status|order_item_subtotal|
+-----------------+--------------------+--------+---------------+-------------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|             299.98|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|             199.99|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|              250.0|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|             129.99|
|             8827|2013-07-25 00:00:...|       4|         CLOSED|              49.98|
|             8827|2013-07-25 00:00:...|       4|         CLOSED|             299.95|
|             8827|2013-07-25 00:00:...|       4|         CLOSED|              150.0|
|             8827|2013-07-25 00:00:...|       4|         CLOSED|             199.92|
|            11318|2013-07-25 00:00:...|       5|     

In [61]:
orders. \
    join(
        order_items,
        on = orders.order_id == order_items.order_item_order_id,
        how ='inner'
    ). \
    select( orders.order_id, orders.order_date, orders.order_status, order_items.order_item_subtotal). \
    show()

+--------+--------------------+---------------+-------------------+
|order_id|          order_date|   order_status|order_item_subtotal|
+--------+--------------------+---------------+-------------------+
|       1|2013-07-25 00:00:...|         CLOSED|             299.98|
|       2|2013-07-25 00:00:...|PENDING_PAYMENT|             199.99|
|       2|2013-07-25 00:00:...|PENDING_PAYMENT|              250.0|
|       2|2013-07-25 00:00:...|PENDING_PAYMENT|             129.99|
|       4|2013-07-25 00:00:...|         CLOSED|              49.98|
|       4|2013-07-25 00:00:...|         CLOSED|             299.95|
|       4|2013-07-25 00:00:...|         CLOSED|              150.0|
|       4|2013-07-25 00:00:...|         CLOSED|             199.92|
|       5|2013-07-25 00:00:...|       COMPLETE|             299.98|
|       5|2013-07-25 00:00:...|       COMPLETE|             299.95|
|       5|2013-07-25 00:00:...|       COMPLETE|              99.96|
|       5|2013-07-25 00:00:...|       COMPLETE| 

In [62]:
orders. \
    join(
        order_items,
        on = orders["order_id"] == order_items["order_item_order_id"],
        how ='inner'
    ). \
    select( orders["order_id"], orders["order_date"], orders["order_status"], order_items["order_item_subtotal"]). \
    show()

+--------+--------------------+---------------+-------------------+
|order_id|          order_date|   order_status|order_item_subtotal|
+--------+--------------------+---------------+-------------------+
|       1|2013-07-25 00:00:...|         CLOSED|             299.98|
|       2|2013-07-25 00:00:...|PENDING_PAYMENT|             199.99|
|       2|2013-07-25 00:00:...|PENDING_PAYMENT|              250.0|
|       2|2013-07-25 00:00:...|PENDING_PAYMENT|             129.99|
|       4|2013-07-25 00:00:...|         CLOSED|              49.98|
|       4|2013-07-25 00:00:...|         CLOSED|             299.95|
|       4|2013-07-25 00:00:...|         CLOSED|              150.0|
|       4|2013-07-25 00:00:...|         CLOSED|             199.92|
|       5|2013-07-25 00:00:...|       COMPLETE|             299.98|
|       5|2013-07-25 00:00:...|       COMPLETE|             299.95|
|       5|2013-07-25 00:00:...|       COMPLETE|              99.96|
|       5|2013-07-25 00:00:...|       COMPLETE| 

In [63]:
orders = spark.read.json('/public/retail_db_json/orders')

In [64]:
customers = spark.read.json('/public/retail_db_json/customers')

In [65]:
orders.printSchema()

root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [66]:
customers.printSchema()

root
 |-- customer_city: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- customer_password: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_street: string (nullable = true)
 |-- customer_zipcode: string (nullable = true)



In [67]:
help(customers.join)

Help on method join in module pyspark.sql.dataframe:

join(other, on=None, how=None) method of pyspark.sql.dataframe.DataFrame instance
    Joins with another :class:`DataFrame`, using the given join expression.
    
    .. versionadded:: 1.3.0
    
    Parameters
    ----------
    other : :class:`DataFrame`
        Right side of the join
    on : str, list or :class:`Column`, optional
        a string for the join column name, a list of column names,
        a join expression (Column), or a list of Columns.
        If `on` is a string or a list of strings indicating the name of the join column(s),
        the column(s) must exist on both sides, and this performs an equi-join.
    how : str, optional
        default ``inner``. Must be one of: ``inner``, ``cross``, ``outer``,
        ``full``, ``fullouter``, ``full_outer``, ``left``, ``leftouter``, ``left_outer``,
        ``right``, ``rightouter``, ``right_outer``, ``semi``, ``leftsemi``, ``left_semi``,
        ``anti``, ``leftanti`` a

In [68]:
customers.join(
    orders,
    on = customers.customer_id == orders.order_customer_id,
    how = 'left_outer'
    ). \
    select(customers.customer_id, orders.order_customer_id,orders.order_date, orders.order_status). \
    show(5)

+-----------+-----------------+--------------------+---------------+
|customer_id|order_customer_id|          order_date|   order_status|
+-----------+-----------------+--------------------+---------------+
|          1|                1|2013-12-13 00:00:...|       COMPLETE|
|          2|                2|2013-11-30 00:00:...|       COMPLETE|
|          2|                2|2013-08-02 00:00:...|        ON_HOLD|
|          2|                2|2014-02-18 00:00:...|       COMPLETE|
|          2|                2|2013-10-29 00:00:...|PENDING_PAYMENT|
+-----------+-----------------+--------------------+---------------+
only showing top 5 rows



In [69]:
from pyspark.sql.functions import concat,lit

customers.join(
    orders,
    on = customers.customer_id == orders.order_customer_id,
    how = 'left_outer'
    ). \
    select(customers.customer_id, orders.order_customer_id,orders.order_date, orders.order_status). \
    withColumn('customer_fullname',concat(customers.customer_fname,lit(' '), customers.customer_lname)) .\
    show(5)

AnalysisException: Resolved attribute(s) customer_fname#901,customer_lname#903 missing from customer_id#902L,order_customer_id#884L,order_date#885,order_status#887 in operator !Project [customer_id#902L, order_customer_id#884L, order_date#885, order_status#887, concat(customer_fname#901,  , customer_lname#903) AS customer_fullname#998].;
!Project [customer_id#902L, order_customer_id#884L, order_date#885, order_status#887, concat(customer_fname#901,  , customer_lname#903) AS customer_fullname#998]
+- Project [customer_id#902L, order_customer_id#884L, order_date#885, order_status#887]
   +- Join LeftOuter, (customer_id#902L = order_customer_id#884L)
      :- Relation[customer_city#899,customer_email#900,customer_fname#901,customer_id#902L,customer_lname#903,customer_password#904,customer_state#905,customer_street#906,customer_zipcode#907] json
      +- Relation[order_customer_id#884L,order_date#885,order_id#886L,order_status#887] json


In [70]:
from pyspark.sql.functions import concat,lit

customers.join(
    orders,
    on = customers.customer_id == orders.order_customer_id,
    how = 'left_outer'
    ). \
    select(customers.customer_id,
           concat(customers.customer_fname,lit(' '), customers.customer_lname).alias('customer_fullname'),
           orders.order_customer_id,orders.order_date, orders.order_status). \
    show(5)

+-----------+-----------------+-----------------+--------------------+---------------+
|customer_id|customer_fullname|order_customer_id|          order_date|   order_status|
+-----------+-----------------+-----------------+--------------------+---------------+
|          1|Richard Hernandez|                1|2013-12-13 00:00:...|       COMPLETE|
|          2|     Mary Barrett|                2|2013-11-30 00:00:...|       COMPLETE|
|          2|     Mary Barrett|                2|2013-08-02 00:00:...|        ON_HOLD|
|          2|     Mary Barrett|                2|2014-02-18 00:00:...|       COMPLETE|
|          2|     Mary Barrett|                2|2013-10-29 00:00:...|PENDING_PAYMENT|
+-----------+-----------------+-----------------+--------------------+---------------+
only showing top 5 rows



In [71]:
help(customers)

Help on DataFrame in module pyspark.sql.dataframe object:

class DataFrame(pyspark.sql.pandas.map_ops.PandasMapOpsMixin, pyspark.sql.pandas.conversion.PandasConversionMixin)
 |  A distributed collection of data grouped into named columns.
 |  
 |  A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
 |  and can be created using various functions in :class:`SparkSession`::
 |  
 |      people = spark.read.parquet("...")
 |  
 |  Once created, it can be manipulated using the various domain-specific-language
 |  (DSL) functions defined in: :class:`DataFrame`, :class:`Column`.
 |  
 |  To select a column from the :class:`DataFrame`, use the apply method::
 |  
 |      ageCol = people.age
 |  
 |  A more concrete example::
 |  
 |      # To create DataFrame using SparkSession
 |      people = spark.read.parquet("...")
 |      department = spark.read.parquet("...")
 |  
 |      people.filter(people.age > 30).join(department, people.deptId == department.id) \
 |        .grou

In [72]:
customer_order_details = customers.join(
    orders,
    on=customers['customer_id'] == orders['order_customer_id'],
    how='inner'
)

In [73]:
orders.count()

68883

In [74]:
customer_order_details.count()

68883

In [75]:
customer_order_details_left = customers.join(
    orders,
    on=customers['customer_id'] == orders['order_customer_id'],
    how='left'
)

In [76]:
customer_order_details_left.count()

68913

In [77]:
customer_order_details_left = customers.join(
    orders,
    on= customers['customer_id']==orders['order_customer_id'],
    how='left_outer'
)

In [78]:
customer_order_details_left.count()

68913

In [79]:
customer_order_details_right = orders.join(
    customers,
    on = orders['order_customer_id'] == customers['customer_id'],
    how='right'
)

In [80]:
customer_order_details_right.count()

68913

In [81]:
customer_order_details_right = orders.join(
    customers,
    on = orders['order_customer_id'] == customers['customer_id'],
    how='right_outer'
)

In [82]:
customer_order_details_right.count()

68913

In [83]:
customer_order_details_left. \
    select(customers.customer_id, customers.customer_email, orders["*"]). \
    show(5)

+-----------+--------------+-----------------+--------------------+--------+---------------+
|customer_id|customer_email|order_customer_id|          order_date|order_id|   order_status|
+-----------+--------------+-----------------+--------------------+--------+---------------+
|          1|     XXXXXXXXX|                1|2013-12-13 00:00:...|   22945|       COMPLETE|
|          2|     XXXXXXXXX|                2|2013-11-30 00:00:...|   67863|       COMPLETE|
|          2|     XXXXXXXXX|                2|2013-08-02 00:00:...|   57963|        ON_HOLD|
|          2|     XXXXXXXXX|                2|2014-02-18 00:00:...|   33865|       COMPLETE|
|          2|     XXXXXXXXX|                2|2013-10-29 00:00:...|   15192|PENDING_PAYMENT|
+-----------+--------------+-----------------+--------------------+--------+---------------+
only showing top 5 rows



In [84]:
orders.count()

68883

In [85]:
customers.count()

12435

In [86]:
customer_order_details.count()

68883

In [87]:
customer_order_details_left.count()

68913

In [88]:
customer_order_details_left. \
    filter(orders.order_id.isNull()). \
    select(customers.customer_id, customers.customer_email, orders['*']). \
    count()

30

In [89]:
customer_order_details_left. \
    filter(orders.order_id.isNull()). \
    select(customers.customer_id, customers.customer_email, orders['*']). \
    show(30)

+-----------+--------------+-----------------+----------+--------+------------+
|customer_id|customer_email|order_customer_id|order_date|order_id|order_status|
+-----------+--------------+-----------------+----------+--------+------------+
|        219|     XXXXXXXXX|             null|      null|    null|        null|
|        339|     XXXXXXXXX|             null|      null|    null|        null|
|        469|     XXXXXXXXX|             null|      null|    null|        null|
|       1187|     XXXXXXXXX|             null|      null|    null|        null|
|       1481|     XXXXXXXXX|             null|      null|    null|        null|
|       1808|     XXXXXXXXX|             null|      null|    null|        null|
|       2073|     XXXXXXXXX|             null|      null|    null|        null|
|       2096|     XXXXXXXXX|             null|      null|    null|        null|
|       2450|     XXXXXXXXX|             null|      null|    null|        null|
|       4555|     XXXXXXXXX|            

In [90]:
customers.alias('c'). \
    join(
        orders.alias('o'),
        on= customers.customer_id == orders.order_customer_id,
        how = 'left'
    ). \
    filter('o.order_id IS NULL'). \
    selectExpr('c.customer_id', 'c.customer_email', 'o.*'). \
    show()

+-----------+--------------+-----------------+----------+--------+------------+
|customer_id|customer_email|order_customer_id|order_date|order_id|order_status|
+-----------+--------------+-----------------+----------+--------+------------+
|        219|     XXXXXXXXX|             null|      null|    null|        null|
|        339|     XXXXXXXXX|             null|      null|    null|        null|
|        469|     XXXXXXXXX|             null|      null|    null|        null|
|       1187|     XXXXXXXXX|             null|      null|    null|        null|
|       1481|     XXXXXXXXX|             null|      null|    null|        null|
|       1808|     XXXXXXXXX|             null|      null|    null|        null|
|       2073|     XXXXXXXXX|             null|      null|    null|        null|
|       2096|     XXXXXXXXX|             null|      null|    null|        null|
|       2450|     XXXXXXXXX|             null|      null|    null|        null|
|       4555|     XXXXXXXXX|            

In [91]:
customers.alias('c'). \
    join(
        orders.alias('o'),
        on= customers.customer_id == orders.order_customer_id,
        how = 'left'
    ). \
    filter('o.order_id IS NULL'). \
    selectExpr('c.customer_id', 'c.customer_email', 'o.*'). \
    count()

30

In [92]:
orders = spark.read.json("/public/retail_db_json/orders")

In [93]:
order_items = spark.read.json("/public/retail_db_json/order_items")

In [94]:
orders_filtered = orders.filter("order_date LIKE '2013%'")

In [95]:
customer_order_details_left = customers.alias('c'). \
    join(
        orders_filtered.alias('o'),
        on = customers['customer_id'] == orders_filtered['order_customer_id'],
        how='left_outer'
    )

In [96]:
orders_filtered.count()

30662

In [97]:
customer_order_details_left.count()

31746

In [98]:
customer_order_details_left.printSchema()

root
 |-- customer_city: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- customer_password: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_street: string (nullable = true)
 |-- customer_zipcode: string (nullable = true)
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [99]:
customer_order_details_left. \
    groupBy("customer_id"). \
    agg( count(lit(1)).alias('order_count')). \
    orderBy(col('order_count').asc()). \
    show(5)

+-----------+-----------+
|customer_id|order_count|
+-----------+-----------+
|         38|          1|
|         83|          1|
|         46|          1|
|          5|          1|
|         55|          1|
+-----------+-----------+
only showing top 5 rows



In [100]:
customer_order_details_left. \
    groupBy("customer_id"). \
    agg( count(lit(1)).alias('order_count')). \
    orderBy(col('order_count').asc()). \
    count()

12435

In [101]:
from pyspark.sql.functions import sum,avg,min,max,when

customer_order_details_left. \
    groupBy("customer_id"). \
    agg( 
        count(lit(1)).alias('order_count'),
        sum((when(col('order_id').isNotNull(),1).otherwise(0))).alias('order_count2')
    ). \
    orderBy(col('order_count').asc()). \
    show()

+-----------+-----------+------------+
|customer_id|order_count|order_count2|
+-----------+-----------+------------+
|         83|          1|           1|
|          5|          1|           1|
|         26|          1|           1|
|         28|          1|           0|
|         29|          1|           1|
|         30|          1|           1|
|         38|          1|           1|
|         46|          1|           1|
|         55|          1|           0|
|         57|          1|           1|
|         81|          1|           1|
|         84|          1|           1|
|         91|          1|           0|
|        104|          1|           1|
|        115|          1|           1|
|        128|          1|           1|
|        130|          1|           1|
|        136|          1|           1|
|        140|          1|           1|
|        151|          1|           1|
+-----------+-----------+------------+
only showing top 20 rows



In [102]:
customer_order_details_left. \
    filter(orders.order_id.isNull()). \
    groupBy("customer_id"). \
    agg( 
        count(lit(1)).alias('order_count'),
        sum((when(col('order_id').isNotNull(),1).otherwise(0))).alias('order_count2')
    ). \
    orderBy(col('order_count').asc()). \
    show()

+-----------+-----------+------------+
|customer_id|order_count|order_count2|
+-----------+-----------+------------+
|         28|          1|           0|
|         55|          1|           0|
|         91|          1|           0|
|        186|          1|           0|
|        200|          1|           0|
|        219|          1|           0|
|        301|          1|           0|
|        360|          1|           0|
|        399|          1|           0|
|        456|          1|           0|
|        476|          1|           0|
|        519|          1|           0|
|        540|          1|           0|
|        574|          1|           0|
|        588|          1|           0|
|        631|          1|           0|
|        652|          1|           0|
|        657|          1|           0|
|        701|          1|           0|
|        769|          1|           0|
+-----------+-----------+------------+
only showing top 20 rows



In [103]:
customer_order_details_left. \
    filter(orders.order_id.isNull()). \
    groupBy("customer_id"). \
    agg( 
        count(lit(1)).alias('order_count'),
        sum((when(col('order_id').isNotNull(),1).otherwise(0))).alias('order_count2')
    ). \
    orderBy(col('order_count').asc()). \
    count()

1084

In [104]:
31746-30662

1084

## Air Traffic tables join

In [105]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession. \
        builder. \
        config('spark.ui.port','0'). \
        config('spark.sql.warehouse.dir',f'/user/{username}/warehouse'). \
        config('spark.shuffle.io.connectionTimeout','6000'). \
        config('spark.driver.memory','6g'). \
        config('spark.executor.memory','6g'). \
        config('spark.dynamicAllocation.minExecutors', '4'). \
        enableHiveSupport(). \
        appName(f'{username} | Section 17 Joining Data Sets'). \
        master('yarn'). \
        getOrCreate()

In [106]:
spark.conf.set("spark.sql.shuffle.partitions", "2")

In [107]:
airtrafficPath = "/public/airlines_all/airlines-part/flightmonth=200801"

In [108]:
airtraffic = spark. \
    read. \
    parquet(airtrafficPath)

In [109]:
airtraffic. \
    select(
        "Year", "Month", "DayOfMonth",
        "Origin", "Dest", "CRSDepTime"
    ). \
    show()

+----+-----+----------+------+----+----------+
|Year|Month|DayOfMonth|Origin|Dest|CRSDepTime|
+----+-----+----------+------+----+----------+
|2008|    1|        16|   BGR| CVG|      1735|
|2008|    1|        17|   SYR| CVG|      1701|
|2008|    1|        17|   SAV| BOS|      1225|
|2008|    1|        17|   CVG| GRR|      1530|
|2008|    1|        17|   STL| CVG|      1205|
|2008|    1|        18|   STL| JFK|      1150|
|2008|    1|        18|   MCI| CVG|      1009|
|2008|    1|        19|   TUL| CVG|       835|
|2008|    1|        20|   JFK| PHL|      1935|
|2008|    1|        20|   RDU| CVG|       830|
|2008|    1|        21|   CVG| DTW|      1640|
|2008|    1|        21|   MSY| LGA|      1204|
|2008|    1|        21|   JFK| PHL|      1935|
|2008|    1|        21|   DCA| JFK|      1830|
|2008|    1|        21|   HSV| DCA|       700|
|2008|    1|        22|   ORD| CVG|      1910|
|2008|    1|        22|   CVG| JFK|      1320|
|2008|    1|        23|   LGA| SAV|       908|
|2008|    1| 

In [111]:
airtraffic.count()

605659

In [110]:
airportCodesPath = "/public/airlines_all/airport-codes"

In [112]:
def getValidAirportCodes(airportCodesPath):
    airportCodes = spark. \
        read. \
        option("sep","\t"). \
        option("header",True). \
        option("inferSchema", True). \
        csv(airportCodesPath). \
        filter("!(State='Hawaii' AND IATA='Big') AND Country='USA'")
    return airportCodes


In [113]:
airportCodes = getValidAirportCodes(airportCodesPath)

In [114]:
airportCodes.count()

443

In [116]:
airtraffic. \
    join(
        airportCodes,
        on= airtraffic['Origin']==airportCodes['IATA']
    ). \
    select(airtraffic['Year'], airtraffic['Month'], airtraffic['DayOfMonth'],airportCodes['*'],airtraffic['CRSDepTime']). \
    show()

+----+-----+----------+-------------+-----+-------+----+----------+
|Year|Month|DayOfMonth|         City|State|Country|IATA|CRSDepTime|
+----+-----+----------+-------------+-----+-------+----+----------+
|2008|    1|        16|       Bangor|   ME|    USA| BGR|      1735|
|2008|    1|        17|     Syracuse|   NY|    USA| SYR|      1701|
|2008|    1|        17|     Savannah|   GA|    USA| SAV|      1225|
|2008|    1|        17|   Cincinnati|   OH|    USA| CVG|      1530|
|2008|    1|        17|    St. Louis|   MO|    USA| STL|      1205|
|2008|    1|        18|    St. Louis|   MO|    USA| STL|      1150|
|2008|    1|        18|  Kansas City|   MO|    USA| MCI|      1009|
|2008|    1|        19|        Tulsa|   OK|    USA| TUL|       835|
|2008|    1|        20|     New York|   NY|    USA| JFK|      1935|
|2008|    1|        20|      Raleigh|   NC|    USA| RDU|       830|
|2008|    1|        21|   Cincinnati|   OH|    USA| CVG|      1640|
|2008|    1|        21|  New Orleans|   LA|    U

In [117]:
airtraffic. \
    join(
        airportCodes,
        on= airtraffic['Origin']==airportCodes['IATA']
    ). \
    select(airtraffic['Year'], airtraffic['Month'], airtraffic['DayOfMonth'],airportCodes['*'],airtraffic['CRSDepTime']). \
    count()

600074

In [119]:
airtraffic. \
    join(
        airportCodes,
        on = airtraffic['Origin']==airportCodes['IATA']
    ). \
    groupBy(airtraffic['Origin']). \
    agg(count(lit(1)).alias('FlightCount')). \
    select(airtraffic['Origin'], col('FlightCount')). \
    show()

+------+-----------+
|Origin|FlightCount|
+------+-----------+
|   BGR|        208|
|   SYR|       1048|
|   CVG|       8659|
|   STL|       5329|
|   JFK|      10023|
|   MSY|       3453|
|   DCA|       7304|
|   HSV|        901|
|   ORD|      29936|
|   CLT|      10752|
|   GSP|        995|
|   BOS|       9717|
|   COS|       1445|
|   BNA|       4935|
|   ATL|      33897|
|   SJC|       4976|
|   GJT|        372|
|   AZO|        359|
|   ELP|       1818|
|   PDX|       4898|
+------+-----------+
only showing top 20 rows



In [121]:
airtraffic. \
    join(
        airportCodes,
        on = airtraffic['Origin']==airportCodes['IATA']
    ). \
    groupBy(airtraffic['Origin']). \
    agg(count(lit(1)).alias('FlightCount')). \
    select(airtraffic['Origin'], col('FlightCount')). \
    orderBy(col('FlightCount').desc()). \
    show()

+------+-----------+
|Origin|FlightCount|
+------+-----------+
|   ATL|      33897|
|   ORD|      29936|
|   DFW|      23861|
|   DEN|      19477|
|   LAX|      18945|
|   PHX|      17695|
|   IAH|      15531|
|   LAS|      15292|
|   DTW|      14357|
|   EWR|      12467|
|   SLC|      12401|
|   MSP|      11800|
|   SFO|      11573|
|   MCO|      11070|
|   CLT|      10752|
|   LGA|      10300|
|   JFK|      10023|
|   BOS|       9717|
|   BWI|       8883|
|   CVG|       8659|
+------+-----------+
only showing top 20 rows



In [123]:
## 207 Get Flight Count Per US State using Spark Data Frame APIs

In [1]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession. \
        builder. \
        config('spark.ui.port','0'). \
        config('spark.sql.warehouse.dir',f'/user/{username}/warehouse'). \
        config('spark.shuffle.io.connectionTimeout','6000'). \
        config('spark.driver.memory','6g'). \
        config('spark.executor.memory','6g'). \
        config('spark.dynamicAllocation.minExecutors', '4'). \
        enableHiveSupport(). \
        appName(f'{username} | Section 17 Joining Data Sets'). \
        master('yarn'). \
        getOrCreate()

In [2]:
spark.conf.set("spark.sql.shuffle.partitions", "2")

In [3]:
airtrafficPath = "/public/airlines_all/airlines-part/flightmonth=200801"

In [4]:
airtraffic = spark.read.parquet(airtrafficPath)

In [5]:
airtraffic. \
    select(
        "Year", "Month", "DayOfMonth",
        "Origin","Dest", "CRSDepTime"
    ). \
    show(5)

+----+-----+----------+------+----+----------+
|Year|Month|DayOfMonth|Origin|Dest|CRSDepTime|
+----+-----+----------+------+----+----------+
|2008|    1|        16|   BGR| CVG|      1735|
|2008|    1|        17|   SYR| CVG|      1701|
|2008|    1|        17|   SAV| BOS|      1225|
|2008|    1|        17|   CVG| GRR|      1530|
|2008|    1|        17|   STL| CVG|      1205|
+----+-----+----------+------+----+----------+
only showing top 5 rows



In [6]:
airtraffic.count()

605659

In [7]:
airportCodesPath = "/public/airlines_all/airport-codes"

In [8]:
def getValidAirportCodes(airportCodesPath):
    airportCodes = spark. \
        read. \
        option("sep","\t"). \
        option("header",True). \
        option("inferSchema", True). \
        csv(airportCodesPath). \
        filter("!(State='Hawaii' AND IATA='Big') AND Country='USA'")
    return airportCodes


In [9]:
airportCodes = getValidAirportCodes(airportCodesPath)

In [10]:
airportCodes.count()

443

In [11]:
from pyspark.sql.functions import col,lit,count

In [12]:
airtraffic. \
    join(
        airportCodes,
        on= airtraffic['Origin']==airportCodes['IATA'],
        how='inner'
    ). \
    groupBy( airportCodes['State']). \
    agg(count(lit(1)).alias('FlightCount')). \
    show()

+-----+-----------+
|State|FlightCount|
+-----+-----------+
|   NY|      28414|
|   MO|      11808|
|   NC|      17942|
|   IL|      39812|
|   SC|       3525|
|   TN|      13549|
|   VA|       4093|
|   MI|      17824|
|   ID|       2497|
|   OR|       6221|
|   SD|        844|
|   AZ|      20768|
|   NE|       2547|
|   NM|       3509|
|   MN|      12357|
|   MD|       8883|
|   IA|       2315|
|   MS|       2005|
|   NJ|      12498|
|   CT|       2729|
+-----+-----------+
only showing top 20 rows



In [14]:
airtraffic. \
    join(
        airportCodes,
        on= airtraffic['Origin']==airportCodes['IATA'],
        how='inner'
    ). \
    groupBy( airportCodes['State']). \
    agg(count(lit(1)).alias('FlightCount')). \
    orderBy(col('FlightCount').desc()). \
    show()

+-----+-----------+
|State|FlightCount|
+-----+-----------+
|   CA|      72853|
|   TX|      63930|
|   FL|      41042|
|   IL|      39812|
|   GA|      35527|
|   NY|      28414|
|   CO|      23288|
|   AZ|      20768|
|   OH|      19209|
|   NC|      17942|
|   MI|      17824|
|   NV|      17763|
| null|      14090|
|   TN|      13549|
|   PA|      13491|
|   UT|      12709|
|   NJ|      12498|
|   MN|      12357|
|   MO|      11808|
|   WA|      10210|
+-----+-----------+
only showing top 20 rows



In [15]:
## 208 Solution Get Dormant US Airports using Spark DF APIs

In [21]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession. \
        builder. \
        config('spark.ui.port','0'). \
        config('spark.sql.warehouse.dir',f'/user/{username}/warehouse'). \
        config('spark.shuffle.io.connectionTimeout','6000'). \
        config('spark.driver.memory','6g'). \
        config('spark.executor.memory','6g'). \
        config('spark.dynamicAllocation.minExecutors', '4'). \
        enableHiveSupport(). \
        appName(f'{username} | Section 17 Joining Data Sets'). \
        master('yarn'). \
        getOrCreate()

In [22]:
spark.conf.set("spark.sql.shuffle.partitions", "2")

In [23]:
airtrafficPath = "/public/airlines_all/airlines-part/flightmonth=200801"

In [24]:
airtraffic = spark.read.parquet(airtrafficPath)

In [25]:
airtraffic. \
    select(
        "Year", "Month", "DayOfMonth",
        "Origin","Dest", "CRSDepTime"
    ). \
    show(5)

+----+-----+----------+------+----+----------+
|Year|Month|DayOfMonth|Origin|Dest|CRSDepTime|
+----+-----+----------+------+----+----------+
|2008|    1|        16|   BGR| CVG|      1735|
|2008|    1|        17|   SYR| CVG|      1701|
|2008|    1|        17|   SAV| BOS|      1225|
|2008|    1|        17|   CVG| GRR|      1530|
|2008|    1|        17|   STL| CVG|      1205|
+----+-----+----------+------+----+----------+
only showing top 5 rows



In [26]:
airportCodesPath = "/public/airlines_all/airport-codes"

In [27]:
def getValidAirportCodes(airportCodesPath):
    airportCodes = spark. \
        read. \
        option("sep","\t"). \
        option("header",True). \
        option("inferSchema", True). \
        csv(airportCodesPath). \
        filter("!(State='Hawaii' AND IATA='Big') AND Country='USA'")
    return airportCodes


In [28]:
airportCodes = getValidAirportCodes(airportCodesPath)

In [29]:
airportCodes.count()

443

In [30]:
from pyspark.sql.functions import col

In [33]:
airportCodes. \
    join(
        airtraffic,
        on= airportCodes['IATA']==airtraffic['Origin'],
        how = 'left'
    ). \
    filter("Origin IS NULL"). \
    select(airportCodes['*'], col("Origin")). \
    show()
    

+--------------+-----+-------+----+------+
|          City|State|Country|IATA|Origin|
+--------------+-----+-------+----+------+
|      Aberdeen|   SD|    USA| ABR|  null|
|       Alamosa|   CO|    USA| ALS|  null|
|      Alliance|   NE|    USA| AIA|  null|
|        Alpena|   MI|    USA| APN|  null|
|       Altoona|   PA|    USA| AOO|  null|
|        Athens|   GA|    USA| AHN|  null|
|       Augusta|   ME|    USA| AUG|  null|
|    Bar Harbor|   ME|    USA| BHB|  null|
|       Beckley|   WV|    USA| BKW|  null|
|       Bedford|   MA|    USA| BED|  null|
|       Bemidji|   MN|    USA| BJI|  null|
|       Bettles|   AK|    USA| BTT|  null|
|   Bloomington|   IN|    USA| BMG|  null|
|     Bluefield|   WV|    USA| BLF|  null|
|     Brookings|   SD|    USA| BKX|  null|
|    Burlington|   IA|    USA| BRL|  null|
|    Burlington|   MA|    USA| BBF|  null|
|Cape Girardeau|   MO|    USA| CGI|  null|
|      Carlsbad|   NM|    USA| CNM|  null|
|      Cheyenne|   WY|    USA| CYS|  null|
+----------

In [34]:
airportCodes. \
    join(
        airtraffic,
        on= airportCodes['IATA']==airtraffic['Origin'],
        how = 'left'
    ). \
    filter(airtraffic['Origin'].isNull()). \
    select(airportCodes['*'], col("Origin")). \
    show()

+--------------+-----+-------+----+------+
|          City|State|Country|IATA|Origin|
+--------------+-----+-------+----+------+
|      Aberdeen|   SD|    USA| ABR|  null|
|       Alamosa|   CO|    USA| ALS|  null|
|      Alliance|   NE|    USA| AIA|  null|
|        Alpena|   MI|    USA| APN|  null|
|       Altoona|   PA|    USA| AOO|  null|
|        Athens|   GA|    USA| AHN|  null|
|       Augusta|   ME|    USA| AUG|  null|
|    Bar Harbor|   ME|    USA| BHB|  null|
|       Beckley|   WV|    USA| BKW|  null|
|       Bedford|   MA|    USA| BED|  null|
|       Bemidji|   MN|    USA| BJI|  null|
|       Bettles|   AK|    USA| BTT|  null|
|   Bloomington|   IN|    USA| BMG|  null|
|     Bluefield|   WV|    USA| BLF|  null|
|     Brookings|   SD|    USA| BKX|  null|
|    Burlington|   IA|    USA| BRL|  null|
|    Burlington|   MA|    USA| BBF|  null|
|Cape Girardeau|   MO|    USA| CGI|  null|
|      Carlsbad|   NM|    USA| CNM|  null|
|      Cheyenne|   WY|    USA| CYS|  null|
+----------

In [36]:
airportCodes. \
    join(
        airtraffic,
        on= airportCodes['IATA']==airtraffic['Origin'],
        how = 'left'
    ). \
    filter(airtraffic['Origin'].isNull()). \
    count()

173

In [37]:
## 209 Solution - Get Origins without master data using Spark DF APIs

In [1]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession. \
        builder. \
        config('spark.ui.port','0'). \
        config('spark.sql.warehouse.dir',f'/user/{username}/warehouse'). \
        config('spark.shuffle.io.connectionTimeout','6000'). \
        config('spark.driver.memory','6g'). \
        config('spark.executor.memory','6g'). \
        config('spark.dynamicAllocation.minExecutors', '4'). \
        enableHiveSupport(). \
        appName(f'{username} | Section 17 Joining Data Sets'). \
        master('yarn'). \
        getOrCreate()

In [2]:
spark.conf.set("spark.sql.shuffle.partitions", "2")

In [3]:
airtrafficPath = "/public/airlines_all/airlines-part/flightmonth=200801"

In [4]:
airtraffic = spark.read.parquet(airtrafficPath)

In [5]:
airtraffic. \
    select(
        "Year", "Month", "DayOfMonth",
        "Origin","Dest", "CRSDepTime"
    ). \
    show(5)

+----+-----+----------+------+----+----------+
|Year|Month|DayOfMonth|Origin|Dest|CRSDepTime|
+----+-----+----------+------+----+----------+
|2008|    1|        16|   BGR| CVG|      1735|
|2008|    1|        17|   SYR| CVG|      1701|
|2008|    1|        17|   SAV| BOS|      1225|
|2008|    1|        17|   CVG| GRR|      1530|
|2008|    1|        17|   STL| CVG|      1205|
+----+-----+----------+------+----+----------+
only showing top 5 rows



In [6]:
airportCodesPath = "/public/airlines_all/airport-codes"

In [7]:
def getValidAirportCodes(airportCodesPath):
    airportCodes = spark. \
        read. \
        option("sep","\t"). \
        option("header",True). \
        option("inferSchema", True). \
        csv(airportCodesPath). \
        filter("!(State='Hawaii' AND IATA='Big') AND Country='USA'")
    return airportCodes


In [8]:
airportCodes = getValidAirportCodes(airportCodesPath)

In [9]:
airportCodes.count()

443

In [10]:
airtraffic. \
    join(
        airportCodes,
        on= airtraffic['Origin']==airportCodes['IATA'],
        how = 'left'
    ).\
    filter("IATA IS NULL"). \
    select(airtraffic['Origin'],airportCodes['*']). \
    show(5)

+------+----+-----+-------+----+
|Origin|City|State|Country|IATA|
+------+----+-----+-------+----+
|   HDN|null| null|   null|null|
|   HDN|null| null|   null|null|
|   SJU|null| null|   null|null|
|   SJU|null| null|   null|null|
|   ITO|null| null|   null|null|
+------+----+-----+-------+----+
only showing top 5 rows



In [11]:
airtraffic. \
    join(
        airportCodes,
        on= airtraffic['Origin']==airportCodes['IATA'],
        how = 'left'
    ).\
    filter("IATA IS NULL"). \
    select(airtraffic['Origin'],airportCodes['*']). \
    distinct(). \
    show(5)

+------+----+-----+-------+----+
|Origin|City|State|Country|IATA|
+------+----+-----+-------+----+
|   HDN|null| null|   null|null|
|   SJU|null| null|   null|null|
|   ITO|null| null|   null|null|
|   STT|null| null|   null|null|
|   CEC|null| null|   null|null|
+------+----+-----+-------+----+
only showing top 5 rows



In [12]:
airtraffic. \
    join(
        airportCodes,
        on= airtraffic['Origin']==airportCodes['IATA'],
        how = 'left'
    ).\
    filter(airportCodes['IATA'].isNull()). \
    select(airtraffic['Origin'],airportCodes['*']). \
    distinct(). \
    show(5)

+------+----+-----+-------+----+
|Origin|City|State|Country|IATA|
+------+----+-----+-------+----+
|   HDN|null| null|   null|null|
|   SJU|null| null|   null|null|
|   ITO|null| null|   null|null|
|   STT|null| null|   null|null|
|   CEC|null| null|   null|null|
+------+----+-----+-------+----+
only showing top 5 rows



In [13]:
airtraffic. \
    join(
        airportCodes,
        on= airtraffic['Origin']==airportCodes['IATA'],
        how = 'left'
    ).\
    filter(airportCodes['IATA'].isNull()). \
    select(airtraffic['Origin'],airportCodes['*']). \
    distinct(). \
    count()

16

In [14]:
airtraffic. \
    select("Origin"). \
    distinct(). \
    join(
        airportCodes,
        on= airtraffic['Origin']==airportCodes['IATA'],
        how = 'left'
    ). \
    filter("IATA IS NULL"). \
    show()

+------+----+-----+-------+----+
|Origin|City|State|Country|IATA|
+------+----+-----+-------+----+
|   HDN|null| null|   null|null|
|   SJU|null| null|   null|null|
|   ITO|null| null|   null|null|
|   STT|null| null|   null|null|
|   CEC|null| null|   null|null|
|   CDC|null| null|   null|null|
|   PSG|null| null|   null|null|
|   ADK|null| null|   null|null|
|   KOA|null| null|   null|null|
|   OTZ|null| null|   null|null|
|   BQN|null| null|   null|null|
|   STX|null| null|   null|null|
|   PMD|null| null|   null|null|
|   PSE|null| null|   null|null|
|   SCC|null| null|   null|null|
|   SLE|null| null|   null|null|
+------+----+-----+-------+----+



In [16]:
airtraffic. \
    select("Origin"). \
    distinct(). \
    join(
        airportCodes,
        on= airtraffic['Origin']==airportCodes['IATA'],
        how = 'left'
    ). \
    filter("IATA IS NULL"). \
    count()

16

In [17]:
## 210 Soulution Problem 6 -Get count of Flights without master data using Spark DF APIs

In [18]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession. \
        builder. \
        config('spark.ui.port','0'). \
        config('spark.sql.warehouse.dir',f'/user/{username}/warehouse'). \
        config('spark.shuffle.io.connectionTimeout','6000'). \
        config('spark.driver.memory','6g'). \
        config('spark.executor.memory','6g'). \
        config('spark.dynamicAllocation.minExecutors', '4'). \
        enableHiveSupport(). \
        appName(f'{username} | Section 17 Joining Data Sets'). \
        master('yarn'). \
        getOrCreate()

In [19]:
spark.conf.set("spark.sql.shuffle.partitions", "2")

In [20]:
airtrafficPath = "/public/airlines_all/airlines-part/flightmonth=200801"

In [21]:
airtraffic = spark.read.parquet(airtrafficPath)

In [22]:
airtraffic. \
    select(
        "Year", "Month", "DayOfMonth",
        "Origin","Dest", "CRSDepTime"
    ). \
    show(5)

+----+-----+----------+------+----+----------+
|Year|Month|DayOfMonth|Origin|Dest|CRSDepTime|
+----+-----+----------+------+----+----------+
|2008|    1|        16|   BGR| CVG|      1735|
|2008|    1|        17|   SYR| CVG|      1701|
|2008|    1|        17|   SAV| BOS|      1225|
|2008|    1|        17|   CVG| GRR|      1530|
|2008|    1|        17|   STL| CVG|      1205|
+----+-----+----------+------+----+----------+
only showing top 5 rows



In [23]:
airportCodesPath = "/public/airlines_all/airport-codes"

In [27]:
def getValidAirportCodes(airportCodesPath):
    airportCodes = spark. \
        read. \
        option("sep","\t"). \
        option("header",True). \
        option("inferSchema", True). \
        csv(airportCodesPath). \
        filter("!(State='Hawaii' AND IATA='Big')")
    return airportCodes


In [28]:
airportCodes = getValidAirportCodes(airportCodesPath)

In [29]:
airportCodes.count()

524

In [32]:
airtraffic. \
    join(
        airportCodes,
        on= airtraffic['Origin']==airportCodes['IATA'],
        how='left'
    ). \
    filter("IATA IS NULL"). \
    select(airtraffic['Year'], airtraffic['Month'],airtraffic['DayOfMonth'],
           airtraffic['Origin'], airtraffic['Dest'], airtraffic['CRSDepTime'],
           airportCodes['*']
    ). \
    show()

+----+-----+----------+------+----+----------+----+-----+-------+----+
|Year|Month|DayOfMonth|Origin|Dest|CRSDepTime|City|State|Country|IATA|
+----+-----+----------+------+----+----------+----+-----+-------+----+
|2008|    1|         8|   HDN| DEN|      1403|null| null|   null|null|
|2008|    1|        26|   HDN| DEN|      1533|null| null|   null|null|
|2008|    1|        26|   SJU| CLT|      1520|null| null|   null|null|
|2008|    1|        19|   SJU| ATL|       945|null| null|   null|null|
|2008|    1|        31|   ITO| HNL|      1010|null| null|   null|null|
|2008|    1|        26|   ITO| HNL|      1725|null| null|   null|null|
|2008|    1|        24|   KOA| LAX|      2355|null| null|   null|null|
|2008|    1|        22|   SJU| EWR|      1845|null| null|   null|null|
|2008|    1|        21|   SJU| MIA|      1815|null| null|   null|null|
|2008|    1|        26|   STT| MIA|       845|null| null|   null|null|
|2008|    1|        11|   SJU| MIA|      1235|null| null|   null|null|
|2008|

In [33]:
from pyspark.sql.functions import count, lit, col

In [34]:
airtraffic. \
    join(
        airportCodes,
        on= airtraffic['Origin']==airportCodes['IATA'],
        how='left'
    ). \
    filter("IATA IS NULL"). \
    groupBy(airtraffic['Origin']). \
    agg(count(lit(1)).alias('FlightCount')). \
    show()

+------+-----------+
|Origin|FlightCount|
+------+-----------+
|   HDN|        429|
|   SJU|       1997|
|   ITO|        786|
|   STT|        311|
|   CEC|         88|
|   CDC|         48|
|   PSG|         62|
|   ADK|          9|
|   KOA|       1316|
|   OTZ|         92|
|   BQN|        124|
|   STX|         40|
|   PMD|         57|
|   PSE|        110|
|   SCC|         62|
|   SLE|         54|
+------+-----------+



In [36]:
airtraffic. \
    join(
        airportCodes,
        on= airtraffic['Origin']==airportCodes['IATA'],
        how='left'
    ). \
    filter("IATA IS NULL"). \
    groupBy(airtraffic['Origin']). \
    agg(count(lit(1)).alias('FlightCount')). \
    orderBy(col('FlightCount').desc()). \
    show()

+------+-----------+
|Origin|FlightCount|
+------+-----------+
|   SJU|       1997|
|   KOA|       1316|
|   ITO|        786|
|   HDN|        429|
|   STT|        311|
|   BQN|        124|
|   PSE|        110|
|   OTZ|         92|
|   CEC|         88|
|   SCC|         62|
|   PSG|         62|
|   PMD|         57|
|   SLE|         54|
|   CDC|         48|
|   STX|         40|
|   ADK|          9|
+------+-----------+



In [37]:
## 211 Solution Problem 5 Get count of Flights per Airport without master data

In [38]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession. \
        builder. \
        config('spark.ui.port','0'). \
        config('spark.sql.warehouse.dir',f'/user/{username}/warehouse'). \
        config('spark.shuffle.io.connectionTimeout','6000'). \
        config('spark.driver.memory','6g'). \
        config('spark.executor.memory','6g'). \
        config('spark.dynamicAllocation.minExecutors', '4'). \
        enableHiveSupport(). \
        appName(f'{username} | Section 17 Joining Data Sets'). \
        master('yarn'). \
        getOrCreate()

In [39]:
spark.conf.set("spark.sql.shuffle.partitions", "2")

In [40]:
airtrafficPath = "/public/airlines_all/airlines-part/flightmonth=200801"

In [41]:
airtraffic = spark.read.parquet(airtrafficPath)

In [42]:
airtraffic. \
    select(
        "Year", "Month", "DayOfMonth",
        "Origin","Dest", "CRSDepTime"
    ). \
    show(5)

+----+-----+----------+------+----+----------+
|Year|Month|DayOfMonth|Origin|Dest|CRSDepTime|
+----+-----+----------+------+----+----------+
|2008|    1|        16|   BGR| CVG|      1735|
|2008|    1|        17|   SYR| CVG|      1701|
|2008|    1|        17|   SAV| BOS|      1225|
|2008|    1|        17|   CVG| GRR|      1530|
|2008|    1|        17|   STL| CVG|      1205|
+----+-----+----------+------+----+----------+
only showing top 5 rows



In [43]:
airportCodesPath = "/public/airlines_all/airport-codes"

In [44]:
def getValidAirportCodes(airportCodesPath):
    airportCodes = spark. \
        read. \
        option("sep","\t"). \
        option("header",True). \
        option("inferSchema", True). \
        csv(airportCodesPath). \
        filter("!(State='Hawaii' AND IATA='Big')")
    return airportCodes


In [45]:
airportCodes = getValidAirportCodes(airportCodesPath)

In [46]:
airportCodes.count()

524

In [47]:
airtraffic. \
    join(
        airportCodes,
        on= airtraffic['Origin']==airportCodes['IATA'],
        how='left'
    ). \
    filter("IATA IS NULL"). \
    select(airtraffic['Year'], airtraffic['Month'],airtraffic['DayOfMonth'],
           airtraffic['Origin'], airtraffic['Dest'], airtraffic['CRSDepTime'],
           airportCodes['*']
    ). \
    show()

+----+-----+----------+------+----+----------+----+-----+-------+----+
|Year|Month|DayOfMonth|Origin|Dest|CRSDepTime|City|State|Country|IATA|
+----+-----+----------+------+----+----------+----+-----+-------+----+
|2008|    1|         8|   HDN| DEN|      1403|null| null|   null|null|
|2008|    1|        26|   HDN| DEN|      1533|null| null|   null|null|
|2008|    1|        26|   SJU| CLT|      1520|null| null|   null|null|
|2008|    1|        19|   SJU| ATL|       945|null| null|   null|null|
|2008|    1|        31|   ITO| HNL|      1010|null| null|   null|null|
|2008|    1|        26|   ITO| HNL|      1725|null| null|   null|null|
|2008|    1|        24|   KOA| LAX|      2355|null| null|   null|null|
|2008|    1|        22|   SJU| EWR|      1845|null| null|   null|null|
|2008|    1|        21|   SJU| MIA|      1815|null| null|   null|null|
|2008|    1|        26|   STT| MIA|       845|null| null|   null|null|
|2008|    1|        11|   SJU| MIA|      1235|null| null|   null|null|
|2008|

In [50]:
airtraffic. \
    join(
        airportCodes,
        on= airtraffic['Origin']==airportCodes['IATA'],
        how='left'
    ). \
    filter("IATA IS NULL"). \
    count()

5585

In [51]:
## 212 Solution Problem 7 - Get Daily Revenue using Spark DF APIs

In [52]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession. \
        builder. \
        config('spark.ui.port','0'). \
        config('spark.sql.warehouse.dir',f'/user/{username}/warehouse'). \
        config('spark.shuffle.io.connectionTimeout','6000'). \
        config('spark.driver.memory','6g'). \
        config('spark.executor.memory','6g'). \
        config('spark.dynamicAllocation.minExecutors', '4'). \
        enableHiveSupport(). \
        appName(f'{username} | Section 17 Joining Data Sets'). \
        master('yarn'). \
        getOrCreate()

In [53]:
spark.conf.set("spark.sql.shuffle.partitions", "2")

In [54]:
orders = spark.read.json("/public/retail_db_json/orders")

In [55]:
order_items = spark.read.json("/public/retail_db_json/order_items")

In [56]:
orders.printSchema()

root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [57]:
order_items.printSchema()

root
 |-- order_item_id: long (nullable = true)
 |-- order_item_order_id: long (nullable = true)
 |-- order_item_product_id: long (nullable = true)
 |-- order_item_product_price: double (nullable = true)
 |-- order_item_quantity: long (nullable = true)
 |-- order_item_subtotal: double (nullable = true)



In [58]:
orders.count()

68883

In [59]:
orders_filtered = orders.filter("order_status IN ('COMPLETE','CLOSED')")

In [62]:
orders_filtered.count()

30455

In [63]:
order_items.count()

172198

In [72]:
from pyspark.sql.functions import sum, round

In [76]:
orders_filtered. \
    join(
        order_items,
        on= orders_filtered.order_id==order_items.order_item_order_id,
        how ='inner'
    ). \
    groupBy(orders_filtered["order_date"]). \
    agg(
        round(sum(order_items["order_item_subtotal"]),2).alias("revenue")
    ). \
    orderBy(orders_filtered["order_date"]). \
    show()

+--------------------+--------+
|          order_date| revenue|
+--------------------+--------+
|2013-07-25 00:00:...|31547.23|
|2013-07-26 00:00:...|54713.23|
|2013-07-27 00:00:...|48411.48|
|2013-07-28 00:00:...|35672.03|
|2013-07-29 00:00:...| 54579.7|
|2013-07-30 00:00:...|49329.29|
|2013-07-31 00:00:...|59212.49|
|2013-08-01 00:00:...|49160.08|
|2013-08-02 00:00:...|50688.58|
|2013-08-03 00:00:...|43416.74|
|2013-08-04 00:00:...|35093.01|
|2013-08-05 00:00:...|34025.27|
|2013-08-06 00:00:...|57843.89|
|2013-08-07 00:00:...|45525.59|
|2013-08-08 00:00:...|33549.47|
|2013-08-09 00:00:...|29225.16|
|2013-08-10 00:00:...|46435.04|
|2013-08-11 00:00:...| 31155.5|
|2013-08-12 00:00:...|59014.74|
|2013-08-13 00:00:...|17956.88|
+--------------------+--------+
only showing top 20 rows



In [77]:
orders_filtered. \
    join(
        order_items,
        on= orders_filtered.order_id==order_items.order_item_order_id,
        how ='inner'
    ). \
    groupBy(orders_filtered["order_date"]). \
    agg(
        round(sum(order_items["order_item_subtotal"]),2).alias("revenue")
    ). \
    orderBy(orders_filtered["order_date"]). \
    count()

364

In [78]:
## 213 - Solution Problem 8 - Get Daily Revenue Rolled up till Yearly using Spari DF APIs

In [1]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession. \
        builder. \
        config('spark.ui.port','0'). \
        config('spark.sql.warehouse.dir',f'/user/{username}/warehouse'). \
        config('spark.shuffle.io.connectionTimeout','6000'). \
        config('spark.driver.memory','6g'). \
        config('spark.executor.memory','6g'). \
        config('spark.dynamicAllocation.minExecutors', '4'). \
        enableHiveSupport(). \
        appName(f'{username} | Section 17 Joining Data Sets'). \
        master('yarn'). \
        getOrCreate()

In [2]:
spark.conf.set("spark.sql.shuffle.partitions", "2")

In [3]:
orders = spark.read.json("/public/retail_db_json/orders")

In [4]:
order_items = spark.read.json("/public/retail_db_json/order_items")

In [5]:
orders.printSchema()

root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [6]:
order_items.printSchema()

root
 |-- order_item_id: long (nullable = true)
 |-- order_item_order_id: long (nullable = true)
 |-- order_item_product_id: long (nullable = true)
 |-- order_item_product_price: double (nullable = true)
 |-- order_item_quantity: long (nullable = true)
 |-- order_item_subtotal: double (nullable = true)



In [8]:
orders.count()

68883

In [9]:
orders_filtered = orders.filter("order_status IN ('COMPLETE','CLOSED')")

In [10]:
orders_filtered.count()

30455

In [11]:
order_items.count()

172198

In [12]:
from pyspark.sql.functions import sum, round

In [13]:
orders_join = orders_filtered. \
    join(order_items, orders_filtered['order_id']==order_items['order_item_order_id'],'inner')

In [14]:
from pyspark.sql.functions import col, countDistinct, sum, round, date_format, year

In [15]:
revenue = orders_join. \
    rollup(
        year('order_date').alias('order_year'),
        date_format(col('order_date'),'yyyyMM').alias('order_month'),
        'order_date'
    ). \
    agg(
        round(sum(order_items['order_item_subtotal']),2).alias('revenue')
    ). \
    show()

+----------+-----------+--------------------+--------+
|order_year|order_month|          order_date| revenue|
+----------+-----------+--------------------+--------+
|      2013|     201307|2013-07-27 00:00:...|48411.48|
|      2013|     201307|2013-07-28 00:00:...|35672.03|
|      2013|     201307|2013-07-29 00:00:...| 54579.7|
|      2013|     201307|2013-07-30 00:00:...|49329.29|
|      2013|     201308|2013-08-01 00:00:...|49160.08|
|      2013|     201308|2013-08-02 00:00:...|50688.58|
|      2013|     201308|2013-08-04 00:00:...|35093.01|
|      2013|     201308|2013-08-06 00:00:...|57843.89|
|      2013|     201308|2013-08-09 00:00:...|29225.16|
|      2013|     201308|2013-08-11 00:00:...| 31155.5|
|      2013|     201308|2013-08-15 00:00:...|49566.68|
|      2013|     201308|2013-08-17 00:00:...|63226.83|
|      2013|     201308|2013-08-22 00:00:...|38190.02|
|      2013|     201308|2013-08-24 00:00:...|52650.15|
|      2013|     201308|2013-08-26 00:00:...| 38548.4|
|      201

In [18]:
revenue = orders_join. \
    rollup(
        year('order_date').alias('order_year'),
        date_format(col('order_date'),'yyyyMM').alias('order_month'),
        'order_date'
    ). \
    agg(
        countDistinct('order_id').alias('order_count'),
        round(sum(order_items['order_item_subtotal']),2).alias('revenue')
    ). \
    orderBy(col('order_year').desc(), col('order_month').desc(),col('order_date').desc()). \
    show()

+----------+-----------+--------------------+-----------+--------+
|order_year|order_month|          order_date|order_count| revenue|
+----------+-----------+--------------------+-----------+--------+
|      2014|     201407|2014-07-24 00:00:...|         81|50885.19|
|      2014|     201407|2014-07-23 00:00:...|         58|38795.23|
|      2014|     201407|2014-07-22 00:00:...|         60|36717.24|
|      2014|     201407|2014-07-21 00:00:...|         83| 51427.7|
|      2014|     201407|2014-07-20 00:00:...|        105|60047.45|
|      2014|     201407|2014-07-19 00:00:...|         71|38420.99|
|      2014|     201407|2014-07-18 00:00:...|         72| 43856.6|
|      2014|     201407|2014-07-17 00:00:...|         64|36384.77|
|      2014|     201407|2014-07-16 00:00:...|         67|43011.92|
|      2014|     201407|2014-07-15 00:00:...|        102|53480.23|
|      2014|     201407|2014-07-14 00:00:...|         58|29937.52|
|      2014|     201407|2014-07-13 00:00:...|         74|40410

In [19]:
revenue = orders_join. \
    rollup(
        year('order_date').alias('order_year'),
        date_format(col('order_date'),'yyyyMM').alias('order_month'),
        'order_date'
    ). \
    agg(
        countDistinct('order_id').alias('order_count'),
        round(sum(order_items['order_item_subtotal']),2).alias('revenue')
    ). \
    orderBy('order_year','order_month','order_date'). \
    show()

+----------+-----------+--------------------+-----------+-------------+
|order_year|order_month|          order_date|order_count|      revenue|
+----------+-----------+--------------------+-----------+-------------+
|      null|       null|                null|      25266|1.501298248E7|
|      2013|       null|                null|      11266|    6686892.0|
|      2013|     201307|                null|        564|    333465.45|
|      2013|     201307|2013-07-25 00:00:...|         51|     31547.23|
|      2013|     201307|2013-07-26 00:00:...|         99|     54713.23|
|      2013|     201307|2013-07-27 00:00:...|         80|     48411.48|
|      2013|     201307|2013-07-28 00:00:...|         67|     35672.03|
|      2013|     201307|2013-07-29 00:00:...|         90|      54579.7|
|      2013|     201307|2013-07-30 00:00:...|         90|     49329.29|
|      2013|     201307|2013-07-31 00:00:...|         87|     59212.49|
|      2013|     201308|                null|       2073|    122

In [21]:
revenue = orders_join. \
    rollup(
        year('order_date').alias('order_year'),
        date_format(col('order_date'),'yyyyMM').alias('order_month'),
        'order_date'
    ). \
    agg(
        countDistinct('order_id').alias('order_count'),
        round(sum(order_items['order_item_subtotal']),2).alias('revenue')
    ). \
    orderBy(
        col('order_year').asc_nulls_last(),
        col('order_month').asc_nulls_last(),
        col('order_date').asc_nulls_last()). \
    show()

+----------+-----------+--------------------+-----------+---------+
|order_year|order_month|          order_date|order_count|  revenue|
+----------+-----------+--------------------+-----------+---------+
|      2013|     201307|2013-07-25 00:00:...|         51| 31547.23|
|      2013|     201307|2013-07-26 00:00:...|         99| 54713.23|
|      2013|     201307|2013-07-27 00:00:...|         80| 48411.48|
|      2013|     201307|2013-07-28 00:00:...|         67| 35672.03|
|      2013|     201307|2013-07-29 00:00:...|         90|  54579.7|
|      2013|     201307|2013-07-30 00:00:...|         90| 49329.29|
|      2013|     201307|2013-07-31 00:00:...|         87| 59212.49|
|      2013|     201307|                null|        564|333465.45|
|      2013|     201308|2013-08-01 00:00:...|         82| 49160.08|
|      2013|     201308|2013-08-02 00:00:...|         90| 50688.58|
|      2013|     201308|2013-08-03 00:00:...|         72| 43416.74|
|      2013|     201308|2013-08-04 00:00:...|   