In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local[*]") \
    .getOrCreate()

25/06/07 20:10:27 WARN Utils: Your hostname, kirans-mac.local resolves to a loopback address: 127.0.0.1; using 172.18.197.149 instead (on interface en0)
25/06/07 20:10:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/07 20:10:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/06/07 20:10:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# create RDD

In [9]:
ord = spark.sparkContext.textFile('/Users/kiranchinta/Downloads/RetailDB SalesData/Orders')
ordItems = spark.sparkContext.textFile('/Users/kiranchinta/Downloads/RetailDB SalesData/Orders_items')

In [10]:
for i in ord.take(5):
    print(i)

1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE


# MAP transformations

### PS: Project all the Order_ids

In [11]:
ordmap = ord.map(lambda x: x.split(',')[0])

for i in ordmap.take(5):
    print(i)

1
2
3
4
5


### PS: Project all the Orders and their status.

In [16]:
ordmap = ord.map(lambda x : (x.split(',')[0] , x.split(',')[3]))

for i in ordmap.take(5):
    print(i)

('1', 'CLOSED')
('2', 'PENDING_PAYMENT')
('3', 'COMPLETE')
('4', 'CLOSED')
('5', 'COMPLETE')


### PS: Combine Order id and status with ‘#’

In [18]:

ordmap = ord.map(lambda x : (x.split(',')[0]+'#'+ x.split(',')[3]))

for i in ordmap.take(5):
    print(i)

1#CLOSED
2#PENDING_PAYMENT
3#COMPLETE
4#CLOSED
5#COMPLETE


### PS: Convert the Order date into YYYY/MM/DD Format

In [19]:
ordmap = ord.map(lambda x : x.split(',')[1].replace('-','/'))

for i in ordmap.take(5):
    print(i)

2013/07/25 00:00:00.0
2013/07/25 00:00:00.0
2013/07/25 00:00:00.0
2013/07/25 00:00:00.0
2013/07/25 00:00:00.0


### PS: Applied user defined function to convert status into lowercase

In [24]:
def lowerCase(str):
    return str.lower()

ordmap = ord.map(lambda x : lowerCase(x.split(',')[-1]))

for i in ordmap.take(5):
    print(i)


closed
pending_payment
complete
closed
complete


# flatMap Transformations

In [32]:
ordmap = ord.map(lambda x : x.split(","))

for i in ordmap.take(5):
    print(i)

['1', '2013-07-25 00:00:00.0', '11599', 'CLOSED']
['2', '2013-07-25 00:00:00.0', '256', 'PENDING_PAYMENT']
['3', '2013-07-25 00:00:00.0', '12111', 'COMPLETE']
['4', '2013-07-25 00:00:00.0', '8827', 'CLOSED']
['5', '2013-07-25 00:00:00.0', '11318', 'COMPLETE']


In [35]:
ordflatmap = ord.flatMap(lambda x : x.split(","))

for i in ordflatmap.take(5):
    print(i)

1
2013-07-25 00:00:00.0
11599
CLOSED
2


In [36]:
wordCount = ord.flatMap(lambda x : x.split(',')).map(lambda w : (w,1))

for i in wordCount.take(5):
    print(i)

('1', 1)
('2013-07-25 00:00:00.0', 1)
('11599', 1)
('CLOSED', 1)
('2', 1)


In [37]:
wordCount = ord.flatMap(lambda x : x.split(',')).map(lambda w : (w,1)).reduceByKey(lambda x,y : x+y)

for i in wordCount.take(5):
    print(i)

('2013-07-25 00:00:00.0', 143)
('11599', 6)
('CLOSED', 7556)
('256', 11)
('3', 8)


# Filter transformations

### PS: Print all the orders which are closed or Complete and ordered in the year 2013.

In [45]:
filteredOrd = ord.filter(lambda x : (x.split(',')[3] in ("CLOSED","COMPLETE")) and (x.split(',')[1].split('-')[0] == '2014'))

for i in filteredOrd.take(5):
    print(i)

25882,2014-01-01 00:00:00.0,4598,COMPLETE
25888,2014-01-01 00:00:00.0,6735,COMPLETE
25889,2014-01-01 00:00:00.0,10045,COMPLETE
25891,2014-01-01 00:00:00.0,3037,CLOSED
25895,2014-01-01 00:00:00.0,1044,COMPLETE


In [46]:
ord.count()

68883

In [47]:
filteredOrd.count()

16831

# mapValue Transformations

In [51]:
rdd = spark.sparkContext.parallelize((("a", (1,2,3)), ("b", (3,4,5)),("a", (1,2,3,4,5))))

for i in rdd.take(5):
    print(i)

('a', (1, 2, 3))
('b', (3, 4, 5))
('a', (1, 2, 3, 4, 5))


In [52]:
def f(x): 
    return len(x)

In [53]:
rdd.mapValues(f).collect()

[('a', 3), ('b', 3), ('a', 5)]