# ITVERSITY CCA-175 Practice

## For LOCAL MACHINE SETUP

In [1]:
import findspark as fs
import os
fs.init()
fs.find()

data_path=os.path.dirname("D://Bigdata Tutorials//data//retail_db//")
data_path_json=os.path.dirname("D://Bigdata Tutorials//data//retail_db_json//")

from pyspark.sql import SparkSession,SQLContext,HiveContext
spark=SparkSession.builder.appName('ITVERSITY').master('local').getOrCreate()
sc=spark.sparkContext
sqlcontext=SQLContext(sc)
sc

## For VAGRANT SETUP

In [1]:
!start-dfs.sh
!start-yarn.sh


Starting namenodes on [localhost]
Starting datanodes
Starting secondary namenodes [shockWAVE]
Starting resourcemanager
Starting nodemanagers
5456 DataNode
6002 ResourceManager
6309 Jps
5688 SecondaryNameNode
5291 NameNode
6175 NodeManager


In [2]:
import findspark as fs
import os
fs.init()
fs.find()
from pyspark.sql import SparkSession,SQLContext,HiveContext
spark=SparkSession.builder.appName('Big_DATA').master('yarn').getOrCreate()
sc=spark.sparkContext
sqlcontext=SQLContext(sc)
sc

In [3]:
df=spark.sql("show databases")
df.show()

+------------+
|databaseName|
+------------+
|     default|
+------------+



In [6]:
!stop-dfs.sh
!stop-yarn.sh

Stopping namenodes on [localhost]
Stopping datanodes
Stopping secondary namenodes [shockWAVE]
Stopping nodemanagers
Stopping resourcemanager


<div align=center><img align=left src="Share/files/images/db.png" width="auto" height=500 /></div>

## Let's LOAD some DATA
### LOCAL FILE

In [2]:
orderItems=sc.textFile(os.path.join(data_path,"order_items"))
orders=sc.textFile(os.path.join(data_path,"orders"))

### For VM

In [4]:
orderItems=sc.textFile("/public/retail_db/order_items")
orders=sc.textFile("/public/retail_db/orders")


## This is the same for both systems
### Let's find Some stuff out

In [5]:
# It contains OrderID
(int(orderItems.first().split(",")[1]),float(orderItems.first().split(",")[4]))

(1, 299.98)

In [6]:
orderItemsMap=orderItems.map(lambda x:(int(x.split(",")[1]),float(x.split(",")[4])))

In [7]:
orderItemsMap.take(5)

[(1, 299.98), (2, 199.99), (2, 250.0), (2, 129.99), (4, 49.98)]

### Now we can hold 1st element as KEY and reduce the Values using add.

In [6]:
from operator import add
revenuePerOrder=orderItemsMap.reduceByKey(add)
for i in revenuePerOrder.take(10): print(i)

(1, 299.98)
(2, 579.98)
(4, 699.85)
(5, 1129.8600000000001)
(7, 579.9200000000001)
(8, 729.8399999999999)
(9, 599.96)
(10, 651.9200000000001)
(11, 919.79)
(12, 1299.8700000000001)


### Transformations follow LazyEvaluation.

#### LazyEvaluation simply uses a DAG(Directed Acyclic Graph) to store all the information related to the Transformations being made.

#### As soon as an Action is 'run', spark executes the DAG first and then the action.

Let's find DAG of transformations:


In [8]:
orderItems.toDebugString()

'(1) D://Bigdata Tutorials//data//retail_db\\order_items MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n |  D://Bigdata Tutorials//data//retail_db\\order_items HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []'

In [9]:
orderItemsMap.toDebugString()

'(1) PythonRDD[12] at RDD at PythonRDD.scala:53 []\n |  D://Bigdata Tutorials//data//retail_db\\order_items MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n |  D://Bigdata Tutorials//data//retail_db\\order_items HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []'

In [10]:
revenuePerOrder.toDebugString()

'(1) PythonRDD[13] at RDD at PythonRDD.scala:53 []\n |  MapPartitionsRDD[10] at mapPartitions at PythonRDD.scala:133 []\n |  ShuffledRDD[9] at partitionBy at NativeMethodAccessorImpl.java:0 []\n +-(1) PairwiseRDD[8] at reduceByKey at <ipython-input-6-409d4c32b590>:2 []\n    |  PythonRDD[7] at reduceByKey at <ipython-input-6-409d4c32b590>:2 []\n    |  D://Bigdata Tutorials//data//retail_db\\order_items MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n    |  D://Bigdata Tutorials//data//retail_db\\order_items HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []'

### Now we can open spark job UI and look into DAG visualization.


### NOTE: DO NOT USE collect() to preview data in REAL LIFE ENVIRONMENT!!!

### Another way tp create an RDD is to open a file using open and making a LIST out of it. (a collection)

In [12]:
productsRaw=open(data_path+"//products").read().splitlines()
print(type(productsRaw))
### Now, we can create an RDD using Parallelize from the collection.
productsRaw=sc.parallelize(productsRaw)
print(type(productsRaw))
print(productsRaw.first())
print(type(productsRaw.first()))

<type 'list'>
<class 'pyspark.rdd.RDD'>
1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy
<type 'str'>


## DATA FRAME

#### --provided by sqlContext

#### Now we can try loading multiple file formats.

In [50]:
df = spark.read.json(data_path_json+"//order_items")
df.printSchema()

root
 |-- order_item_id: long (nullable = true)
 |-- order_item_order_id: long (nullable = true)
 |-- order_item_product_id: long (nullable = true)
 |-- order_item_product_price: double (nullable = true)
 |-- order_item_quantity: long (nullable = true)
 |-- order_item_subtotal: double (nullable = true)



In [49]:
df1=spark.read.format("json").load(data_path_json+"//orders")
df1.printSchema()

root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



## Let's continue with the Orders and OrderItems Example

## Use Cases
### 1. Extract OrderStatus: (YYYYMMDD,count)

In [7]:
ordersMap=orders.map(lambda x: (x.split(",")[1].split(" ")[0].replace("-",""),x.split(",")[3]))
ordersMap.first()

(u'20130725', u'CLOSED')

### 2. Filtering CLOSED/COMPLETE

In [17]:
# To get data of COMPLETE ORDERS OR CLOSED orders
ordersComplete=orders.filter(lambda x: x.split(",")[3] == 'COMPLETE' or x.split(",")[3] == 'CLOSED')
ordersComplete.take(5)

[u'1,2013-07-25 00:00:00.0,11599,CLOSED',
 u'3,2013-07-25 00:00:00.0,12111,COMPLETE',
 u'4,2013-07-25 00:00:00.0,8827,CLOSED',
 u'5,2013-07-25 00:00:00.0,11318,COMPLETE',
 u'6,2013-07-25 00:00:00.0,7130,COMPLETE']

### 3. Filtering CLOSED/COMPLETE in 2014-01

In [19]:
#To get CLOSED/COMPLETE orders in 2014-01
ordersComplete=orders.filter(lambda x: (x.split(",")[1][:7]=='2014-01') and (x.split(",")[3] in ['COMPLETE','CLOSED'] ))
ordersComplete.take(5)

#Note: (x.split(",")[3] in ['COMPLETE','CLOSED']) is same as (x.split(",")[3] == 'COMPLETE' or x.split(",")[3] == 'CLOSED')

[u'25882,2014-01-01 00:00:00.0,4598,COMPLETE',
 u'25888,2014-01-01 00:00:00.0,6735,COMPLETE',
 u'25889,2014-01-01 00:00:00.0,10045,COMPLETE',
 u'25891,2014-01-01 00:00:00.0,3037,CLOSED',
 u'25895,2014-01-01 00:00:00.0,1044,COMPLETE']

### 4. Join datasets where C{self} has (k,v) and C{other} has (k,w)
#### Note: After join --> (k,(v,w))
#### Thus, we need two tables; Parent & Child Tables

### We know that, 
* "orders" has 1st element as order_id
* "orderItems" has 2nd element as FK to order_id
### We need to convert RDDs to (k,v) format

In [101]:
#Converting orders RDD to (k,v)
ordersMapJoin=orders.map(lambda x: (int(x.split(",")[0]),str(x.split(",")[3])))
ordersMapJoin.take(5)

[(1, 'CLOSED'),
 (2, 'PENDING_PAYMENT'),
 (3, 'COMPLETE'),
 (4, 'CLOSED'),
 (5, 'COMPLETE')]

In [100]:
#Converting orderItems RDD to (k,w)
# int(x.split(",")[1]) - Order ID
#int(x.split(",")[2]) - Product ID 
#float(x.split(",")[5]) - Product - Price
#int(x.split(",")[3]) - Product Count
#float(x.split(",")[4]) - Subtotal 

orderItemsMapJoin=orderItems.map(lambda x: (int(x.split(",")[1]),(int(x.split(",")[2]),float(x.split(",")[5]),int(x.split(",")[3]),float(x.split(",")[4]))))
orderItemsMapJoin.take(5)



[(1, (957, 299.98, 1, 299.98)),
 (2, (1073, 199.99, 1, 199.99)),
 (2, (502, 50.0, 5, 250.0)),
 (2, (403, 129.99, 1, 129.99)),
 (4, (897, 24.99, 2, 49.98))]

### Now, we can join these 2 RDDs to get information about ORDER_ID with details relating to the orderItems present in that order.

In [105]:
ordersJoin=ordersMapJoin.join(orderItemsMapJoin)
for i in ordersJoin.take(10): print(i)

(2, ('PENDING_PAYMENT', (1073, 199.99, 1, 199.99)))
(2, ('PENDING_PAYMENT', (502, 50.0, 5, 250.0)))
(2, ('PENDING_PAYMENT', (403, 129.99, 1, 129.99)))
(4, ('CLOSED', (897, 24.99, 2, 49.98)))
(4, ('CLOSED', (365, 59.99, 5, 299.95)))
(4, ('CLOSED', (502, 50.0, 3, 150.0)))
(4, ('CLOSED', (1014, 49.98, 4, 199.92)))
(8, ('PROCESSING', (365, 59.99, 3, 179.97)))
(8, ('PROCESSING', (365, 59.99, 5, 299.95)))
(8, ('PROCESSING', (1014, 49.98, 4, 199.92)))


### 5. Let's Try to figure out the STATUS and Revenue on on a particular order:

##### (Say, To get details of an order) ex. Order 2 contains 3 items - 1073,502,403

In [78]:
ordersJoin.reduceByKey(add).first()

(2,
 ('PENDING_PAYMENT',
  (1073, 199.99, 1, 199.99),
  'PENDING_PAYMENT',
  (502, 50.0, 5, 250.0),
  'PENDING_PAYMENT',
  (403, 129.99, 1, 129.99)))

### Now trying to get total revenue from the previous RDD, we can:

In [83]:
ordersJoin.map(lambda x:(x[0],x[1][1][3])).reduceByKey(add).take(5)


[(2, 579.98),
 (4, 699.85),
 (8, 729.8399999999999),
 (10, 651.9200000000001),
 (12, 1299.8700000000001)]

In [116]:
ordersJoin.map(lambda x:(x[0],x[1][1][3])).reduce(add).take(5)


### 6. Order is successful but no corresponding entry in orderItems - TROUBLESHOOT PROBLEM
#### We can use outer joins:

* leftOuterJoin: Keep parent table on the left.
    * Followed by filter

* rightOuterJoin: Keep the parent table on the right.
    * Followed by map

* fullOuterJoin: When both tables have some unique values that need to be joined together.

In [108]:
ordersLOJoin=ordersMapJoin.leftOuterJoin(orderItemsMapJoin)
ordersLOJoin.filter(lambda x: x[1][1]==None).take(5)

[(6, ('COMPLETE', None)),
 (22, ('COMPLETE', None)),
 (26, ('COMPLETE', None)),
 (32, ('COMPLETE', None)),
 (40, ('PENDING_PAYMENT', None))]

In [111]:
# We can do the same thing with right outer Join
ordersROJoin=orderItemsMap.rightOuterJoin(ordersMap)
ordersROJoin.filter(lambda x: x[1][0]==None).take(5)

##THIS STILL GIVES US DATA about a complete order.

[(u'20140119', (None, u'CANCELED')),
 (u'20140119', (None, u'PROCESSING')),
 (u'20140119', (None, u'PENDING_PAYMENT')),
 (u'20140119', (None, u'COMPLETE')),
 (u'20140119', (None, u'CANCELED'))]

In [114]:
# Note: if we had NOT placed the parent folder on the right:
ordersROJoin=ordersMap.rightOuterJoin(orderItemsMap)
ordersROJoin.filter(lambda x: x[1][0]==None).take(5)

#THIS WON'T HELP US!!!

[(2, (None, 199.99)),
 (2, (None, 250.0)),
 (2, (None, 129.99)),
 (4, (None, 49.98)),
 (4, (None, 299.95))]

### 7. Find the orderItem with minimum value(cost) in a order

#### We will need to:

* Filter the RDD to show a particular order - O/P another RDD
* reduce the RDD to get the minimum value orderItem.

In [30]:
orderItems.filter(lambda x: int(x.split(",")[1])==4).reduce(lambda x,y: x if(float(x.split(",")[4]) < float(y.split(",")[4])) else y)

u'5,4,897,2,49.98,24.99'

### 8. Find the orderItem with minimum value(cost) in all orders

#### We will need to:

* Map the RDD to (k,v) order - O/P another RDD
* reduceByKey the RDD to get the minimum value of all orders. (using the same logic)

In [32]:
##to generalize: WE CAN USE reduceByKey
orderItems.map(lambda x: (int(x.split(",")[1]),x)).reduceByKey(lambda x,y: x if(float(x.split(",")[4]) < float(y.split(",")[4])) else y).take(5)

[(1, u'1,1,957,1,299.98,299.98'),
 (2, u'4,2,403,1,129.99,129.99'),
 (4, u'5,4,897,2,49.98,24.99'),
 (5, u'11,5,1014,2,99.96,49.98'),
 (7, u'16,7,926,5,79.95,15.99')]

### 9. Group the cost of each item and finally get TotalRevenue per order.

#### We can use groupByKey() followed by map - (Instead of better option - reduceByKey())

In [46]:
orderItemsMap.groupByKey().map(lambda x: (x[0],sum(x[1]))).take(5)


[(1, 299.98),
 (2, 579.98),
 (4, 699.85),
 (5, 1129.8600000000001),
 (7, 579.9200000000001)]

### 10. Group orders based of revenue of orderItem in DESC (sort)

In [60]:
# THIS IS FOR VIEW ONLY

for k,v in orderItemsMap.groupByKey().take(5):
    print(k,sorted(list(v),reverse=True))

(1, [299.98])
(2, [250.0, 199.99, 129.99])
(4, [299.95, 199.92, 150.0, 49.98])
(5, [299.98, 299.98, 299.95, 129.99, 99.96])
(7, [299.98, 199.99, 79.95])


In [71]:
# WE WILL DO THIS IN PRODUCTION

##In general
orderItemsMap.groupByKey(). \
map(lambda x: (x[0],sorted(list(x[1])))). \
take(5)

[(1, [299.98]),
 (2, [129.99, 199.99, 250.0]),
 (4, [49.98, 150.0, 199.92, 299.95]),
 (5, [99.96, 129.99, 299.95, 299.98, 299.98]),
 (7, [79.95, 199.99, 299.98])]

In [70]:
#OR

## If we need individual values:

orderItemsMap.groupByKey(). \
flatMap(lambda x: (x[0],sorted(list(x[1])))). \
take(5)


[1, [299.98], 2, [129.99, 199.99, 250.0], 4]