In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Demo").master("local[2]").getOrCreate()


In [5]:
spark

In [6]:
#Create RDD using textFile API
rdd = spark.sparkContext.textFile('c:/practice/emp.txt')
rdd.take(5)


['e1,babjee,50000,10',
 'e2,naveen,40000,20',
 'e3,praveen,60000,10',
 'e4,sundar,20000,10',
 'e5,raheem,80000,30']

In [8]:
for i in rdd.take(5):print(i)

e1,babjee,50000,10
e2,naveen,40000,20
e3,praveen,60000,10
e4,sundar,20000,10
e5,raheem,80000,30


In [9]:
rdd.getNumPartitions()

1

In [12]:
#Get the Number of elements in each partition
rdd.glom().map(len).collect()


[633]

In [14]:
#Create RDD using textFile API and a defined number of partitions
rdd = spark.sparkContext.textFile('c:/practice/emp.txt',10)


['e1,babjee,50000,10',
 'e2,naveen,40000,20',
 'e3,praveen,60000,10',
 'e4,sundar,20000,10',
 'e5,raheem,80000,30']

In [15]:
#Get the Number of Partitions in the RDD
rdd.getNumPartitions()

10

In [16]:
#Get the Number of elements in each partition
rdd.glom().map(len).collect()

[64, 63, 63, 63, 63, 64, 63, 63, 63, 64]

In [17]:
#Create a RDD from a Python List
lst = [1,2,3,4,5,6,7]
rdd = spark.sparkContext.parallelize(lst)
for i in rdd.take(5) : print(i)


1
2
3
4
5


In [18]:
#Create a RDD from local file
lst = open('c:/practice/emp.txt').read().splitlines()
lst[0:10]
rdd = spark.sparkContext.parallelize(lst)
for i in rdd.take(5) : print(i)


e1,babjee,50000,10
e2,naveen,40000,20
e3,praveen,60000,10
e4,sundar,20000,10
e5,raheem,80000,30


In [19]:
#Create RDD from range function
lst1 = range(10)
rdd = spark.sparkContext.parallelize(lst1)
for i in rdd.take(5) : print(i)


0
1
2
3
4


In [20]:
#Create RDD from a DataFrame
df=spark.createDataFrame(data=(('robert',35),('Mike',45)),schema=('name','age'))
df.printSchema()
df.show()
rdd1= df.rdd
type(rdd1)
for i in rdd1.take(2) : print(i)

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+------+---+
|  name|age|
+------+---+
|robert| 35|
|  Mike| 45|
+------+---+

Row(name='robert', age=35)
Row(name='Mike', age=45)


In [3]:
ord =spark.sparkContext.textFile('c:/practice/orders')

In [4]:
ordItems=spark.sparkContext.textFile('c:/practice/order_items')

In [6]:
ord.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [7]:
ordItems.take(5)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99']

In [8]:
#project OrderIds
mapRdd =ord.map(lambda x :x.split(',')[0])

In [9]:
mapRdd.take(5)

['1', '2', '3', '4', '5']

In [10]:
#project OrderIds and Status
maprdd=ord.map(lambda x :(x.split(',')[0], x.split(',')[3]))

In [33]:
maprdd.take(5)

[('1', 'CLOSED'),
 ('2', 'PENDING_PAYMENT'),
 ('3', 'COMPLETE'),
 ('4', 'CLOSED'),
 ('5', 'COMPLETE')]

In [11]:
#Combine OrderIds and Status with #
maprdd=ord.map(lambda x :x.split(',')[0]+"#" +x.split(',')[3] )

In [12]:
for i in maprdd.take(5):print(i)

1#CLOSED
2#PENDING_PAYMENT
3#COMPLETE
4#CLOSED
5#COMPLETE


In [None]:
ord.

In [13]:
#Convert Order date into yyyy/mm/dd format
maprdd = ord.map(lambda x : x.split(',')[1])\
.map(lambda x : x.split(' ')[0].replace('-','/'))

In [14]:
maprdd.take(5)

['2013/07/25', '2013/07/25', '2013/07/25', '2013/07/25', '2013/07/25']

In [15]:
#create key-value pair with key as Order id and vlue as whole record
maprdd = ord.map(lambda x : x.split(',')[0]+":"+x)

In [16]:
for i in maprdd.take(5):print(i)

1:1,2013-07-25 00:00:00.0,11599,CLOSED
2:2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3:3,2013-07-25 00:00:00.0,12111,COMPLETE
4:4,2013-07-25 00:00:00.0,8827,CLOSED
5:5,2013-07-25 00:00:00.0,11318,COMPLETE


In [55]:
#project all order_item_ids and subtotal
ordItems.take(5)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99']

In [17]:
maprdd =ordItems.map(lambda x:(x.split(',')[0] , x.split(',')[4]))

In [18]:
maprdd.take(5)

[('1', '299.98'),
 ('2', '199.99'),
 ('3', '250.0'),
 ('4', '129.99'),
 ('5', '49.98')]

In [58]:
#apply user defined function to convert status to lower case

ord.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [19]:
def lwcase(str):
    return str.lower()

In [20]:
maprdd = ord.map(lambda x :lwcase(x.split(',')[3]))

In [21]:
maprdd.take(5)

['closed', 'pending_payment', 'complete', 'closed', 'complete']

In [24]:
maprdd =ord.map(lambda x :x.split(',')[3].lower())

In [25]:
maprdd.take(5)

['closed', 'pending_payment', 'complete', 'closed', 'complete']

In [26]:
#flat Map 
#number of words in order file
maprdd =ord.flatMap(lambda x: x.split(','))

In [28]:
maprdd.take(5)

['1', '2013-07-25 00:00:00.0', '11599', 'CLOSED', '2']

In [29]:
maprdd =ord.flatMap(lambda x: x.split(',')).map(lambda w: (w,1)).reduceByKey(lambda x,y:x+y)

In [9]:
maprdd.take(100)

[('2013-07-25 00:00:00.0', 143),
 ('11599', 6),
 ('CLOSED', 7556),
 ('256', 11),
 ('3', 8),
 ('12111', 7),
 ('COMPLETE', 22899),
 ('4', 7),
 ('8827', 7),
 ('5', 5),
 ('6', 5),
 ('7130', 8),
 ('7', 9),
 ('4530', 11),
 ('2911', 7),
 ('PROCESSING', 8275),
 ('10', 3),
 ('918', 6),
 ('PAYMENT_REVIEW', 729),
 ('13', 7),
 ('14', 10),
 ('15', 5),
 ('16', 8),
 ('17', 6),
 ('18', 9),
 ('9488', 8),
 ('21', 5),
 ('22', 7),
 ('333', 7),
 ('4367', 6),
 ('24', 6),
 ('9503', 4),
 ('26', 6),
 ('29', 4),
 ('10039', 11),
 ('31', 7),
 ('6983', 7),
 ('32', 6),
 ('33', 3),
 ('5793', 4),
 ('34', 8),
 ('4840', 3),
 ('5649', 3),
 ('38', 7),
 ('11586', 10),
 ('8214', 6),
 ('40', 8),
 ('8136', 8),
 ('9776', 6),
 ('43', 3),
 ('44', 7),
 ('45', 7),
 ('2636', 7),
 ('1549', 5),
 ('8487', 8),
 ('12186', 5),
 ('1871', 9),
 ('5225', 10),
 ('CANCELED', 1428),
 ('51', 9),
 ('4701', 5),
 ('10628', 8),
 ('55', 7),
 ('2052', 8),
 ('10519', 7),
 ('57', 7),
 ('7073', 4),
 ('58', 10),
 ('9213', 9),
 ('8365', 10),
 ('61', 6),
 

In [10]:
#Filter
#Print the all the all orders which are close or complete and ordered in the year 2023

In [30]:
filrdd=ord.filter(lambda x :  x.split(',')[3]in ['COMPLETE','CLOSED'] \
                  and  x.split(',')[1].split('-')[0]=='2013' )

In [31]:
filrdd.take(10)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE',
 '7,2013-07-25 00:00:00.0,4530,COMPLETE',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '15,2013-07-25 00:00:00.0,2568,COMPLETE',
 '17,2013-07-25 00:00:00.0,2667,COMPLETE',
 '18,2013-07-25 00:00:00.0,1205,CLOSED']

In [30]:
filrdd.count()

13624

In [32]:
rdd =spark.sparkContext.parallelize\
([("a",[1,2,3,4]) ,("b",[2,4,5,6]) , ("a",[1,2,3,4,5])])

In [9]:
#mapValues api

In [33]:
rdd.mapValues(len).collect()

[('a', 4), ('b', 4), ('a', 5)]

In [8]:
rdd.mapValues(sum).collect()

[('a', 10), ('b', 17), ('a', 15)]

In [13]:
#user defined function
def f(num):return len(num)

In [12]:
rdd.mapValues(f).collect()

[('a', 4), ('b', 4), ('a', 5)]

In [14]:
#join api
#join
#left outer join
#right outer join
#full outer join


In [15]:
ord = spark.sparkContext.textFile('c:/practice/orders')

In [16]:
ord.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [21]:
ordItems = spark.sparkContext.textFile('c:/practice/order_items')

In [22]:
ordItems.take(5)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99']

In [23]:
#find the subtotal for each customerid

In [36]:
ordMap= ord.map(lambda x:(x.split(',')[0] , x.split(',')[2]))

In [37]:
ordMap.take(5)

[('1', '11599'), ('2', '256'), ('3', '12111'), ('4', '8827'), ('5', '11318')]

In [38]:
ordItemMap = ordItems.map(lambda x : (x.split(',')[1] ,x.split(',')[4]))

In [39]:
ordItemMap.take(5)

[('1', '299.98'),
 ('2', '199.99'),
 ('2', '250.0'),
 ('2', '129.99'),
 ('4', '49.98')]

In [84]:
joinRdd =ordMap.join(ordItemMap)


In [85]:
joinRdd.take(5)

[('4', ('8827', '49.98')),
 ('4', ('8827', '299.95')),
 ('4', ('8827', '150.0')),
 ('4', ('8827', '199.92')),
 ('5', ('11318', '299.98'))]

In [38]:
joinRdd.map(lambda x : x[1]).take(5)

[('8827', '49.98'),
 ('8827', '299.95'),
 ('8827', '150.0'),
 ('8827', '199.92'),
 ('11318', '299.98')]

In [39]:
joinRdd.map(lambda x :x[1][0]+','+ x[1][1]).take(5)

['8827,49.98', '8827,299.95', '8827,150.0', '8827,199.92', '11318,299.98']

In [44]:
joinRdd.map(lambda x :x[1][0]+','+ x[1][1]).first()

'8827,49.98'

In [111]:
empRdd=spark.sparkContext.textFile('c:/practice/emp.txt')

In [112]:
empRdd.take(5)

['e1,babjee,50000,10',
 'e2,naveen,40000,20',
 'e3,praveen,60000,10',
 'e4,sundar,20000,10',
 'e5,raheem,80000,30']

In [116]:
deptRdd=spark.sparkContext.textFile('c:/practice/dept.txt')

In [114]:
deptRdd.take(5)

['10,Accounts', '20,hr', '40,it']

In [115]:
mapEmp =empRdd.map(lambda x:(x.split(',')[3],x.split(',')[1]))

In [105]:
mapEmp.take(5)

[('10', 'babjee'),
 ('20', 'naveen'),
 ('10', 'praveen'),
 ('10', 'sundar'),
 ('30', 'raheem')]

In [119]:
mapDept=deptRdd.map(lambda x:(x.split(',')[0],x.split(',')[1]))

In [120]:
mapDept.collect()

[('10', 'Accounts'), ('20', 'hr'), ('40', 'it')]

In [122]:
joinRdd =mapEmp.join(mapDept)

In [123]:
joinRdd.collect()

[('10', ('babjee', 'Accounts')),
 ('10', ('praveen', 'Accounts')),
 ('10', ('sundar', 'Accounts')),
 ('20', ('naveen', 'hr'))]

In [125]:
empleftRdd=mapEmp.leftOuterJoin(mapDept)

In [126]:
empleftRdd.collect()

[('10', ('babjee', 'Accounts')),
 ('10', ('praveen', 'Accounts')),
 ('10', ('sundar', 'Accounts')),
 ('20', ('naveen', 'hr')),
 ('30', ('raheem', None)),
 ('30', ('suil', None))]

In [152]:
empleftRdd.map(lambda x : (x[0] , x[1][0] ,x[1][1])).collect()

[('10', 'babjee', 'Accounts'),
 ('10', 'praveen', 'Accounts'),
 ('10', 'sundar', 'Accounts'),
 ('20', 'naveen', 'hr'),
 ('30', 'raheem', None),
 ('30', 'suil', None)]

In [154]:
emprightRdd=mapEmp.rightOuterJoin(mapDept)

In [157]:
emprightRdd.collect()

[('10', ('babjee', 'Accounts')),
 ('10', ('praveen', 'Accounts')),
 ('10', ('sundar', 'Accounts')),
 ('20', ('naveen', 'hr')),
 ('40', (None, 'it'))]

In [158]:
empFullRdd=emprightRdd=mapEmp.fullOuterJoin(mapDept)

In [159]:
empFullRdd.collect()

[('10', ('babjee', 'Accounts')),
 ('10', ('praveen', 'Accounts')),
 ('10', ('sundar', 'Accounts')),
 ('20', ('naveen', 'hr')),
 ('40', (None, 'it')),
 ('30', ('raheem', None)),
 ('30', ('suil', None))]

In [51]:
#Cogroup

In [160]:
x = spark.sparkContext.parallelize([('a',2 ), ('b',4)])
y = spark.sparkContext.parallelize([('a',3 ), ('b',5)])
 

In [161]:
xy = x.cogroup(y)

In [162]:
xy.collect()

[('b',
  (<pyspark.resultiterable.ResultIterable at 0x28049ad7110>,
   <pyspark.resultiterable.ResultIterable at 0x28049228210>)),
 ('a',
  (<pyspark.resultiterable.ResultIterable at 0x28049c1e010>,
   <pyspark.resultiterable.ResultIterable at 0x28049c1df90>))]

In [163]:
for i, j in xy.collect():
    print(i, list(map(list,j)))

b [[4], [5]]
a [[2], [3]]


In [52]:
#Cross Join

In [53]:
rdd = spark.sparkContext.parallelize([1,2,3])

In [55]:
rdd.cartesian(rdd).collect()

[(1, 1), (1, 2), (1, 3), (2, 1), (3, 1), (2, 2), (2, 3), (3, 2), (3, 3)]

In [56]:
sorted(rdd.cartesian(rdd).collect())

[(1, 1), (1, 2), (1, 3), (2, 1), (2, 2), (2, 3), (3, 1), (3, 2), (3, 3)]

In [57]:
#Total Aggriation (Actions)

In [29]:
ord.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [59]:
ordItems.take(5)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99']

In [60]:
#Count the Number of orde which are closed

In [30]:
ordfilter = ord.filter(lambda x : x.split(',')[3]=='CLOSED')

In [31]:
ordfilter.count()

7556

In [69]:
# find the total quantity sold for order id 1-10

In [78]:
#Reduce

In [32]:
ordItems.take(5)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99']

In [33]:
ordItems.filter(lambda x : int(x.split(',')[1]) <11).\
map(lambda x : int(x.split(',')[3])).reduce(lambda x , y : x+y)

62

In [34]:
from operator import add

In [35]:
ordItems.filter(lambda x : int(x.split(',')[1]) <11).\
map(lambda x : int(x.split(',')[3])).reduce(add)

62

In [84]:
#For given order 10  find the maximum subtotal 

In [85]:
ordItems.take(5)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99']

In [88]:
ordItems.filter(lambda x : int(x.split(',')[1]) ==10).collect()

['24,10,1073,1,199.99,199.99',
 '25,10,1014,2,99.96,49.98',
 '26,10,403,1,129.99,129.99',
 '27,10,917,1,21.99,21.99',
 '28,10,1073,1,199.99,199.99']

In [37]:
ordItems.filter(lambda x : int(x.split(',')[1]) ==10).\
reduce(lambda a,b : a if (float(a .split(',')[4]) ) > \
       (float(b .split(',')[4]) ) else b )

'28,10,1073,1,199.99,199.99'

In [40]:
ordItems.filter(lambda x : int(x.split(',')[1]) ==10).\
map(lambda x : float(x.split(',')[4])).reduce(min)

21.99

In [97]:
#Group By key

In [98]:
#find total revenue for each product

In [99]:
ordItems.take(5)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99']

In [47]:
filter =ordItems.filter(lambda x:len(x.split(',')[2])== 0)

In [48]:
filter.collect()

[]

In [62]:
ordGrp = ordItems.map(lambda x : ( int(x.split(',')[2]) , \
                                  float(x.split(',')[4])))

In [63]:
ordGrp.reduceByKey(sum).take(5)


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 1 times, most recent failure: Lost task 0.0 in stage 38.0 (TID 66) (DESKTOP-LJE0474 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1247, in main
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1237, in process
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\rdd.py", line 840, in func
    return f(iterator)
           ^^^^^^^^^^^
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\rdd.py", line 3983, in combineLocally
    merger.mergeValues(iterator)
  File "C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py", line 258, in mergeValues
    d[k] = comb(d[k], v) if k in d else creator(v)
           ^^^^^^^^^^^^^
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
TypeError: 'float' object is not iterable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1247, in main
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1237, in process
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\rdd.py", line 840, in func
    return f(iterator)
           ^^^^^^^^^^^
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\rdd.py", line 3983, in combineLocally
    merger.mergeValues(iterator)
  File "C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py", line 258, in mergeValues
    d[k] = comb(d[k], v) if k in d else creator(v)
           ^^^^^^^^^^^^^
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
TypeError: 'float' object is not iterable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more


In [14]:
from operator import add

In [56]:
ordGrp.take(5)



Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 33.0 failed 1 times, most recent failure: Lost task 0.0 in stage 33.0 (TID 59) (DESKTOP-LJE0474 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1247, in main
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1239, in process
  File "C:\Spark\python\lib\pyspark.zip\pyspark\serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "C:\Spark\python\lib\pyspark.zip\pyspark\util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\AppData\Local\Temp\ipykernel_15884\4211828731.py", line 2, in <lambda>
ValueError: invalid literal for int() with base 10: '299.98'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1247, in main
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1239, in process
  File "C:\Spark\python\lib\pyspark.zip\pyspark\serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "C:\Spark\python\lib\pyspark.zip\pyspark\util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\AppData\Local\Temp\ipykernel_15884\4211828731.py", line 2, in <lambda>
ValueError: invalid literal for int() with base 10: '299.98'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more


In [53]:
result.take(5)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 1 times, most recent failure: Lost task 0.0 in stage 30.0 (TID 54) (DESKTOP-LJE0474 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1247, in main
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1239, in process
  File "C:\Spark\python\lib\pyspark.zip\pyspark\serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "C:\Spark\python\lib\pyspark.zip\pyspark\util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\AppData\Local\Temp\ipykernel_15884\4211828731.py", line 2, in <lambda>
ValueError: invalid literal for int() with base 10: '299.98'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1247, in main
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1239, in process
  File "C:\Spark\python\lib\pyspark.zip\pyspark\serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "C:\Spark\python\lib\pyspark.zip\pyspark\util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\AppData\Local\Temp\ipykernel_15884\4211828731.py", line 2, in <lambda>
ValueError: invalid literal for int() with base 10: '299.98'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more


In [54]:


sales_data = [("apple", 3), ("banana", 5), ("orange", 2), ("apple", 4), ("banana", 3), ("orange", 6)] 
sales_rdd = spark.sparkContext.parallelize(sales_data) 

# Group sales data by product using groupByKey 
grouped_sales_rdd = sales_rdd.groupByKey()

# Collect and print the results 
print(grouped_sales_rdd.collect()) 

[('apple', <pyspark.resultiterable.ResultIterable object at 0x00000191FAB5C910>), ('banana', <pyspark.resultiterable.ResultIterable object at 0x00000191FBD2C2D0>), ('orange', <pyspark.resultiterable.ResultIterable object at 0x0000019180D1F610>)]


In [137]:
#Reduce By Key

In [57]:
rdd = spark.sparkContext.parallelize( [('a',10),('b',20),('a',20)])

In [58]:
from operator import add
rdd.reduceByKey(add).collect()

[('b', 20), ('a', 30)]

In [8]:
#Total revenue for each order

In [12]:
ordItems.take(4)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99']

In [13]:
ordGrp = ordItems.map(lambda x : ( int(x.split(',')[1]),float(x.split(',')[4])  ))

In [14]:
ordGrp.take(5)

[(1, 299.98), (2, 199.99), (2, 250.0), (2, 129.99), (4, 49.98)]

In [66]:
ordGrp.reduceByKey(add).collect()

[(502, 3147800.0),
 (1014, 2888993.9399996493),
 (926, 14870.699999999972),
 (134, 20025.0),
 (276, 29398.809999999947),
 (1004, 6929653.499999708),
 (828, 28982.939999999948),
 (810, 16031.97999999997),
 (906, 22865.84999999997),
 (924, 14151.149999999976),
 (886, 21766.289999999954),
 (572, 35191.19999999998),
 (778, 21116.549999999963),
 (278, 36576.86999999999),
 (642, 27840.0),
 (804, 18810.589999999967),
 (564, 27210.0),
 (792, 12951.359999999982),
 (172, 27210.0),
 (822, 38391.99999999997),
 (818, 41415.37),
 (116, 44585.09000000002),
 (282, 27127.519999999957),
 (728, 61490.0),
 (44, 56330.61000000003),
 (666, 9349.149999999987),
 (858, 11799.40999999999),
 (24, 18477.689999999988),
 (982, 8999.39999999999),
 (652, 7539.419999999991),
 (724, 22700.0),
 (306, 17638.039999999997),
 (646, 20697.929999999997),
 (730, 15600.0),
 (216, 12096.0),
 (258, 19567.93999999999),
 (786, 11699.34999999999),
 (78, 18598.139999999992),
 (364, 10799.639999999994),
 (58, 8699.709999999995),
 (768

In [19]:
sorted(ordGrp.reduceByKey(lambda x,y : x+y).collect())

[(1, 299.98),
 (2, 579.98),
 (4, 699.85),
 (5, 1129.8600000000001),
 (7, 579.9200000000001),
 (8, 729.8399999999999),
 (9, 599.96),
 (10, 651.9200000000001),
 (11, 919.79),
 (12, 1299.8700000000001),
 (13, 127.96),
 (14, 549.94),
 (15, 925.9100000000001),
 (16, 419.93),
 (17, 694.84),
 (18, 449.96000000000004),
 (19, 699.96),
 (20, 879.8599999999999),
 (21, 372.91),
 (23, 299.98),
 (24, 829.97),
 (25, 399.98),
 (27, 749.97),
 (28, 1159.9),
 (29, 1109.85),
 (30, 100.0),
 (31, 499.95),
 (33, 659.89),
 (34, 299.98),
 (35, 129.99),
 (36, 799.96),
 (37, 159.95),
 (38, 359.96000000000004),
 (39, 199.99),
 (41, 327.88),
 (42, 739.9200000000001),
 (43, 529.96),
 (44, 399.98),
 (45, 499.84999999999997),
 (46, 229.95),
 (48, 99.96),
 (49, 549.95),
 (50, 429.97),
 (51, 449.98),
 (52, 399.84000000000003),
 (56, 699.89),
 (57, 637.9),
 (58, 799.94),
 (59, 619.95),
 (61, 639.92),
 (62, 1149.94),
 (63, 899.9200000000001),
 (64, 659.97),
 (65, 299.98),
 (66, 679.94),
 (67, 150.0),
 (68, 299.98),
 (69,

In [17]:
#find maximu revenue for each order

In [67]:
sorted(ordGrp.reduceByKey(max).collect())

[(19, 124.99),
 (24, 399.95),
 (35, 159.99),
 (37, 174.95),
 (44, 299.95),
 (58, 299.99),
 (60, 999.99),
 (61, 299.99),
 (78, 499.95),
 (93, 124.95),
 (116, 224.95),
 (127, 329.99),
 (134, 125.0),
 (135, 110.0),
 (172, 150.0),
 (191, 499.95),
 (203, 399.99),
 (208, 1999.99),
 (216, 189.0),
 (226, 599.99),
 (235, 174.95),
 (249, 274.85),
 (251, 449.95),
 (258, 474.95),
 (273, 139.95),
 (276, 159.95),
 (278, 224.95),
 (282, 159.95),
 (295, 499.75),
 (303, 399.99),
 (305, 199.0),
 (306, 449.95),
 (311, 109.95),
 (359, 499.95),
 (364, 299.99),
 (365, 299.95),
 (403, 129.99),
 (502, 250.0),
 (564, 150.0),
 (565, 350.0),
 (567, 125.0),
 (572, 199.95),
 (607, 249.99),
 (625, 199.99),
 (627, 199.95),
 (642, 150.0),
 (646, 499.95),
 (647, 134.99),
 (652, 129.99),
 (666, 109.99),
 (671, 209.99),
 (677, 499.95),
 (691, 399.95),
 (703, 99.95),
 (705, 119.99),
 (715, 129.99),
 (724, 500.0),
 (725, 108.0),
 (728, 325.0),
 (730, 400.0),
 (743, 169.99),
 (768, 299.99),
 (771, 199.95),
 (773, 249.99),


In [23]:
ordItems.map(lambda x : ( int(x.split(',')[1]),float(x.split(',')[4])  )).reduceByKey(lambda x,y :x if x >y else y).take(4)

[(2, 250.0), (4, 299.95), (8, 299.95), (10, 199.99)]

In [24]:
ordItems.map(lambda x : ( int(x.split(',')[1]),x  )).\
reduceByKey(lambda x,y :x if float(x.split(',')[4]) > float(y.split(',')[4]) else y).take(4)

[(2, '3,2,502,5,250.0,50.0'),
 (4, '6,4,365,5,299.95,59.99'),
 (8, '18,8,365,5,299.95,59.99'),
 (10, '28,10,1073,1,199.99,199.99')]

In [34]:
lst=[(2,'joseph',200),(2,'jimmy',250),(2,'tina',130),(4,'jimmy',50),(4,'tina',300),(4,'joseph',150),(4,'ram',200),
     (7,'joseph',300),(7,'jimmy',80)]
orditems=spark.sparkContext.parallelize(lst,2)

In [35]:
ordPair =orditems.map(lambda x: (x[0],(x[1],x[2])))

In [36]:
ordPair.take(5)

[(2, ('joseph', 200)),
 (2, ('jimmy', 250)),
 (2, ('tina', 130)),
 (4, ('jimmy', 50)),
 (4, ('tina', 300))]

In [37]:
zero_val =0

In [38]:
def seq_opp(acc,ele):
    if acc > ele[1]:
        return acc
    else:
        return ele[1]

In [39]:
def comb_opp(acc1,acc2):
    if acc1 > acc2:
        return acc1
    else:
        return acc2

In [40]:
agg_result =ordPair.aggregateByKey(zero_val,seq_opp,comb_opp)

In [41]:
agg_result.take(5)

[(2, 250), (4, 300), (7, 300)]

In [42]:
#return customer_name along with max value

In [43]:
zero_val =('',0)

In [52]:
def seq_opp(acc,ele):
    if acc[1] > ele[1]:
        return acc
    else:
        return ele

In [53]:
def comb_opp(acc1,acc2):
    if acc1[1] > acc2[1]:
        return acc1
    else:
        return acc2

In [54]:
agg_result =ordPair.aggregateByKey(zero_val,seq_opp,comb_opp)

In [55]:
for i in agg_result.collect():print(i)

(2, ('jimmy', 250))
(4, ('tina', 300))
(7, ('joseph', 300))


In [56]:
#Sum all revenues and number of records for each order

In [1]:
#County By Key

In [2]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName("SparkDemo").master("local[1]").getOrCreate()

In [3]:
ord = spark.sparkContext.textFile('c:/practice/orders')

In [7]:
ord.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [69]:
ordMap = ord.map(lambda x: (x.split(',')[3],1))

In [70]:
ordMap.take(5)

[('CLOSED', 1),
 ('PENDING_PAYMENT', 1),
 ('COMPLETE', 1),
 ('CLOSED', 1),
 ('COMPLETE', 1)]

In [72]:
result = ordMap.countByKey()

AttributeError: 'collections.defaultdict' object has no attribute 'collect'

In [76]:
for k,v in result.items():
    print(k,v)

CLOSED 7556
PENDING_PAYMENT 15030
COMPLETE 22899
PROCESSING 8275
PAYMENT_REVIEW 729
PENDING 7610
ON_HOLD 3798
CANCELED 1428
SUSPECTED_FRAUD 1558


In [16]:
#Sort api

In [17]:
#sort orders using customer id

In [4]:
ord.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [78]:
ordPair = ord.map(lambda x:(int(x.split(',')[2]),x))

In [79]:
ordPair.take(5)

[(11599, '1,2013-07-25 00:00:00.0,11599,CLOSED'),
 (256, '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT'),
 (12111, '3,2013-07-25 00:00:00.0,12111,COMPLETE'),
 (8827, '4,2013-07-25 00:00:00.0,8827,CLOSED'),
 (11318, '5,2013-07-25 00:00:00.0,11318,COMPLETE')]

In [80]:
srtRdd =ordPair.sortByKey()

In [16]:
srtRdd.take(10)

[(1, '22945,2013-12-13 00:00:00.0,1,COMPLETE'),
 (2, '15192,2013-10-29 00:00:00.0,2,PENDING_PAYMENT'),
 (2, '33865,2014-02-18 00:00:00.0,2,COMPLETE'),
 (2, '57963,2013-08-02 00:00:00.0,2,ON_HOLD'),
 (2, '67863,2013-11-30 00:00:00.0,2,COMPLETE'),
 (3, '22646,2013-12-11 00:00:00.0,3,COMPLETE'),
 (3, '23662,2013-12-19 00:00:00.0,3,COMPLETE'),
 (3, '35158,2014-02-26 00:00:00.0,3,COMPLETE'),
 (3, '46399,2014-05-09 00:00:00.0,3,PROCESSING'),
 (3, '56178,2014-07-15 00:00:00.0,3,PENDING')]

In [81]:
srtRdd =ordPair.sortByKey(ascending=False)

In [22]:
for i in srtRdd.take(10):print(i[1])

41643,2014-04-08 00:00:00.0,12435,PENDING
61629,2013-12-21 00:00:00.0,12435,CANCELED
1868,2013-08-03 00:00:00.0,12434,CLOSED
4799,2013-08-23 00:00:00.0,12434,PENDING_PAYMENT
5303,2013-08-26 00:00:00.0,12434,PENDING
6160,2013-09-02 00:00:00.0,12434,COMPLETE
13544,2013-10-16 00:00:00.0,12434,PENDING
42915,2014-04-16 00:00:00.0,12434,COMPLETE
51800,2014-06-14 00:00:00.0,12434,ON_HOLD
61777,2013-12-26 00:00:00.0,12434,COMPLETE


In [23]:
#Sort Order using Customer and Status

In [24]:
ordPair = ord.map(lambda x :((int(x.split(',')[2]), x.split(',')[3]) ,x))

In [26]:
ordPair.take(5)

[((11599, 'CLOSED'), '1,2013-07-25 00:00:00.0,11599,CLOSED'),
 ((256, 'PENDING_PAYMENT'), '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT'),
 ((12111, 'COMPLETE'), '3,2013-07-25 00:00:00.0,12111,COMPLETE'),
 ((8827, 'CLOSED'), '4,2013-07-25 00:00:00.0,8827,CLOSED'),
 ((11318, 'COMPLETE'), '5,2013-07-25 00:00:00.0,11318,COMPLETE')]

In [83]:
srtRdd =ordPair.sortByKey(ascending=False)

In [84]:
for i in srtRdd.take(10):print(i[1])

41643,2014-04-08 00:00:00.0,12435,PENDING
61629,2013-12-21 00:00:00.0,12435,CANCELED
1868,2013-08-03 00:00:00.0,12434,CLOSED
4799,2013-08-23 00:00:00.0,12434,PENDING_PAYMENT
5303,2013-08-26 00:00:00.0,12434,PENDING
6160,2013-09-02 00:00:00.0,12434,COMPLETE
13544,2013-10-16 00:00:00.0,12434,PENDING
42915,2014-04-16 00:00:00.0,12434,COMPLETE
51800,2014-06-14 00:00:00.0,12434,ON_HOLD
61777,2013-12-26 00:00:00.0,12434,COMPLETE


In [29]:
#Ranking

In [85]:
prod = spark.sparkContext.textFile('c:/practice/product')

In [86]:
for i in prod.take(5):print(i)

1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy
2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet


In [87]:
prodPair = prod.map(lambda x : (float(x.split(',')[3]),x))

In [88]:
prodPair.take(5)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 63.0 failed 1 times, most recent failure: Lost task 0.0 in stage 63.0 (TID 110) (DESKTOP-LJE0474 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1247, in main
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1239, in process
  File "C:\Spark\python\lib\pyspark.zip\pyspark\serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "C:\Spark\python\lib\pyspark.zip\pyspark\util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\AppData\Local\Temp\ipykernel_15884\1381186240.py", line 1, in <lambda>
ValueError: could not convert string to float: ''

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1247, in main
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 1239, in process
  File "C:\Spark\python\lib\pyspark.zip\pyspark\serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\anaconda3\Lib\site-packages\pyspark\rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "C:\Spark\python\lib\pyspark.zip\pyspark\util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "C:\Users\babje\AppData\Local\Temp\ipykernel_15884\1381186240.py", line 1, in <lambda>
ValueError: could not convert string to float: ''

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more


In [89]:
prod.filter(lambda x : x.split(',')[4]=='').collect()

['685,31,"TaylorMade SLDR Irons - (Steel) 4-PW, AW",,899.99,http://images.acmesports.sports/TaylorMade+SLDR+Irons+-+%28Steel%29+4-PW%2C+AW']

In [90]:
prod1 =prod.filter(lambda x : x.split(',')[4]!='')

In [91]:
prodPair = prod1.map(lambda x : (float(x.split(',')[4]),x))

In [92]:
for i in prodPair.take(5):print(i)

(59.98, '1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy')
(129.99, "2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat")
(89.99, "3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat")
(89.99, "4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat")
(199.99, '5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet')


In [93]:
top5rdd =prodPair.sortByKey(ascending=False).take(5)

In [94]:
for i in top5rdd:print(i[1])

208,10,SOLE E35 Elliptical,,1999.99,http://images.acmesports.sports/SOLE+E35+Elliptical
66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
199,10,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
496,22,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
1048,47,"Spalding Beast 60"" Glass Portable Basketball ",,1099.99,http://images.acmesports.sports/Spalding+Beast+60%22+Glass+Portable+Basketball+Hoop


In [7]:
#takeOrder api

In [95]:
rdd =spark.sparkContext.parallelize([10,8,20,15,30])

In [98]:
rdd.takeOrdered(3).sort(reverse=True)

In [10]:
rdd.takeOrdered(2,lambda x : -x)

[30, 20]

In [12]:
prod.take(5)

['1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy',
 "2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat",
 "3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat",
 "4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat",
 '5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet']

In [17]:
prod1.takeOrdered(2, lambda x : -float(x.split(',')[4]))

['208,10,SOLE E35 Elliptical,,1999.99,http://images.acmesports.sports/SOLE+E35+Elliptical',
 '66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill']

In [19]:
#Ranking Per Group
#Top 2 products per category

In [24]:
prodDf = prod.filter(lambda x: int(x.split(',')[1] )in (2,3,4))

In [25]:
prodDf.count()

72

In [26]:
prodDf.collect()


['1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy',
 "2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat",
 "3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat",
 "4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat",
 '5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet',
 "6,2,Jordan Men's VI Retro TD Football Cleat,,134.99,http://images.acmesports.sports/Jordan+Men%27s+VI+Retro+TD+Football+Cleat",
 '7,2,Schutt Youth Recruit Hybrid Custom Football H,,99.99,http://images.acmesports.sports/Schutt+Youth+Recruit+Hybrid+Custom+Football+Helmet+20

In [29]:
prodGrp=prodDf.map(lambda x :(int(x.split(',')[1]),x)).groupByKey()

In [30]:
prodGrp.take(5)

[(2, <pyspark.resultiterable.ResultIterable at 0x21405368790>),
 (4, <pyspark.resultiterable.ResultIterable at 0x21405368f50>),
 (3, <pyspark.resultiterable.ResultIterable at 0x214053ac910>)]

In [33]:
first = prodGrp.first()

In [35]:
first

(2, <pyspark.resultiterable.ResultIterable at 0x21405532d50>)

In [39]:
sorted(first[1],key =lambda x : float(x.split(',')[4]) ,reverse=True)

['16,2,Riddell Youth 360 Custom Football Helmet,,299.99,http://images.acmesports.sports/Riddell+Youth+360+Custom+Football+Helmet',
 '11,2,Fitness Gear 300 lb Olympic Weight Set,,209.99,http://images.acmesports.sports/Fitness+Gear+300+lb+Olympic+Weight+Set',
 '5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet',
 '14,2,Quik Shade Summit SX170 10 FT. x 10 FT. Canop,,199.99,http://images.acmesports.sports/Quik+Shade+Summit+SX170+10+FT.+x+10+FT.+Canopy',
 "12,2,Under Armour Men's Highlight MC Alter Ego Fla,,139.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Alter+Ego+Flash+Football...",
 "23,2,Under Armour Men's Highlight MC Alter Ego Hul,,139.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Alter+Ego+Hulk+Football...",
 "6,2,Jordan Men's VI Retro TD Football Cleat,,134.99,http://images.acmesports.sports/Jordan+Men%27s+VI+Retro+TD+Football+Cleat",
 "2,2,Und

In [52]:
result=prodGrp.flatMap(lambda x :sorted(x[1],key =lambda x : float(x.split(',')[4]) ,reverse=True)[:2])


In [53]:
result.collect()

['16,2,Riddell Youth 360 Custom Football Helmet,,299.99,http://images.acmesports.sports/Riddell+Youth+360+Custom+Football+Helmet',
 '11,2,Fitness Gear 300 lb Olympic Weight Set,,209.99,http://images.acmesports.sports/Fitness+Gear+300+lb+Olympic+Weight+Set',
 '66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill',
 '60,4,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical',
 '40,3,Quik Shade Summit SX170 10 FT. x 10 FT. Canop,,199.99,http://images.acmesports.sports/Quik+Shade+Summit+SX170+10+FT.+x+10+FT.+Canopy',
 "32,3,PUMA Men's evoPOWER 1 Tricks FG Soccer Cleat,,189.99,http://images.acmesports.sports/PUMA+Men%27s+evoPOWER+1+Tricks+FG+Soccer+Cleat"]

In [54]:
#Union
#Number of customer placed orders in july and august

In [99]:
ord.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [100]:
jlyOrd =ord.filter(lambda x : x.split(',')[1].split('-')[1]=="07" ).map(lambda x : x.split(',')[2])

In [61]:
jlyOrd.count()

6001

In [101]:
augOrd =ord.filter(lambda x : x.split(',')[1].split('-')[1]=="08" ).map(lambda x : x.split(',')[2])

In [63]:
augOrd.count()

5680

In [102]:
jlyOrd.union(augOrd).count()

11681

In [103]:
jlyOrd.union(augOrd).distinct().count()

7633

In [104]:
jlyOrd.intersection(augOrd).count()

1759

In [105]:
jlyOrd.subtract(augOrd).count()

3836

In [107]:
rdd =spark.sparkContext.parallelize(range(100))

In [108]:
rdd.sample(withReplacement=False,fraction=0.1,seed=10).collect()

[5, 19, 20, 29, 35, 47, 61, 62, 71, 72, 77, 81, 86, 87, 91]

In [77]:
rdd.sample(withReplacement=True,fraction=0.1,seed=10).collect()

[3, 3, 14, 14, 18, 45, 76, 89]

In [84]:
rdd.sample(withReplacement=True,fraction=0.1,seed=10).collect()

[3, 3, 14, 14, 18, 45, 76, 89]

In [91]:
rdd.takeSample(withReplacement=True,num=10,seed=10)

[15, 56, 81, 11, 71, 22, 96, 34, 4, 82]

In [92]:
rdd.takeSample(withReplacement=False,num=10,seed=10)

[87, 23, 24, 71, 20, 9, 91, 77, 84, 68]

In [105]:
df =spark.range(1000000)

In [106]:
df =df.select(df.id, df.id * 2, df.id*3)

In [151]:
rdd =df.rdd.map(lambda x : str(x[0]) +','+str(x[1])+','+str(x[2]))


In [152]:
rdd.take(4)

['0,0,0', '1,2,3', '2,4,6', '3,6,9']

In [109]:
rdd =spark.sparkContext.textFile('c:/practice/orders')

In [110]:
rdd.getNumPartitions()

2

In [111]:
rdd1= rdd.repartition(5)


In [112]:
rdd1.getNumPartitions()

5

In [7]:
#data frame

In [3]:
lst = [('babjee',45),('ram',40),('sham',40),('sruthi',25)]

In [4]:
df =spark.createDataFrame(lst,schema=["Name","Age"])

In [5]:
df.show()

+------+---+
|  Name|Age|
+------+---+
|babjee| 45|
|   ram| 40|
|  sham| 40|
|sruthi| 25|
+------+---+



In [11]:
df.rdd.getNumPartitions()

2

In [12]:
df.rdd.glom().collect()

[[Row(Name='babjee', Age=45), Row(Name='ram', Age=40)],
 [Row(Name='sham', Age=40), Row(Name='sruthi', Age=25)]]

In [13]:
#data types

In [6]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)



In [8]:
spark.version

'3.5.0'

In [9]:
spark.sparkContext.version

'3.5.0'

In [10]:
df =spark.range(1000)

In [11]:
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
only showing top 20 rows



In [12]:
help(spark.range)

Help on method range in module pyspark.sql.session:

range(start: int, end: Optional[int] = None, step: int = 1, numPartitions: Optional[int] = None) -> pyspark.sql.dataframe.DataFrame method of pyspark.sql.session.SparkSession instance
    Create a :class:`DataFrame` with single :class:`pyspark.sql.types.LongType` column named
    ``id``, containing elements in a range from ``start`` to ``end`` (exclusive) with
    step value ``step``.
    
    .. versionadded:: 2.0.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Parameters
    ----------
    start : int
        the start value
    end : int, optional
        the end value (exclusive)
    step : int, optional
        the incremental step (default: 1)
    numPartitions : int, optional
        the number of partitions of the DataFrame
    
    Returns
    -------
    :class:`DataFrame`
    
    Examples
    --------
    >>> spark.range(1, 7, 2).show()
    +---+
    | id|
    +---+
    |  1|
    |  3|
    | 

In [13]:
df = spark.range(0,11,2)

In [14]:
df.show()

+---+
| id|
+---+
|  0|
|  2|
|  4|
|  6|
|  8|
| 10|
+---+



In [16]:
df = spark.range(1000,numPartitions=5)

In [17]:
df.rdd.getNumPartitions()

5

In [18]:
type(df)

pyspark.sql.dataframe.DataFrame

In [19]:
#creating data frame from list

In [20]:
lst=[('Robert',40),('James',25)]

In [21]:
df =spark.createDataFrame(lst)

In [23]:
df.show()


+------+---+
|    _1| _2|
+------+---+
|Robert| 40|
| James| 25|
+------+---+



In [24]:
df.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)



In [25]:
df =spark.createDataFrame(lst,["Name","Age"])

In [26]:
df.show()

+------+---+
|  Name|Age|
+------+---+
|Robert| 40|
| James| 25|
+------+---+



In [27]:
df =spark.createDataFrame(lst ,schema=('Name string, Age int'))

In [28]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)



In [29]:
dic =[{"Name":"Robert", "Age":20},{"Name":"Jame", "Age":25}]

In [30]:
df =spark.createDataFrame(dic)

In [31]:
df.show()

+---+------+
|Age|  Name|
+---+------+
| 20|Robert|
| 25|  Jame|
+---+------+



In [32]:
lst = [('Robert',40),('James',40)]

In [33]:
rdd =spark.sparkContext.parallelize(lst)

In [34]:
df =spark.createDataFrame(rdd,['Name','Age'])

In [35]:
df.show()

+------+---+
|  Name|Age|
+------+---+
|Robert| 40|
| James| 40|
+------+---+



In [37]:
from pyspark.sql import Row

In [38]:
r =Row(Name='Ram',Age=35)

In [39]:
r.Name

'Ram'

In [40]:
r.Age

35

In [44]:
rdd =spark.sparkContext.parallelize([Row(Name='Ram',Age=30),Row(Name='James',Age=25)])

In [45]:
for i in rdd.collect():print(i)

Row(Name='Ram', Age=30)
Row(Name='James', Age=25)


In [46]:
df =spark.createDataFrame(rdd)

In [47]:
df.show()

+-----+---+
| Name|Age|
+-----+---+
|  Ram| 30|
|James| 25|
+-----+---+



In [49]:
import prands as pd

In [50]:
#Creating data frame using pyhon pands
data =[['tom',30],['james',30],['sunil',20]]
df_pandas =pd.DataFrame(data,columns=['Name','Age'])
df=spark.createDataFrame(df_panda)

In [51]:
lst =[('Robet',101),('James',102)]

In [55]:
df =spark.createDataFrame(lst,["EmpName","DeptNo"])

In [59]:
df.createOrReplaceTempView("emp")

In [61]:
spark.sql("select * from emp").show()

+-------+------+
|EmpName|DeptNo|
+-------+------+
|  Robet|   101|
|  James|   102|
+-------+------+



In [64]:
spark.sql("select * from emp where EmpName='James'").show()

+-------+------+
|EmpName|DeptNo|
+-------+------+
|  James|   102|
+-------+------+



In [75]:
#GlobalTempView
df.createOrReplaceGlobalTempView("emp1")

In [71]:
spark.sql("select * from emp1").show()

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `emp1` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [emp1], [], false


In [74]:
spark.sql("select * from global_temp.emp1 where deptno=101").show()

+-------+------+
|EmpName|DeptNo|
+-------+------+
|  Robet|   101|
+-------+------+



In [76]:
#Table function

In [78]:
df.createOrReplaceTempView("emp_v")

In [79]:
spark.sql("select * from emp_v").show()

+-------+------+
|EmpName|DeptNo|
+-------+------+
|  Robet|   101|
|  James|   102|
+-------+------+



In [82]:
df =spark.table("emp_v")

In [83]:
df.show()

+-------+------+
|EmpName|DeptNo|
+-------+------+
|  Robet|   101|
|  James|   102|
+-------+------+



In [84]:
spark.sparkContext

In [85]:
spark.conf.get('spark.sql.shuffle.partitions')

'200'

In [86]:
spark.conf.set('spark.sql.shuffle.partitions',300)

In [87]:
spark.conf.get('spark.sql.shuffle.partitions')

'300'

In [96]:
df =spark.read.load('c:/practice/orders',format='csv',sep=',')

In [90]:
df.show()

+---+--------------------+-----+---------------+
|_c0|                 _c1|  _c2|            _c3|
+---+--------------------+-----+---------------+
|  1|2013-07-25 00:00:...|11599|         CLOSED|
|  2|2013-07-25 00:00:...|  256|PENDING_PAYMENT|
|  3|2013-07-25 00:00:...|12111|       COMPLETE|
|  4|2013-07-25 00:00:...| 8827|         CLOSED|
|  5|2013-07-25 00:00:...|11318|       COMPLETE|
|  6|2013-07-25 00:00:...| 7130|       COMPLETE|
|  7|2013-07-25 00:00:...| 4530|       COMPLETE|
|  8|2013-07-25 00:00:...| 2911|     PROCESSING|
|  9|2013-07-25 00:00:...| 5657|PENDING_PAYMENT|
| 10|2013-07-25 00:00:...| 5648|PENDING_PAYMENT|
| 11|2013-07-25 00:00:...|  918| PAYMENT_REVIEW|
| 12|2013-07-25 00:00:...| 1837|         CLOSED|
| 13|2013-07-25 00:00:...| 9149|PENDING_PAYMENT|
| 14|2013-07-25 00:00:...| 9842|     PROCESSING|
| 15|2013-07-25 00:00:...| 2568|       COMPLETE|
| 16|2013-07-25 00:00:...| 7276|PENDING_PAYMENT|
| 17|2013-07-25 00:00:...| 2667|       COMPLETE|
| 18|2013-07-25 00:0

In [91]:
ord=spark.read.load('c:/practice/orders',format='csv',schema=('OrderId int, OrderDate String,CustomerId int,OrderStatus String'))

In [92]:
df.show()

+-------+--------------------+----------+---------------+
|OrderId|           OrderDate|CustomerId|    OrderStatus|
+-------+--------------------+----------+---------------+
|      1|2013-07-25 00:00:...|     11599|         CLOSED|
|      2|2013-07-25 00:00:...|       256|PENDING_PAYMENT|
|      3|2013-07-25 00:00:...|     12111|       COMPLETE|
|      4|2013-07-25 00:00:...|      8827|         CLOSED|
|      5|2013-07-25 00:00:...|     11318|       COMPLETE|
|      6|2013-07-25 00:00:...|      7130|       COMPLETE|
|      7|2013-07-25 00:00:...|      4530|       COMPLETE|
|      8|2013-07-25 00:00:...|      2911|     PROCESSING|
|      9|2013-07-25 00:00:...|      5657|PENDING_PAYMENT|
|     10|2013-07-25 00:00:...|      5648|PENDING_PAYMENT|
|     11|2013-07-25 00:00:...|       918| PAYMENT_REVIEW|
|     12|2013-07-25 00:00:...|      1837|         CLOSED|
|     13|2013-07-25 00:00:...|      9149|PENDING_PAYMENT|
|     14|2013-07-25 00:00:...|      9842|     PROCESSING|
|     15|2013-

In [93]:
df.printSchema()

root
 |-- OrderId: integer (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- OrderStatus: string (nullable = true)



In [95]:
df.show(5,truncate=False)

+-------+---------------------+----------+---------------+
|OrderId|OrderDate            |CustomerId|OrderStatus    |
+-------+---------------------+----------+---------------+
|1      |2013-07-25 00:00:00.0|11599     |CLOSED         |
|2      |2013-07-25 00:00:00.0|256       |PENDING_PAYMENT|
|3      |2013-07-25 00:00:00.0|12111     |COMPLETE       |
|4      |2013-07-25 00:00:00.0|8827      |CLOSED         |
|5      |2013-07-25 00:00:00.0|11318     |COMPLETE       |
+-------+---------------------+----------+---------------+
only showing top 5 rows



In [98]:
df =spark.read.load('c:/practice/orders',format='csv')

In [99]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)



In [100]:
df =spark.read.load(path='c:/practice/orders',format='csv',inferSchema=True)

In [101]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: timestamp (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)



In [102]:
df =spark.read.load('c:/practice/orders1',format='csv',inferSchema=True,header=True)

In [103]:
df.show()

+-------+-------------------+----------+---------------+
|OrderId|          OrderDate|CustomerId|    OrderStatus|
+-------+-------------------+----------+---------------+
|      1|2013-07-25 00:00:00|     11599|         CLOSED|
|      2|2013-07-25 00:00:00|       256|PENDING_PAYMENT|
|      3|2013-07-25 00:00:00|     12111|       COMPLETE|
|      4|2013-07-25 00:00:00|      8827|         CLOSED|
|      5|2013-07-25 00:00:00|     11318|       COMPLETE|
|      6|2013-07-25 00:00:00|      7130|       COMPLETE|
|      7|2013-07-25 00:00:00|      4530|       COMPLETE|
|      8|2013-07-25 00:00:00|      2911|     PROCESSING|
|      9|2013-07-25 00:00:00|      5657|PENDING_PAYMENT|
|     10|2013-07-25 00:00:00|      5648|PENDING_PAYMENT|
|     11|2013-07-25 00:00:00|       918| PAYMENT_REVIEW|
|     12|2013-07-25 00:00:00|      1837|         CLOSED|
|     13|2013-07-25 00:00:00|      9149|PENDING_PAYMENT|
|     14|2013-07-25 00:00:00|      9842|     PROCESSING|
|     15|2013-07-25 00:00:00|  

In [104]:
df.printSchema()

root
 |-- OrderId: integer (nullable = true)
 |-- OrderDate: timestamp (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- OrderStatus: string (nullable = true)



In [105]:
#textfile

In [106]:
df =spark.read.load('c:/practice/orders',format='text')

In [108]:
df.show(truncate=False)

+---------------------------------------------+
|value                                        |
+---------------------------------------------+
|1,2013-07-25 00:00:00.0,11599,CLOSED         |
|2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT  |
|3,2013-07-25 00:00:00.0,12111,COMPLETE       |
|4,2013-07-25 00:00:00.0,8827,CLOSED          |
|5,2013-07-25 00:00:00.0,11318,COMPLETE       |
|6,2013-07-25 00:00:00.0,7130,COMPLETE        |
|7,2013-07-25 00:00:00.0,4530,COMPLETE        |
|8,2013-07-25 00:00:00.0,2911,PROCESSING      |
|9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT |
|10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT|
|11,2013-07-25 00:00:00.0,918,PAYMENT_REVIEW  |
|12,2013-07-25 00:00:00.0,1837,CLOSED         |
|13,2013-07-25 00:00:00.0,9149,PENDING_PAYMENT|
|14,2013-07-25 00:00:00.0,9842,PROCESSING     |
|15,2013-07-25 00:00:00.0,2568,COMPLETE       |
|16,2013-07-25 00:00:00.0,7276,PENDING_PAYMENT|
|17,2013-07-25 00:00:00.0,2667,COMPLETE       |
|18,2013-07-25 00:00:00.0,1205,CLOSED   

In [109]:
#Parquet

In [125]:
df =spark.read.load('c:/practice/Order.parquet',format='parquet')

In [127]:
df.show(truncate=False)

+--------+---------------------+--------+---------------+
|column_1|column_2             |column_3|column_4       |
+--------+---------------------+--------+---------------+
|1       |2013-07-25 00:00:00.0|11599   |CLOSED         |
|2       |2013-07-25 00:00:00.0|256     |PENDING_PAYMENT|
|3       |2013-07-25 00:00:00.0|12111   |COMPLETE       |
|4       |2013-07-25 00:00:00.0|8827    |CLOSED         |
|5       |2013-07-25 00:00:00.0|11318   |COMPLETE       |
|6       |2013-07-25 00:00:00.0|7130    |COMPLETE       |
|7       |2013-07-25 00:00:00.0|4530    |COMPLETE       |
|8       |2013-07-25 00:00:00.0|2911    |PROCESSING     |
|9       |2013-07-25 00:00:00.0|5657    |PENDING_PAYMENT|
|10      |2013-07-25 00:00:00.0|5648    |PENDING_PAYMENT|
|11      |2013-07-25 00:00:00.0|918     |PAYMENT_REVIEW |
|12      |2013-07-25 00:00:00.0|1837    |CLOSED         |
|13      |2013-07-25 00:00:00.0|9149    |PENDING_PAYMENT|
|14      |2013-07-25 00:00:00.0|9842    |PROCESSING     |
|15      |2013

In [130]:
df =spark.read.load('c:/practice/order.avro',format='avro')

AnalysisException: Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of Apache Avro Data Source Guide.

In [3]:
#Spark ETl

In [4]:
#Select

In [3]:
ord =spark.read.load('c:/practice/orders',format='csv',sep=',', schema=('order_id int, order_date timestamp,\
                                                                         order_customer_id int, order_status string'))

In [8]:
ord.show(5)

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
+--------+-------------------+-----------------+---------------+
only showing top 5 rows



In [4]:
data =[('Robet',30,40,40),('Ram',31,33,29),('Ram',31,33,99),('Ram',31,33,29)]
emp=spark.createDataFrame(data =data, schema=['Name','score1','score2','score3'])

In [35]:
emp.show()

+-----+------+------+------+
| Name|score1|score2|score3|
+-----+------+------+------+
|Robet|    30|    40|    40|
|  Ram|    31|    33|    29|
|  Ram|    31|    33|    99|
|  Ram|    31|    33|    29|
+-----+------+------+------+



In [6]:
ord.select(ord.order_id, 'order_id', "order_id", (ord.order_id+10).alias('order10')).show()

+--------+--------+--------+-------+
|order_id|order_id|order_id|order10|
+--------+--------+--------+-------+
|       1|       1|       1|     11|
|       2|       2|       2|     12|
|       3|       3|       3|     13|
|       4|       4|       4|     14|
|       5|       5|       5|     15|
|       6|       6|       6|     16|
|       7|       7|       7|     17|
|       8|       8|       8|     18|
|       9|       9|       9|     19|
|      10|      10|      10|     20|
|      11|      11|      11|     21|
|      12|      12|      12|     22|
|      13|      13|      13|     23|
|      14|      14|      14|     24|
|      15|      15|      15|     25|
|      16|      16|      16|     26|
|      17|      17|      17|     27|
|      18|      18|      18|     28|
|      19|      19|      19|     29|
|      20|      20|      20|     30|
+--------+--------+--------+-------+
only showing top 20 rows



In [14]:
from pyspark.sql.functions import lower

In [21]:
ord.select(ord.order_status, lower(ord.order_status).alias('order_status_lower')).show()

+---------------+------------------+
|   order_status|order_status_lower|
+---------------+------------------+
|         CLOSED|            closed|
|PENDING_PAYMENT|   pending_payment|
|       COMPLETE|          complete|
|         CLOSED|            closed|
|       COMPLETE|          complete|
|       COMPLETE|          complete|
|       COMPLETE|          complete|
|     PROCESSING|        processing|
|PENDING_PAYMENT|   pending_payment|
|PENDING_PAYMENT|   pending_payment|
| PAYMENT_REVIEW|    payment_review|
|         CLOSED|            closed|
|PENDING_PAYMENT|   pending_payment|
|     PROCESSING|        processing|
|       COMPLETE|          complete|
|PENDING_PAYMENT|   pending_payment|
|       COMPLETE|          complete|
|         CLOSED|            closed|
|PENDING_PAYMENT|   pending_payment|
|     PROCESSING|        processing|
+---------------+------------------+
only showing top 20 rows



In [23]:
from pyspark.sql.functions import substring

In [24]:
ord.select(substring(ord.order_date,1,4).alias('orderyear')).show()

+---------+
|orderyear|
+---------+
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
|     2013|
+---------+
only showing top 20 rows



In [34]:
ord.selectExpr("substring(order_date,1,4) as order_year").show()

+----------+
|order_year|
+----------+
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
|      2013|
+----------+
only showing top 20 rows



In [8]:
ord.selectExpr(" lower(order_status) as lower_order_status").show()

+------------------+
|lower_order_status|
+------------------+
|            closed|
|   pending_payment|
|          complete|
|            closed|
|          complete|
|          complete|
|          complete|
|        processing|
|   pending_payment|
|   pending_payment|
|    payment_review|
|            closed|
|   pending_payment|
|        processing|
|          complete|
|   pending_payment|
|          complete|
|            closed|
|   pending_payment|
|        processing|
+------------------+
only showing top 20 rows



In [12]:
from pyspark.sql.functions import *

In [13]:
ord.select(lower(ord.order_status)).show()

+-------------------+
|lower(order_status)|
+-------------------+
|             closed|
|    pending_payment|
|           complete|
|             closed|
|           complete|
|           complete|
|           complete|
|         processing|
|    pending_payment|
|    pending_payment|
|     payment_review|
|             closed|
|    pending_payment|
|         processing|
|           complete|
|    pending_payment|
|           complete|
|             closed|
|    pending_payment|
|         processing|
+-------------------+
only showing top 20 rows



In [14]:
#With colum api

In [16]:
ord.show(4)

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
+--------+-------------------+-----------------+---------------+
only showing top 4 rows



In [17]:
ord.withColumn('order_year',substring(ord.order_date,1,4)).show()

+--------+-------------------+-----------------+---------------+----------+
|order_id|         order_date|order_customer_id|   order_status|order_year|
+--------+-------------------+-----------------+---------------+----------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|      2013|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|      2013|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|      2013|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|      2013|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|      2013|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|      2013|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|      2013|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|      2013|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|      2013|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|      2013|
|      11|20

In [18]:
#if you give alias name as colum name it will modify existing colum
ord.withColumn('order_date',substring(ord.order_date,1,4)).show()

+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|   order_status|
+--------+----------+-----------------+---------------+
|       1|      2013|            11599|         CLOSED|
|       2|      2013|              256|PENDING_PAYMENT|
|       3|      2013|            12111|       COMPLETE|
|       4|      2013|             8827|         CLOSED|
|       5|      2013|            11318|       COMPLETE|
|       6|      2013|             7130|       COMPLETE|
|       7|      2013|             4530|       COMPLETE|
|       8|      2013|             2911|     PROCESSING|
|       9|      2013|             5657|PENDING_PAYMENT|
|      10|      2013|             5648|PENDING_PAYMENT|
|      11|      2013|              918| PAYMENT_REVIEW|
|      12|      2013|             1837|         CLOSED|
|      13|      2013|             9149|PENDING_PAYMENT|
|      14|      2013|             9842|     PROCESSING|
|      15|      2013|             2568|       CO

In [19]:
#withColumnRenamed

In [25]:
ord.withColumnRenamed('order_date','New_Date').show()

+--------+-------------------+-----------------+---------------+
|order_id|           New_Date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

In [26]:
ord.show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

In [27]:
ord.drop('order_id','order_date').show()

+-----------------+---------------+
|order_customer_id|   order_status|
+-----------------+---------------+
|            11599|         CLOSED|
|              256|PENDING_PAYMENT|
|            12111|       COMPLETE|
|             8827|         CLOSED|
|            11318|       COMPLETE|
|             7130|       COMPLETE|
|             4530|       COMPLETE|
|             2911|     PROCESSING|
|             5657|PENDING_PAYMENT|
|             5648|PENDING_PAYMENT|
|              918| PAYMENT_REVIEW|
|             1837|         CLOSED|
|             9149|PENDING_PAYMENT|
|             9842|     PROCESSING|
|             2568|       COMPLETE|
|             7276|PENDING_PAYMENT|
|             2667|       COMPLETE|
|             1205|         CLOSED|
|             9488|PENDING_PAYMENT|
|             9198|     PROCESSING|
+-----------------+---------------+
only showing top 20 rows



In [29]:
#dropDuplicate


In [37]:
emp.show()

+-----+------+------+------+
| Name|score1|score2|score3|
+-----+------+------+------+
|Robet|    30|    40|    40|
|  Ram|    31|    33|    29|
|  Ram|    31|    33|    99|
|  Ram|    31|    33|    29|
+-----+------+------+------+



In [32]:
emp.dropDuplicates().show()

+-----+------+------+------+
| Name|score1|score2|score3|
+-----+------+------+------+
|Robet|    30|    40|    40|
|  Ram|    31|    33|    99|
|  Ram|    31|    33|    29|
+-----+------+------+------+



In [38]:
emp.dropDuplicates(['Name','score1','score2']).show()

+-----+------+------+------+
| Name|score1|score2|score3|
+-----+------+------+------+
|  Ram|    31|    33|    29|
|Robet|    30|    40|    40|
+-----+------+------+------+



In [39]:
#filer api

In [40]:
ord.select(ord.order_id <5).show()

+--------------+
|(order_id < 5)|
+--------------+
|          true|
|          true|
|          true|
|          true|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
+--------------+
only showing top 20 rows



In [43]:
ord.where(ord.order_id <5).show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
+--------+-------------------+-----------------+---------------+



In [45]:
ord.where((ord.order_id <10) & (ord.order_status=='CLOSED')) .show()

+--------+-------------------+-----------------+------------+
|order_id|         order_date|order_customer_id|order_status|
+--------+-------------------+-----------------+------------+
|       1|2013-07-25 00:00:00|            11599|      CLOSED|
|       4|2013-07-25 00:00:00|             8827|      CLOSED|
+--------+-------------------+-----------------+------------+



In [46]:
ord.select(ord.order_id,ord.order_status).where((ord.order_id <10) & (ord.order_status=='CLOSED')) .show()

+--------+------------+
|order_id|order_status|
+--------+------------+
|       1|      CLOSED|
|       4|      CLOSED|
+--------+------------+



In [51]:
ord.where("order_status in ('CLOSED','COMPLETE')").show()

+--------+-------------------+-----------------+------------+
|order_id|         order_date|order_customer_id|order_status|
+--------+-------------------+-----------------+------------+
|       1|2013-07-25 00:00:00|            11599|      CLOSED|
|       3|2013-07-25 00:00:00|            12111|    COMPLETE|
|       4|2013-07-25 00:00:00|             8827|      CLOSED|
|       5|2013-07-25 00:00:00|            11318|    COMPLETE|
|       6|2013-07-25 00:00:00|             7130|    COMPLETE|
|       7|2013-07-25 00:00:00|             4530|    COMPLETE|
|      12|2013-07-25 00:00:00|             1837|      CLOSED|
|      15|2013-07-25 00:00:00|             2568|    COMPLETE|
|      17|2013-07-25 00:00:00|             2667|    COMPLETE|
|      18|2013-07-25 00:00:00|             1205|      CLOSED|
|      22|2013-07-25 00:00:00|              333|    COMPLETE|
|      24|2013-07-25 00:00:00|            11441|      CLOSED|
|      25|2013-07-25 00:00:00|             9503|      CLOSED|
|      2

In [54]:
ord.where(ord.order_status.isin(['CLOSED'])).show()

+--------+-------------------+-----------------+------------+
|order_id|         order_date|order_customer_id|order_status|
+--------+-------------------+-----------------+------------+
|       1|2013-07-25 00:00:00|            11599|      CLOSED|
|       4|2013-07-25 00:00:00|             8827|      CLOSED|
|      12|2013-07-25 00:00:00|             1837|      CLOSED|
|      18|2013-07-25 00:00:00|             1205|      CLOSED|
|      24|2013-07-25 00:00:00|            11441|      CLOSED|
|      25|2013-07-25 00:00:00|             9503|      CLOSED|
|      37|2013-07-25 00:00:00|             5863|      CLOSED|
|      51|2013-07-25 00:00:00|            12271|      CLOSED|
|      57|2013-07-25 00:00:00|             7073|      CLOSED|
|      61|2013-07-25 00:00:00|             4791|      CLOSED|
|      62|2013-07-25 00:00:00|             9111|      CLOSED|
|      87|2013-07-25 00:00:00|             3065|      CLOSED|
|      90|2013-07-25 00:00:00|             9131|      CLOSED|
|     10

In [58]:
ord.filter(ord.order_id <10).show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
+--------+-------------------+-----------------+---------------+



In [6]:
#Sort api

In [5]:
ord.sort(ord.order_status.asc()).show()

+--------+-------------------+-----------------+------------+
|order_id|         order_date|order_customer_id|order_status|
+--------+-------------------+-----------------+------------+
|     527|2013-07-28 00:00:00|             5426|    CANCELED|
|    1435|2013-08-01 00:00:00|             1879|    CANCELED|
|     552|2013-07-28 00:00:00|             1445|    CANCELED|
|     112|2013-07-26 00:00:00|             5375|    CANCELED|
|     564|2013-07-28 00:00:00|             2216|    CANCELED|
|     955|2013-07-30 00:00:00|             8117|    CANCELED|
|    1383|2013-08-01 00:00:00|             1753|    CANCELED|
|     962|2013-07-30 00:00:00|             9492|    CANCELED|
|     607|2013-07-28 00:00:00|             6376|    CANCELED|
|    1013|2013-07-30 00:00:00|             1903|    CANCELED|
|     667|2013-07-28 00:00:00|             4726|    CANCELED|
|    1169|2013-07-31 00:00:00|             3971|    CANCELED|
|     717|2013-07-29 00:00:00|             8208|    CANCELED|
|    118

In [10]:
ord.sort(ord.order_date.asc(),ord.order_status.desc()).show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|      69|2013-07-25 00:00:00|             2821|SUSPECTED_FRAUD|
|   57770|2013-07-25 00:00:00|             7451|SUSPECTED_FRAUD|
|      29|2013-07-25 00:00:00|              196|     PROCESSING|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|      14|2013-07-25 00:00:00|             9842|     PROCESSING|
|      20|2013-07-25 00:00:00|             9198|     PROCESSING|
|      34|2013-07-25 00:00:00|             4189|     PROCESSING|
|      38|2013-07-25 00:00:00|            11586|     PROCESSING|
|      48|2013-07-25 00:00:00|            12186|     PROCESSING|
|      53|2013-07-25 00:00:00|             4701|     PROCESSING|
|      81|2013-07-25 00:00:00|              674|     PROCESSING|
|      84|2013-07-25 00:00:00|             6789|     PROCESSING|
|      94|2013-07-25 00:0

In [13]:
ord.sort(ord.order_date,ord.order_status,ascending=[1,0]).show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|      69|2013-07-25 00:00:00|             2821|SUSPECTED_FRAUD|
|   57770|2013-07-25 00:00:00|             7451|SUSPECTED_FRAUD|
|      29|2013-07-25 00:00:00|              196|     PROCESSING|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|      14|2013-07-25 00:00:00|             9842|     PROCESSING|
|      20|2013-07-25 00:00:00|             9198|     PROCESSING|
|      34|2013-07-25 00:00:00|             4189|     PROCESSING|
|      38|2013-07-25 00:00:00|            11586|     PROCESSING|
|      48|2013-07-25 00:00:00|            12186|     PROCESSING|
|      53|2013-07-25 00:00:00|             4701|     PROCESSING|
|      81|2013-07-25 00:00:00|              674|     PROCESSING|
|      84|2013-07-25 00:00:00|             6789|     PROCESSING|
|      94|2013-07-25 00:0

In [14]:
data =[('a',1),('d',4),('c',3),('b',3),('e',5)]
df =spark.createDataFrame(data,schema='col1 string, col2 int')

In [15]:
df.show()

+----+----+
|col1|col2|
+----+----+
|   a|   1|
|   d|   4|
|   c|   3|
|   b|   3|
|   e|   5|
+----+----+



In [20]:
df.rdd.glom().collect()

[[Row(col1='a', col2=1), Row(col1='d', col2=4)],
 [Row(col1='c', col2=3), Row(col1='b', col2=3), Row(col1='e', col2=5)]]

In [21]:
df.sortWithinPartitions( df.col2.desc()).show()

+----+----+
|col1|col2|
+----+----+
|   d|   4|
|   a|   1|
|   e|   5|
|   c|   3|
|   b|   3|
+----+----+



In [22]:
#Set Operators

In [30]:
df1 =spark.createDataFrame([('apple',10),('orange',20),('mango',10),('pinaple',10)],schema='Name string, price int')

In [28]:
df1.show()

+-------+-----+
|   Name|price|
+-------+-----+
|  apple|   10|
| orange|   20|
|  mango|   10|
|pinaple|   10|
+-------+-----+



In [31]:
df2 =spark.createDataFrame([('apple',10),('orange',20),('sapota',10),('fig',10)],schema='Name string, price int')

In [32]:
df2.show()

+------+-----+
|  Name|price|
+------+-----+
| apple|   10|
|orange|   20|
|sapota|   10|
|   fig|   10|
+------+-----+



In [34]:
df1.union(df2).show()

+-------+-----+
|   Name|price|
+-------+-----+
|  apple|   10|
| orange|   20|
|  mango|   10|
|pinaple|   10|
|  apple|   10|
| orange|   20|
| sapota|   10|
|    fig|   10|
+-------+-----+



In [35]:
df1.union(df2).drop_duplicates().show()

+-------+-----+
|   Name|price|
+-------+-----+
|  apple|   10|
| orange|   20|
|  mango|   10|
|pinaple|   10|
| sapota|   10|
|    fig|   10|
+-------+-----+



In [36]:
df1.unionAll(df2).show()

+-------+-----+
|   Name|price|
+-------+-----+
|  apple|   10|
| orange|   20|
|  mango|   10|
|pinaple|   10|
|  apple|   10|
| orange|   20|
| sapota|   10|
|    fig|   10|
+-------+-----+



In [44]:
df1 = spark.createDataFrame([('a',1),('b',2)] ,schema='col1 string ,col2 int')

In [40]:
df1.show()

+----+----+
|col1|col2|
+----+----+
|   a|   1|
|   b|   2|
+----+----+



In [43]:
df2 = spark.createDataFrame([(3,'c'),(4,'d')] ,schema='col2 int ,col1 string')

In [45]:
df2.show()

+----+----+
|col2|col1|
+----+----+
|   3|   c|
|   4|   d|
+----+----+



In [47]:
df1.union(df2).show()

+----+----+
|col1|col2|
+----+----+
|   a|   1|
|   b|   2|
|   3|   c|
|   4|   d|
+----+----+



In [49]:
df1.unionByName(df2).show()

+----+----+
|col1|col2|
+----+----+
|   a|   1|
|   b|   2|
|   c|   3|
|   d|   4|
+----+----+



In [50]:
df1 =spark.createDataFrame([('apple',10),('orange',20),('mango',10),('pinaple',10)],schema='Name string, price int')

In [51]:
df2 =spark.createDataFrame([('apple',10),('orange',20),('sapota',10),('fig',10)],schema='Name string, price int')

In [52]:
df1.intersect(df2).show()

+------+-----+
|  Name|price|
+------+-----+
| apple|   10|
|orange|   20|
+------+-----+



In [53]:
df1.intersectAll(df2).show()

+------+-----+
|  Name|price|
+------+-----+
| apple|   10|
|orange|   20|
+------+-----+



In [55]:
df1.exceptAll(df2).show()

+-------+-----+
|   Name|price|
+-------+-----+
|  mango|   10|
|pinaple|   10|
+-------+-----+



In [56]:
#Join api

In [58]:
df1 = spark.createDataFrame([(1,'Robert'),(2,'James'),(3,'Riya')], schema='empid int, ename string')

In [60]:
df2 =spark.createDataFrame([(2,'India'),(4,'USA')], schema='empid int, country string')

In [61]:
df1.show()

+-----+------+
|empid| ename|
+-----+------+
|    1|Robert|
|    2| James|
|    3|  Riya|
+-----+------+



In [62]:
df2.show()

+-----+-------+
|empid|country|
+-----+-------+
|    2|  India|
|    4|    USA|
+-----+-------+



In [66]:
df1.join(df2,df1.empid ==df2.empid).select(df1.empid,df1.ename,df2.country).show()

+-----+-----+-------+
|empid|ename|country|
+-----+-----+-------+
|    2|James|  India|
+-----+-----+-------+



In [67]:
df1.join(df2,df1.empid ==df2.empid , 'inner').select(df1.empid,df1.ename,df2.country).show()

+-----+-----+-------+
|empid|ename|country|
+-----+-----+-------+
|    2|James|  India|
+-----+-----+-------+



In [68]:
df1.join(df2,df1.empid ==df2.empid , 'left').select(df1.empid,df1.ename,df2.country).show()

+-----+------+-------+
|empid| ename|country|
+-----+------+-------+
|    1|Robert|   NULL|
|    3|  Riya|   NULL|
|    2| James|  India|
+-----+------+-------+



In [69]:
df1.join(df2,df1.empid ==df2.empid , 'right').select(df1.empid,df1.ename,df2.country).show()

+-----+-----+-------+
|empid|ename|country|
+-----+-----+-------+
|    2|James|  India|
| NULL| NULL|    USA|
+-----+-----+-------+



In [70]:
df1.join(df2,df1.empid ==df2.empid , 'full').select(df1.empid,df1.ename,df2.country).show()

+-----+------+-------+
|empid| ename|country|
+-----+------+-------+
|    1|Robert|   NULL|
|    2| James|  India|
|    3|  Riya|   NULL|
| NULL|  NULL|    USA|
+-----+------+-------+



In [73]:
df1.join(df2,df1.empid ==df1.empid , 'cross').show()

+-----+------+-----+-------+
|empid| ename|empid|country|
+-----+------+-----+-------+
|    1|Robert|    2|  India|
|    1|Robert|    4|    USA|
|    2| James|    2|  India|
|    3|  Riya|    2|  India|
|    2| James|    4|    USA|
|    3|  Riya|    4|    USA|
+-----+------+-----+-------+



In [75]:
df1.join(df2,df1.empid ==df2.empid , 'leftanti').show()

+-----+------+
|empid| ename|
+-----+------+
|    1|Robert|
|    3|  Riya|
+-----+------+



In [76]:
#You cant select column from df2 data frame

In [78]:
df1.join(df2,df1.empid ==df2.empid , 'leftsemi').show()

+-----+-----+
|empid|ename|
+-----+-----+
|    2|James|
+-----+-----+



In [80]:
df1.crossJoin(df2).select(df1.empid,df1.ename,df2.country).show()

+-----+------+-------+
|empid| ename|country|
+-----+------+-------+
|    1|Robert|  India|
|    1|Robert|    USA|
|    2| James|  India|
|    3|  Riya|  India|
|    2| James|    USA|
|    3|  Riya|    USA|
+-----+------+-------+



In [84]:
df1 =spark.createDataFrame([(1,'Robert',2),(2,'Riya',3),(3,'James',5)],'empid int, ename string, managerid int')

In [85]:
df1.show()

+-----+------+---------+
|empid| ename|managerid|
+-----+------+---------+
|    1|Robert|        2|
|    2|  Riya|        3|
|    3| James|        5|
+-----+------+---------+



In [86]:
from pyspark.sql.functions import col

In [87]:
help(col)

Help on function col in module pyspark.sql.functions:

col(col: str) -> pyspark.sql.column.Column
    Returns a :class:`~pyspark.sql.Column` based on the given column name.
    
    .. versionadded:: 1.3.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Parameters
    ----------
    col : str
        the name for the column
    
    Returns
    -------
    :class:`~pyspark.sql.Column`
        the corresponding column instance.
    
    Examples
    --------
    >>> col('x')
    Column<'x'>
    >>> column('x')
    Column<'x'>



In [88]:
df1.show()

+-----+------+---------+
|empid| ename|managerid|
+-----+------+---------+
|    1|Robert|        2|
|    2|  Riya|        3|
|    3| James|        5|
+-----+------+---------+



In [104]:
df1.alias("emp1").join(df1.alias("emp2"), col("emp1.managerid")==col("emp2.empid"), 'inner').\
    select(col("emp1.empid"),col("emp1.ename"),col("emp1.managerid"),col("emp2.ename").alias("managername")).show()

+-----+------+---------+-----------+
|empid| ename|managerid|managername|
+-----+------+---------+-----------+
|    1|Robert|        2|       Riya|
|    2|  Riya|        3|      James|
+-----+------+---------+-----------+



In [105]:
df1.alias("emp1").join(df1.alias("emp2"), col("emp1.managerid")==col("emp2.empid"), 'left').\
    select(col("emp1.empid"),col("emp1.ename"),col("emp1.managerid"),col("emp2.ename").alias("managername")).show()

+-----+------+---------+-----------+
|empid| ename|managerid|managername|
+-----+------+---------+-----------+
|    1|Robert|        2|       Riya|
|    2|  Riya|        3|      James|
|    3| James|        5|       NULL|
+-----+------+---------+-----------+



In [None]:
#Dataframe api summary

In [106]:
ordItems =spark.read.load('c:/practice/order_items',format='csv',sep=',',schema=\
        'order_item_id int, order_id int, product_id int, qunatity int, subtotal float, price float')

In [107]:
ordItems.show()

+-------------+--------+----------+--------+--------+------+
|order_item_id|order_id|product_id|qunatity|subtotal| price|
+-------------+--------+----------+--------+--------+------+
|            1|       1|       957|       1|  299.98|299.98|
|            2|       2|      1073|       1|  199.99|199.99|
|            3|       2|       502|       5|   250.0|  50.0|
|            4|       2|       403|       1|  129.99|129.99|
|            5|       4|       897|       2|   49.98| 24.99|
|            6|       4|       365|       5|  299.95| 59.99|
|            7|       4|       502|       3|   150.0|  50.0|
|            8|       4|      1014|       4|  199.92| 49.98|
|            9|       5|       957|       1|  299.98|299.98|
|           10|       5|       365|       5|  299.95| 59.99|
|           11|       5|      1014|       2|   99.96| 49.98|
|           12|       5|       957|       1|  299.98|299.98|
|           13|       5|       403|       1|  129.99|129.99|
|           14|       7|

In [109]:
ordItems.summary().show()

+-------+-----------------+------------------+-----------------+------------------+------------------+------------------+
|summary|    order_item_id|          order_id|       product_id|          qunatity|          subtotal|             price|
+-------+-----------------+------------------+-----------------+------------------+------------------+------------------+
|  count|           172198|            172198|           172198|            172198|            172198|            172198|
|   mean|          86099.5| 34442.56682423721|660.4877176273824|2.1821275508426345|199.32066922046081|133.75906959048717|
| stddev|49709.42516431533|19883.325171992223|310.5144727900077|1.4663523175387168|112.74303987146766| 118.5589363325847|
|    min|                1|                 1|               19|                 1|              9.99|              9.99|
|    25%|            43033|             17204|              403|                 1|            119.98|              50.0|
|    50%|            860

In [111]:
ordItems.select(ordItems.price).summary().show()

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|            172198|
|   mean|133.75906959048717|
| stddev| 118.5589363325847|
|    min|              9.99|
|    25%|              50.0|
|    50%|             59.99|
|    75%|            199.99|
|    max|           1999.99|
+-------+------------------+



In [112]:
from pyspark.sql.functions import *

In [116]:
ordItems.select(round(avg(ordItems.price),2).alias("avg")).show()

+------+
|   avg|
+------+
|133.76|
+------+



In [123]:
ordItems.select(round(avg(ordItems.price),2).alias("avg"),\
                max(ordItems.price).alias("max"),\
                min(ordItems.price).alias("max"),\
                sum(ordItems.price).alias("sum")).show(truncate=False)

+------+-------+----+--------------------+
|avg   |max    |max |sum                 |
+------+-------+----+--------------------+
|133.76|1999.99|9.99|2.3033044265342712E7|
+------+-------+----+--------------------+



In [126]:
ordItems.select(
                sum(ordItems.price).alias("sum"), sum_distinct(ordItems.price).alias("sumdistinct")).show(truncate=False)

+--------------------+----------------+
|sum                 |sumdistinct     |
+--------------------+----------------+
|2.3033044265342712E7|9832.41999053955|
+--------------------+----------------+



In [129]:
ordItems.select(count(ordItems.price).alias("count"), count_distinct(ordItems.price).alias("countdistinct")).show()

+------+-------------+
| count|countdistinct|
+------+-------------+
|172198|           57|
+------+-------------+



In [130]:
ordItems.sort(ordItems.price).show()

+-------------+--------+----------+--------+--------+-----+
|order_item_id|order_id|product_id|qunatity|subtotal|price|
+-------------+--------+----------+--------+--------+-----+
|         6533|    2605|       775|       4|   39.96| 9.99|
|        17878|    7140|       775|       4|   39.96| 9.99|
|         6813|    2721|       775|       4|   39.96| 9.99|
|          137|      58|       775|       2|   19.98| 9.99|
|         7024|    2797|       775|       5|   49.95| 9.99|
|          604|     243|       775|       3|   29.97| 9.99|
|         8935|    3595|       775|       5|   49.95| 9.99|
|         4264|    1713|       775|       2|   19.98| 9.99|
|         9199|    3698|       775|       4|   39.96| 9.99|
|         4548|    1820|       775|       4|   39.96| 9.99|
|         9252|    3720|       775|       3|   29.97| 9.99|
|         4866|    1944|       775|       1|    9.99| 9.99|
|         9757|    3926|       775|       4|   39.96| 9.99|
|         5409|    2159|       775|     

In [132]:
ordItems.sort(ordItems.price).select(first(ordItems.price)).show()

+------------+
|first(price)|
+------------+
|        9.99|
+------------+



In [133]:
ordItems.sort(ordItems.price).select(last(ordItems.price)).show()

+-----------+
|last(price)|
+-----------+
|    1999.99|
+-----------+



In [134]:
df =spark.createDataFrame([(1,100),(2,200),(3,50),(4,50)],'id int, salary int')

In [135]:
df.show()

+---+------+
| id|salary|
+---+------+
|  1|   100|
|  2|   200|
|  3|    50|
|  4|    50|
+---+------+



In [139]:
df.select(collect_list(df.salary).alias('list'), collect_set(df.salary).alias('set')).show(truncate=False)
#list contain duplicate values and set contain unique value

+------------------+--------------+
|list              |set           |
+------------------+--------------+
|[100, 200, 50, 50]|[100, 50, 200]|
+------------------+--------------+



In [17]:
data =[('James','Sales','NY',9000,34),
       ('Alicia','Sales','NY',8600,56),
       ('Robert','Sales','CA',8100,30),
       ('Lisa','Finance','CA',9000,24),
       ('Deja','Finance','CA',9900,40),
       ('Sugie','Finance','NY',8300,36),
       ('Ram','Finance','NY',7900,53),
       ('Kyle','Marketing','CA',8000,25),
       ('Reid','Marketing','NY',9100,50)]

In [18]:
schema=['ename','dept','state','salary','age'] 

In [19]:
df =spark.createDataFrame(data,schema)

In [144]:
df.show()

+------+---------+-----+------+---+
| ename|     dept|state|salary|age|
+------+---------+-----+------+---+
| James|    Sales|   NY|  9000| 34|
|Alicia|    Sales|   NY|  8600| 56|
|Robert|    Sales|   CA|  8100| 30|
|  Lisa|  Finance|   CA|  9000| 24|
|  Deja|  Finance|   CA|  9900| 40|
| Sugie|  Finance|   NY|  8300| 36|
|   Ram|  Finance|   NY|  7900| 53|
|  Kyle|Marketing|   CA|  8000| 25|
|  Reid|Marketing|   NY|  9100| 50|
+------+---------+-----+------+---+



In [145]:
df.printSchema()

root
 |-- ename: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)



In [153]:
df.groupBy(df.dept).avg("salary").show()

+---------+-----------------+
|     dept|      avg(salary)|
+---------+-----------------+
|    Sales|8566.666666666666|
|  Finance|           8775.0|
|Marketing|           8550.0|
+---------+-----------------+



In [158]:
df.groupBy(df.dept).sum("salary").withColumnRenamed('sum(salary)','sum_sal').show()

+---------+-------+
|     dept|sum_sal|
+---------+-------+
|    Sales|  25700|
|  Finance|  35100|
|Marketing|  17100|
+---------+-------+



In [159]:
df.groupBy(df.dept).max("salary").withColumnRenamed('max(salary)','max_sal').show()

+---------+-------+
|     dept|max_sal|
+---------+-------+
|    Sales|   9000|
|  Finance|   9900|
|Marketing|   9100|
+---------+-------+



In [160]:
df.show()

+------+---------+-----+------+---+
| ename|     dept|state|salary|age|
+------+---------+-----+------+---+
| James|    Sales|   NY|  9000| 34|
|Alicia|    Sales|   NY|  8600| 56|
|Robert|    Sales|   CA|  8100| 30|
|  Lisa|  Finance|   CA|  9000| 24|
|  Deja|  Finance|   CA|  9900| 40|
| Sugie|  Finance|   NY|  8300| 36|
|   Ram|  Finance|   NY|  7900| 53|
|  Kyle|Marketing|   CA|  8000| 25|
|  Reid|Marketing|   NY|  9100| 50|
+------+---------+-----+------+---+



In [164]:
df.groupBy(df.dept,df.state).sum("salary").show()

+---------+-----+-----------+
|     dept|state|sum(salary)|
+---------+-----+-----------+
|    Sales|   CA|       8100|
|  Finance|   CA|      18900|
|    Sales|   NY|      17600|
|  Finance|   NY|      16200|
|Marketing|   NY|       9100|
|Marketing|   CA|       8000|
+---------+-----+-----------+



In [165]:
df.groupBy(df.dept,df.state).min("salary","age").show()

+---------+-----+-----------+--------+
|     dept|state|min(salary)|min(age)|
+---------+-----+-----------+--------+
|    Sales|   CA|       8100|      30|
|  Finance|   CA|       9000|      24|
|    Sales|   NY|       8600|      34|
|  Finance|   NY|       7900|      36|
|Marketing|   NY|       9100|      50|
|Marketing|   CA|       8000|      25|
+---------+-----+-----------+--------+



In [166]:
df.groupBy(df.dept).agg(max("salary").alias('max_sal'),
                        min("salary").alias('min_sal'),
                        avg("salary").alias('avg_sal'),
                       sum("salary").alias('sum_sal')).show()

+---------+-------+-------+-----------------+-------+
|     dept|max_sal|min_sal|          avg_sal|sum_sal|
+---------+-------+-------+-----------------+-------+
|    Sales|   9000|   8100|8566.666666666666|  25700|
|  Finance|   9900|   7900|           8775.0|  35100|
|Marketing|   9100|   8000|           8550.0|  17100|
+---------+-------+-------+-----------------+-------+



In [168]:
df.where(df.state=='NY').groupBy(df.dept).agg(max("salary").alias('max_sal'),
                        min("salary").alias('min_sal'),
                        avg("salary").alias('avg_sal'),
                       sum("salary").alias('sum_sal')).show()

+---------+-------+-------+-------+-------+
|     dept|max_sal|min_sal|avg_sal|sum_sal|
+---------+-------+-------+-------+-------+
|    Sales|   9000|   8600| 8800.0|  17600|
|  Finance|   8300|   7900| 8100.0|  16200|
|Marketing|   9100|   9100| 9100.0|   9100|
+---------+-------+-------+-------+-------+



In [170]:
df.groupBy(df.dept).agg(max("salary").alias('max_sal'),
                        min("salary").alias('min_sal'),
                        avg("salary").alias('avg_sal'),
                       sum("salary").alias('sum_sal')).show()

+---------+-------+-------+-----------------+-------+
|     dept|max_sal|min_sal|          avg_sal|sum_sal|
+---------+-------+-------+-----------------+-------+
|    Sales|   9000|   8100|8566.666666666666|  25700|
|  Finance|   9900|   7900|           8775.0|  35100|
|Marketing|   9100|   8000|           8550.0|  17100|
+---------+-------+-------+-----------------+-------+



In [171]:
df.groupBy(df.dept).agg(max("salary").alias('max_sal'),
                        min("salary").alias('min_sal'),
                        avg("salary").alias('avg_sal'),
                       sum("salary").alias('sum_sal')).where(col('min_sal') >=8000).show()

+---------+-------+-------+-----------------+-------+
|     dept|max_sal|min_sal|          avg_sal|sum_sal|
+---------+-------+-------+-----------------+-------+
|    Sales|   9000|   8100|8566.666666666666|  25700|
|Marketing|   9100|   8000|           8550.0|  17100|
+---------+-------+-------+-----------------+-------+



In [172]:
dft=df.groupBy(df.dept).pivot('state').sum('salary')

In [173]:
dft.show()

+---------+-----+-----+
|     dept|   CA|   NY|
+---------+-----+-----+
|    Sales| 8100|17600|
|  Finance|18900|16200|
|Marketing| 8000| 9100|
+---------+-----+-----+



In [174]:
#Window Functions

In [175]:
from pyspark.sql.window import *
from pyspark.sql.functions import *

In [177]:
spec =Window.orderBy(df.salary.desc())

In [183]:
df.select(df.ename,df.dept,df.salary).withColumn("row_number",row_number().over(spec)).show()

+------+---------+------+----------+
| ename|     dept|salary|row_number|
+------+---------+------+----------+
|  Deja|  Finance|  9900|         1|
|  Reid|Marketing|  9100|         2|
| James|    Sales|  9000|         3|
|  Lisa|  Finance|  9000|         4|
|Alicia|    Sales|  8600|         5|
| Sugie|  Finance|  8300|         6|
|Robert|    Sales|  8100|         7|
|  Kyle|Marketing|  8000|         8|
|   Ram|  Finance|  7900|         9|
+------+---------+------+----------+



In [186]:
spec =Window.partitionBy(df.dept).orderBy(df.salary.desc())

In [187]:
df.select(df.ename,df.dept,df.salary).withColumn("row_number",row_number().over(spec)).show()

+------+---------+------+----------+
| ename|     dept|salary|row_number|
+------+---------+------+----------+
|  Deja|  Finance|  9900|         1|
|  Lisa|  Finance|  9000|         2|
| Sugie|  Finance|  8300|         3|
|   Ram|  Finance|  7900|         4|
|  Reid|Marketing|  9100|         1|
|  Kyle|Marketing|  8000|         2|
| James|    Sales|  9000|         1|
|Alicia|    Sales|  8600|         2|
|Robert|    Sales|  8100|         3|
+------+---------+------+----------+



In [188]:
spec =Window.orderBy(df.salary.desc())

In [190]:
df.select(df.ename,df.dept,df.salary).withColumn("rank",rank().over(spec)).show()

+------+---------+------+----+
| ename|     dept|salary|rank|
+------+---------+------+----+
|  Deja|  Finance|  9900|   1|
|  Reid|Marketing|  9100|   2|
| James|    Sales|  9000|   3|
|  Lisa|  Finance|  9000|   3|
|Alicia|    Sales|  8600|   5|
| Sugie|  Finance|  8300|   6|
|Robert|    Sales|  8100|   7|
|  Kyle|Marketing|  8000|   8|
|   Ram|  Finance|  7900|   9|
+------+---------+------+----+



In [191]:
spec =Window.partitionBy(df.dept).orderBy(df.salary.desc())

In [192]:
df.select(df.ename,df.dept,df.salary).withColumn("rank",rank().over(spec)).show()

+------+---------+------+----+
| ename|     dept|salary|rank|
+------+---------+------+----+
|  Deja|  Finance|  9900|   1|
|  Lisa|  Finance|  9000|   2|
| Sugie|  Finance|  8300|   3|
|   Ram|  Finance|  7900|   4|
|  Reid|Marketing|  9100|   1|
|  Kyle|Marketing|  8000|   2|
| James|    Sales|  9000|   1|
|Alicia|    Sales|  8600|   2|
|Robert|    Sales|  8100|   3|
+------+---------+------+----+



In [193]:
spec =Window.orderBy(df.salary.desc())

In [194]:
df.select(df.ename,df.dept,df.salary).withColumn("dense_rank",dense_rank().over(spec)).show()

+------+---------+------+----------+
| ename|     dept|salary|dense_rank|
+------+---------+------+----------+
|  Deja|  Finance|  9900|         1|
|  Reid|Marketing|  9100|         2|
| James|    Sales|  9000|         3|
|  Lisa|  Finance|  9000|         3|
|Alicia|    Sales|  8600|         4|
| Sugie|  Finance|  8300|         5|
|Robert|    Sales|  8100|         6|
|  Kyle|Marketing|  8000|         7|
|   Ram|  Finance|  7900|         8|
+------+---------+------+----------+



In [196]:
df.select(df.ename,df.dept,df.salary)\
.withColumn("rank",rank().over(spec))\
.withColumn("dense_rank",dense_rank().over(spec)).show()

+------+---------+------+----+----------+
| ename|     dept|salary|rank|dense_rank|
+------+---------+------+----+----------+
|  Deja|  Finance|  9900|   1|         1|
|  Reid|Marketing|  9100|   2|         2|
| James|    Sales|  9000|   3|         3|
|  Lisa|  Finance|  9000|   3|         3|
|Alicia|    Sales|  8600|   5|         4|
| Sugie|  Finance|  8300|   6|         5|
|Robert|    Sales|  8100|   7|         6|
|  Kyle|Marketing|  8000|   8|         7|
|   Ram|  Finance|  7900|   9|         8|
+------+---------+------+----+----------+



In [198]:
df.select(df.ename,df.dept,df.salary)\
.withColumn("percent_rank",percent_rank().over(spec)).show()


+------+---------+------+------------+
| ename|     dept|salary|percent_rank|
+------+---------+------+------------+
|  Deja|  Finance|  9900|         0.0|
|  Reid|Marketing|  9100|       0.125|
| James|    Sales|  9000|        0.25|
|  Lisa|  Finance|  9000|        0.25|
|Alicia|    Sales|  8600|         0.5|
| Sugie|  Finance|  8300|       0.625|
|Robert|    Sales|  8100|        0.75|
|  Kyle|Marketing|  8000|       0.875|
|   Ram|  Finance|  7900|         1.0|
+------+---------+------+------------+



In [200]:
df.select(df.ename,df.dept,df.salary)\
.withColumn("cume_dist",cume_dist().over(spec)).show()


+------+---------+------+------------------+
| ename|     dept|salary|         cume_dist|
+------+---------+------+------------------+
|  Deja|  Finance|  9900|0.1111111111111111|
|  Reid|Marketing|  9100|0.2222222222222222|
| James|    Sales|  9000|0.4444444444444444|
|  Lisa|  Finance|  9000|0.4444444444444444|
|Alicia|    Sales|  8600|0.5555555555555556|
| Sugie|  Finance|  8300|0.6666666666666666|
|Robert|    Sales|  8100|0.7777777777777778|
|  Kyle|Marketing|  8000|0.8888888888888888|
|   Ram|  Finance|  7900|               1.0|
+------+---------+------+------------------+



In [203]:
df.select(df.ename,df.dept,df.salary)\
.withColumn("ntile",ntile(4).over(spec)).show()


+------+---------+------+-----+
| ename|     dept|salary|ntile|
+------+---------+------+-----+
|  Deja|  Finance|  9900|    1|
|  Reid|Marketing|  9100|    1|
| James|    Sales|  9000|    1|
|  Lisa|  Finance|  9000|    2|
|Alicia|    Sales|  8600|    2|
| Sugie|  Finance|  8300|    3|
|Robert|    Sales|  8100|    3|
|  Kyle|Marketing|  8000|    4|
|   Ram|  Finance|  7900|    4|
+------+---------+------+-----+



In [204]:
df.select(df.ename,df.dept,df.salary)\
.withColumn("lag",lag(df.salary,1,0).over(spec)).show()

+------+---------+------+----+
| ename|     dept|salary| lag|
+------+---------+------+----+
|  Deja|  Finance|  9900|   0|
|  Reid|Marketing|  9100|9900|
| James|    Sales|  9000|9100|
|  Lisa|  Finance|  9000|9000|
|Alicia|    Sales|  8600|9000|
| Sugie|  Finance|  8300|8600|
|Robert|    Sales|  8100|8300|
|  Kyle|Marketing|  8000|8100|
|   Ram|  Finance|  7900|8000|
+------+---------+------+----+



In [205]:
spec =Window.partitionBy(df.dept).orderBy(df.salary.desc())

In [206]:
df.select(df.ename,df.dept,df.salary)\
.withColumn("lag",lag(df.salary,1,0).over(spec)).show()

+------+---------+------+----+
| ename|     dept|salary| lag|
+------+---------+------+----+
|  Deja|  Finance|  9900|   0|
|  Lisa|  Finance|  9000|9900|
| Sugie|  Finance|  8300|9000|
|   Ram|  Finance|  7900|8300|
|  Reid|Marketing|  9100|   0|
|  Kyle|Marketing|  8000|9100|
| James|    Sales|  9000|   0|
|Alicia|    Sales|  8600|9000|
|Robert|    Sales|  8100|8600|
+------+---------+------+----+



In [214]:
spec =Window.partitionBy(df.dept)

In [208]:
df.select(df.ename,df.dept,df.salary)\
.withColumn("lead",lead(df.salary,1,0).over(spec)).show()

+------+---------+------+----+
| ename|     dept|salary|lead|
+------+---------+------+----+
|  Deja|  Finance|  9900|9100|
|  Reid|Marketing|  9100|9000|
| James|    Sales|  9000|9000|
|  Lisa|  Finance|  9000|8600|
|Alicia|    Sales|  8600|8300|
| Sugie|  Finance|  8300|8100|
|Robert|    Sales|  8100|8000|
|  Kyle|Marketing|  8000|7900|
|   Ram|  Finance|  7900|   0|
+------+---------+------+----+



In [217]:
spec =Window.partitionBy()

In [223]:
df.select(df.ename,df.dept,df.salary)\
.withColumn("sum",sum(df.salary).over(spec))\
.withColumn("avg",avg(df.salary).over(spec))\
.withColumn("max",max(df.salary).over(spec))\
.withColumn("min",min(df.salary).over(spec)).show()

+------+---------+------+-----+-----------------+----+----+
| ename|     dept|salary|  sum|              avg| max| min|
+------+---------+------+-----+-----------------+----+----+
| James|    Sales|  9000|77900|8655.555555555555|9900|7900|
|Alicia|    Sales|  8600|77900|8655.555555555555|9900|7900|
|Robert|    Sales|  8100|77900|8655.555555555555|9900|7900|
|  Lisa|  Finance|  9000|77900|8655.555555555555|9900|7900|
|  Deja|  Finance|  9900|77900|8655.555555555555|9900|7900|
| Sugie|  Finance|  8300|77900|8655.555555555555|9900|7900|
|   Ram|  Finance|  7900|77900|8655.555555555555|9900|7900|
|  Kyle|Marketing|  8000|77900|8655.555555555555|9900|7900|
|  Reid|Marketing|  9100|77900|8655.555555555555|9900|7900|
+------+---------+------+-----+-----------------+----+----+



In [225]:
spec =Window.partitionBy(df.dept)

In [226]:
df.select(df.ename,df.dept,df.salary)\
.withColumn("sum",sum(df.salary).over(spec))\
.withColumn("avg",avg(df.salary).over(spec))\
.withColumn("max",max(df.salary).over(spec))\
.withColumn("min",min(df.salary).over(spec)).show()

+------+---------+------+-----+-----------------+----+----+
| ename|     dept|salary|  sum|              avg| max| min|
+------+---------+------+-----+-----------------+----+----+
|  Lisa|  Finance|  9000|35100|           8775.0|9900|7900|
|  Deja|  Finance|  9900|35100|           8775.0|9900|7900|
| Sugie|  Finance|  8300|35100|           8775.0|9900|7900|
|   Ram|  Finance|  7900|35100|           8775.0|9900|7900|
|  Kyle|Marketing|  8000|17100|           8550.0|9100|8000|
|  Reid|Marketing|  9100|17100|           8550.0|9100|8000|
| James|    Sales|  9000|25700|8566.666666666666|9000|8100|
|Alicia|    Sales|  8600|25700|8566.666666666666|9000|8100|
|Robert|    Sales|  8100|25700|8566.666666666666|9000|8100|
+------+---------+------+-----+-----------------+----+----+



In [227]:
spec =Window.orderBy(df.salary.desc())

In [229]:
df.select(df.ename,df.dept,df.salary)\
.withColumn("first",first(df.salary).over(spec)).show()

+------+---------+------+-----+
| ename|     dept|salary|first|
+------+---------+------+-----+
|  Deja|  Finance|  9900| 9900|
|  Reid|Marketing|  9100| 9900|
| James|    Sales|  9000| 9900|
|  Lisa|  Finance|  9000| 9900|
|Alicia|    Sales|  8600| 9900|
| Sugie|  Finance|  8300| 9900|
|Robert|    Sales|  8100| 9900|
|  Kyle|Marketing|  8000| 9900|
|   Ram|  Finance|  7900| 9900|
+------+---------+------+-----+



In [3]:
#sampling api

In [4]:
df =spark.range(100)

In [7]:
df.sample(fraction=0.1).show()

+---+
| id|
+---+
|  1|
| 16|
| 22|
| 27|
| 34|
| 60|
| 69|
| 81|
| 91|
| 93|
| 96|
+---+



In [13]:
df.sample(withReplacement=True,fraction=0.2).show()

+---+
| id|
+---+
|  0|
|  5|
|  8|
|  9|
| 13|
| 17|
| 19|
| 20|
| 22|
| 37|
| 43|
| 54|
| 55|
| 55|
| 64|
| 71|
| 74|
| 80|
| 85|
| 86|
+---+
only showing top 20 rows



In [16]:
df.sample(withReplacement=True,fraction=0.2,seed=20).show()

+---+
| id|
+---+
|  0|
|  3|
| 11|
| 21|
| 21|
| 23|
| 32|
| 33|
| 34|
| 40|
| 40|
| 42|
| 51|
| 54|
| 60|
| 60|
| 62|
| 64|
| 67|
| 68|
+---+
only showing top 20 rows



In [20]:
from pyspark.sql.functions import *

In [22]:
df.show()

+------+---------+-----+------+---+
| ename|     dept|state|salary|age|
+------+---------+-----+------+---+
| James|    Sales|   NY|  9000| 34|
|Alicia|    Sales|   NY|  8600| 56|
|Robert|    Sales|   CA|  8100| 30|
|  Lisa|  Finance|   CA|  9000| 24|
|  Deja|  Finance|   CA|  9900| 40|
| Sugie|  Finance|   NY|  8300| 36|
|   Ram|  Finance|   NY|  7900| 53|
|  Kyle|Marketing|   CA|  8000| 25|
|  Reid|Marketing|   NY|  9100| 50|
+------+---------+-----+------+---+



In [None]:
#monotonically_increasing_id()

In [29]:
df.withColumn('id',monotonically_increasing_id()).show()

+------+---------+-----+------+---+----------+
| ename|     dept|state|salary|age|        id|
+------+---------+-----+------+---+----------+
| James|    Sales|   NY|  9000| 34|         0|
|Alicia|    Sales|   NY|  8600| 56|         1|
|Robert|    Sales|   CA|  8100| 30|         2|
|  Lisa|  Finance|   CA|  9000| 24|         3|
|  Deja|  Finance|   CA|  9900| 40|8589934592|
| Sugie|  Finance|   NY|  8300| 36|8589934593|
|   Ram|  Finance|   NY|  7900| 53|8589934594|
|  Kyle|Marketing|   CA|  8000| 25|8589934595|
|  Reid|Marketing|   NY|  9100| 50|8589934596|
+------+---------+-----+------+---+----------+



In [None]:
#lit

In [30]:
df.withColumn('Country',lit('USA')).show()

+------+---------+-----+------+---+-------+
| ename|     dept|state|salary|age|Country|
+------+---------+-----+------+---+-------+
| James|    Sales|   NY|  9000| 34|    USA|
|Alicia|    Sales|   NY|  8600| 56|    USA|
|Robert|    Sales|   CA|  8100| 30|    USA|
|  Lisa|  Finance|   CA|  9000| 24|    USA|
|  Deja|  Finance|   CA|  9900| 40|    USA|
| Sugie|  Finance|   NY|  8300| 36|    USA|
|   Ram|  Finance|   NY|  7900| 53|    USA|
|  Kyle|Marketing|   CA|  8000| 25|    USA|
|  Reid|Marketing|   NY|  9100| 50|    USA|
+------+---------+-----+------+---+-------+



In [31]:
df.withColumn('col1',concat(df.ename,lit(':'),df.salary)).show()

+------+---------+-----+------+---+-----------+
| ename|     dept|state|salary|age|       col1|
+------+---------+-----+------+---+-----------+
| James|    Sales|   NY|  9000| 34| James:9000|
|Alicia|    Sales|   NY|  8600| 56|Alicia:8600|
|Robert|    Sales|   CA|  8100| 30|Robert:8100|
|  Lisa|  Finance|   CA|  9000| 24|  Lisa:9000|
|  Deja|  Finance|   CA|  9900| 40|  Deja:9900|
| Sugie|  Finance|   NY|  8300| 36| Sugie:8300|
|   Ram|  Finance|   NY|  7900| 53|   Ram:7900|
|  Kyle|Marketing|   CA|  8000| 25|  Kyle:8000|
|  Reid|Marketing|   NY|  9100| 50|  Reid:9100|
+------+---------+-----+------+---+-----------+



In [32]:
#expr

In [34]:
df.withColumn('length',expr("length(ename)")).show()

+------+---------+-----+------+---+------+
| ename|     dept|state|salary|age|length|
+------+---------+-----+------+---+------+
| James|    Sales|   NY|  9000| 34|     5|
|Alicia|    Sales|   NY|  8600| 56|     6|
|Robert|    Sales|   CA|  8100| 30|     6|
|  Lisa|  Finance|   CA|  9000| 24|     4|
|  Deja|  Finance|   CA|  9900| 40|     4|
| Sugie|  Finance|   NY|  8300| 36|     5|
|   Ram|  Finance|   NY|  7900| 53|     3|
|  Kyle|Marketing|   CA|  8000| 25|     4|
|  Reid|Marketing|   NY|  9100| 50|     4|
+------+---------+-----+------+---+------+



In [35]:
df.withColumn('length',expr("upper(ename)")).show()

+------+---------+-----+------+---+------+
| ename|     dept|state|salary|age|length|
+------+---------+-----+------+---+------+
| James|    Sales|   NY|  9000| 34| JAMES|
|Alicia|    Sales|   NY|  8600| 56|ALICIA|
|Robert|    Sales|   CA|  8100| 30|ROBERT|
|  Lisa|  Finance|   CA|  9000| 24|  LISA|
|  Deja|  Finance|   CA|  9900| 40|  DEJA|
| Sugie|  Finance|   NY|  8300| 36| SUGIE|
|   Ram|  Finance|   NY|  7900| 53|   RAM|
|  Kyle|Marketing|   CA|  8000| 25|  KYLE|
|  Reid|Marketing|   NY|  9100| 50|  REID|
+------+---------+-----+------+---+------+



In [37]:
df.withColumn('age_des',expr("case when age >50 then 'Senior' else 'Adult' end")).show()

+------+---------+-----+------+---+-------+
| ename|     dept|state|salary|age|age_des|
+------+---------+-----+------+---+-------+
| James|    Sales|   NY|  9000| 34|  Adult|
|Alicia|    Sales|   NY|  8600| 56| Senior|
|Robert|    Sales|   CA|  8100| 30|  Adult|
|  Lisa|  Finance|   CA|  9000| 24|  Adult|
|  Deja|  Finance|   CA|  9900| 40|  Adult|
| Sugie|  Finance|   NY|  8300| 36|  Adult|
|   Ram|  Finance|   NY|  7900| 53| Senior|
|  Kyle|Marketing|   CA|  8000| 25|  Adult|
|  Reid|Marketing|   NY|  9100| 50|  Adult|
+------+---------+-----+------+---+-------+



In [45]:
df.withColumn('emp-dept',expr("ename ||'-'||dept")).show()

+------+---------+-----+------+---+--------------+
| ename|     dept|state|salary|age|      emp-dept|
+------+---------+-----+------+---+--------------+
| James|    Sales|   NY|  9000| 34|   James-Sales|
|Alicia|    Sales|   NY|  8600| 56|  Alicia-Sales|
|Robert|    Sales|   CA|  8100| 30|  Robert-Sales|
|  Lisa|  Finance|   CA|  9000| 24|  Lisa-Finance|
|  Deja|  Finance|   CA|  9900| 40|  Deja-Finance|
| Sugie|  Finance|   NY|  8300| 36| Sugie-Finance|
|   Ram|  Finance|   NY|  7900| 53|   Ram-Finance|
|  Kyle|Marketing|   CA|  8000| 25|Kyle-Marketing|
|  Reid|Marketing|   NY|  9100| 50|Reid-Marketing|
+------+---------+-----+------+---+--------------+



In [47]:
df.select(df.ename,df.age,expr("age+10").alias('age_10')).show()

+------+---+------+
| ename|age|age_10|
+------+---+------+
| James| 34|    44|
|Alicia| 56|    66|
|Robert| 30|    40|
|  Lisa| 24|    34|
|  Deja| 40|    50|
| Sugie| 36|    46|
|   Ram| 53|    63|
|  Kyle| 25|    35|
|  Reid| 50|    60|
+------+---+------+



In [48]:
#spark_partition_id

In [49]:
df.rdd.getNumPartitions()

2

In [51]:
df.withColumn("partitionid", spark_partition_id()).show()

+------+---------+-----+------+---+-----------+
| ename|     dept|state|salary|age|partitionid|
+------+---------+-----+------+---+-----------+
| James|    Sales|   NY|  9000| 34|          0|
|Alicia|    Sales|   NY|  8600| 56|          0|
|Robert|    Sales|   CA|  8100| 30|          0|
|  Lisa|  Finance|   CA|  9000| 24|          0|
|  Deja|  Finance|   CA|  9900| 40|          1|
| Sugie|  Finance|   NY|  8300| 36|          1|
|   Ram|  Finance|   NY|  7900| 53|          1|
|  Kyle|Marketing|   CA|  8000| 25|          1|
|  Reid|Marketing|   NY|  9100| 50|          1|
+------+---------+-----+------+---+-----------+



In [52]:
#rand

In [53]:
df.withColumn("rand",rand(20)).show()

+------+---------+-----+------+---+-------------------+
| ename|     dept|state|salary|age|               rand|
+------+---------+-----+------+---+-------------------+
| James|    Sales|   NY|  9000| 34| 0.5996723933366402|
|Alicia|    Sales|   NY|  8600| 56| 0.5649185186102964|
|Robert|    Sales|   CA|  8100| 30| 0.9388775895322536|
|  Lisa|  Finance|   CA|  9000| 24|0.17935290010205296|
|  Deja|  Finance|   CA|  9900| 40| 0.6396141227834357|
| Sugie|  Finance|   NY|  8300| 36|0.07022155145032771|
|   Ram|  Finance|   NY|  7900| 53| 0.3103281117295451|
|  Kyle|Marketing|   CA|  8000| 25|0.36386047804145194|
|  Reid|Marketing|   NY|  9100| 50| 0.7677060586249027|
+------+---------+-----+------+---+-------------------+



In [54]:
#randn

In [55]:
df.withColumn('randn',randn(70)).show()

+------+---------+-----+------+---+--------------------+
| ename|     dept|state|salary|age|               randn|
+------+---------+-----+------+---+--------------------+
| James|    Sales|   NY|  9000| 34|  0.5060974463351131|
|Alicia|    Sales|   NY|  8600| 56|  0.3202289933218433|
|Robert|    Sales|   CA|  8100| 30|  0.7885168481267597|
|  Lisa|  Finance|   CA|  9000| 24| -0.3496663257478285|
|  Deja|  Finance|   CA|  9900| 40|  1.6128589974640997|
| Sugie|  Finance|   NY|  8300| 36|-0.16386638151271365|
|   Ram|  Finance|   NY|  7900| 53| -0.6594558232170213|
|  Kyle|Marketing|   CA|  8000| 25|  0.8456550639941086|
|  Reid|Marketing|   NY|  9100| 50|  0.7301071180491999|
+------+---------+-----+------+---+--------------------+



In [56]:
#String Functions

In [63]:
#split

In [57]:
ord =spark.read.load('c:/practice/orders',format='csv',sep=',', schema=('order_id int, order_date timestamp,\
                                                                         order_customer_id int, order_status string'))

In [58]:
ord.show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

In [61]:
ord.select(ord.order_date,split(ord.order_date,'-').alias('order_split')).show(truncate=False)

+-------------------+-----------------------+
|order_date         |order_split            |
+-------------------+-----------------------+
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25 00:00:00]|
|2013-07-25 00:00:00|[2013, 07, 25

In [62]:
ord.select(ord.order_date,\
           split(ord.order_date,'-')[0].alias('order_year')).show(truncate=False)

+-------------------+----------+
|order_date         |order_year|
+-------------------+----------+
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
|2013-07-25 00:00:00|2013      |
+-------------------+----------+
only showing top 20 rows



In [70]:
ord.select(ord.order_date,
           split(ord.order_date,'-')[0].alias('order_year'),
           split(ord.order_date,'-')[1].alias('order_month'),
           split(split(ord.order_date,'-')[2],' ')[0].alias('order_date')
          ).show(truncate=False)


+-------------------+----------+-----------+----------+
|order_date         |order_year|order_month|order_date|
+-------------------+----------+-----------+----------+
|2013-07-25 00:00:00|2013      |07         |25        |
|2013-07-25 00:00:00|2013      |07         |25        |
|2013-07-25 00:00:00|2013      |07         |25        |
|2013-07-25 00:00:00|2013      |07         |25        |
|2013-07-25 00:00:00|2013      |07         |25        |
|2013-07-25 00:00:00|2013      |07         |25        |
|2013-07-25 00:00:00|2013      |07         |25        |
|2013-07-25 00:00:00|2013      |07         |25        |
|2013-07-25 00:00:00|2013      |07         |25        |
|2013-07-25 00:00:00|2013      |07         |25        |
|2013-07-25 00:00:00|2013      |07         |25        |
|2013-07-25 00:00:00|2013      |07         |25        |
|2013-07-25 00:00:00|2013      |07         |25        |
|2013-07-25 00:00:00|2013      |07         |25        |
|2013-07-25 00:00:00|2013      |07         |25  

In [75]:
df1 =spark.createDataFrame([('ab12cd23ef34ij',)],['s',])

In [76]:
df1.show()

+--------------+
|             s|
+--------------+
|ab12cd23ef34ij|
+--------------+



In [82]:
df1.select(split(df1.s,'[1-9]+').alias('col1')).show()

+----------------+
|            col1|
+----------------+
|[ab, cd, ef, ij]|
+----------------+



In [83]:
#length

In [86]:
df.select(df.ename,length(df.ename).alias('no of char')).show()

+------+----------+
| ename|no of char|
+------+----------+
| James|         5|
|Alicia|         6|
|Robert|         6|
|  Lisa|         4|
|  Deja|         4|
| Sugie|         5|
|   Ram|         3|
|  Kyle|         4|
|  Reid|         4|
+------+----------+



In [90]:
ord.select(ord.order_status,initcap(ord.order_status).alias('initcap_status')).show()

+---------------+---------------+
|   order_status| initcap_status|
+---------------+---------------+
|         CLOSED|         Closed|
|PENDING_PAYMENT|Pending_payment|
|       COMPLETE|       Complete|
|         CLOSED|         Closed|
|       COMPLETE|       Complete|
|       COMPLETE|       Complete|
|       COMPLETE|       Complete|
|     PROCESSING|     Processing|
|PENDING_PAYMENT|Pending_payment|
|PENDING_PAYMENT|Pending_payment|
| PAYMENT_REVIEW| Payment_review|
|         CLOSED|         Closed|
|PENDING_PAYMENT|Pending_payment|
|     PROCESSING|     Processing|
|       COMPLETE|       Complete|
|PENDING_PAYMENT|Pending_payment|
|       COMPLETE|       Complete|
|         CLOSED|         Closed|
|PENDING_PAYMENT|Pending_payment|
|     PROCESSING|     Processing|
+---------------+---------------+
only showing top 20 rows



In [91]:
ord.select(ord.order_status,upper(ord.order_status).alias('upper_status')).show()

+---------------+---------------+
|   order_status|   upper_status|
+---------------+---------------+
|         CLOSED|         CLOSED|
|PENDING_PAYMENT|PENDING_PAYMENT|
|       COMPLETE|       COMPLETE|
|         CLOSED|         CLOSED|
|       COMPLETE|       COMPLETE|
|       COMPLETE|       COMPLETE|
|       COMPLETE|       COMPLETE|
|     PROCESSING|     PROCESSING|
|PENDING_PAYMENT|PENDING_PAYMENT|
|PENDING_PAYMENT|PENDING_PAYMENT|
| PAYMENT_REVIEW| PAYMENT_REVIEW|
|         CLOSED|         CLOSED|
|PENDING_PAYMENT|PENDING_PAYMENT|
|     PROCESSING|     PROCESSING|
|       COMPLETE|       COMPLETE|
|PENDING_PAYMENT|PENDING_PAYMENT|
|       COMPLETE|       COMPLETE|
|         CLOSED|         CLOSED|
|PENDING_PAYMENT|PENDING_PAYMENT|
|     PROCESSING|     PROCESSING|
+---------------+---------------+
only showing top 20 rows



In [92]:
ord.select(ord.order_status,lower(ord.order_status).alias('lower_status')).show()

+---------------+---------------+
|   order_status|   lower_status|
+---------------+---------------+
|         CLOSED|         closed|
|PENDING_PAYMENT|pending_payment|
|       COMPLETE|       complete|
|         CLOSED|         closed|
|       COMPLETE|       complete|
|       COMPLETE|       complete|
|       COMPLETE|       complete|
|     PROCESSING|     processing|
|PENDING_PAYMENT|pending_payment|
|PENDING_PAYMENT|pending_payment|
| PAYMENT_REVIEW| payment_review|
|         CLOSED|         closed|
|PENDING_PAYMENT|pending_payment|
|     PROCESSING|     processing|
|       COMPLETE|       complete|
|PENDING_PAYMENT|pending_payment|
|       COMPLETE|       complete|
|         CLOSED|         closed|
|PENDING_PAYMENT|pending_payment|
|     PROCESSING|     processing|
+---------------+---------------+
only showing top 20 rows



In [109]:
#left

In [108]:
ord.select(ord.order_status,expr("left(order_status,3)").alias("left_3")).show()


+---------------+------+
|   order_status|left_3|
+---------------+------+
|         CLOSED|   CLO|
|PENDING_PAYMENT|   PEN|
|       COMPLETE|   COM|
|         CLOSED|   CLO|
|       COMPLETE|   COM|
|       COMPLETE|   COM|
|       COMPLETE|   COM|
|     PROCESSING|   PRO|
|PENDING_PAYMENT|   PEN|
|PENDING_PAYMENT|   PEN|
| PAYMENT_REVIEW|   PAY|
|         CLOSED|   CLO|
|PENDING_PAYMENT|   PEN|
|     PROCESSING|   PRO|
|       COMPLETE|   COM|
|PENDING_PAYMENT|   PEN|
|       COMPLETE|   COM|
|         CLOSED|   CLO|
|PENDING_PAYMENT|   PEN|
|     PROCESSING|   PRO|
+---------------+------+
only showing top 20 rows



In [111]:
ord.select(ord.order_status,expr("right(order_status,3)").alias("rigth_3")).show()

+---------------+-------+
|   order_status|rigth_3|
+---------------+-------+
|         CLOSED|    SED|
|PENDING_PAYMENT|    ENT|
|       COMPLETE|    ETE|
|         CLOSED|    SED|
|       COMPLETE|    ETE|
|       COMPLETE|    ETE|
|       COMPLETE|    ETE|
|     PROCESSING|    ING|
|PENDING_PAYMENT|    ENT|
|PENDING_PAYMENT|    ENT|
| PAYMENT_REVIEW|    IEW|
|         CLOSED|    SED|
|PENDING_PAYMENT|    ENT|
|     PROCESSING|    ING|
|       COMPLETE|    ETE|
|PENDING_PAYMENT|    ENT|
|       COMPLETE|    ETE|
|         CLOSED|    SED|
|PENDING_PAYMENT|    ENT|
|     PROCESSING|    ING|
+---------------+-------+
only showing top 20 rows



In [None]:
#ltrim,rtrim,trim

In [122]:
df1 =spark.createDataFrame([('    spark',),('     spark   ',)],('col1',))

In [123]:
df1.select(df1.col1,trim(df1.col1).alias('ltrim')).show()

+-------------+-----+
|         col1|ltrim|
+-------------+-----+
|        spark|spark|
|     spark   |spark|
+-------------+-----+



In [124]:
#lpad,rpad

In [125]:
ord.show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

In [127]:
ord.withColumn('lapd',lpad(ord.order_id,6,'0')).show()

+--------+-------------------+-----------------+---------------+------+
|order_id|         order_date|order_customer_id|   order_status|  lapd|
+--------+-------------------+-----------------+---------------+------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|000001|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|000002|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|000003|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|000004|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|000005|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|000006|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|000007|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|000008|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|000009|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|000010|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|

In [128]:
#reverse

In [130]:
ord.withColumn('reverstring',reverse(ord.order_status)).show()

+--------+-------------------+-----------------+---------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|    reverstring|
+--------+-------------------+-----------------+---------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|         DESOLC|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|TNEMYAP_GNIDNEP|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|       ETELPMOC|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|         DESOLC|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|       ETELPMOC|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|       ETELPMOC|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|       ETELPMOC|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|     GNISSECORP|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|TNEMYAP_GNIDNEP|
|      10|2013-07-25 00:00:0

In [131]:
#repeat

In [133]:
ord.withColumn('repeat',repeat(ord.order_status,3)).show(truncate=False)

+--------+-------------------+-----------------+---------------+---------------------------------------------+
|order_id|order_date         |order_customer_id|order_status   |repeat                                       |
+--------+-------------------+-----------------+---------------+---------------------------------------------+
|1       |2013-07-25 00:00:00|11599            |CLOSED         |CLOSEDCLOSEDCLOSED                           |
|2       |2013-07-25 00:00:00|256              |PENDING_PAYMENT|PENDING_PAYMENTPENDING_PAYMENTPENDING_PAYMENT|
|3       |2013-07-25 00:00:00|12111            |COMPLETE       |COMPLETECOMPLETECOMPLETE                     |
|4       |2013-07-25 00:00:00|8827             |CLOSED         |CLOSEDCLOSEDCLOSED                           |
|5       |2013-07-25 00:00:00|11318            |COMPLETE       |COMPLETECOMPLETECOMPLETE                     |
|6       |2013-07-25 00:00:00|7130             |COMPLETE       |COMPLETECOMPLETECOMPLETE                     |
|

In [134]:
#hex

In [135]:
ord.withColumn('hex',hex(ord.order_status)).show(truncate=False)

+--------+-------------------+-----------------+---------------+------------------------------+
|order_id|order_date         |order_customer_id|order_status   |hex                           |
+--------+-------------------+-----------------+---------------+------------------------------+
|1       |2013-07-25 00:00:00|11599            |CLOSED         |434C4F534544                  |
|2       |2013-07-25 00:00:00|256              |PENDING_PAYMENT|50454E44494E475F5041594D454E54|
|3       |2013-07-25 00:00:00|12111            |COMPLETE       |434F4D504C455445              |
|4       |2013-07-25 00:00:00|8827             |CLOSED         |434C4F534544                  |
|5       |2013-07-25 00:00:00|11318            |COMPLETE       |434F4D504C455445              |
|6       |2013-07-25 00:00:00|7130             |COMPLETE       |434F4D504C455445              |
|7       |2013-07-25 00:00:00|4530             |COMPLETE       |434F4D504C455445              |
|8       |2013-07-25 00:00:00|2911      

In [136]:
#concat

In [141]:
ord.withColumn('idstatus',concat(ord.order_id,lit(' '),ord.order_status)).show()

+--------+-------------------+-----------------+---------------+------------------+
|order_id|         order_date|order_customer_id|   order_status|          idstatus|
+--------+-------------------+-----------------+---------------+------------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|          1 CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT| 2 PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|        3 COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|          4 CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|        5 COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|        6 COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|        7 COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|      8 PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT| 9 PENDING_P

In [144]:
ord.withColumn('idstatus',concat_ws(' ',ord.order_id,ord.order_status)).show()

+--------+-------------------+-----------------+---------------+------------------+
|order_id|         order_date|order_customer_id|   order_status|          idstatus|
+--------+-------------------+-----------------+---------------+------------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|          1 CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT| 2 PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|        3 COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|          4 CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|        5 COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|        6 COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|        7 COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|      8 PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT| 9 PENDING_P

In [145]:
#substring

In [147]:
ord.withColumn('order_year',substring(ord.order_date,1,4)).show()

+--------+-------------------+-----------------+---------------+----------+
|order_id|         order_date|order_customer_id|   order_status|order_year|
+--------+-------------------+-----------------+---------------+----------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|      2013|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|      2013|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|      2013|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|      2013|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|      2013|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|      2013|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|      2013|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|      2013|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|      2013|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|      2013|
|      11|20

In [148]:
#substring_index

In [150]:
ord.withColumn('dummy',substring_index(ord.order_date,'-',1)).show()

+--------+-------------------+-----------------+---------------+-----+
|order_id|         order_date|order_customer_id|   order_status|dummy|
+--------+-------------------+-----------------+---------------+-----+
|       1|2013-07-25 00:00:00|            11599|         CLOSED| 2013|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT| 2013|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE| 2013|
|       4|2013-07-25 00:00:00|             8827|         CLOSED| 2013|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE| 2013|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE| 2013|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE| 2013|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING| 2013|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT| 2013|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT| 2013|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW| 2013|
|     

In [151]:
ord.withColumn('dummy',substring_index(ord.order_date,'-',2)).show()

+--------+-------------------+-----------------+---------------+-------+
|order_id|         order_date|order_customer_id|   order_status|  dummy|
+--------+-------------------+-----------------+---------------+-------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|2013-07|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|2013-07|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|2013-07|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|2013-07|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|2013-07|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|2013-07|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|2013-07|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|2013-07|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|2013-07|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|2013-07|
|      11|2013-07-25 00:00:00|              918| PA

In [152]:
ord.withColumn('dummy',substring_index(ord.order_date,'-',-1)).show()

+--------+-------------------+-----------------+---------------+-----------+
|order_id|         order_date|order_customer_id|   order_status|      dummy|
+--------+-------------------+-----------------+---------------+-----------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|25 00:00:00|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|25 00:00:00|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|25 00:00:00|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|25 00:00:00|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|25 00:00:00|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|25 00:00:00|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|25 00:00:00|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|25 00:00:00|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|25 00:00:00|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|25 00:00:00|

In [153]:
ord.withColumn('instr',instr(ord.order_status,'LO')).show()

+--------+-------------------+-----------------+---------------+-----+
|order_id|         order_date|order_customer_id|   order_status|instr|
+--------+-------------------+-----------------+---------------+-----+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|    2|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|    0|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|    0|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|    2|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|    0|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|    0|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|    0|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|    0|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|    0|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|    0|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|    0|
|     

In [154]:
#rgular expression extract

In [167]:
df1 =spark.createDataFrame([('11ssl ab',)],schema=['col1'])

In [168]:
df1.show()

+--------+
|    col1|
+--------+
|11ssl ab|
+--------+



In [170]:
df1.select(regexp_extract(df1.col1,'(\d+)' ,1)).show()

+------------------------------+
|regexp_extract(col1, (\d+), 1)|
+------------------------------+
|                            11|
+------------------------------+



In [171]:
df1.select(regexp_extract(df1.col1,'(\d+)(\w+)' ,2)).show()

+-----------------------------------+
|regexp_extract(col1, (\d+)(\w+), 2)|
+-----------------------------------+
|                                ssl|
+-----------------------------------+



In [172]:
df1.select(regexp_extract(df1.col1,'(\d+)(\w+)(\s)' ,3)).show()

+---------------------------------------+
|regexp_extract(col1, (\d+)(\w+)(\s), 3)|
+---------------------------------------+
|                                       |
+---------------------------------------+



In [175]:
df1.select(regexp_extract(df1.col1,'(\d+)(\w+)(\s)([a-z]+)' ,4).alias("output")).show()

+------+
|output|
+------+
|    ab|
+------+



In [178]:
df1.select(regexp_replace(df1.col1,'(\d+)' ,"xx").alias("output")).show()

+--------+
|  output|
+--------+
|xxssl ab|
+--------+



In [179]:
add =spark.createDataFrame([(1,'Mount rd','chennai'),
                             (2, 'Church rd','chennai')],schema=['id','street','city'])

In [180]:
add.show()

+---+---------+-------+
| id|   street|   city|
+---+---------+-------+
|  1| Mount rd|chennai|
|  2|Church rd|chennai|
+---+---------+-------+



In [183]:
add.withColumn('Reg_replace',regexp_replace(add.street,'rd','road')).show()

+---+---------+-------+-----------+
| id|   street|   city|Reg_replace|
+---+---------+-------+-----------+
|  1| Mount rd|chennai| Mount road|
|  2|Church rd|chennai|Church road|
+---+---------+-------+-----------+



In [3]:
#date functions

In [5]:
ord =spark.read.load('c:/practice/orders',format='csv',sep=',', schema=('order_id int, order_date timestamp,\
                                                                         order_customer_id int, order_status string'))

In [8]:
from pyspark.sql.functions import *

In [9]:
ord_new=ord.withColumn('new_order_date',date_add( ord.order_date,50))

In [10]:
ord_new.show()

+--------+-------------------+-----------------+---------------+--------------+
|order_id|         order_date|order_customer_id|   order_status|new_order_date|
+--------+-------------------+-----------------+---------------+--------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|    2013-09-13|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|    2013-09-13|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|    2013-09-13|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|    2013-09-13|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|    2013-09-13|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|    2013-09-13|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|    2013-09-13|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|    2013-09-13|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|    2013-09-13|
|      10|2013-07-25 00:00:00|          

In [11]:
#current_date

In [15]:
ord.select(current_date()).show()

+--------------+
|current_date()|
+--------------+
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
|    2024-03-27|
+--------------+
only showing top 20 rows



In [17]:
df = spark.range(1)

In [19]:
df.select(current_date()).show()

+--------------+
|current_date()|
+--------------+
|    2024-03-27|
+--------------+



In [None]:
#current_timestamp()

In [21]:
df.select(current_timestamp()).show(truncate=False)

+--------------------------+
|current_timestamp()       |
+--------------------------+
|2024-03-27 09:37:12.478795|
+--------------------------+



In [22]:
#next_day

In [24]:
df.select(next_day(current_date(),'sun')).show()

+-----------------------------+
|next_day(current_date(), sun)|
+-----------------------------+
|                   2024-03-31|
+-----------------------------+



In [25]:
#lastday

In [27]:
df.select(last_day(current_date())).show()

+------------------------+
|last_day(current_date())|
+------------------------+
|              2024-03-31|
+------------------------+



In [28]:
#dayofweek

In [29]:
df.select(dayofweek(current_date())).show()

+-------------------------+
|dayofweek(current_date())|
+-------------------------+
|                        4|
+-------------------------+



In [30]:
#dayofmonth

In [32]:
df.select(dayofmonth(current_date())).show()

+--------------------------+
|dayofmonth(current_date())|
+--------------------------+
|                        27|
+--------------------------+



In [33]:
#dayofyear

In [34]:
df.select(dayofyear(current_date())).show()

+-------------------------+
|dayofyear(current_date())|
+-------------------------+
|                       87|
+-------------------------+



In [35]:
df.select(weekofyear(current_date())).show()

+--------------------------+
|weekofyear(current_date())|
+--------------------------+
|                        13|
+--------------------------+



In [36]:
df.select (second(current_timestamp())).show()

+---------------------------+
|second(current_timestamp())|
+---------------------------+
|                         42|
+---------------------------+



In [37]:
df.select (hour(current_timestamp())).show()

+-------------------------+
|hour(current_timestamp())|
+-------------------------+
|                        9|
+-------------------------+



In [38]:
df.select (minute(current_timestamp())).show()

+---------------------------+
|minute(current_timestamp())|
+---------------------------+
|                         44|
+---------------------------+



In [39]:
df.select (year(current_timestamp())).show()

+-------------------------+
|year(current_timestamp())|
+-------------------------+
|                     2024|
+-------------------------+



In [40]:
df.select (quarter(current_timestamp())).show()

+----------------------------+
|quarter(current_timestamp())|
+----------------------------+
|                           1|
+----------------------------+



In [41]:
df.select (month(current_timestamp())).show()

+--------------------------+
|month(current_timestamp())|
+--------------------------+
|                         3|
+--------------------------+



In [42]:
df.select (day(current_timestamp())).show()

+------------------------+
|day(current_timestamp())|
+------------------------+
|                      27|
+------------------------+



In [43]:
df.select (week(current_timestamp())).show()

NameError: name 'week' is not defined

In [44]:
ord_new.show()

+--------+-------------------+-----------------+---------------+--------------+
|order_id|         order_date|order_customer_id|   order_status|new_order_date|
+--------+-------------------+-----------------+---------------+--------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|    2013-09-13|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|    2013-09-13|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|    2013-09-13|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|    2013-09-13|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|    2013-09-13|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|    2013-09-13|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|    2013-09-13|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|    2013-09-13|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|    2013-09-13|
|      10|2013-07-25 00:00:00|          

In [48]:
ord_new.select(ord_new.new_order_date,ord_new.order_date,
               months_between(ord_new.new_order_date,ord_new.order_date,roundOff=False)).show()

+--------------+-------------------+-------------------------------------------------+
|new_order_date|         order_date|months_between(new_order_date, order_date, false)|
+--------------+-------------------+-------------------------------------------------+
|    2013-09-13|2013-07-25 00:00:00|                               1.6129032258064515|
|    2013-09-13|2013-07-25 00:00:00|                               1.6129032258064515|
|    2013-09-13|2013-07-25 00:00:00|                               1.6129032258064515|
|    2013-09-13|2013-07-25 00:00:00|                               1.6129032258064515|
|    2013-09-13|2013-07-25 00:00:00|                               1.6129032258064515|
|    2013-09-13|2013-07-25 00:00:00|                               1.6129032258064515|
|    2013-09-13|2013-07-25 00:00:00|                               1.6129032258064515|
|    2013-09-13|2013-07-25 00:00:00|                               1.6129032258064515|
|    2013-09-13|2013-07-25 00:00:00|       

In [49]:
df.select(date_add(current_date(),10)).show()

+----------------------------+
|date_add(current_date(), 10)|
+----------------------------+
|                  2024-04-06|
+----------------------------+



In [51]:
df.select( date_sub(current_date(),3)).show()

+---------------------------+
|date_sub(current_date(), 3)|
+---------------------------+
|                 2024-03-24|
+---------------------------+



In [52]:
df.select( add_months(current_date(),3)).show()

+-----------------------------+
|add_months(current_date(), 3)|
+-----------------------------+
|                   2024-06-27|
+-----------------------------+



In [55]:
ord.select( ord.order_date,current_date(),date_diff(current_date(), ord.order_date).alias('diffdate')).show()

+-------------------+--------------+--------+
|         order_date|current_date()|diffdate|
+-------------------+--------------+--------+
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-27|    3898|
|2013-07-25 00:00:00|    2024-03-2

In [63]:
df.select(current_date(),date_format(current_date(),'dd/mm/yyyy').alias('date_formate')).show()

+--------------+------------+
|current_date()|date_formate|
+--------------+------------+
|    2024-03-27|  27/00/2024|
+--------------+------------+



In [79]:
df2 = spark.createDataFrame([('1924-01-01 10:03:30',)],schema=['t'])

In [80]:
df2.show()

+-------------------+
|                  t|
+-------------------+
|1924-01-01 10:03:30|
+-------------------+



In [81]:
df2.printSchema()

root
 |-- t: string (nullable = true)



In [82]:
df2.select(to_timestamp(df2.t)).printSchema()

root
 |-- to_timestamp(t): timestamp (nullable = true)



In [88]:
df3 =df2.select(to_date(df2.t,'yyyy-mm-dd'))

In [87]:
df2.show()

+-------------------+
|                  t|
+-------------------+
|1924-01-01 10:03:30|
+-------------------+



In [90]:
#null functions

In [93]:
df = spark.createDataFrame([('Robert',1,None,114.0),('Jhon',None,2577,float('nan'))], ["name","id","phone","stAddr"])

In [94]:
df.show()

+------+----+-----+------+
|  name|  id|phone|stAddr|
+------+----+-----+------+
|Robert|   1| NULL| 114.0|
|  Jhon|NULL| 2577|   NaN|
+------+----+-----+------+



In [101]:
df.where(isnull(df.phone)).show()

+------+---+-----+------+
|  name| id|phone|stAddr|
+------+---+-----+------+
|Robert|  1| NULL| 114.0|
+------+---+-----+------+



In [102]:
df.select(df.phone,isnull(df.phone)).show()

+-----+---------------+
|phone|(phone IS NULL)|
+-----+---------------+
| NULL|           true|
| 2577|          false|
+-----+---------------+



In [105]:
df.select(df.stAddr,isnan(df.stAddr)).show()

+------+-------------+
|stAddr|isnan(stAddr)|
+------+-------------+
| 114.0|        false|
|   NaN|         true|
+------+-------------+



In [108]:
df.select(df.phone,df.stAddr,coalesce(df.phone,df.stAddr)).show()

+-----+------+-----------------------+
|phone|stAddr|coalesce(phone, stAddr)|
+-----+------+-----------------------+
| NULL| 114.0|                  114.0|
| 2577|   NaN|                 2577.0|
+-----+------+-----------------------+



In [109]:
#collection functions

In [112]:
data =[('Alicia','Joesph',['Java','Scala','Spark'],{'hair':'black','eye':'brown'}),\
      ('Robert','Gee',['Spark','Java'],{'hair':'brown','eye':None}),\
      ('Mike','Bianca',['Csharp',''],{'hair':'red','eye':''}),\
      ('Jhon','Kumar',None,None),\
      ('Jeff','L',['1','2'],{})]




In [114]:
schema =['FirstName','LastName','Languages','Properties']

emp1 =spark.createDataFrame(data=data,schema=schema)

In [116]:
emp1.show(truncate=False)

+---------+--------+--------------------+-----------------------------+
|FirstName|LastName|Languages           |Properties                   |
+---------+--------+--------------------+-----------------------------+
|Alicia   |Joesph  |[Java, Scala, Spark]|{eye -> brown, hair -> black}|
|Robert   |Gee     |[Spark, Java]       |{eye -> NULL, hair -> brown} |
|Mike     |Bianca  |[Csharp, ]          |{eye -> , hair -> red}       |
|Jhon     |Kumar   |NULL                |NULL                         |
|Jeff     |L       |[1, 2]              |{}                           |
+---------+--------+--------------------+-----------------------------+



In [117]:
data =[('Robert',35,40,40),('Ram',31,33,29),('Jhon',95,89,91)]
schema =['name','score1','score2','score3']
emp2 =spark.createDataFrame(data=data,schema=schema)

In [119]:
emp2.show()

+------+------+------+------+
|  name|score1|score2|score3|
+------+------+------+------+
|Robert|    35|    40|    40|
|   Ram|    31|    33|    29|
|  Jhon|    95|    89|    91|
+------+------+------+------+



In [120]:
data =[('Jhon',[10,20,20],[25,11,10]),\
        ('Robert',[15,13,55],[5,None,29]),\
        ('James',[11,13,45],[5,89,79])]

schema =['empName','score_arr1','score_arr2']

emp3 =spark.createDataFrame(data=data,schema=schema)

In [121]:
emp3.show()

+-------+------------+-------------+
|empName|  score_arr1|   score_arr2|
+-------+------------+-------------+
|   Jhon|[10, 20, 20]| [25, 11, 10]|
| Robert|[15, 13, 55]|[5, NULL, 29]|
|  James|[11, 13, 45]|  [5, 89, 79]|
+-------+------------+-------------+



In [123]:
df =spark.sql("select array(struct(1,'a'),struct(2,'b')) as data")

In [124]:
df.printSchema()

root
 |-- data: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- col1: integer (nullable = false)
 |    |    |-- col2: string (nullable = false)



In [125]:
#size

In [133]:
emp1.show(truncate=False)

+---------+--------+--------------------+-----------------------------+
|FirstName|LastName|Languages           |Properties                   |
+---------+--------+--------------------+-----------------------------+
|Alicia   |Joesph  |[Java, Scala, Spark]|{eye -> brown, hair -> black}|
|Robert   |Gee     |[Spark, Java]       |{eye -> NULL, hair -> brown} |
|Mike     |Bianca  |[Csharp, ]          |{eye -> , hair -> red}       |
|Jhon     |Kumar   |NULL                |NULL                         |
|Jeff     |L       |[1, 2]              |{}                           |
+---------+--------+--------------------+-----------------------------+



In [132]:
emp1.select(size(emp1.Languages),size(emp1.Properties)).show()

+---------------+----------------+
|size(Languages)|size(Properties)|
+---------------+----------------+
|              3|               2|
|              2|               2|
|              2|               2|
|             -1|              -1|
|              2|               0|
+---------------+----------------+



In [134]:
#element_at

In [135]:
emp1.show(truncate=False)

+---------+--------+--------------------+-----------------------------+
|FirstName|LastName|Languages           |Properties                   |
+---------+--------+--------------------+-----------------------------+
|Alicia   |Joesph  |[Java, Scala, Spark]|{eye -> brown, hair -> black}|
|Robert   |Gee     |[Spark, Java]       |{eye -> NULL, hair -> brown} |
|Mike     |Bianca  |[Csharp, ]          |{eye -> , hair -> red}       |
|Jhon     |Kumar   |NULL                |NULL                         |
|Jeff     |L       |[1, 2]              |{}                           |
+---------+--------+--------------------+-----------------------------+



In [137]:
emp1.select(emp1.FirstName, element_at(emp1.Languages,1),element_at(emp1.Properties,'eye') ).show()

+---------+------------------------+---------------------------+
|FirstName|element_at(Languages, 1)|element_at(Properties, eye)|
+---------+------------------------+---------------------------+
|   Alicia|                    Java|                      brown|
|   Robert|                   Spark|                       NULL|
|     Mike|                  Csharp|                           |
|     Jhon|                    NULL|                       NULL|
|     Jeff|                       1|                       NULL|
+---------+------------------------+---------------------------+



In [138]:
#struct

In [139]:
emp1.select(emp1.FirstName,emp1.LastName,struct(emp1.FirstName,emp1.LastName)).show()

+---------+--------+---------------------------+
|FirstName|LastName|struct(FirstName, LastName)|
+---------+--------+---------------------------+
|   Alicia|  Joesph|           {Alicia, Joesph}|
|   Robert|     Gee|              {Robert, Gee}|
|     Mike|  Bianca|             {Mike, Bianca}|
|     Jhon|   Kumar|              {Jhon, Kumar}|
|     Jeff|       L|                  {Jeff, L}|
+---------+--------+---------------------------+



In [None]:
#arry

In [140]:
emp2.show()

+------+------+------+------+
|  name|score1|score2|score3|
+------+------+------+------+
|Robert|    35|    40|    40|
|   Ram|    31|    33|    29|
|  Jhon|    95|    89|    91|
+------+------+------+------+



In [142]:
emp2.select(emp2.name,array(emp2.score1,emp2.score2,emp2.score3)).show()

+------+-----------------------------+
|  name|array(score1, score2, score3)|
+------+-----------------------------+
|Robert|                 [35, 40, 40]|
|   Ram|                 [31, 33, 29]|
|  Jhon|                 [95, 89, 91]|
+------+-----------------------------+



In [143]:
emp2.select(emp2.name,array(emp2.score1,emp2.score2,emp2.score3)).printSchema()

root
 |-- name: string (nullable = true)
 |-- array(score1, score2, score3): array (nullable = false)
 |    |-- element: long (containsNull = true)



In [144]:
#arry_max

In [145]:
emp3.show()

+-------+------------+-------------+
|empName|  score_arr1|   score_arr2|
+-------+------------+-------------+
|   Jhon|[10, 20, 20]| [25, 11, 10]|
| Robert|[15, 13, 55]|[5, NULL, 29]|
|  James|[11, 13, 45]|  [5, 89, 79]|
+-------+------------+-------------+



In [None]:
#arry_max

In [148]:
emp3.select(emp3.empName,array_max(emp3.score_arr1)).show()

+-------+---------------------+
|empName|array_max(score_arr1)|
+-------+---------------------+
|   Jhon|                   20|
| Robert|                   55|
|  James|                   45|
+-------+---------------------+



In [149]:
#array_distinct

In [150]:
emp3.select(emp3.score_arr1, array_distinct(emp3.score_arr1)).show()

+------------+--------------------------+
|  score_arr1|array_distinct(score_arr1)|
+------------+--------------------------+
|[10, 20, 20]|                  [10, 20]|
|[15, 13, 55]|              [15, 13, 55]|
|[11, 13, 45]|              [11, 13, 45]|
+------------+--------------------------+



In [151]:
#arry_repeat

In [153]:
emp3.select(emp3.score_arr1, array_repeat(emp3.score_arr1,3)).show(truncate=False)

+------------+------------------------------------------+
|score_arr1  |array_repeat(score_arr1, 3)               |
+------------+------------------------------------------+
|[10, 20, 20]|[[10, 20, 20], [10, 20, 20], [10, 20, 20]]|
|[15, 13, 55]|[[15, 13, 55], [15, 13, 55], [15, 13, 55]]|
|[11, 13, 45]|[[11, 13, 45], [11, 13, 45], [11, 13, 45]]|
+------------+------------------------------------------+



In [154]:
emp1.show()

+---------+--------+--------------------+--------------------+
|FirstName|LastName|           Languages|          Properties|
+---------+--------+--------------------+--------------------+
|   Alicia|  Joesph|[Java, Scala, Spark]|{eye -> brown, ha...|
|   Robert|     Gee|       [Spark, Java]|{eye -> NULL, hai...|
|     Mike|  Bianca|          [Csharp, ]|{eye -> , hair ->...|
|     Jhon|   Kumar|                NULL|                NULL|
|     Jeff|       L|              [1, 2]|                  {}|
+---------+--------+--------------------+--------------------+



In [155]:
#slice

In [156]:
emp1.select(emp1.Languages, slice(emp1.Languages,2,2)).show()

+--------------------+----------------------+
|           Languages|slice(Languages, 2, 2)|
+--------------------+----------------------+
|[Java, Scala, Spark]|        [Scala, Spark]|
|       [Spark, Java]|                [Java]|
|          [Csharp, ]|                    []|
|                NULL|                  NULL|
|              [1, 2]|                   [2]|
+--------------------+----------------------+



In [None]:
#arry_position

In [158]:
emp3.show()

+-------+------------+-------------+
|empName|  score_arr1|   score_arr2|
+-------+------------+-------------+
|   Jhon|[10, 20, 20]| [25, 11, 10]|
| Robert|[15, 13, 55]|[5, NULL, 29]|
|  James|[11, 13, 45]|  [5, 89, 79]|
+-------+------------+-------------+



In [160]:
emp3.select(emp3.score_arr1,array_position(emp3.score_arr1,13)).show()

+------------+------------------------------+
|  score_arr1|array_position(score_arr1, 13)|
+------------+------------------------------+
|[10, 20, 20]|                             0|
|[15, 13, 55]|                             2|
|[11, 13, 45]|                             2|
+------------+------------------------------+



In [161]:
emp3.select(emp3.score_arr1,array_remove(emp3.score_arr1,13)).show()

+------------+----------------------------+
|  score_arr1|array_remove(score_arr1, 13)|
+------------+----------------------------+
|[10, 20, 20]|                [10, 20, 20]|
|[15, 13, 55]|                    [15, 55]|
|[11, 13, 45]|                    [11, 45]|
+------------+----------------------------+



In [162]:
emp3.show()

+-------+------------+-------------+
|empName|  score_arr1|   score_arr2|
+-------+------------+-------------+
|   Jhon|[10, 20, 20]| [25, 11, 10]|
| Robert|[15, 13, 55]|[5, NULL, 29]|
|  James|[11, 13, 45]|  [5, 89, 79]|
+-------+------------+-------------+



In [163]:
emp3.select(emp3.score_arr1,array_sort(emp3.score_arr1)).show()

+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|  score_arr1|array_sort(score_arr1, lambdafunction((IF(((namedlambdavariable() IS NULL) AND (namedlambdavariable() IS NULL)), 0, (IF((namedlambdavariable() IS NULL), 1, (IF((namedlambdavariable() IS NULL), -1, (IF((namedlambdavariable() < namedlambdavariable()), -1, (IF((namedlambdavariable() > namedlambdavariable()), 1, 0)))))))))), namedlambdavariable(), namedlambdavariable()))|
+------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [165]:
#sort array

In [164]:
emp3.select(emp3.score_arr1,sort_array(emp3.score_arr1,asc=False)).show()

+------------+-----------------------------+
|  score_arr1|sort_array(score_arr1, false)|
+------------+-----------------------------+
|[10, 20, 20]|                 [20, 20, 10]|
|[15, 13, 55]|                 [55, 15, 13]|
|[11, 13, 45]|                 [45, 13, 11]|
+------------+-----------------------------+



In [166]:
#array_contains

In [169]:
emp3.select(emp3.score_arr2,array_contains(emp3.score_arr2,25)).show()

+-------------+------------------------------+
|   score_arr2|array_contains(score_arr2, 25)|
+-------------+------------------------------+
| [25, 11, 10]|                          true|
|[5, NULL, 29]|                          NULL|
|  [5, 89, 79]|                         false|
+-------------+------------------------------+



In [172]:
emp3.select(emp3.score_arr1,emp3.score_arr2,array_union(emp3.score_arr1,emp3.score_arr2)).show(truncate=False)

+------------+-------------+-----------------------------------+
|score_arr1  |score_arr2   |array_union(score_arr1, score_arr2)|
+------------+-------------+-----------------------------------+
|[10, 20, 20]|[25, 11, 10] |[10, 20, 25, 11]                   |
|[15, 13, 55]|[5, NULL, 29]|[15, 13, 55, 5, NULL, 29]          |
|[11, 13, 45]|[5, 89, 79]  |[11, 13, 45, 5, 89, 79]            |
+------------+-------------+-----------------------------------+



In [173]:
emp3.select(emp3.score_arr1,emp3.score_arr2,array_except(emp3.score_arr1,emp3.score_arr2)).show(truncate=False)

+------------+-------------+------------------------------------+
|score_arr1  |score_arr2   |array_except(score_arr1, score_arr2)|
+------------+-------------+------------------------------------+
|[10, 20, 20]|[25, 11, 10] |[20]                                |
|[15, 13, 55]|[5, NULL, 29]|[15, 13, 55]                        |
|[11, 13, 45]|[5, 89, 79]  |[11, 13, 45]                        |
+------------+-------------+------------------------------------+



In [175]:
emp3.select(emp3.score_arr1,emp3.score_arr2,array_intersect(emp3.score_arr1,emp3.score_arr2)).show(truncate=False)

+------------+-------------+---------------------------------------+
|score_arr1  |score_arr2   |array_intersect(score_arr1, score_arr2)|
+------------+-------------+---------------------------------------+
|[10, 20, 20]|[25, 11, 10] |[10]                                   |
|[15, 13, 55]|[5, NULL, 29]|[]                                     |
|[11, 13, 45]|[5, 89, 79]  |[]                                     |
+------------+-------------+---------------------------------------+



In [176]:
emp3.select(emp3.score_arr2,array_join(emp3.score_arr2,'#',null_replacement='*')).show()

+-------------+----------------------------+
|   score_arr2|array_join(score_arr2, #, *)|
+-------------+----------------------------+
| [25, 11, 10]|                    25#11#10|
|[5, NULL, 29]|                      5#*#29|
|  [5, 89, 79]|                     5#89#79|
+-------------+----------------------------+



In [179]:
emp3.select(emp3.score_arr1,emp3.score_arr2,arrays_zip(emp3.score_arr1,emp3.score_arr2).alias('arry_zip')).show(truncate=False)

+------------+-------------+-------------------------------+
|score_arr1  |score_arr2   |arry_zip                       |
+------------+-------------+-------------------------------+
|[10, 20, 20]|[25, 11, 10] |[{10, 25}, {20, 11}, {20, 10}] |
|[15, 13, 55]|[5, NULL, 29]|[{15, 5}, {13, NULL}, {55, 29}]|
|[11, 13, 45]|[5, 89, 79]  |[{11, 5}, {13, 89}, {45, 79}]  |
+------------+-------------+-------------------------------+



In [180]:
#create_map

In [181]:
emp1.show()

+---------+--------+--------------------+--------------------+
|FirstName|LastName|           Languages|          Properties|
+---------+--------+--------------------+--------------------+
|   Alicia|  Joesph|[Java, Scala, Spark]|{eye -> brown, ha...|
|   Robert|     Gee|       [Spark, Java]|{eye -> NULL, hai...|
|     Mike|  Bianca|          [Csharp, ]|{eye -> , hair ->...|
|     Jhon|   Kumar|                NULL|                NULL|
|     Jeff|       L|              [1, 2]|                  {}|
+---------+--------+--------------------+--------------------+



In [185]:
emp1.select(create_map(emp1.FirstName, emp1.LastName)).show()

+------------------------+
|map(FirstName, LastName)|
+------------------------+
|      {Alicia -> Joesph}|
|         {Robert -> Gee}|
|        {Mike -> Bianca}|
|         {Jhon -> Kumar}|
|             {Jeff -> L}|
+------------------------+



In [186]:
emp1.show()

+---------+--------+--------------------+--------------------+
|FirstName|LastName|           Languages|          Properties|
+---------+--------+--------------------+--------------------+
|   Alicia|  Joesph|[Java, Scala, Spark]|{eye -> brown, ha...|
|   Robert|     Gee|       [Spark, Java]|{eye -> NULL, hai...|
|     Mike|  Bianca|          [Csharp, ]|{eye -> , hair ->...|
|     Jhon|   Kumar|                NULL|                NULL|
|     Jeff|       L|              [1, 2]|                  {}|
+---------+--------+--------------------+--------------------+



In [187]:
emp1.select(map_keys(emp1.Properties)).show()

+--------------------+
|map_keys(Properties)|
+--------------------+
|         [eye, hair]|
|         [eye, hair]|
|         [eye, hair]|
|                NULL|
|                  []|
+--------------------+



In [188]:
emp1.select(map_values(emp1.Properties)).show()

+----------------------+
|map_values(Properties)|
+----------------------+
|        [brown, black]|
|         [NULL, brown]|
|               [, red]|
|                  NULL|
|                    []|
+----------------------+



In [193]:
emp2.show()

+------+------+------+------+
|  name|score1|score2|score3|
+------+------+------+------+
|Robert|    35|    40|    40|
|   Ram|    31|    33|    29|
|  Jhon|    95|    89|    91|
+------+------+------+------+



In [196]:
#Sequance

In [195]:
emp2.select(emp2.score1,emp2.score2,sequence(emp2.score1,emp2.score2)).show(truncate=False)

+------+------+----------------------------+
|score1|score2|sequence(score1, score2)    |
+------+------+----------------------------+
|35    |40    |[35, 36, 37, 38, 39, 40]    |
|31    |33    |[31, 32, 33]                |
|95    |89    |[95, 94, 93, 92, 91, 90, 89]|
+------+------+----------------------------+



In [197]:
data =[('Alice',80,60),('Bob',None,5),(None,None,None),('Robet',30,35)]
schema='name string,age int, hight int'
df =spark.createDataFrame(data=data,schema=schema)

In [198]:
df.show()

+-----+----+-----+
| name| age|hight|
+-----+----+-----+
|Alice|  80|   60|
|  Bob|NULL|    5|
| NULL|NULL| NULL|
|Robet|  30|   35|
+-----+----+-----+



In [205]:
df.na.drop(how='all').show()

+-----+----+-----+
| name| age|hight|
+-----+----+-----+
|Alice|  80|   60|
|  Bob|NULL|    5|
|Robet|  30|   35|
+-----+----+-----+



In [201]:
df.select(df.age).na.drop().show()

+---+
|age|
+---+
| 80|
| 30|
+---+



In [206]:
df.show()

+-----+----+-----+
| name| age|hight|
+-----+----+-----+
|Alice|  80|   60|
|  Bob|NULL|    5|
| NULL|NULL| NULL|
|Robet|  30|   35|
+-----+----+-----+



In [211]:
df.na.drop(subset='age').show()

+-----+---+-----+
| name|age|hight|
+-----+---+-----+
|Alice| 80|   60|
|Robet| 30|   35|
+-----+---+-----+



In [212]:
df.show()

+-----+----+-----+
| name| age|hight|
+-----+----+-----+
|Alice|  80|   60|
|  Bob|NULL|    5|
| NULL|NULL| NULL|
|Robet|  30|   35|
+-----+----+-----+



In [213]:
df.na.fill(50).show()

+-----+---+-----+
| name|age|hight|
+-----+---+-----+
|Alice| 80|   60|
|  Bob| 50|    5|
| NULL| 50|   50|
|Robet| 30|   35|
+-----+---+-----+



In [215]:
df.na.fill('Ram').show()

+-----+----+-----+
| name| age|hight|
+-----+----+-----+
|Alice|  80|   60|
|  Bob|NULL|    5|
|  Ram|NULL| NULL|
|Robet|  30|   35|
+-----+----+-----+



In [218]:
df.na.fill({'age':50, 'name':'ram','hight':30}).show()

+-----+---+-----+
| name|age|hight|
+-----+---+-----+
|Alice| 80|   60|
|  Bob| 50|    5|
|  ram| 50|   30|
|Robet| 30|   35|
+-----+---+-----+



In [219]:
df =spark.createDataFrame([(-5,0),(1,3),(7,9)],['COL1','COL2'])

In [220]:
df.show()

+----+----+
|COL1|COL2|
+----+----+
|  -5|   0|
|   1|   3|
|   7|   9|
+----+----+



In [221]:
df.select(abs('COL1')).show()

+---------+
|abs(COL1)|
+---------+
|        5|
|        1|
|        7|
+---------+



In [222]:
df.select('COL1',exp('COL1')).show()

+----+--------------------+
|COL1|           EXP(COL1)|
+----+--------------------+
|  -5|0.006737946999085467|
|   1|  2.7182818284590455|
|   7|  1096.6331584284585|
+----+--------------------+



In [223]:
    df.select('COL1',factorial('COL1')).show()

+----+---------------+
|COL1|factorial(COL1)|
+----+---------------+
|  -5|           NULL|
|   1|              1|
|   7|           5040|
+----+---------------+



In [225]:
df.select('COL1',sqrt('COL1')).show()

+----+------------------+
|COL1|        SQRT(COL1)|
+----+------------------+
|  -5|               NaN|
|   1|               1.0|
|   7|2.6457513110645907|
+----+------------------+



In [227]:
df.select('COL1',pow('COL1',3)).show()

+----+--------------+
|COL1|POWER(COL1, 3)|
+----+--------------+
|  -5|        -125.0|
|   1|           1.0|
|   7|         343.0|
+----+--------------+



In [229]:
    df.select('COL1',floor('COL1')).show()

+----+-----------+
|COL1|FLOOR(COL1)|
+----+-----------+
|  -5|         -5|
|   1|          1|
|   7|          7|
+----+-----------+



In [230]:
df.select('COL1',ceil('COL1')).show()

+----+----------+
|COL1|CEIL(COL1)|
+----+----------+
|  -5|        -5|
|   1|         1|
|   7|         7|
+----+----------+



In [232]:
df.show()

+----+----+
|COL1|COL2|
+----+----+
|  -5|   0|
|   1|   3|
|   7|   9|
+----+----+



In [235]:
df.select(sum(df.COL1)).show()

+---------+
|sum(COL1)|
+---------+
|        3|
+---------+



In [237]:
df.select(avg(df.COL1)).show()

+---------+
|avg(COL1)|
+---------+
|      1.0|
+---------+



In [238]:
df.select(max(df.COL1)).show()

+---------+
|max(COL1)|
+---------+
|        7|
+---------+



In [239]:
df.select(min(df.COL1)).show()

+---------+
|min(COL1)|
+---------+
|       -5|
+---------+



In [240]:
emp1.show()

+---------+--------+--------------------+--------------------+
|FirstName|LastName|           Languages|          Properties|
+---------+--------+--------------------+--------------------+
|   Alicia|  Joesph|[Java, Scala, Spark]|{eye -> brown, ha...|
|   Robert|     Gee|       [Spark, Java]|{eye -> NULL, hai...|
|     Mike|  Bianca|          [Csharp, ]|{eye -> , hair ->...|
|     Jhon|   Kumar|                NULL|                NULL|
|     Jeff|       L|              [1, 2]|                  {}|
+---------+--------+--------------------+--------------------+



In [243]:
emp1.select  (emp1.FirstName, explode(emp1.Languages)).show()

+---------+------+
|FirstName|   col|
+---------+------+
|   Alicia|  Java|
|   Alicia| Scala|
|   Alicia| Spark|
|   Robert| Spark|
|   Robert|  Java|
|     Mike|Csharp|
|     Mike|      |
|     Jeff|     1|
|     Jeff|     2|
+---------+------+



In [244]:
emp1.select  (emp1.FirstName, explode(emp1.Properties)).show()

+---------+----+-----+
|FirstName| key|value|
+---------+----+-----+
|   Alicia| eye|brown|
|   Alicia|hair|black|
|   Robert| eye| NULL|
|   Robert|hair|brown|
|     Mike| eye|     |
|     Mike|hair|  red|
+---------+----+-----+



In [245]:
emp1.select  (emp1.FirstName, explode_outer(emp1.Languages)).show()

+---------+------+
|FirstName|   col|
+---------+------+
|   Alicia|  Java|
|   Alicia| Scala|
|   Alicia| Spark|
|   Robert| Spark|
|   Robert|  Java|
|     Mike|Csharp|
|     Mike|      |
|     Jhon|  NULL|
|     Jeff|     1|
|     Jeff|     2|
+---------+------+



In [246]:
#format_number

In [12]:
ordItems =spark.read.load('c:/practice/order_items',format='csv',sep=',',schema=\
        'order_item_id int, order_id int, product_id int, qunatity int, subtotal float, price float')

In [13]:
ordItems.show()

+-------------+--------+----------+--------+--------+------+
|order_item_id|order_id|product_id|qunatity|subtotal| price|
+-------------+--------+----------+--------+--------+------+
|            1|       1|       957|       1|  299.98|299.98|
|            2|       2|      1073|       1|  199.99|199.99|
|            3|       2|       502|       5|   250.0|  50.0|
|            4|       2|       403|       1|  129.99|129.99|
|            5|       4|       897|       2|   49.98| 24.99|
|            6|       4|       365|       5|  299.95| 59.99|
|            7|       4|       502|       3|   150.0|  50.0|
|            8|       4|      1014|       4|  199.92| 49.98|
|            9|       5|       957|       1|  299.98|299.98|
|           10|       5|       365|       5|  299.95| 59.99|
|           11|       5|      1014|       2|   99.96| 49.98|
|           12|       5|       957|       1|  299.98|299.98|
|           13|       5|       403|       1|  129.99|129.99|
|           14|       7|

In [249]:
ordItems.select(ordItems.price,format_number(ordItems.price,1)).show()

+------+-----------------------+
| price|format_number(price, 1)|
+------+-----------------------+
|299.98|                  300.0|
|199.99|                  200.0|
|  50.0|                   50.0|
|129.99|                  130.0|
| 24.99|                   25.0|
| 59.99|                   60.0|
|  50.0|                   50.0|
| 49.98|                   50.0|
|299.98|                  300.0|
| 59.99|                   60.0|
| 49.98|                   50.0|
|299.98|                  300.0|
|129.99|                  130.0|
|199.99|                  200.0|
|299.98|                  300.0|
| 15.99|                   16.0|
| 59.99|                   60.0|
| 59.99|                   60.0|
| 49.98|                   50.0|
|  50.0|                   50.0|
+------+-----------------------+
only showing top 20 rows



In [256]:
spark.range(1).select(lit(3.5),format_number(lit(3.5),0)).show()

+---+---------------------+
|3.5|format_number(3.5, 0)|
+---+---------------------+
|3.5|                    4|
+---+---------------------+



In [257]:
spark.range(1).select(lit(4.5),format_number(lit(4.5),0)).show()

+---+---------------------+
|4.5|format_number(4.5, 0)|
+---+---------------------+
|4.5|                    4|
+---+---------------------+



In [258]:
df =spark.createDataFrame([(5,'hello')],['a','b'])

In [259]:
df.show()

+---+-----+
|  a|    b|
+---+-----+
|  5|hello|
+---+-----+



In [260]:
#format_string

In [262]:
df.select(format_string('%d %s', df.a, df.b)).show()

+--------------------------+
|format_string(%d %s, a, b)|
+--------------------------+
|                   5 hello|
+--------------------------+



In [14]:
ord=spark.read.load('c:/practice/orders',format='csv',
                    schema=('OrderId int, OrderDate String,CustomerId int,OrderStatus String'))

In [5]:
ord.show()

+-------+--------------------+----------+---------------+
|OrderId|           OrderDate|CustomerId|    OrderStatus|
+-------+--------------------+----------+---------------+
|      1|2013-07-25 00:00:...|     11599|         CLOSED|
|      2|2013-07-25 00:00:...|       256|PENDING_PAYMENT|
|      3|2013-07-25 00:00:...|     12111|       COMPLETE|
|      4|2013-07-25 00:00:...|      8827|         CLOSED|
|      5|2013-07-25 00:00:...|     11318|       COMPLETE|
|      6|2013-07-25 00:00:...|      7130|       COMPLETE|
|      7|2013-07-25 00:00:...|      4530|       COMPLETE|
|      8|2013-07-25 00:00:...|      2911|     PROCESSING|
|      9|2013-07-25 00:00:...|      5657|PENDING_PAYMENT|
|     10|2013-07-25 00:00:...|      5648|PENDING_PAYMENT|
|     11|2013-07-25 00:00:...|       918| PAYMENT_REVIEW|
|     12|2013-07-25 00:00:...|      1837|         CLOSED|
|     13|2013-07-25 00:00:...|      9149|PENDING_PAYMENT|
|     14|2013-07-25 00:00:...|      9842|     PROCESSING|
|     15|2013-

In [15]:
ordCountByStatus = ord.groupBy(ord.OrderStatus).count()

In [16]:
ordCountByStatus.show()

+---------------+-----+
|    OrderStatus|count|
+---------------+-----+
|PENDING_PAYMENT|15030|
|       COMPLETE|22899|
|        ON_HOLD| 3798|
| PAYMENT_REVIEW|  729|
|     PROCESSING| 8275|
|         CLOSED| 7556|
|SUSPECTED_FRAUD| 1558|
|        PENDING| 7610|
|       CANCELED| 1428|
+---------------+-----+



In [7]:
ordCountByStatus.rdd.getNumPartitions()

1

In [18]:
ordCountByStatus.coalesce(1).write.save("file:///C:/practice/out1.csv",format='csv',sep=',',mode='overwrite')



Py4JJavaError: An error occurred while calling o95.save.
: java.lang.RuntimeException: java.io.FileNotFoundException: Hadoop bin directory does not exist: C:\winutils\bin\bin -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:374)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:374)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.io.FileNotFoundException: Hadoop bin directory does not exist: C:\winutils\bin\bin -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getQualifiedBinInner(Shell.java:607)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, 40)
lines = ssc.textFileStream("c:/training/words")
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate



-------------------------------------------
Time: 2024-04-05 16:58:00
-------------------------------------------

-------------------------------------------
Time: 2024-04-05 16:58:40
-------------------------------------------

-------------------------------------------
Time: 2024-04-05 16:59:20
-------------------------------------------

-------------------------------------------
Time: 2024-04-05 17:00:00
-------------------------------------------

-------------------------------------------
Time: 2024-04-05 17:00:40
-------------------------------------------

-------------------------------------------
Time: 2024-04-05 17:01:20
-------------------------------------------

-------------------------------------------
Time: 2024-04-05 17:02:00
-------------------------------------------

-------------------------------------------
Time: 2024-04-05 17:02:40
-------------------------------------------

-------------------------------------------
Time: 2024-04-05 17:03:20
----------

-------------------------------------------
Time: 2024-04-05 17:46:00
-------------------------------------------

-------------------------------------------
Time: 2024-04-05 17:46:40
-------------------------------------------

-------------------------------------------
Time: 2024-04-05 17:47:20
-------------------------------------------

-------------------------------------------
Time: 2024-04-05 17:48:00
-------------------------------------------

-------------------------------------------
Time: 2024-04-05 17:48:40
-------------------------------------------

-------------------------------------------
Time: 2024-04-05 17:49:20
-------------------------------------------

-------------------------------------------
Time: 2024-04-05 17:50:00
-------------------------------------------

