In [0]:
data=[('Alice','Badminton,Tennis'),('Bob','Tennis,Cricket'),('Jullie','Cricket,Carroms')]
schema=['name','hobbies']
df=spark.createDataFrame(data,schema)
display(df)

name,hobbies
Alice,"Badminton,Tennis"
Bob,"Tennis,Cricket"
Jullie,"Cricket,Carroms"


In [0]:
from pyspark.sql.functions import  *
from pyspark.sql.window import  *

In [0]:
df1=df.select(df.name,explode(split(df.hobbies,',')).alias('hobbies'))
display(df1)

name,hobbies
Alice,Badminton
Alice,Tennis
Bob,Tennis
Bob,Cricket
Jullie,Cricket
Jullie,Carroms


In [0]:
data=[('Goa','','AP'),('','AP',None),(None,'','BLR')]
schema=['city1','city2','city3']
df=spark.createDataFrame(data,schema)
display(df)

city1,city2,city3
Goa,,AP
,AP,
,,BLR


In [0]:
df=df.withColumn('new',coalesce(
    (when(df.city1=='',None).otherwise(df.city1)),
    (when(df.city2=='',None).otherwise(df.city2)),
    (when(df.city3=='',None).otherwise(df.city3))
    )
)
display(df.select(df.new))

new
Goa
AP
BLR


In [0]:
data=[(1,'Steve'),(2,'David'),(3,'Aryan')]
schema=['Student_id','Student_name']
df=spark.createDataFrame(data,schema)
display(df)

Student_id,Student_name
1,Steve
2,David
3,Aryan


In [0]:
data=[(1,'Pyspark',90),(1,'Sql',100),(2,'Sql',70),(2,'Pyspark',60),(3,'Sql',30),(3,'Pyspark',20)]
schema=['Student_id','Subject_name','Marks']
df1=spark.createDataFrame(data,schema)
display(df1)

Student_id,Subject_name,Marks
1,Pyspark,90
1,Sql,100
2,Sql,70
2,Pyspark,60
3,Sql,30
3,Pyspark,20


In [0]:
df2=df.join(df1,df.Student_id==df1.Student_id,"inner").drop(df1.Student_id)
df3=df2.select('*').groupBy(df2.Student_id,df2.Student_name).agg((sum('Marks')/count('*')).alias('Percentage'))
df4=df3.select('*',when(df3.Percentage>=70,'Distinction').
                   when((df3.Percentage<=70) & (df3.Percentage>=60),'First Class').
                   when((df3.Percentage<=60) & (df3.Percentage>=50),'Second Class').
                   when((df3.Percentage<=50) & (df3.Percentage>=40),'Third Class').
                   otherwise('Fail')
               )
display(df4)

Student_id,Student_name,Percentage,CASE WHEN (Percentage >= 70) THEN Distinction WHEN ((Percentage <= 70) AND (Percentage >= 60)) THEN First Class WHEN ((Percentage <= 60) AND (Percentage >= 50)) THEN Second Class WHEN ((Percentage <= 50) AND (Percentage >= 40)) THEN Third Class ELSE Fail END
1,Steve,95.0,Distinction
2,David,65.0,First Class
3,Aryan,25.0,Fail


In [0]:
data=[(1,'A',1000,'IT'),(2,'B',1500,'IT'),(3,'C',2500,'IT'),(4,'D',3000,'HR'),(5,'E',2000,'HR'),(6,'F',2000,'HR'),(7,'G',4000,'Sales'),(8,'H',4000,'Sales'),(9,'I',1000,'Sales'),(10,'J',2000,'Sales')]
schema=['EMPID','EMPNAME','SALARY','DEPARTMENT']
df=spark.createDataFrame(data,schema)
display(df)

EMPID,EMPNAME,SALARY,DEPARTMENT
1,A,1000,IT
2,B,1500,IT
3,C,2500,IT
4,D,3000,HR
5,E,2000,HR
6,F,2000,HR
7,G,4000,Sales
8,H,4000,Sales
9,I,1000,Sales
10,J,2000,Sales


In [0]:
df1=df.select('*',(dense_rank().over(Window().partitionBy(df.DEPARTMENT).orderBy(df.SALARY.desc())).alias('Rank')))
df2=df1.filter(df1.Rank==1).drop(df1.Rank)
display(df2)

EMPID,EMPNAME,SALARY,DEPARTMENT
4,D,3000,HR
3,C,2500,IT
7,G,4000,Sales
8,H,4000,Sales


In [0]:
data=[(1,'IT'),(2,'Sales')]
schema=['DeptID','DeptName']
df=spark.createDataFrame(data,schema)
display(df)

DeptID,DeptName
1,IT
2,Sales


In [0]:
data=[(100,'Raj',None,1,'01-05-23',50000),(200,'Venki',100,1,'13-04-23',4000),(200,'Venki',100,1,'01-04-23',4500),(200,'Venki',100,1,'14-05-23',4020)]
schema=['EmpID','EmpName','MgrID','DeptID','Sal_Date','Sal']
df1=spark.createDataFrame(data,schema)
df2=spark.createDataFrame(data,schema)
display(df1)
display(df2)

EmpID,EmpName,MgrID,DeptID,Sal_Date,Sal
100,Raj,,1,01-05-23,50000
200,Venki,100.0,1,13-04-23,4000
200,Venki,100.0,1,01-04-23,4500
200,Venki,100.0,1,14-05-23,4020


EmpID,EmpName,MgrID,DeptID,Sal_Date,Sal
100,Raj,,1,01-05-23,50000
200,Venki,100.0,1,13-04-23,4000
200,Venki,100.0,1,01-04-23,4500
200,Venki,100.0,1,14-05-23,4020


In [0]:
df3=df1.join(df2,df1.MgrID==df2.EmpID,'left').drop(df2.EmpID,df1.MgrID,df2.MgrID,df2.DeptID,df2.Sal_Date,df2.Sal).select(df1.EmpID,df1.EmpName,df1.DeptID,df1.Sal_Date,df2.EmpName.alias('ManagerName'),df1.Sal)
df3=df3.select('EmpID','EmpName','DeptID',to_date(df3.Sal_Date,'dd-MM-yy').alias('Sal_Date'),'ManagerName','Sal')
df3=df3.select('*',year(df3.Sal_Date).alias('Salary_Year'),month(df3.Sal_Date).alias('Salary_Month'))
df4=df.join(df3,df.DeptID==df3.DeptID,'inner')
df4=df4.select('*').groupBy(df4.EmpName,df4.DeptName,df4.ManagerName,df4.EmpName,df4.Salary_Year,df4.Salary_Month).sum('Sal')
display(df4)

EmpName,DeptName,ManagerName,EmpName.1,Salary_Year,Salary_Month,sum(Sal)
Raj,IT,,Raj,2023,5,50000
Venki,IT,Raj,Venki,2023,4,8500
Venki,IT,Raj,Venki,2023,5,4020


In [0]:
df=spark.read.option('header',True).csv('/FileStore/tables/products_2000000.csv')

In [0]:
df.rdd.getNumPartitions()

Out[3]: 8

In [0]:
df1=df.select(spark_partition_id().alias('Partyid')).groupBy('Partyid').count()
display(df1)

Partyid,count
0,254648
1,253913
2,253849
3,253764
4,252412
5,252328
6,252279
7,226807
