In [0]:
orders_df = spark. \
    read. \
    csv(
        'dbfs:/public/retail_db/orders',
        schema='order_id INT, order_date DATE, order_customer_id INT, order_status STRING'
    )

In [0]:
order_items_df = spark. \
    read. \
    csv(
        'dbfs:/public/retail_db/order_items',
        schema='''
            order_item_id INT, order_item_order_id INT, order_item_product_id INT, 
            order_item_quantity INT, order_item_subtotal FLOAT, order_item_product_price FLOAT
        '''
    )

In [0]:
from pyspark.sql.functions import sum, round, col

In [0]:
daily_revenue_df = orders_df. \
    filter("order_status IN ('COMPLETE', 'CLOSED')"). \
    join(order_items_df, orders_df['order_id'] == order_items_df['order_item_order_id']). \
    groupBy('order_date'). \
    agg(round(sum('order_item_subtotal'), 2).alias('revenue'))

In [0]:
display(daily_revenue_df.orderBy('order_date'))

order_date,revenue
2013-07-25,31547.23
2013-07-26,54713.23
2013-07-27,48411.48
2013-07-28,35672.03
2013-07-29,54579.7
2013-07-30,49329.29
2013-07-31,59212.49
2013-08-01,49160.08
2013-08-02,50688.58
2013-08-03,43416.74


In [0]:
daily_product_revenue_df = orders_df. \
    filter("order_status IN ('COMPLETE', 'CLOSED')"). \
    join(order_items_df, orders_df['order_id'] == order_items_df['order_item_order_id']). \
    groupBy(orders_df['order_date'], order_items_df['order_item_product_id']). \
    agg(round(sum('order_item_subtotal'), 2).alias('revenue'))

In [0]:
display(daily_product_revenue_df.orderBy('order_date', col('revenue').desc()))

order_date,order_item_product_id,revenue
2013-07-25,1004,5599.72
2013-07-25,191,5099.49
2013-07-25,957,4499.7
2013-07-25,365,3359.44
2013-07-25,1073,2999.85
2013-07-25,1014,2798.88
2013-07-25,403,1949.85
2013-07-25,502,1650.0
2013-07-25,627,1079.73
2013-07-25,226,599.99


In [0]:
daily_product_revenue_df.count()

Out[8]: 9120

In [0]:
# Global Ranks - orderBy
# Ranks with in each partition or group - partitionBy and orderBy
# df.select(rank().over(Window.orderBy(col('revenue').desc())))
# df.select(rank().over(Window.partitionBy('order_date').orderBy(col('revenue').desc())))

In [0]:
display(daily_product_revenue_df.filter("order_date = '2014-01-01'").orderBy('order_date', col('revenue').desc()))

order_date,order_item_product_id,revenue
2014-01-01,1004,5599.72
2014-01-01,191,4399.56
2014-01-01,365,3839.36
2014-01-01,502,3300.0
2014-01-01,957,3299.78
2014-01-01,1073,2199.89
2014-01-01,1014,1999.2
2014-01-01,403,1819.86
2014-01-01,627,759.81
2014-01-01,724,500.0


In [0]:
from pyspark.sql.functions import dense_rank, col

In [0]:
from pyspark.sql.window import Window

In [0]:
display(
    daily_product_revenue_df. \
        filter("order_date = '2014-01-01'"). \
        withColumn('drnk', dense_rank().over(Window.orderBy(col('revenue').desc()))). \
        orderBy('order_date', col('revenue').desc())
)

order_date,order_item_product_id,revenue,drnk
2014-01-01,1004,5599.72,1
2014-01-01,191,4399.56,2
2014-01-01,365,3839.36,3
2014-01-01,502,3300.0,4
2014-01-01,957,3299.78,5
2014-01-01,1073,2199.89,6
2014-01-01,1014,1999.2,7
2014-01-01,403,1819.86,8
2014-01-01,627,759.81,9
2014-01-01,724,500.0,10


In [0]:
display(
    daily_product_revenue_df. \
        filter("order_date = '2014-01-01'"). \
        withColumn('drnk', dense_rank().over(Window.orderBy(col('revenue').desc()))). \
        filter("drnk <= 5"). \
        orderBy('order_date', col('revenue').desc())
)

order_date,order_item_product_id,revenue,drnk
2014-01-01,1004,5599.72,1
2014-01-01,191,4399.56,2
2014-01-01,365,3839.36,3
2014-01-01,502,3300.0,4
2014-01-01,957,3299.78,5


In [0]:
from pyspark.sql.functions import date_format, sum, round

In [0]:
display(
    daily_product_revenue_df. \
        filter("date_format(order_date, 'yyyyMM') = 201401"). \
        groupBy(date_format('order_date', 'yyyyMM').alias('order_month'), 'order_item_product_id'). \
        agg(round(sum('revenue'), 2).alias('revenue')). \
        withColumn('drnk', dense_rank().over(Window.orderBy(col('revenue').desc()))). \
        filter('drnk <= 5')
)

order_month,order_item_product_id,revenue,drnk
201401,1004,250787.46,1
201401,365,151474.75,2
201401,957,148190.12,3
201401,191,132286.77,4
201401,502,114800.0,5


In [0]:
spec = Window.partitionBy('order_date').orderBy(col('revenue').desc())

In [0]:
spec

Out[22]: <pyspark.sql.window.WindowSpec at 0x7f5b3aafc3a0>

In [0]:
display(
    daily_product_revenue_df. \
        filter("date_format(order_date, 'yyyyMM') = 201401"). \
        withColumn('drnk', dense_rank().over(spec)). \
        orderBy('order_date', col('revenue').desc())
)

order_date,order_item_product_id,revenue,drnk
2014-01-01,1004,5599.72,1
2014-01-01,191,4399.56,2
2014-01-01,365,3839.36,3
2014-01-01,502,3300.0,4
2014-01-01,957,3299.78,5
2014-01-01,1073,2199.89,6
2014-01-01,1014,1999.2,7
2014-01-01,403,1819.86,8
2014-01-01,627,759.81,9
2014-01-01,724,500.0,10


In [0]:
display(
    daily_product_revenue_df. \
        filter("date_format(order_date, 'yyyyMM') = 201401"). \
        withColumn('drnk', dense_rank().over(spec)). \
        filter('drnk <= 3'). \
        orderBy('order_date', col('revenue').desc())
)

order_date,order_item_product_id,revenue,drnk
2014-01-01,1004,5599.72,1
2014-01-01,191,4399.56,2
2014-01-01,365,3839.36,3
2014-01-02,1004,4799.76,1
2014-01-02,957,2999.8,2
2014-01-02,365,2939.51,3
2014-01-03,1004,11599.42,1
2014-01-03,365,6958.84,2
2014-01-03,191,5599.44,3
2014-01-04,1004,7599.62,1


In [0]:
display(
    daily_product_revenue_df. \
        filter("order_date BETWEEN '2014-01-01' AND '2014-03-31'"). \
        withColumn('drnk', dense_rank().over(spec)). \
        filter('drnk <= 3'). \
        orderBy('order_date', col('revenue').desc())
)

order_date,order_item_product_id,revenue,drnk
2014-01-01,1004,5599.72,1
2014-01-01,191,4399.56,2
2014-01-01,365,3839.36,3
2014-01-02,1004,4799.76,1
2014-01-02,957,2999.8,2
2014-01-02,365,2939.51,3
2014-01-03,1004,11599.42,1
2014-01-03,365,6958.84,2
2014-01-03,191,5599.44,3
2014-01-04,1004,7599.62,1


In [0]:
%sql

CREATE TABLE IF NOT EXISTS student_scores (
	student_id INT,
	student_score INT
);

In [0]:
%sql

INSERT OVERWRITE student_scores VALUES
(1, 980),
(2, 960),
(3, 960),
(4, 990),
(5, 920),
(6, 960),
(7, 980),
(8, 960),
(9, 940),
(10, 940);

num_affected_rows,num_inserted_rows
10,10


In [0]:
display(
    spark.read.table('student_scores').
        orderBy(col('student_score').desc())
)

student_id,student_score
4,990
1,980
7,980
2,960
3,960
6,960
8,960
9,940
10,940
5,920


In [0]:
spec = Window.orderBy(col('student_score').desc())

In [0]:
student_scores = spark.read.table('student_scores')

In [0]:
from pyspark.sql.functions import rank, dense_rank

In [0]:
display(
    student_scores. \
        withColumn('rnk', rank().over(spec)). \
        withColumn('drnk', dense_rank().over(spec)). \
        orderBy(col('student_score').desc())
)

student_id,student_score,rnk,drnk
4,990,1,1
1,980,2,2
7,980,2,2
2,960,4,3
3,960,4,3
6,960,4,3
8,960,4,3
9,940,8,4
10,940,8,4
5,920,10,5
