In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession.builder.config('spark.ui.port','0'). \
config('spark.shuffle.useOldFetchProtocol', 'true'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
orders_schema = "order_id long , order_date date, customer_id long,order_status string"

In [3]:
orders_df = spark.read \
.format("csv") \
.schema(orders_schema) \
.load("/public/trendytech/orders/orders_1gb.csv")

In [5]:
orders_df.cache()

order_id,order_date,customer_id,order_status
1,2013-07-25,11599,CLOSED
2,2013-07-25,256,PENDING_PAYMENT
3,2013-07-25,12111,COMPLETE
4,2013-07-25,8827,CLOSED
5,2013-07-25,11318,COMPLETE
6,2013-07-25,7130,COMPLETE
7,2013-07-25,4530,COMPLETE
8,2013-07-25,2911,PROCESSING
9,2013-07-25,5657,PENDING_PAYMENT
10,2013-07-25,5648,PENDING_PAYMENT


In [6]:
filtered_results = orders_df.filter("customer_id == 10000000") 

In [7]:
filtered_results.show()

+--------+----------+-----------+------------+
|order_id|order_date|customer_id|order_status|
+--------+----------+-----------+------------+
+--------+----------+-----------+------------+



In [8]:
filtered_results.unpersist()

order_id,order_date,customer_id,order_status


In [9]:
orders_df.unpersist()

order_id,order_date,customer_id,order_status
1,2013-07-25,11599,CLOSED
2,2013-07-25,256,PENDING_PAYMENT
3,2013-07-25,12111,COMPLETE
4,2013-07-25,8827,CLOSED
5,2013-07-25,11318,COMPLETE
6,2013-07-25,7130,COMPLETE
7,2013-07-25,4530,COMPLETE
8,2013-07-25,2911,PROCESSING
9,2013-07-25,5657,PENDING_PAYMENT
10,2013-07-25,5648,PENDING_PAYMENT


In [10]:
orders_df.select("order_id", "order_status").filter("order_status == 'CLOSED'").cache()

order_id,order_status
1,CLOSED
4,CLOSED
12,CLOSED
18,CLOSED
24,CLOSED
25,CLOSED
37,CLOSED
51,CLOSED
57,CLOSED
61,CLOSED


In [12]:
orders_df.select("order_id", "order_status").filter("order_status == 'CLOSED'").count()

2833500

In [14]:
orders_df.filter("order_status == 'CLOSED'").select("order_id", "order_status").count()

2833500

In [15]:
orders_df.select("order_id", "order_status").count()

25831125

In [16]:
orders_df.select("order_id", "order_status").filter("order_status == 'COMPLETE'").count()

8587125

In [17]:
orders_df.select("order_id", "order_status").filter("order_id > 10").cache()

order_id,order_status
11,PAYMENT_REVIEW
12,CLOSED
13,PENDING_PAYMENT
14,PROCESSING
15,COMPLETE
16,PENDING_PAYMENT
17,COMPLETE
18,CLOSED
19,PENDING_PAYMENT
20,PROCESSING


In [18]:
orders_df.select("order_id", "order_status").filter("order_id > 10").count()

25827375

In [19]:
orders_df.select("order_id", "order_status").filter("order_id > 100").count()

25793625

In [20]:
orders_df.select("order_status").filter("order_id > 10").count()

25827375

In [21]:
cached_df = orders_df.select("order_id", "order_status").filter("order_id > 10").cache()

In [22]:
cached_df.count()

25827375

In [23]:
cached_df.select("order_id", "order_status").filter("order_id > 100").count()

25793625