In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
data = [('shrikant', {'age':21, 'year':2022, 'salary':50000}),
       ('Sanket', {'age':24, 'year':2022, 'salary':53000}),
       ('Pratham', {'age':43, 'year':2022, 'salary':100000}),
       ('Prachi', {'age':22, 'year':2022, 'salary':60000}),
       ('Nidhi', {'age':22, 'year':2022, 'salary':64000})]
# schema = StructType([\
#                     StructField('Name', StringType(), True),\
#                     StructField('info', StringType(), True),\
#                     ])
df = spark.createDataFrame(data=data, schema=['Name', 'Info'])
df.display()

Name,Info
shrikant,"Map(salary -> 50000, year -> 2022, age -> 21)"
Sanket,"Map(salary -> 53000, year -> 2022, age -> 24)"
Pratham,"Map(salary -> 100000, year -> 2022, age -> 43)"
Prachi,"Map(salary -> 60000, year -> 2022, age -> 22)"
Nidhi,"Map(salary -> 64000, year -> 2022, age -> 22)"


In [0]:
df1 = df.select('Name', explode('Info'))
df1.display()

Name,key,value
shrikant,salary,50000
shrikant,year,2022
shrikant,age,21
Sanket,salary,53000
Sanket,year,2022
Sanket,age,24
Pratham,salary,100000
Pratham,year,2022
Pratham,age,43
Prachi,salary,60000


In [0]:
df2 = df1.groupBy('Name').pivot('key').sum('value')
df2.display()

Name,age,salary,year
shrikant,21,50000,2022
Prachi,22,60000,2022
Pratham,43,100000,2022
Sanket,24,53000,2022
Nidhi,22,64000,2022


In [0]:
df3 = df2.withColumn('salary dest',\
                     when(df2.salary>=100000, 'High').\
                     when(df2.salary>=60000, 'Good').\
                     otherwise('Average'))
df3.display()

Name,age,salary,year,salary dest
shrikant,21,50000,2022,Average
Prachi,22,60000,2022,Good
Pratham,43,100000,2022,High
Sanket,24,53000,2022,Average
Nidhi,22,64000,2022,Good


In [0]:
## create new dataframe
data_new = [('John', {'age':21, 'year':2022, 'salary':50000}),
       ('Ethos', {'age':24, 'year':2022, 'salary':53000}),
       ('Pratham', {'age':43, 'year':2022, 'salary':100000}),
       ('Prachi', {'age':22, 'year':2022, 'salary':60000}),
       ('Riya', {'age':22, 'year':2022, 'salary':64000})]
df_new = spark.createDataFrame(data=data_new, schema=['Name', 'Info'])
df_new.display()

Name,Info
John,"Map(salary -> 50000, year -> 2022, age -> 21)"
Ethos,"Map(salary -> 53000, year -> 2022, age -> 24)"
Pratham,"Map(salary -> 100000, year -> 2022, age -> 43)"
Prachi,"Map(salary -> 60000, year -> 2022, age -> 22)"
Riya,"Map(salary -> 64000, year -> 2022, age -> 22)"


In [0]:
df4 = df.union(df_new) ## union it's used for combine two dataframes
print('Union function dataframe : ')
df4.display()
df4 = df4.select('Name', explode('Info'))
print('After Apply explode function : ')
df4.display()
df4 = df4.groupBy('Name').pivot('key').agg({"value":'first'})
print('After Apply pivot function : ')
df4.display()
df4 = df4.drop_duplicates() ## it is used for drop duplicate values from dataframe
print('After Apply drop_duplicates function : ')
df4.display()

Name,Info
shrikant,"Map(salary -> 50000, year -> 2022, age -> 21)"
Sanket,"Map(salary -> 53000, year -> 2022, age -> 24)"
Pratham,"Map(salary -> 100000, year -> 2022, age -> 43)"
Prachi,"Map(salary -> 60000, year -> 2022, age -> 22)"
Nidhi,"Map(salary -> 64000, year -> 2022, age -> 22)"
John,"Map(salary -> 50000, year -> 2022, age -> 21)"
Ethos,"Map(salary -> 53000, year -> 2022, age -> 24)"
Pratham,"Map(salary -> 100000, year -> 2022, age -> 43)"
Prachi,"Map(salary -> 60000, year -> 2022, age -> 22)"
Riya,"Map(salary -> 64000, year -> 2022, age -> 22)"


Name,key,value
shrikant,salary,50000
shrikant,year,2022
shrikant,age,21
Sanket,salary,53000
Sanket,year,2022
Sanket,age,24
Pratham,salary,100000
Pratham,year,2022
Pratham,age,43
Prachi,salary,60000


Name,age,salary,year
shrikant,21,50000,2022
Prachi,22,60000,2022
Ethos,24,53000,2022
Pratham,43,100000,2022
Riya,22,64000,2022
John,21,50000,2022
Sanket,24,53000,2022
Nidhi,22,64000,2022


Name,age,salary,year
shrikant,21,50000,2022
Prachi,22,60000,2022
Ethos,24,53000,2022
Pratham,43,100000,2022
Riya,22,64000,2022
John,21,50000,2022
Sanket,24,53000,2022
Nidhi,22,64000,2022


In [0]:
data = [('shrikant', 25, 'satara', 63),
       ('john', 45, 'UP', 92),
       ('thomas', 35, 'Tokiyo', 43),
       ('nancy', 15, None, 56),
       ('rutu', 26, 'pune',None)]
df5 = spark.createDataFrame(data = data, schema=['Name', 'Age', 'City', 'Marks'])
df5.display()

Name,Age,City,Marks
shrikant,25,satara,63.0
john,45,UP,92.0
thomas,35,Tokiyo,43.0
nancy,15,,56.0
rutu,26,pune,


In [0]:
df5.where(df5.Marks.isNull()).display()
df5.where(df5.City.isNull()).display()

Name,Age,City,Marks
rutu,26,pune,


Name,Age,City,Marks
nancy,15,,56


In [0]:
df5.filter(df5.Marks.isNotNull()).display()
df5.filter(df5.City.isNotNull()).display()

Name,Age,City,Marks
shrikant,25,satara,63
john,45,UP,92
thomas,35,Tokiyo,43
nancy,15,,56


Name,Age,City,Marks
shrikant,25,satara,63.0
john,45,UP,92.0
thomas,35,Tokiyo,43.0
rutu,26,pune,


In [0]:
df5.dropna().display()

Name,Age,City,Marks
shrikant,25,satara,63
john,45,UP,92
thomas,35,Tokiyo,43


In [0]:
sc.defaultParallelism

In [0]:
spark.conf.get('spark.sql.files.maxPartitionBytes')

In [0]:
data = [('shrikant', 25, 'satara', 63),
       ('john', 45, 'UP', 92),
       ('thomas', 35, 'Tokiyo', 43),
       ('nancy', 15, None, 56),
       ('rutu', 26, 'pune',None)]
df6 = spark.createDataFrame(data = data, schema=['Name', 'Age', 'City', 'Marks'])
df6.rdd.getNumPartitions()

In [0]:
df6 = df6.rdd.glom()


In [0]:
df6 = df6.repartition(2)
df6.rdd.glom().collect()

In [0]:
df6 =df6.toPandas()
df6

Unnamed: 0,Name,Age,City,Marks
0,shrikant,25,satara,63.0
1,thomas,35,Tokiyo,43.0
2,rutu,26,pune,
3,john,45,UP,92.0
4,nancy,15,,56.0
