In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.types import * 
from pyspark.sql.functions import *

spark = SparkSession.builder.appName('Spark_SQL').config('spark.sql.shuffle.partitions',2).getOrCreate()
#read the csv file in dataframe
OrderSchema = StructType([StructField("order_id", IntegerType()),StructField("order_date", StringType()), StructField("customer_id", IntegerType()),StructField("order_status", StringType())])
orderDF = spark.read.csv(path='/home/hduser/Documents/RddTransformations/Data/orders.csv', sep=",", schema=OrderSchema)

orderItemschema = StructType([StructField("order_item_id", IntegerType()),StructField("order_item_order_id", IntegerType()),StructField("order_item_product_id", IntegerType()),StructField("order_item_quantity", IntegerType()),StructField("order_item_subtotal", FloatType()), StructField("order_item_product_price", FloatType())])
orderItemsDF = spark.read.csv(path='/home/hduser/Documents/RddTransformations/Data/order_items.csv', sep=",", schema=orderItemschema)

In [41]:
orderDF.registerTempTable('orders')
orderItemsDF.registerTempTable('order_items')

#to Drop temp table
#spark.catalog.dropTempView('order')

In [42]:
#spark.sql whatever it executes it returns DataFrame
spark.sql('show tables').show()

+--------+-----------+-----------+
|database|  tableName|isTemporary|
+--------+-----------+-----------+
|        |order_items|       true|
|        |     orders|       true|
+--------+-----------+-----------+



In [56]:
spark.sql('select * from orders').show(5)

+--------+-------------------+-----------+---------------+
|order_id|         order_date|customer_id|   order_status|
+--------+-------------------+-----------+---------------+
|       1|2013-07-25 00:00:00|      11599|         CLOSED|
|       2|2013-07-25 00:00:00|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|      12111|       COMPLETE|
|       4|2013-07-25 00:00:00|       8827|         CLOSED|
|       5|2013-07-25 00:00:00|      11318|       COMPLETE|
+--------+-------------------+-----------+---------------+
only showing top 5 rows



In [44]:
#get the order_items whose order_status is complete or closed
#1. using in operator
 spark.sql('select * from order_items where order_item_order_id in \
 (select order_id from orders where order_status in ("COMPLETE","CLOSED"))').show(5)
#2 Using Join 
#spark.sql('select * from order_items join orders on \
#order_items.order_item_order_id = orders.order_id where orders.order_status in ("COMPLETE","CLOSED")').show()

#3 Using ASCII standard sql code (suggested)
#spark.sql('select * from order_items, orders where order_items.order_item_order_id = orders.order_id \
#and orders.order_status in ("COMPLETE","CLOSED")').show()

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|            1|                  1|                  957|                  1|             299.98|                  299.98|
|            5|                  4|                  897|                  2|              49.98|                   24.99|
|            6|                  4|                  365|                  5|             299.95|                   59.99|
|            7|                  4|                  502|                  3|              150.0|                    50.0|
|            8|                  4|                 1014|                  4|             199.92|                   49.98|
+-------------+-

In [45]:
#show all the orders whose order_id  is not in order_items
#spark.sql('select * from orders where order_id not in (select order_item_order_id from order_items)').count()
#11452
#using Join
spark.sql('select * from orders left outer join order_items on\
    orders.order_id = order_items.order_item_order_id where order_items.order_item_order_id is null').count()
#11452

11452

In [53]:
#find the revenue per day per product item
spark.sql('select order_date, order_item_product_id,sum(order_item_subtotal) from orders join order_items on \
          orders.order_id = order_items.order_item_order_id group by order_date,order_item_product_id').show(5)

+-------------------+---------------------+------------------------+
|         order_date|order_item_product_id|sum(order_item_subtotal)|
+-------------------+---------------------+------------------------+
|2013-07-25 00:00:00|                 1014|       6397.439922332764|
|2013-07-25 00:00:00|                  926|       79.94999694824219|
|2013-07-25 00:00:00|                  191|        8499.15005493164|
|2013-07-25 00:00:00|                  134|                   100.0|
|2013-07-25 00:00:00|                  276|       255.9199981689453|
+-------------------+---------------------+------------------------+
only showing top 5 rows



In [47]:
#spark.sql('select order_date, order_item_product_id,sum(order_item_subtotal) from orders join order_items on \
#          orders.order_id = order_items.order_item_order_id group by order_date,order_item_product_id')\
#          .write.csv('/home/hduser/Documents/RddTransformations/sparksql/')

In [54]:
#using group by functions. We can have those columns in select by clause which we are doing group by.
spark.sql('select order_item_order_id, sum(order_item_subtotal) from order_items\
           group by order_item_order_id').show(5)

+-------------------+------------------------+
|order_item_order_id|sum(order_item_subtotal)|
+-------------------+------------------------+
|                  2|       579.9800109863281|
|                  4|       699.8500099182129|
|                  5|      1129.8600387573242|
|                 10|        651.920015335083|
|                 12|      1299.8700256347656|
+-------------------+------------------------+
only showing top 5 rows



In [64]:
#if you need more columns you need to self join and in this situation partition by comes
spark.sql('select ot.order_item_order_id,ot.order_item_product_id ,ot.order_item_subtotal,o.revenue \
           from order_items ot join \
          (select order_item_order_id, sum(order_item_subtotal)as revenue \
          from order_items group by order_item_order_id)o\
          on ot.order_item_order_id = o.order_item_order_id').show(5)

+-------------------+---------------------+-------------------+-----------------+
|order_item_order_id|order_item_product_id|order_item_subtotal|          revenue|
+-------------------+---------------------+-------------------+-----------------+
|                  1|                  957|             299.98|299.9800109863281|
|                  2|                 1073|             199.99|579.9800109863281|
|                  2|                  502|              250.0|579.9800109863281|
|                  2|                  403|             129.99|579.9800109863281|
|                  4|                  897|              49.98|699.8500099182129|
+-------------------+---------------------+-------------------+-----------------+
only showing top 5 rows



In [65]:
#using partition By we can get multiple columns value as well. Not needed to join and exeution is also fast
#get the total revenue as per product id.
spark.sql('select order_item_order_id,order_item_product_id ,order_item_subtotal ,\
sum(order_item_subtotal) over (partition by order_item_order_id)as revenue from order_items').show(5)

+-------------------+---------------------+-------------------+-----------------+
|order_item_order_id|order_item_product_id|order_item_subtotal|          revenue|
+-------------------+---------------------+-------------------+-----------------+
|                  2|                 1073|             199.99|579.9800109863281|
|                  2|                  502|              250.0|579.9800109863281|
|                  2|                  403|             129.99|579.9800109863281|
|                  4|                  897|              49.98|699.8500099182129|
|                  4|                  365|             299.95|699.8500099182129|
+-------------------+---------------------+-------------------+-----------------+
only showing top 5 rows



In [76]:
#get the % of each product id
spark.sql('select order_item_order_id,order_item_product_id ,order_item_subtotal ,\
sum(order_item_subtotal) over (partition by order_item_order_id)as revenue,\
round(order_item_subtotal/(sum(order_item_subtotal) over (partition by order_item_order_id))* 100 ,2)\
as Percentage from order_items').show(5)

+-------------------+---------------------+-------------------+-----------------+----------+
|order_item_order_id|order_item_product_id|order_item_subtotal|          revenue|Percentage|
+-------------------+---------------------+-------------------+-----------------+----------+
|                  2|                 1073|             199.99|579.9800109863281|     34.48|
|                  2|                  502|              250.0|579.9800109863281|      43.1|
|                  2|                  403|             129.99|579.9800109863281|     22.41|
|                  4|                  897|              49.98|699.8500099182129|      7.14|
|                  4|                  365|             299.95|699.8500099182129|     42.86|
+-------------------+---------------------+-------------------+-----------------+----------+
only showing top 5 rows



In [83]:
#Rank the records based on order_item_subtotal of each order_id.
#these analytical functions can be used in select clause.
#execution of sql happens like
#from --> where and/or join --> group by --> having --> select --> order by
spark.sql('select order_item_order_id,\
order_item_product_id,\
order_item_subtotal,\
sum(order_item_subtotal) over(partition by order_item_order_id) as revenue,\
round(order_item_subtotal/sum(order_item_subtotal) over(partition by order_item_order_id), 2)as Percentage, \
rank() over (partition by order_item_order_id order by order_item_subtotal) as rank \
from order_items').show(7)

+-------------------+---------------------+-------------------+-----------------+----------+----+
|order_item_order_id|order_item_product_id|order_item_subtotal|          revenue|Percentage|rank|
+-------------------+---------------------+-------------------+-----------------+----------+----+
|                  2|                  403|             129.99|579.9800109863281|      0.22|   1|
|                  2|                 1073|             199.99|579.9800109863281|      0.34|   2|
|                  2|                  502|              250.0|579.9800109863281|      0.43|   3|
|                  4|                  897|              49.98|699.8500099182129|      0.07|   1|
|                  4|                  502|              150.0|699.8500099182129|      0.21|   2|
|                  4|                 1014|             199.92|699.8500099182129|      0.29|   3|
|                  4|                  365|             299.95|699.8500099182129|      0.43|   4|
+-------------------

In [105]:
#get top 2 items of each order_id
spark.sql('select  *from (select \
order_item_order_id, \
order_item_subtotal, \
round(sum(order_item_subtotal) over (partition by order_item_order_id), 2) as revenue, \
rank() over(partition by order_item_order_id order by order_item_subtotal) as ranking \
from order_items ) where ranking < 3').show()

+-------------------+-------------------+-------+-------+
|order_item_order_id|order_item_subtotal|revenue|ranking|
+-------------------+-------------------+-------+-------+
|                  2|             129.99| 579.98|      1|
|                  2|             199.99| 579.98|      2|
|                  4|              49.98| 699.85|      1|
|                  4|              150.0| 699.85|      2|
|                  5|              99.96|1129.86|      1|
|                  5|             129.99|1129.86|      2|
|                 10|              21.99| 651.92|      1|
|                 10|              99.96| 651.92|      2|
|                 12|              100.0|1299.87|      1|
|                 12|             149.94|1299.87|      2|
|                 13|             127.96| 127.96|      1|
|                 14|               50.0| 549.94|      1|
|                 14|              99.96| 549.94|      2|
|                 18|             119.98| 449.96|      1|
|             

In [100]:
# we use lead() and lag() to get the next or previous item for the column specified in the functions.
spark.sql('select order_item_order_id,\
sum(order_item_subtotal) over(partition by order_item_order_id) as revenue,\
order_item_subtotal,\
lead(order_item_subtotal) over (partition by order_item_order_id order by order_item_subtotal desc) as leading \
from order_items').show(5)

+-------------------+-----------------+-------------------+-------+
|order_item_order_id|          revenue|order_item_subtotal|leading|
+-------------------+-----------------+-------------------+-------+
|                  2|579.9800109863281|              250.0| 199.99|
|                  2|579.9800109863281|             199.99| 129.99|
|                  2|579.9800109863281|             129.99|   null|
|                  4|699.8500099182129|             299.95| 199.92|
|                  4|699.8500099182129|             199.92|  150.0|
+-------------------+-----------------+-------------------+-------+
only showing top 5 rows



In [92]:
#using lagging 
spark.sql('select order_item_order_id,\
round(sum(order_item_subtotal) over(partition by order_item_order_id), 2) as revenue,\
order_item_subtotal,\
lag(order_item_subtotal) over (partition by order_item_order_id order by order_item_subtotal desc) as lagging \
from order_items').show()

+-------------------+-------+-------------------+-------+
|order_item_order_id|revenue|order_item_subtotal|lagging|
+-------------------+-------+-------------------+-------+
|                  2| 579.98|              250.0|   null|
|                  2| 579.98|             199.99|  250.0|
|                  2| 579.98|             129.99| 199.99|
|                  4| 699.85|             299.95|   null|
|                  4| 699.85|             199.92| 299.95|
|                  4| 699.85|              150.0| 199.92|
|                  4| 699.85|              49.98|  150.0|
|                  5|1129.86|             299.98|   null|
|                  5|1129.86|             299.98| 299.98|
|                  5|1129.86|             299.95| 299.98|
|                  5|1129.86|             129.99| 299.95|
|                  5|1129.86|              99.96| 129.99|
|                 10| 651.92|             199.99|   null|
|                 10| 651.92|             199.99| 199.99|
|             

In [110]:
spark.sql('select * from order_items').show(5)

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|            1|                  1|                  957|                  1|             299.98|                  299.98|
|            2|                  2|                 1073|                  1|             199.99|                  199.99|
|            3|                  2|                  502|                  5|              250.0|                    50.0|
|            4|                  2|                  403|                  1|             129.99|                  129.99|
|            5|                  4|                  897|                  2|              49.98|                   24.99|
+-------------+-

In [115]:
#get Daily top N product per day.
spark.sql('select orders.order_id,\
order_items.order_item_subtotal, \
orders.order_date, \
sum (order_items.order_item_subtotal) over (partition by orders.order_date) as Dailyrevenue,\
rank() over (partition by order_items.order_item_product_id order by order_items.order_item_subtotal) as rank \
from orders join order_items on orders.order_id = order_items.order_item_order_id').show()

+--------+-------------------+-------------------+------------------+----+
|order_id|order_item_subtotal|         order_date|      Dailyrevenue|rank|
+--------+-------------------+-------------------+------------------+----+
|    7507|              34.99|2013-09-09 00:00:00|124454.49238586426|   1|
|    9837|              34.99|2013-09-24 00:00:00|126898.56241798401|   1|
|    9866|              34.99|2013-09-24 00:00:00|126898.56241798401|   1|
|    9985|              34.99|2013-09-25 00:00:00|141775.64274215698|   1|
|   14074|              34.99|2013-10-21 00:00:00| 53567.91116333008|   1|
|   14120|              34.99|2013-10-21 00:00:00| 53567.91116333008|   1|
|   21855|              34.99|2013-12-06 00:00:00|127911.60246276855|   1|
|   22380|              34.99|2013-12-09 00:00:00| 73927.68151283264|   1|
|   32076|              34.99|2014-02-08 00:00:00| 82435.65173149109|   1|
|   33848|              34.99|2014-02-18 00:00:00|104887.89197158813|   1|
|   35570|              3