### list the files in dbfs

In [0]:
%fs ls /exam_prep/retail_db_json

path,name,size,modificationTime
dbfs:/exam_prep/retail_db_json/categories/,categories/,0,1720271868000
dbfs:/exam_prep/retail_db_json/create_db_tables_pg.sql,create_db_tables_pg.sql,1748,1720271860000
dbfs:/exam_prep/retail_db_json/customers/,customers/,0,1720271858000
dbfs:/exam_prep/retail_db_json/departments/,departments/,0,1720271860000
dbfs:/exam_prep/retail_db_json/order_items/,order_items/,0,1720271860000
dbfs:/exam_prep/retail_db_json/orders/,orders/,0,1720271866000
dbfs:/exam_prep/retail_db_json/products/,products/,0,1720271859000


### load some files from dbfs as dataframes

In [0]:
file_path_order_items = '/exam_prep/retail_db_json/order_items'
order_items = spark.read.json(file_path_order_items)

order_items.show(5)

+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|            1|                  1|                  957|                  299.98|                  1|             299.98|
|            2|                  2|                 1073|                  199.99|                  1|             199.99|
|            3|                  2|                  502|                    50.0|                  5|              250.0|
|            4|                  2|                  403|                  129.99|                  1|             129.99|
|            5|                  4|                  897|                   24.99|                  2|              49.98|
+-------------+-

In [0]:
file_path_orders = '/exam_prep/retail_db_json/orders'
orders = spark.read.json(file_path_orders)

orders.show(5, truncate=0)

+-----------------+---------------------+--------+---------------+
|order_customer_id|order_date           |order_id|order_status   |
+-----------------+---------------------+--------+---------------+
|11599            |2013-07-25 00:00:00.0|1       |CLOSED         |
|256              |2013-07-25 00:00:00.0|2       |PENDING_PAYMENT|
|12111            |2013-07-25 00:00:00.0|3       |COMPLETE       |
|8827             |2013-07-25 00:00:00.0|4       |CLOSED         |
|11318            |2013-07-25 00:00:00.0|5       |COMPLETE       |
+-----------------+---------------------+--------+---------------+
only showing top 5 rows



### import some aggregation functions

In [0]:
from pyspark.sql.functions import count, sum, min, max, avg

In [0]:
orders.groupBy('order_status').agg(count('*').alias('row_count')).show()

+---------------+---------+
|   order_status|row_count|
+---------------+---------+
|PENDING_PAYMENT|    15030|
|       COMPLETE|    22899|
|        ON_HOLD|     3798|
| PAYMENT_REVIEW|      729|
|     PROCESSING|     8275|
|         CLOSED|     7556|
|SUSPECTED_FRAUD|     1558|
|        PENDING|     7610|
|       CANCELED|     1428|
+---------------+---------+



### Filter order_id = 4

In [0]:
order_items.filter('order_item_order_id = 4').show()

+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|            5|                  4|                  897|                   24.99|                  2|              49.98|
|            6|                  4|                  365|                   59.99|                  5|             299.95|
|            7|                  4|                  502|                    50.0|                  3|              150.0|
|            8|                  4|                 1014|                   49.98|                  4|             199.92|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+



### get total aggregates of the column w/o grouping

In [0]:
order_items.filter("order_item_order_id = 4").select(
    sum("order_item_subtotal").alias("sum_total"),
    count("order_item_id").alias("total_items"),
    sum('order_item_quantity').alias('total_quantity')
).show()

+---------+-----------+--------------+
|sum_total|total_items|total_quantity|
+---------+-----------+--------------+
|   699.85|          4|            14|
+---------+-----------+--------------+



### getting count of rows in a dataframe

In [0]:
orders.count()

68883

In [0]:
orders.select(count('*')).show()

+--------+
|count(1)|
+--------+
|   68883|
+--------+



In [0]:
order_items.show(5)

+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|            1|                  1|                  957|                  299.98|                  1|             299.98|
|            2|                  2|                 1073|                  199.99|                  1|             199.99|
|            3|                  2|                  502|                    50.0|                  5|              250.0|
|            4|                  2|                  403|                  129.99|                  1|             129.99|
|            5|                  4|                  897|                   24.99|                  2|              49.98|
+-------------+-

In [0]:
from pyspark.sql.functions import round

In [0]:
order_items.groupBy("order_item_order_id").agg(
    sum("order_item_quantity").alias("total_quantity"),
    round(avg("order_item_quantity"),2).alias("avg_quantity"),
    round(avg(order_items['order_item_product_price']), 2).alias("avg_price")
).show(5)

+-------------------+--------------+------------+---------+
|order_item_order_id|total_quantity|avg_quantity|avg_price|
+-------------------+--------------+------------+---------+
|                 29|             9|         1.8|   181.99|
|                474|            13|         2.6|    74.99|
|                964|            11|        2.75|   124.99|
|               1677|            14|         2.8|    55.59|
|               1806|             8|        2.67|   169.99|
+-------------------+--------------+------------+---------+
only showing top 5 rows



In [0]:
order_items.groupby("order_item_order_id").agg(
    {"order_item_quantity": "sum", 
     "order_item_product_price": "avg"}
).toDF('order_id', 'avg_product_price', 'total_quantity').withColumn('avg_product_price', round('avg_product_price', 2)).show(5)

+--------+-----------------+--------------+
|order_id|avg_product_price|total_quantity|
+--------+-----------------+--------------+
|      29|           181.99|             9|
|     474|            74.99|            13|
|     964|           124.99|            11|
|    1677|            55.59|            14|
|    1806|           169.99|             8|
+--------+-----------------+--------------+
only showing top 5 rows

