1. select, filter or where -> narrow transformations
2. groupBy, orderBy or sort -> wide transformations (shuffling)
3. join (broadcast) -> narrow transformation
4. join (reduce side) -> wide transformation (shuffling)

In [0]:
[
    {'user_id': 1, 'user_fname': 'Michael', 'user_lname': 'Scott'},
    {'user_id': 2, 'user_fname': 'Dwight', 'user_lname': 'Schrute'},
    {'user_id': 3, 'user_fname': 'Jim', 'user_lname': 'Halpert'}
]

[{'user_id': 1, 'user_fname': 'Michael', 'user_lname': 'Scott'},
 {'user_id': 2, 'user_fname': 'Dwight', 'user_lname': 'Schrute'},
 {'user_id': 3, 'user_fname': 'Jim', 'user_lname': 'Halpert'}]

In [0]:
orders_df = spark.read.csv('dbfs:/public/retail_db/orders', schema='order_id INT, order_date DATE, order_customer_id INT, order_status STRING')

In [0]:
display(orders_df)
# Row level transformation --> last 4 digits, string manipulation (SELECT)

order_id,order_date,order_customer_id,order_status
1,2013-07-25,11599,CLOSED
2,2013-07-25,256,PENDING_PAYMENT
3,2013-07-25,12111,COMPLETE
4,2013-07-25,8827,CLOSED
5,2013-07-25,11318,COMPLETE
6,2013-07-25,7130,COMPLETE
7,2013-07-25,4530,COMPLETE
8,2013-07-25,2911,PROCESSING
9,2013-07-25,5657,PENDING_PAYMENT
10,2013-07-25,5648,PENDING_PAYMENT


In [0]:
from pyspark.sql.functions import date_format, cast, count, col

In [0]:
display(
    orders_df. \
        withColumn('order_month', date_format('order_date', 'yyyyMM').cast('int')).printSchema()
)

root
 |-- order_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_month: integer (nullable = true)



In [0]:
display(orders_df.select('order_status').distinct())

order_status
PENDING_PAYMENT
COMPLETE
ON_HOLD
PAYMENT_REVIEW
PROCESSING
CLOSED
SUSPECTED_FRAUD
PENDING
CANCELED


In [0]:
display(orders_df.filter("order_status = 'COMPLETE'").count())

22899

In [0]:
display(orders_df.filter("date_format(order_date, 'yyyyMM') = 201401 AND order_status IN ('COMPLETE', 'CLOSED')").count())

2544

In [0]:
display(
    orders_df. \
    groupBy('order_status'). \
    agg(count('order_id').alias('order_count')). \
    orderBy(col('order_count').desc())
)

order_status,order_count
COMPLETE,22899
PENDING_PAYMENT,15030
PROCESSING,8275
PENDING,7610
CLOSED,7556
ON_HOLD,3798
SUSPECTED_FRAUD,1558
CANCELED,1428
PAYMENT_REVIEW,729


In [0]:
%fs ls dbfs:/public/retail_db/order_items

path,name,size,modificationTime
dbfs:/public/retail_db/order_items/part-00000,part-00000,5408880,1713343652000


In [0]:
order_items_df = spark.read.csv(
    'dbfs:/public/retail_db/order_items',
    schema='''
        order_item_id INT,
        order_item_order_id INT,
        order_item_product_id INT,
        order_item_quantity INT,
        order_item_subtotal FLOAT,
        order_item_product_price FLOAT
    '''
)

In [0]:
display(order_items_df)

order_item_id,order_item_order_id,order_item_product_id,order_item_quantity,order_item_subtotal,order_item_product_price
1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99
5,4,897,2,49.98,24.99
6,4,365,5,299.95,59.99
7,4,502,3,150.0,50.0
8,4,1014,4,199.92,49.98
9,5,957,1,299.98,299.98
10,5,365,5,299.95,59.99


In [0]:
from pyspark.sql.functions import sum, round

In [0]:
display(
    order_items_df. \
        groupBy('order_item_order_id'). \
        agg(round(sum('order_item_subtotal'), 2).alias('order_revenue')). \
        orderBy('order_item_order_id')
)

order_item_order_id,order_revenue
1,299.98
2,579.98
4,699.85
5,1129.86
7,579.92
8,729.84
9,599.96
10,651.92
11,919.79
12,1299.87


In [0]:
display(
    orders_df. \
    withColumn('order_month', date_format('order_date', 'yyyyMM').cast('int')). \
    groupBy('order_month'). \
    agg(count('order_id').alias('order_count')). \
    orderBy('order_count',ascending=False) #alternatively orderBy(col('order_count').desc())
)

order_month,order_count
201311,6381
201401,5908
201312,5892
201309,5841
201403,5778
201308,5680
201404,5657
201402,5635
201405,5467
201310,5335
