In [0]:
# # udf 
# it is called user defined function , we can wrie one logic using python or spark then we can register that logic with function name by using udf() then we can use it like normal functions to perform that logic


In [0]:
data=[(1,'lokesh',2000,100),(2,'gopal',3000,200)]
schema =['id','name','salary','bonus']
df=spark.createDataFrame(data,schema)
display(df)

id,name,salary,bonus
1,lokesh,2000,100
2,gopal,3000,200


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

In [0]:
def total(s,b) :
    return s+b

# register by using udf with functon TotalPay
TotalPay=udf(lambda s,b : total(s,b),IntegerType())

In [0]:
df.withColumn('Total Salary',TotalPay(df.salary,df.bonus)).show()

+---+------+------+-----+------------+
| id|  name|salary|bonus|Total Salary|
+---+------+------+-----+------------+
|  1|lokesh|  2000|  100|        2100|
|  2| gopal|  3000|  200|        3200|
+---+------+------+-----+------------+



In [0]:
# register function using another method
@udf(returntype=IntegerType)
    def total(s,b) :
      return s+b

In [0]:
df.withColumn('Total Salary',total(df.salary,df.bonus)).show()

+---+------+------+-----+------------+
| id|  name|salary|bonus|Total Salary|
+---+------+------+-----+------------+
|  1|lokesh|  2000|  100|        2100|
|  2| gopal|  3000|  200|        3200|
+---+------+------+-----+------------+



In [0]:
df.createOrReplaceTempView('students')

In [0]:
# to use udf function name in sql we register like
def total(s,b) :
      return s+b

spark.udf.register('TotalPay',total,IntegerType())

Out[12]: <function __main__.total(s, b)>

In [0]:
%sql
select *,TotalPay(salary,bonus) as TotalSalary from students

id,name,salary,bonus,TotalSalary
1,lokesh,2000,100,2100
2,gopal,3000,200,3200


In [0]:
# resilent distributed dataset(RDD)
# it is called collection og objects similar to python , the advantage Of RDD  is 
# it it immutable 
# fault tolerance 
# parallelizing
# partitioning 
# in memory processing
#it is fundamantal data struccture in spark

In [0]:
# we create RDD by ug sparkcontext instance and parallelize function

In [0]:
data=[(1,'lokesh',2000,100),(2,'gopal',3000,200)]
rdd=spark.sparkContext.parallelize(data)
print(rdd.collect())

[(1, 'lokesh', 2000, 100), (2, 'gopal', 3000, 200)]


In [0]:
# to convert rdd into dataframe
schema =['id','name','salary','bonus']

df=rdd.toDF(schema)
df.show()

+---+------+------+-----+
| id|  name|salary|bonus|
+---+------+------+-----+
|  1|lokesh|  2000|  100|
|  2| gopal|  3000|  200|
+---+------+------+-----+



In [0]:
# we can also create by using spark data frame
df1=spark.createDataFrame(rdd,schema)
df1.show()

+---+------+------+-----+
| id|  name|salary|bonus|
+---+------+------+-----+
|  1|lokesh|  2000|  100|
|  2| gopal|  3000|  200|
+---+------+------+-----+



In [0]:
# map()
# it's RDD transformation , used to peroform transormation functions on RDD to alter rows or anything, we use lambda fuction to take element from rdd and perform operations on  it. 
# in dataframe we don't have map() function to perform for that we need create daaframe first.


In [0]:
rdd2=rdd.map(lambda x : x+(x[2]+x[3],))
print(rdd2.collect())

[(1, 'lokesh', 2000, 100, 2100), (2, 'gopal', 3000, 200, 3200)]


In [0]:
rdd3=df1.rdd.map(lambda x : x+(x[2]+x[3],))
df2=rdd3.toDF(['id','name','salary','bonus','total salary'])
df2.show()

+---+------+------+-----+------------+
| id|  name|salary|bonus|total salary|
+---+------+------+-----+------------+
|  1|lokesh|  2000|  100|        2100|
|  2| gopal|  3000|  200|        3200|
+---+------+------+-----+------------+



In [0]:
#flatMap 
#it is used to flatten an array , flatten means distributed array elements in to individual rows like explodse in dataframes

In [0]:
# suppose we have some data if we try to split or perofoem any operation on it it will return you array
data=['SAMPATHI LOKESH','RAVI GOPAL']
Rdd = spark.sparkContext.parallelize(data)
Rdd2=Rdd.map(lambda x : x.split(' '))
for item in Rdd2.collect():
    print(item)
# it will return you list by splitting elemnts by space in array but if tou want to flatten you will use flatMap

['SAMPATHI', 'LOKESH']
['RAVI', 'GOPAL']


In [0]:
Rdd3=Rdd.flatMap(lambda x: x.split(' '))
for item in Rdd3.collect():
    print(item)
    

SAMPATHI
LOKESH
RAVI
GOPAL
