In [0]:
PySpark Groupby Explained with Example
Similar to SQL GROUP BY clause, PySpark groupBy() function is used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data. In this article, I will explain several groupBy() examples using PySpark (Spark with Python).
simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)
2. PySpark groupBy on DataFrame Columns
Let’s do the groupBy() on department column of DataFrame and then find the sum of salary for each department using sum() function.
df.groupBy("department").sum("salary").show(truncate=False)
Similarly, we can calculate the number of employees in each department using.
df.groupBy("department").count()
Calculate the minimum salary of each department using min()
df.groupBy("department").min("salary")
Calculate the maximin salary of each department using max()
df.groupBy("department").max("salary")
Calculate the average salary of each department using avg()
df.groupBy("department").avg( "salary")
Calculate the mean salary of each department using mean()
df.groupBy("department").mean( "salary") 

In [0]:
import pyspark
simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]
schema = ["employee_name","department","state","salary","age","bonus"]
df200 = spark.createDataFrame(data = simpleData,schema = schema)
df200.printSchema()
df200.show()
df200.groupBy("department").sum("salary").show()
df200.groupBy("department").count().show()
df200.groupBy("department").min("salary").show()
df200.groupBy("department").max("salary").show()
df200.groupBy("department").avg("salary").show()
df200.groupBy("department").mean("salary").show()


root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|     Sales|     257000|
|   Finance|   

In [0]:
3. Using Multiple columns
Similarly, we can also run groupBy and aggregate on two or more DataFrame columns, below example does group by on department,state and does sum() on salary and bonus columns.
//GroupBy on multiple columns
df.groupBy("department","state") \
    .sum("salary","bonus") \
    .show(false)

4. Running more aggregates at a time
Using agg() aggregate function we can calculate many aggregations at a time on a single statement using SQL functions sum(), avg(), min(), max() mean() e.t.c. In order to use these, we should import "from pyspark.sql.functions import sum,avg,max,min,mean,count"
from pyspark.sql.functions import sum,avg,max
df.groupBy("department") \
    .agg(sum("salary").alias("sum_salary"), \
         avg("salary").alias("avg_salary"), \
         sum("bonus").alias("sum_bonus"), \
         max("bonus").alias("max_bonus") \
     ) \
    .show(truncate=False)
This example does group on department column and calculates sum() and avg() of salary for each department and calculates sum() and max() of bonus for each department.
5. Using filter on aggregate data
Similar to SQL “HAVING” clause, On PySpark DataFrame we can use either where() or filter() function to filter the rows of aggregated data.
from pyspark.sql.functions import sum,avg,max
df.groupBy("department") \
    .agg(sum("salary").alias("sum_salary"), \
      avg("salary").alias("avg_salary"), \
      sum("bonus").alias("sum_bonus"), \
      max("bonus").alias("max_bonus")) \
    .where(col("sum_bonus") >= 50000) \
    .show(truncate=False)

In [0]:
from pyspark.sql.functions import sum,min,max,avg,col
df200.groupBy("department","state").sum("salary","bonus").show(truncate = False)
df200.groupBy("department").agg(sum("salary").alias("sum"),\
                               avg("salary").alias("salary"),\
                                min("salary").alias("min"),\
                                max("salary").alias("max")
                               ).show()
df200.groupBy("department").agg(sum("salary").alias("sum_salary"),avg("salary").alias("avg_salary"),
                                sum("bonus").alias("bonus_sum"),max("bonus").alias("max_bonus")).where(col("bonus_sum")>=50000).show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-478151557241616>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;32mfrom[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mfunctions[0m [0;32mimport[0m [0msum[0m[0;34m,[0m[0mmin[0m[0;34m,[0m[0mmax[0m[0;34m,[0m[0mavg[0m[0;34m,[0m[0mcol[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mdf200[0m[0;34m.[0m[0mgroupBy[0m[0;34m([0m[0;34m"department"[0m[0;34m,[0m[0;34m"state"[0m[0;34m)[0m[0;34m.[0m[0msum[0m[0;34m([0m[0;34m"salary"[0m[0;34m,[0m[0;34m"bonus"[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0mtruncate[0m [0;34m=[0m [0;32mFalse[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      3[0m df200.groupBy("department").agg(sum("salary").alias("sum"),\
[1;32m      4[0m                                [0mavg[0m[0;34m

In [0]:
1. PySpark Join Syntax
PySpark SQL join has a below syntax and it can be accessed directly from DataFrame.
join(self, other, on=None, how=None)
join() operation takes parameters as below and returns DataFrame.

param other: Right side of the join
param on: a string for the join column name
param how: default inner. Must be one of inner, cross, outer,full, full_outer, left, left_outer, right, right_outer,left_semi, and left_anti.
You can also write Join expression by adding where() and filter() methods on DataFrame and can have Join on multiple columns.

2. PySpark Join Types
Below are the different Join Types PySpark supports.
Join String	Equivalent SQL Join
inner	INNER JOIN
outer, full, fullouter, full_outer	FULL OUTER JOIN
left, leftouter, left_outer	LEFT JOIN
right, rightouter, right_outer	RIGHT JOIN
cross	
anti, leftanti, left_anti	
semi, leftsemi, left_semi	


emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

In [0]:
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
emp=[(1,"smith",-1,"2018","10","M",3000),\
     (2,"Rose",1,"2010","20","M",4000),\
     (3,"Williams",1,"2010","10","M",1000),\
     (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
    ]

empcolumns = ["emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary"]
empdf = spark.createDataFrame(data = emp, schema = empcolumns)
#empdf.printSchema()
empdf.show()

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40)   
]

deptcolumns=["dept_name","dept_id"]
deptdf = spark.createDataFrame(data = dept,schema=deptcolumns)
deptdf.show()

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|     1|   smith|             -1|       2018|         10|     M|  3000|
|     2|    Rose|              1|       2010|         20|     M|  4000|
|     3|Williams|              1|       2010|         10|     M|  1000|
|     4|   Jones|              2|       2005|         10|     F|  2000|
|     5|   Brown|              2|       2010|         40|      |    -1|
|     6|   Brown|              2|       2010|         50|      |    -1|
+------+--------+---------------+-----------+-----------+------+------+

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+



In [0]:
#innerjoin
#syntax 
empdf.join(deptdf,empdf.emp_dept_id==deptdf.dept_id,"inner")\
.show()
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"outer").show(truncate = False)
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"full").show(2)
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"fullouter").show()
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"left").show()
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"leftouter").show()
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"right").show()
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"rightouter").show()
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"leftsemi").show()
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"leftanti").select("emp_id","name").show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+-

In [0]:
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"leftanti").select("emp_id","name").show()
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"inner").select(col("emp_id"),col("dept_id"),col("dept_name")).where(col("salary") >1000).show()
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"inner").select(empdf.emp_id,deptdf.dept_id).where((deptdf.dept_id == 10)& (empdf.salary>"1000")).show()
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"inner").select(empdf.emp_id,deptdf.dept_id).where((deptdf.dept_id == 10)& (empdf.salary.cast("int")>"1000")).show()

df = empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"inner").select(empdf.emp_id,deptdf.dept_id).where((deptdf.dept_id == 10)& (empdf.salary.cast("int")>"1000"))
df.show()

+------+-----+
|emp_id| name|
+------+-----+
|     6|Brown|
+------+-----+

+------+-------+---------+
|emp_id|dept_id|dept_name|
+------+-------+---------+
|     1|     10|  Finance|
|     4|     10|  Finance|
|     2|     20|Marketing|
+------+-------+---------+

+------+-------+
|emp_id|dept_id|
+------+-------+
|     1|     10|
|     4|     10|
+------+-------+

+------+-------+
|emp_id|dept_id|
+------+-------+
|     1|     10|
|     4|     10|
+------+-------+

+------+-------+
|emp_id|dept_id|
+------+-------+
|     1|     10|
|     4|     10|
+------+-------+



In [0]:
#selef join
empdf.show()
empdf.alias("emp1").join(empdf.alias("emp2"),col("emp1.emp_id")==col("emp2.superior_emp_id"),"inner").show()
deptdf.show()
empdf.alias("emp11").join(empdf.alias("emp22"),col("emp11.emp_id")==col("emp22.superior_emp_id"),"inner").select("emp11.emp_id","emp11.name","emp22.salary").show()

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|     1|   smith|             -1|       2018|         10|     M|  3000|
|     2|    Rose|              1|       2010|         20|     M|  4000|
|     3|Williams|              1|       2010|         10|     M|  1000|
|     4|   Jones|              2|       2005|         10|     F|  2000|
|     5|   Brown|              2|       2010|         40|      |    -1|
|     6|   Brown|              2|       2010|         50|      |    -1|
+------+--------+---------------+-----------+-----------+------+------+

+------+-----+---------------+-----------+-----------+------+------+------+--------+---------------+-----------+-----------+------+------+
|emp_id| name|superior_emp_id|year_joined|emp_dept_id|gender|salary|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+

In [0]:
4. Using SQL Expression
Since PySpark SQL support native SQL syntax, we can also write join operations after creating temporary tables on DataFrames and use these tables on spark.sql().


empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

In [0]:
empdf.createOrReplaceTempView("EMP")
deptdf.createOrReplaceTempView("DEP")
joinDF = spark.sql("select * from EMP e,DEP d where e.emp_dept_id==d.dept_id").show()
joinDF2 = spark.sql("select * from EMP inner join DEP on EMP.emp_dept_id == DEP.dept_id").show()
joinDF2 = spark.sql("select * from EMP left join DEP on EMP.emp_dept_id == DEP.dept_id").show()
joinDF2 = spark.sql("select * from EMP right join DEP on EMP.emp_dept_id == DEP.dept_id").show()
joinDF2 = spark.sql("select * from EMP left outer join DEP on EMP.emp_dept_id == DEP.dept_id").show()
joinDF2 = spark.sql("select * from EMP right outer join DEP on EMP.emp_dept_id == DEP.dept_id").show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+-

In [0]:
5. PySpark SQL Join on multiple DataFrames
When you need to join more than two tables, you either use SQL expression after creating a temporary view on the DataFrame or use the result of join operation to join with another DataFrame like chaining them. for example


df1.join(df2,df1.id1 == df2.id2,"inner") \
   .join(df3,df1.id1 == df3.id3,"inner")

In [0]:
df1.join(df2,df1.id == df2.id,"inner").join(df3,df1.id==df2.id,"inner")

In [0]:
PySpark Union and UnionAll Explained
Dataframe union() – union() method of the DataFrame is used to merge two DataFrame’s of the same structure/schema. If schemas are not the same it returns an error.
DataFrame unionAll() – unionAll() is deprecated since Spark “2.0.0” version and replaced with union().


In [0]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkByexample.com").getOrCreate()
simpledata = [("james","sales","NY",90000,34,10000),\
              ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000)    
]
columns = ["employee_name","department","state","salary","age","bonus"]
df300 = spark.createDataFrame(data = simpledata, schema = columns)

simpleData2 = [("James","Sales","NY",90000,34,10000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
column2= ["employee_name","department","state","salary","age","bonus"]
df301 = spark.createDataFrame(data = simpleData2 , schema = column2)
unionDF = df300.union(df301)
unionDF.show()

unionDF2 = df300.unionAll(df301)
unionDF.show()
unionDF2.count().cast('str').show()






+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        james|     sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        James|     Sales|   NY| 90000| 34|10000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        james|     sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-821191946760139>[0m in [0;36m<module>[0;34m[0m
[1;32m     23[0m [0munionDF2[0m [0;34m=[0m [0mdf300[0m[0;34m.[0m[0munionAll[0m[0;34m([0m[0mdf301[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     24[0m [0munionDF[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 25[0;31m [0munionDF2[0m[0;34m.[0m[0mcount[0m[0;34m([0m[0;34m)[0m[0;34m.[0m[0mcast[0m[0;34m([0m[0;34m'str'[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     26[0m [0;34m[0m[0m
[1;32m     27[0m [0;34m[0m[0m

[0;31mAttributeError[0m: 'int' object has no attribute 'cast'

In [0]:
PySpark Merge Two DataFrames with Different Columns
In PySpark to merge two DataFrames with different columns, will use the similar approach explain above and uses unionByName() transformation. First let’s create DataFrame’s with different number of columns.
#Create DataFrame df1 with columns name,dept & age
data = [("James","Sales",34), ("Michael","Sales",56), \
    ("Robert","Sales",30), ("Maria","Finance",24) ]
columns= ["name","dept","age"]
df1 = spark.createDataFrame(data = data, schema = columns)
df1.printSchema()

#Create DataFrame df1 with columns name,dep,state & salary
data2=[("James","Sales","NY",9000),("Maria","Finance","CA",9000), \
    ("Jen","Finance","NY",7900),("Jeff","Marketing","CA",8000)]
columns2= ["name","dept","state","salary"]
df2 = spark.createDataFrame(data = data2, schema = columns2)
df2.printSchema()


#Add missing columns 'state' & 'salary' to df1
from pyspark.sql.functions import lit
for column in [column for column in df2.columns if column not in df1.columns]:
    df1 = df1.withColumn(column, lit(None))

#Add missing column 'age' to df2
for column in [column for column in df1.columns if column not in df2.columns]:
    df2 = df2.withColumn(column, lit(None))
    
#Finally join two dataframe's df1 & df2 by name
merged_df=df1.unionByName(df2)
merged_df.show()


In [0]:
from pyspark.sql.functions import lit
data1 = [("James","Sales",34), ("Michael","Sales",56), \
    ("Robert","Sales",30), ("Maria","Finance",24) ]
columns1= ["name","dept","age"]
df1 = spark.createDataFrame(data = data1, schema = columns1)
df1.show()

#Create DataFrame df1 with columns name,dep,state & salary
data2=[("James","Sales","NY",9000),("Maria","Finance","CA",9000), \
    ("Jen","Finance","NY",7900),("Jeff","Marketing","CA",8000)]
columns2= ["name","dept","state","salary"]
df2 = spark.createDataFrame(data = data2, schema = columns2)
df2.show()

for column in [column for column in df2.columns if column not in df1.columns]:
    df1=df1.withColumn(column,lit(None))
for column in [column for column in df1.columns if column not in df2.columns]:
    df2=df2.withColumn(column,lit(None))
    
merged_df = df1.unionByName(df2)
merged_df.show()

+-------+-------+---+
|   name|   dept|age|
+-------+-------+---+
|  James|  Sales| 34|
|Michael|  Sales| 56|
| Robert|  Sales| 30|
|  Maria|Finance| 24|
+-------+-------+---+

+-----+---------+-----+------+
| name|     dept|state|salary|
+-----+---------+-----+------+
|James|    Sales|   NY|  9000|
|Maria|  Finance|   CA|  9000|
|  Jen|  Finance|   NY|  7900|
| Jeff|Marketing|   CA|  8000|
+-----+---------+-----+------+

+-------+---------+----+-----+------+
|   name|     dept| age|state|salary|
+-------+---------+----+-----+------+
|  James|    Sales|  34| null|  null|
|Michael|    Sales|  56| null|  null|
| Robert|    Sales|  30| null|  null|
|  Maria|  Finance|  24| null|  null|
|  James|    Sales|null|   NY|  9000|
|  Maria|  Finance|null|   CA|  9000|
|    Jen|  Finance|null|   NY|  7900|
|   Jeff|Marketing|null|   CA|  8000|
+-------+---------+----+-----+------+



In [0]:
PySpark map() Transformation
PySpark map (map()) is an RDD transformation that is used to apply the transformation function (lambda) on every element of RDD/DataFrame and returns a new RDD. In this article, you will learn the syntax and usage of the RDD map() transformation with an example and how to use it with DataFrame.

RDD map() transformation is used to apply any complex operations like adding a column, updating a column, transforming the data e.t.c, the output of map transformations would always have the same number of records as input.
Note1: DataFrame doesn’t have map() transformation to use with DataFrame hence you need to DataFrame to RDD first.
Note2: If you have a heavy initialization use PySpark mapPartitions() transformation instead of map(), as with mapPartitions() heavy initialization executes only once for each partition instead of every record.
First, let’s create an RDD from the list.

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
    .appName("SparkByExamples.com").getOrCreate()

data = ["Project","Gutenberg’s","Alice’s","Adventures",
"in","Wonderland","Project","Gutenberg’s","Adventures",
"in","Wonderland","Project","Gutenberg’s"]

rdd=spark.sparkContext.parallelize(data)
map() Syntax

map(f, preservesPartitioning=False)
PySpark map() Example with RDD
In this PySpark map() example, we are adding a new element with value 1 for each element, the result of the RDD is PairRDDFunctions which contains key-value pairs, word of type String as Key and 1 of type Int as value.

rdd2=rdd.map(lambda x: (x,1))
for element in rdd2.collect():
    print(element)

In [0]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("fhdf").getOrCreate()
data = ["Project","Gutenberg","Alice","Adventures",
"in","Wonderland","Project","Gutenberg’s","Adventures",
"in","Wonderland","Project","Gutenberg’s"]
rdd = spark.sparkContext.parallelize(data)
rdd2 = rdd.map(lambda x: (x,2))
for element in rdd2.collect():print(element)

('Project', 2)
('Gutenberg', 2)
('Alice', 2)
('Adventures', 2)
('in', 2)
('Wonderland', 2)
('Project', 2)
('Gutenberg’s', 2)
('Adventures', 2)
('in', 2)
('Wonderland', 2)
('Project', 2)
('Gutenberg’s', 2)


In [0]:
PySpark map() Example with DataFrame
PySpark DataFrame doesn’t have map() transformation to apply the lambda function, when you wanted to apply the custom transformation, you need to convert the DataFrame to RDD and apply the map() transformation. Let’s use another dataset to explain this.
data301 = [('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]
columns301 = ["firstname","lastname","gender","salary"]

df302 = spark.createDataFrame(data = data301,schema = columns301)
df302.show()
# Refering columns by index.
rdd2=df302.rdd.map(lambda x: 
    (x[0]+","+x[1],x[2],x[3]*2)
    )  
df2=rdd2.toDF(["name","gender","new_salary"]   )
df2.show()
#refering columns names
rdd2=df302.rdd.map(lambda x:(x.firstname+","+x.lastname,x.gender,x.salary*2))

You can also create a custom function to perform an operation. Below func1() function executes for every DataFrame row from the lambda function.

# By Calling function
def func1(x):
    firstName=x.firstname
    lastName=x.lastname
    name=firstName+","+lastName
    gender=x.gender.lower()
    salary=x.salary*2
    return (name,gender,salary)

rdd2=df.rdd.map(lambda x: func1(x))



[0;36m  File [0;32m"<command-3118337423178868>"[0;36m, line [0;32m1[0m
[0;31m    PySpark map() Example with DataFrame[0m
[0m            ^[0m
[0;31mSyntaxError[0m[0;31m:[0m invalid syntax


In [0]:
data301 = [('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]

columns301 = ["firstname","lastname","gender","salary"]

df302 = spark.createDataFrame(data = data301,schema = columns301)
df302.show()
rdd = spark.sparkContext.parallelize(data301)
rdd3=df302.rdd.map(lambda x: (x[0]+","+x[1],x[2],x[3]*2))
df303 = rdd3.toDF(["name","gender","salary"])
df303.show()
rdd4=df302.rdd.map(lambda x:(x.firstname+","+x.lastname,x.gender,x.salary*2))
df304 = rdd4.toDF(["name","gender","salary"])
df304.show()
rdd5 = df302.rdd.map(lambda x:(x["firstname"]+","+x["lastname"],x["gender"],x["salary"]*2))
df305 = rdd5.toDF(["name","gender","salary"])
df305.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|    30|
|     Anna|    Rose|     F|    41|
|   Robert|Williams|     M|    62|
+---------+--------+------+------+

+---------------+------+------+
|           name|gender|salary|
+---------------+------+------+
|    James,Smith|     M|    60|
|      Anna,Rose|     F|    82|
|Robert,Williams|     M|   124|
+---------------+------+------+

+---------------+------+------+
|           name|gender|salary|
+---------------+------+------+
|    James,Smith|     M|    60|
|      Anna,Rose|     F|    82|
|Robert,Williams|     M|   124|
+---------------+------+------+

+---------------+------+------+
|           name|gender|salary|
+---------------+------+------+
|    James,Smith|     M|    60|
|      Anna,Rose|     F|    82|
|Robert,Williams|     M|   124|
+---------------+------+------+



In [0]:
def func1(x):
    firstName = x.firstname
    lastName = x.lastname
    name = firstName+","+lastName
    gender = x.gender.lower()
    salary = x.salary*2
    return(name,gender,salary)
rdd6 = df302.rdd.map(lambda x: func1(x))
df306 = rdd6.toDF(["name","gender","salary"])
df306.show()



+---------------+------+------+
|           name|gender|salary|
+---------------+------+------+
|    James,Smith|     m|    60|
|      Anna,Rose|     f|    82|
|Robert,Williams|     m|   124|
+---------------+------+------+



In [0]:
PySpark flatMap() Transformation
PySpark flatMap() is a transformation operation that flattens the RDD/DataFrame (array/map DataFrame columns) after applying the function on every element and returns a new PySpark RDD/DataFrame. In this article, you will learn the syntax and usage of the PySpark flatMap() with an example.
data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)
for element in rdd.collect():
    print(element)
    
    flatMap() Example
Now, let’s see with an example of how to apply a flatMap() transformation on RDD. In the below example, first, it splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a single word on each record.
rdd2=rdd.flatMap(lambda x: x.split(" "))
for element in rdd2.collect():
    print(element)



In [0]:
data20 = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd10=spark.sparkContext.parallelize(data20)
for i in rdd10.collect():print(i)

rdd11 =rdd10.flatMap(lambda x:x.split(" "))
for j in rdd11.collect():print(j)



Project Gutenberg’s
Alice’s Adventures in Wonderland
Project Gutenberg’s
Adventures in Wonderland
Project Gutenberg’s
Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
Project
Gutenberg’s
Adventures
in
Wonderland
Project
Gutenberg’s


In [0]:
Using flatMap() transformation on DataFrame
Unfortunately, PySpark DataFame doesn’t have flatMap() transformation however, DataFrame has explode() SQL function that is used to flatten the column. Below is a complete example.

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pyspark-by-examples').getOrCreate()

arrayData = [
        ('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
        ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
        ('Robert',['CSharp',''],{'hair':'red','eye':''}),
        ('Washington',None,None),
        ('Jefferson',['1','2'],{})]
df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])

from pyspark.sql.functions import explode
df2 = df.select(df.name,explode(df.knownLanguages))
df2.printSchema()
df2.show()


In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
spark = SparkSession.builder.master("krishnareddy").appName("pyspark.com").getOrCreate()
arraydata = [('james',['java','scala'],{'hair':'black','eye':'brown'}),
        ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
        ('Robert',['CSharp',''],{'hair':'red','eye':''}),
        ('Washington',None,None),
        ('Jefferson',['1','2'],{})]
df22 = spark.createDataFrame(data= arraydata ,schema = ['name','knownlanguage','properties'])
df21 = df22.select(df22.name,explode(df22.properties))
df21.printSchema()
df21.show()

root
 |-- name: string (nullable = true)
 |-- key: string (nullable = false)
 |-- value: string (nullable = true)

+-------+----+-----+
|   name| key|value|
+-------+----+-----+
|  james| eye|brown|
|  james|hair|black|
|Michael| eye| null|
|Michael|hair|brown|
| Robert| eye|     |
| Robert|hair|  red|
+-------+----+-----+



In [0]:
Using foreach() to Loop Through Rows in DataFrame
Similar to map(), foreach() also applied to every row of DataFrame, the difference being foreach() is an action and it returns nothing. Below are some examples to iterate through DataFrame using for each.
# Foreach example
def f(x): print(x)
df.foreach(f)

# Another example
df.foreach(lambda x: 
    print("Data ==>"+x["firstname"]+","+x["lastname"]+","+x["gender"]+","+str(x["salary"]*2))
    ) 
Using pandas() to Iterate
If you have a small dataset, you can also Convert PySpark DataFrame to Pandas and use pandas to iterate through. Use spark.sql.execution.arrow.enabled config to enable Apache Arrow with Spark. Apache Spark uses Apache Arrow which is an in-memory columnar format to transfer the data between Python and JVM.


# Using pandas
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandasDF = df.toPandas()
for index, row in pandasDF.iterrows():
    print(row['firstname'], row['gender'])
Collect Data As List and Loop Through
You can also Collect the PySpark DataFrame to Driver and iterate through Python, you can also use toLocalIterator().


# Collect the data to Python List
dataCollect = df.collect()
for row in dataCollect:
    print(row['firstname'] + "," +row['lastname'])

#Using toLocalIterator()
dataCollect=df.rdd.toLocalIterator()
for row in dataCollect:
    print(row['firstname'] + "," +row['lastname'])

In [0]:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = [('James','Smith','M',30),('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]
columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
def f(x): print(x)
df.foreach(f)

df.foreach(lambda x:
          print("data==>"+x["firstname"]+","+x["lastname"]+","+x["gender"]+","+str(x["salary"]*2)
          ))

In [0]:
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandasDF = df.toPandas()
for index,row in pandasDF.iterrows():print(row['firstname'],row['gender'],)

James M
Anna F
Robert M


In [0]:
datacollect = df.collect()
for row in datacollect:print(row['firstname']+","+row['gender'])

datacollect1 = df.rdd.toLocalIterator()
for row in datacollect1:print(row['firstname']+","+str(row['salary']*2))

James,M
Anna,F
Robert,M
James,60
Anna,82
Robert,124


In [0]:
PySpark fillna() & fill() – Replace NULL/None Values
In PySpark, DataFrame.fillna() or DataFrameNaFunctions.fill() is used to replace NULL/None values on all or selected multiple DataFrame columns with either zero(0), empty string, space, or any constant literal values.

While working on PySpark DataFrame we often need to replace null values since certain operations on null value return error hence, we need to graciously handle nulls as the first step before processing. Also, while writing to a file, it’s always best practice to replace null values, not doing this result nulls on the output file.
As part of the cleanup, sometimes you may need to Drop Rows with NULL/None Values in PySpark DataFrame and Filter Rows by checking IS NULL/NOT NULL conditions.
In this article, I will use both fill() and fillna() to replace null/none values with an empty string, constant value, and zero(0) on Dataframe columns integer, string with Python examples.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()

filePath="resources/small_zipcode.csv"
df = spark.read.options(header='true', inferSchema='true') \
          .csv(filePath)

df.printSchema()
df.show(truncate=False)

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[8]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()

#filePath="D:\\Data\\small_zipcode.csv"

#df = spark.read.options(header='true', inferSchema='True').csv(filePath)
#df = spark.read.csv(filePath)

#df = spark.read.csv("D:\\Data\\small_zipcode.csv")

#df = spark.read.csv('D:\Data\small_zipcode.csv', inferSchema = True, header = True)
#df.printSchema()
#df.show(truncate=False)
simpleData9 = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,14000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]
schema9 = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData9 , schema = schema9)
df.printSchema()
df.show()

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|     | 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|     | 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|14000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [0]:
df.fillna(value ="IN",subset=["state"]).show()
df.na.fill({"state" :"unknow"}).show()
df.na.fill("unknown").show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|     | 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|     | 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|14000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|     | 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|

In [0]:
PySpark Pivot and Unpivot DataFrame
PySpark pivot() function is used to rotate/transpose the data from one column into multiple Dataframe columns and back using unpivot(). Pivot() It is an aggregation where one of the grouping columns values is transposed into individual columns with distinct data.

This tutorial describes and provides a PySpark example on how to create a Pivot table on DataFrame and Unpivot back.
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
#Create spark session
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
Pivot PySpark DataFrame
PySpark SQL provides pivot() function to rotate the data from one column into multiple columns. It is an aggregation where one of the grouping columns values is transposed into individual columns with distinct data. To get the total amount exported to each country of each product, will do group by Product, pivot by Country, and the sum of Amount.

pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)
This will transpose the countries from DataFrame rows into columns and produces the below output. where ever data is not present, it represents as null by default.
Pivot Performance improvement in PySpark 2.0
version 2.0 on-wards performance has been improved on Pivot, however, if you are using the lower version; note that pivot is a very expensive operation hence, it is recommended to provide column data (if known) as an argument to function as shown below.
countries = ["USA","China","Canada","Mexico"]
pivotDF = df.groupBy("Product").pivot("Country", countries).sum("Amount")
pivotDF.show(truncate=False)
Another approach is to do two-phase aggregation. PySpark 2.0 uses this implementation in order to improve the performance Spark-13749


pivotDF = df.groupBy("Product","Country") \
      .sum("Amount") \
      .groupBy("Product") \
      .pivot("Country") \
      .sum("sum(Amount)") \
pivotDF.show(truncate=False)
The above two examples return the same output but with better performance.

In [0]:
from pyspark.sql import SparkSession
#Create spark session
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
#df.show()
pivotdf = df.groupBy("Product").pivot("Country").sum("Amount").show()
pivotdf1 = df.groupBy("Country").pivot("Product").sum("Amount").show()
countries=["China","USA","Mexico","Canada"]
pivotdf = df.groupBy("Product").pivot("Country",countries).sum("Amount").show()
pivotdf3 = df.groupBy("Product","Country")\
            .sum("Amount")\
            .groupBy("Product")\
            .pivot("Country")\
            .sum("sum(Amount)")\
            .show()



root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)

+-------+------+-----+------+----+
|Product|Canada|China|Mexico| USA|
+-------+------+-----+------+----+
| Orange|  null| 4000|  null|4000|
|  Beans|  null| 1500|  2000|1600|
| Banana|  2000|  400|  null|1000|
|Carrots|  2000| 1200|  null|1500|
+-------+------+-----+------+----+

+-------+------+-----+-------+------+
|Country|Banana|Beans|Carrots|Orange|
+-------+------+-----+-------+------+
|  China|   400| 1500|   1200|  4000|
|    USA|  1000| 1600|   1500|  4000|
| Mexico|  null| 2000|   null|  null|
| Canada|  2000| null|   2000|  null|
+-------+------+-----+-------+------+

+-------+-----+----+------+------+
|Product|China| USA|Mexico|Canada|
+-------+-----+----+------+------+
| Orange| 4000|4000|  null|  null|
|  Beans| 1500|1600|  2000|  null|
| Banana|  400|1000|  null|  2000|
|Carrots| 1200|1500|  null|  2000|
+-------+-----+----+------+------+

+-------+----

In [0]:
Unpivot PySpark DataFrame
Unpivot is a reverse operation, we can achieve by rotating column values into rows values. PySpark SQL doesn’t have unpivot function hence will use the stack() function. Below code converts column countries to row.
from pyspark.sql.functions import expr
unpivotExpr = "stack(3, 'Canada', Canada, 'China', China, 'Mexico', Mexico) as (Country,Total)"
unPivotDF = pivotDF.select("Product", expr(unpivotExpr)) \
    .where("Total is not null")
unPivotDF.show(truncate=False)
unPivotDF.show()

In [0]:
from pyspark.sql.functions import expr
unpivoexpr = "stack(3,'Canada',Canada,'China',China,'Mexico',Mexico) as (Country,Total)"
unpivotdf = pivotdf.select("Product",expr(unpivoexpr)).where("Total is not null")
unpivotdf.show(truncate = False)
unpivotdf.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-3484641498263362>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;32mfrom[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mfunctions[0m [0;32mimport[0m [0mexpr[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m [0munpivoexpr[0m [0;34m=[0m [0;34m"stack(3,'Canada',Canada,'China',China,'Mexico',Mexico) as (Country,Total)"[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 3[0;31m [0munpivotdf[0m [0;34m=[0m [0mpivotdf[0m[0;34m.[0m[0mselect[0m[0;34m([0m[0;34m"Product"[0m[0;34m,[0m[0mexpr[0m[0;34m([0m[0munpivoexpr[0m[0;34m)[0m[0;34m)[0m[0;34m.[0m[0mwhere[0m[0;34m([0m[0;34m"Total is not null"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      4[0m [0munpivotdf[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0mtruncate[0m [0;34m=[0m [0;32mFalse[0m[

In [0]:
PySpark ArrayType Column With Examples
PySpark pyspark.sql.types.ArrayType (ArrayType extends DataType class) is used to define an array data type column on DataFrame that holds the same type of elements, In this article, I will explain how to create a DataFrame ArrayType column using org.apache.spark.sql.types.ArrayType class and applying some SQL functions on the array columns with examples.

While working with structured files (Avro, Parquet e.t.c) or semi-structured (JSON) files, we often get data with complex structures like MapType, ArrayType, StructType e.t.c. I will try my best to cover some mostly used functions on ArrayType columns.
What is PySpark ArrayType
PySpark ArrayType is a collection data type that extends the DataType class which is a superclass of all types in PySpark. All elements of ArrayType should have the same type of elements.
Create PySpark ArrayType
You can create an instance of an ArrayType using ArraType() class, This takes arguments valueType and one optional argument valueContainsNull to specify if a value can accept null, by default it takes True. valueType should be a PySpark type that extends DataType class.
from pyspark.sql.types import StringType, ArrayType
arrayCol = ArrayType(StringType(),False)
Above example creates string array and doesn’t not accept null values.

Create PySpark ArrayType Column Using StructType
Let’s create a DataFrame with few array columns by using PySpark StructType & StructField classes.


data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
]

from pyspark.sql.types import StringType, ArrayType,StructType,StructField
schema = StructType([ 
    StructField("name",StringType(),True), 
    StructField("languagesAtSchool",ArrayType(StringType()),True), 
    StructField("languagesAtWork",ArrayType(StringType()),True), 
    StructField("currentState", StringType(), True), 
    StructField("previousState", StringType(), True)
  ])

df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()
This snippet creates two Array columns languagesAtSchool and languagesAtWork which defines languages learned at School and languages using at work. For the rest of the article, I will use these array columns of DataFrame and provide examples of PySpark SQL array functions. printSchema() and show() from above snippet display below output.

PySpark ArrayType (Array) Functions
PySpark SQL provides several Array functions to work with the ArrayType column, In this section, we will see some of the most commonly used SQL functions.

explode()
Use explode() function to create a new row for each element in the given array column. There are various PySpark SQL explode functions available to work with Array columns.
from pyspark.sql.functions import explode
df.select(df.name,explode(df.languagesAtS

Split()
split() sql function returns an array type after splitting the string column by delimiter. Below example split the name column by comma delimiter.
 from pyspark.sql.functions import split
df.select(split(df.name,",").alias("nameAsArray")).show()

array()
Use array() function to create a new array column by merging the data from multiple columns. All input columns must have the same data type. The below example combines the data from currentState and previousState and creates a new column states.


from pyspark.sql.functions import array
df.select(df.name,array(df.currentState,df.previousState).alias("States")).show()


In [0]:
import pyspark
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,ArrayType,MapType
from pyspark.sql.functions import explode,split,array,array_contains
data = [(("james","","smith"),["java","scala","c++"],["spark","Java"],"oh,HO","ca"),
         (("Michael","","Rose"),["Spark","Java","C++"],["Spark","Java"],"NY,YN","NJ"),
 (("Robert","","Williams"),["CSharp","VB"],["Spark","Python"],"UT,AP","NV")    
]
columns = StructType([StructField("name",StructType([StructField("fname",StringType(),True),StructField("lname",StringType(),True),StructField("mname",StringType(),True)])),
                      StructField("languageAtSchool",ArrayType(StringType()),True),
                      StructField("languageAtWork",ArrayType(StringType()),True),
                      StructField("currentstatus",StringType(),True),
                      StructField("previousstate",StringType(),True)    
])
DF5 = spark.createDataFrame(data= data , schema= columns)
DF5.show()
#DF5.select(DF5.name,DF5.currentstatus,DF5.languageAtWork).show()
DF5.select(DF5.name,DF5.currentstatus,explode(DF5.languageAtWork)).show()
DF5.select(DF5.name,DF5.previousstate,explode(DF5.languageAtSchool)).show()
#split()
DF5.select(split(DF5.currentstatus,",").alias("arrayname")).show()
#array()
DF5.select(DF5.name,array(DF5.currentstatus,DF5.previousstate).alias("states")).show()
DF5.select(array(DF5.previousstate,DF5.currentstatus).alias("states1")).show()
DF5.select(DF5.name,array_contains(DF5.languageAtSchool,"Spark")).show()
DF5.select(DF5.languageAtSchool,array_contains(DF5.languageAtSchool,"C++")).show()

+--------------------+------------------+---------------+-------------+-------------+
|                name|  languageAtSchool| languageAtWork|currentstatus|previousstate|
+--------------------+------------------+---------------+-------------+-------------+
|    {james, , smith}|[java, scala, c++]|  [spark, Java]|        oh,HO|           ca|
|   {Michael, , Rose}|[Spark, Java, C++]|  [Spark, Java]|        NY,YN|           NJ|
|{Robert, , Williams}|      [CSharp, VB]|[Spark, Python]|        UT,AP|           NV|
+--------------------+------------------+---------------+-------------+-------------+

+--------------------+-------------+------+
|                name|currentstatus|   col|
+--------------------+-------------+------+
|    {james, , smith}|        oh,HO| spark|
|    {james, , smith}|        oh,HO|  Java|
|   {Michael, , Rose}|        NY,YN| Spark|
|   {Michael, , Rose}|        NY,YN|  Java|
|{Robert, , Williams}|        UT,AP| Spark|
|{Robert, , Williams}|        UT,AP|Python|
+

In [0]:
PySpark MapType (Dict) Usage with Examples
PySpark MapType (also called map type) is a data type to represent Python Dictionary (dict) to store key-value pair, a MapType object comprises three fields, keyType (a DataType), valueType (a DataType) and valueContainsNull (a BooleanType).

What is PySpark MapType
PySpark MapType is used to represent map key-value pair similar to python Dictionary (Dict), it extends DataType class which is a superclass of all types in PySpark and takes two mandatory arguments keyType and valueType of type DataType and one optional boolean argument valueContainsNull. keyType and valueType can be any type that extends the DataType class. for e.g StringType, IntegerType, ArrayType, MapType, StructType (struct) e.t.c.
1. Create PySpark MapType
In order to use MapType data type first, you need to import it from pyspark.sql.types.MapType and use MapType() constructor to create a map object.

from pyspark.sql.types import StringType, MapType
mapCol = MapType(StringType(),StringType(),False)
The First param keyType is used to specify the type of the key in the map.
The Second param valueType is used to specify the type of the value in the map.
Third parm valueContainsNull is an optional boolean type that is used to specify if the value of the second param can accept Null/None values.
The key of the map won’t accept None/Null values.
PySpark provides several SQL functions to work with MapType.
2. Create MapType From StructType
Let’s see how to create a MapType by using PySpark StructType & StructField, StructType() constructor takes list of StructField, StructField takes a fieldname and type of the value.


from pyspark.sql.types import StructField, StructType, StringType, MapType
schema = StructType([
    StructField('name', StringType(), True),
    StructField('properties', MapType(StringType(),StringType()),True)
])
Now let’s create a DataFrame by using above StructType schema.


from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
dataDictionary = [
        ('James',{'hair':'black','eye':'brown'}),
        ('Michael',{'hair':'brown','eye':None}),
        ('Robert',{'hair':'red','eye':'black'}),
        ('Washington',{'hair':'grey','eye':'grey'}),
        ('Jefferson',{'hair':'brown','eye':''})
        ]
df = spark.createDataFrame(data=dataDictionary, schema = schema)
df.printSchema()
df.show(truncate=False)
df.printSchema() yields the Schema and df.show() yields the DataFrame output.

3. Access PySpark MapType Elements
Let’s see how to extract the key and values from the PySpark DataFrame Dictionary column. Here I have used PySpark map transformation to read the values of properties (MapType column)


df3=df.rdd.map(lambda x: \
    (x.name,x.properties["hair"],x.properties["eye"])) \
    .toDF(["name","hair","eye"])
df3.printSchema()
df3.show()

Let’s use another way to get the value of a key from Map using getItem() of Column type, this method takes a key as an argument and returns a value.


df.withColumn("hair",df.properties.getItem("hair")) \
  .withColumn("eye",df.properties.getItem("eye")) \
  .drop("properties") \
  .show()

df.withColumn("hair",df.properties["hair"]) \
  .withColumn("eye",df.properties["eye"]) \
  .drop("properties") \
  .show()


In [0]:
import pyspark 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkbyexample.com").getOrCreate()
dataDictonary = [('james',{'eye':'balck','hair':'black'}),
                 ('krishna',{'eye':'black','hair':'white'}),
                 ('mani',{'eye':'red','hair':'brown'})    
]
datrcolumns = StructType([StructField("name",StringType(),True),
                          StructField("properties",MapType(StringType(),StringType()),True)])
    
DF6= spark.createDataFrame(data = dataDictonary,schema=datrcolumns)
#DF6.show(truncate = False)
DF7 = DF6.rdd.map(lambda x:(x.name,x.properties["hair"],x.properties["eye"])).toDF(["name","hair","eye"])
DF8=DF6.rdd.map(lambda x:(x.properties["eye"],x.properties["hair"],x.name)).toDF(["eye","hair","name"])
#DF7.show()
#DF8.show()
DF6.withColumn("hair",DF6.properties.getItem("hair")).withColumn("eye",DF6.properties.getItem("eye")).drop("properties").show()
DF6.withColumn("eye",DF6.properties.getItem("eye")).withColumn("hair",DF6.properties.getItem("hair")).show()
DF6.withColumn("eye",DF6.properties["eye"]).withColumn("hair",DF6.properties["hair"]).drop("properties").show()

+-------+-----+-----+
|   name| hair|  eye|
+-------+-----+-----+
|  james|black|balck|
|krishna|white|black|
|   mani|brown|  red|
+-------+-----+-----+

+-------+--------------------+-----+-----+
|   name|          properties|  eye| hair|
+-------+--------------------+-----+-----+
|  james|{eye -> balck, ha...|balck|black|
|krishna|{eye -> black, ha...|black|white|
|   mani|{eye -> red, hair...|  red|brown|
+-------+--------------------+-----+-----+

+-------+-----+-----+
|   name|  eye| hair|
+-------+-----+-----+
|  james|balck|black|
|krishna|black|white|
|   mani|  red|brown|
+-------+-----+-----+



In [0]:
from pyspark.sql.functions import explode,map_keys,map_values
DF6.select(DF6.name,explode("properties")).show()
DF6.select(DF6.name,map_keys(DF6.properties)).show()
DF6.select(DF6.name,map_values(DF6.properties)).show()


+-------+----+-----+
|   name| key|value|
+-------+----+-----+
|  james| eye|balck|
|  james|hair|black|
|krishna| eye|black|
|krishna|hair|white|
|   mani| eye|  red|
|   mani|hair|brown|
+-------+----+-----+

+-------+--------------------+
|   name|map_keys(properties)|
+-------+--------------------+
|  james|         [eye, hair]|
|krishna|         [eye, hair]|
|   mani|         [eye, hair]|
+-------+--------------------+

+-------+----------------------+
|   name|map_values(properties)|
+-------+----------------------+
|  james|        [balck, black]|
|krishna|        [black, white]|
|   mani|          [red, brown]|
+-------+----------------------+

