In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config('spark.shuffle.useOldFetchProtocol', 'true'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
orders_schema = "order_id long , order_date string, customer_id long,order_status string"

In [3]:
orders_df = spark.read \
.format("csv") \
.schema(orders_schema) \
.load("/public/trendytech/orders/orders_1gb.csv")

In [4]:
orders_df.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
|       6|2013-07-25 00:00:...|       7130|       COMPLETE|
|       7|2013-07-25 00:00:...|       4530|       COMPLETE|
|       8|2013-07-25 00:00:...|       2911|     PROCESSING|
|       9|2013-07-25 00:00:...|       5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|       5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|        918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|       1837|         CLOSED|
|      13|2013-07-25 00:00:...|       9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|       98

In [5]:
orders_df.createOrReplaceTempView("orders")

In [6]:
spark.sql("select * from order")

AnalysisException: Table or view not found: order; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [order], [], false


In [7]:
spark.sql(""" select order_id, order_status from
(select order_id, customer_id, order_status from orders
where order_id < 500) where order_id < 200
""").show()

+--------+---------------+
|order_id|   order_status|
+--------+---------------+
|       1|         CLOSED|
|       2|PENDING_PAYMENT|
|       3|       COMPLETE|
|       4|         CLOSED|
|       5|       COMPLETE|
|       6|       COMPLETE|
|       7|       COMPLETE|
|       8|     PROCESSING|
|       9|PENDING_PAYMENT|
|      10|PENDING_PAYMENT|
|      11| PAYMENT_REVIEW|
|      12|         CLOSED|
|      13|PENDING_PAYMENT|
|      14|     PROCESSING|
|      15|       COMPLETE|
|      16|PENDING_PAYMENT|
|      17|       COMPLETE|
|      18|         CLOSED|
|      19|PENDING_PAYMENT|
|      20|     PROCESSING|
+--------+---------------+
only showing top 20 rows



In [8]:
spark.sql(""" select order_id, order_status from
(select order_id, customer_id, order_status from orders
where order_id < 500) where order_id < 200
""").explain(True)

== Parsed Logical Plan ==
'Project ['order_id, 'order_status]
+- 'Filter ('order_id < 200)
   +- 'SubqueryAlias __auto_generated_subquery_name
      +- 'Project ['order_id, 'customer_id, 'order_status]
         +- 'Filter ('order_id < 500)
            +- 'UnresolvedRelation [orders], [], false

== Analyzed Logical Plan ==
order_id: bigint, order_status: string
Project [order_id#0L, order_status#3]
+- Filter (order_id#0L < cast(200 as bigint))
   +- SubqueryAlias __auto_generated_subquery_name
      +- Project [order_id#0L, customer_id#2L, order_status#3]
         +- Filter (order_id#0L < cast(500 as bigint))
            +- SubqueryAlias orders
               +- Relation[order_id#0L,order_date#1,customer_id#2L,order_status#3] csv

== Optimized Logical Plan ==
Project [order_id#0L, order_status#3]
+- Filter ((isnotnull(order_id#0L) AND (order_id#0L < 500)) AND (order_id#0L < 200))
   +- Relation[order_id#0L,order_date#1,customer_id#2L,order_status#3] csv

== Physical Plan ==
*(1) Filter 

In [9]:
customer_schema = " customerid long , customer_fname string , customer_lname string , user_name string,password string , address string, city string, state string, pincode long "

In [10]:
customers_df = spark.read \
.format("csv") \
.schema(customer_schema) \
.load("/public/trendytech/retail_db/customers")

In [16]:
customers_df.createOrReplaceTempView("customers")

In [17]:
spark.sql("""select * from orders inner join customers 
on orders.customer_id == customers.customerid where 
order_status = 'CLOSED'""").show()

+--------+--------------------+-----------+------------+----------+--------------+--------------+---------+---------+--------------------+------------+-----+-------+
|order_id|          order_date|customer_id|order_status|customerid|customer_fname|customer_lname|user_name| password|             address|        city|state|pincode|
+--------+--------------------+-----------+------------+----------+--------------+--------------+---------+---------+--------------------+------------+-----+-------+
|       1|2013-07-25 00:00:...|      11599|      CLOSED|     11599|          Mary|        Malone|XXXXXXXXX|XXXXXXXXX|8708 Indian Horse...|     Hickory|   NC|  28601|
|       4|2013-07-25 00:00:...|       8827|      CLOSED|      8827|         Brian|        Wilson|XXXXXXXXX|XXXXXXXXX|   8396 High Corners| San Antonio|   TX|  78240|
|      12|2013-07-25 00:00:...|       1837|      CLOSED|      1837|          Mary|          Vega|XXXXXXXXX|XXXXXXXXX|  4312 Bright Corner|      Caguas|   PR|    725|
|   

In [18]:
spark.sql("""select * from orders inner join customers
on orders.customer_id == customers.customerid where
order_status = 'CLOSED'""").explain(True)

== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('order_status = CLOSED)
   +- 'Join Inner, ('orders.customer_id = 'customers.customerid)
      :- 'UnresolvedRelation [orders], [], false
      +- 'UnresolvedRelation [customers], [], false

== Analyzed Logical Plan ==
order_id: bigint, order_date: string, customer_id: bigint, order_status: string, customerid: bigint, customer_fname: string, customer_lname: string, user_name: string, password: string, address: string, city: string, state: string, pincode: bigint
Project [order_id#0L, order_date#1, customer_id#2L, order_status#3, customerid#44L, customer_fname#45, customer_lname#46, user_name#47, password#48, address#49, city#50, state#51, pincode#52L]
+- Filter (order_status#3 = CLOSED)
   +- Join Inner, (customer_id#2L = customerid#44L)
      :- SubqueryAlias orders
      :  +- Relation[order_id#0L,order_date#1,customer_id#2L,order_status#3] csv
      +- SubqueryAlias customers
         +- Relation[customerid#44L,customer_fname#45,cus

In [19]:
spark.sql("""select customer_id, count(1) from
(select * from orders where customer_id in (1,2,3,4,5))
where customer_id in (1,2,3) group by customer_id
having customer_id = 1""").show()

+-----------+--------+
|customer_id|count(1)|
+-----------+--------+
|          1|     375|
+-----------+--------+



In [None]:
spark.sql("""select customer_id, count(1) from
(select * from orders where customer_id in (1,2,3,4,5))
where customer_id in (1,2,3) group by customer_id
having customer_id = 1""").explai