In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
     |████████████████████████████████| 281.3 MB 35 kB/s             
[?25h  Preparing metadata (setup.py) ... [?25l- \ done
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
     |████████████████████████████████| 198 kB 43.5 MB/s            
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ done
[?25h  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=5d78794d1064dbca2f2ce76d041f12b0da113997a87f58e5619fcf4f37f2a3af
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built p

In [2]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import os
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("Pyspark_2").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/14 13:33:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


You are working with 1 core(s)


In [3]:
path =""

# Some csv data
wine_quality = spark.read.csv(path+'/kaggle/input/red-wine-quality-cortez-et-al-2009/winequality-red.csv',
                          inferSchema=True,header=True)

In [4]:
wine_quality.limit(5).toPandas()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5


In [5]:
wine_quality.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



# Creating New Columns and Features

#### Suppose we want to have a feature that is the square of the chlorides feature by the name 'sq_chlorides'. This can be accomplished with the withColumn() method of PySpark DataFrame.

With the withColumn() function the first value we pass in is the name of the new column and the second calls on the existing dataframe column name we want to use or any other expression that returns valid value for the new column being generated.

In [6]:
wine_quality.withColumn("sq_chlorides", wine_quality.chlorides**2)

DataFrame[fixed acidity: double, volatile acidity: double, citric acid: double, residual sugar: double, chlorides: double, free sulfur dioxide: double, total sulfur dioxide: double, density: double, pH: double, sulphates: double, alcohol: double, quality: int, sq_chlorides: double]

In [7]:
%%timeit
wine_quality_v1 = wine_quality.withColumn("sq_chlorides", wine_quality.chlorides**2)

19.9 ms ± 4.9 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [8]:
wine_quality.limit(5).toPandas()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5


We can see the new column got created and also the amount of time it took for the whole operation however there's a very interesting possibility of modification in the way we just created this feature. Let's check that out.

# User Defined Functions

Spark get its great advantage from the idea of operating on distributed workers/executors that execute a subset of the bigger task and the final results are combined to form the final outcome.

When working on operations in DataFrames on PySpark if we define a function in the traditional Pythonic manner the function would still execute and the output would be the same. However the time taken for the execution of the operation would be much more than what we would expect from a map and reduce framework based engine. 

Therefore to still be able to leverage the fast execution offered by Spark we need to create what are called User Defined Functions in PySpark.

In [9]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def square(x):
    return int(x**2)
square_udf = udf(lambda z: square(z), IntegerType())

In [10]:
%%timeit
wine_quality.withColumn("fast_sq_chlorides", square_udf(wine_quality.chlorides))

4.8 ms ± 1.05 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)


#### We can see with the notebook magic function timeit that even with a single core Spark's UDFs are much faster.


# Changing data types after read in
In the last notebook we had covered reading a data source in the specific format that we'd like to see it in.

Now we consider the situation were we need to change the data type of a specific column after the data source has already been read.

#### Available types:
    - DataType
    - NullType
    - StringType
    - BinaryType
    - BooleanType
    - DateType
    - TimestampType
    - DecimalType
    - DoubleType
    - FloatType
    - ByteType
    - IntegerType
    - LongType
    - ShortType
    - ArrayType
    - MapType
    - StructField
    - StructType
    

In [11]:
from pyspark.sql.functions import * 
from pyspark.sql.types import * # IntegerType

df = wine_quality.withColumn("quality", wine_quality["quality"].cast(DoubleType())) \
        .withColumn("density", wine_quality["density"].cast(IntegerType())) 
        
print(df.printSchema())


root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: integer (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: double (nullable = true)

None


We can see the schema of the DataFrame called df contains datatypes that we have defined by casting for the two columns. The important method to focus on here is the cast() method which can be called on a column of the dataframe and takes in datatype input available in pyspark.sql.types.

# Renaming Columns

The withColumnRenamed() method takes the old column name as the first argument and the new column name as the second argument in the call. It returns a new DataFrame with the updates.

In [12]:
df.withColumnRenamed('density','density_new_name')

DataFrame[fixed acidity: double, volatile acidity: double, citric acid: double, residual sugar: double, chlorides: double, free sulfur dioxide: double, total sulfur dioxide: double, density_new_name: int, pH: double, sulphates: double, alcohol: double, quality: double]

# Dropping a Column

This method returns a new DataFrame with the remaining columns.

In [13]:
df.drop("quality")

DataFrame[fixed acidity: double, volatile acidity: double, citric acid: double, residual sugar: double, chlorides: double, free sulfur dioxide: double, total sulfur dioxide: double, density: int, pH: double, sulphates: double, alcohol: double]

# Data Manipulation Methods

In [14]:
# Adding a text column to run some text manipulation functions on.
df1 = df.withColumn("text_col", lit("    TEST TEXT   "))

**Trim**

The "trim" function removes leading and trailing white space from a cell.

In [15]:
df1 = df.withColumn("text_col", lit("    TEST TEXT   "))
df1.select("text_col").show(5,False)

+----------------+
|text_col        |
+----------------+
|    TEST TEXT   |
|    TEST TEXT   |
|    TEST TEXT   |
|    TEST TEXT   |
|    TEST TEXT   |
+----------------+
only showing top 5 rows



In [16]:
df = df1.withColumn('text_col',trim(df1.text_col)) 
df.select("text_col").show(5,False)

+---------+
|text_col |
+---------+
|TEST TEXT|
|TEST TEXT|
|TEST TEXT|
|TEST TEXT|
|TEST TEXT|
+---------+
only showing top 5 rows



**Lower**

 Lower casing all values in a string.

In [17]:
df1 = df.withColumn('text_col',lower(df.text_col)) 
df1.select("text_col").show(5,False)

+---------+
|text_col |
+---------+
|test text|
|test text|
|test text|
|test text|
|test text|
+---------+
only showing top 5 rows



**Split a string around a pattern**

In [18]:
df1.select(split(df1.text_col, ' ').alias('split_text_col')).show(1,False)

+--------------+
|split_text_col|
+--------------+
|[test, text]  |
+--------------+
only showing top 1 row



**Regex to replace text**


In [19]:
df1.select('text_col',regexp_replace(df1.text_col, 'test', 'replaced').alias('regex_replaced_text')).show(1, False)

+---------+-------------------+
|text_col |regex_replaced_text|
+---------+-------------------+
|test text|replaced text      |
+---------+-------------------+
only showing top 1 row



# Condition Based Feature Creation

In [20]:
print("Option#1: select or withColumn() using when-otherwise without Otherwise condition")
from pyspark.sql.functions import when
df_subset = df.select("pH",(when(df.pH > 3, 'Good')).alias("Acidity Category"))

df_subset.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df_subset.columns]).show()

Option#1: select or withColumn() using when-otherwise without Otherwise condition
+---+----------------+
| pH|Acidity Category|
+---+----------------+
|  0|              35|
+---+----------------+



In [21]:
print("Option#1: select or withColumn() using when-otherwise with Otherwise condition")

df_subset_2 = df.select("pH",(when(df.pH >= 3.0, 'Good').otherwise('Bad')).alias("Acidity Category"))

df_subset_2.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df_subset_2.columns]).show()

Option#1: select or withColumn() using when-otherwise with Otherwise condition
+---+----------------+
| pH|Acidity Category|
+---+----------------+
|  0|               0|
+---+----------------+



In [22]:
print("Option2: select or withColumn() using expr function")
from pyspark.sql.functions import expr 
df.select("pH",expr("CASE WHEN pH >= 3.0 THEN 'Good' ELSE 'Bad' END").alias('Acidity Category')).show(3)

Option2: select or withColumn() using expr function
+----+----------------+
|  pH|Acidity Category|
+----+----------------+
|3.51|            Good|
| 3.2|            Good|
|3.26|            Good|
+----+----------------+
only showing top 3 rows



In [23]:
print("Option 3: selectExpr() using SQL equivalent CASE expression")
df.selectExpr("pH","CASE WHEN pH >= 3.0 THEN  'Good' ELSE 'Bad' END AS Acidity_Category").show(3)

Option 3: selectExpr() using SQL equivalent CASE expression
+----+----------------+
|  pH|Acidity_Category|
+----+----------------+
|3.51|            Good|
| 3.2|            Good|
|3.26|            Good|
+----+----------------+
only showing top 3 rows



# PySpark DF Immutability

#### As per Spark Architecture DataFrame is built on top of RDDs which are immutable in nature therefore Data frames are immutable in nature.

#### Most operations of the Dataframe will generate another dataframe and assign it to reference variable in case you have the assignment statement.

#### In order to verify the same, you can use id() method of rdd to get the unique identifier of your dataframe.

In [24]:
org_df = spark.createDataFrame([('Apple','Steve Jobs')], ['Organisation', 'Founder'])
print(org_df.show())
print(org_df.rdd.id())

                                                                                

+------------+----------+
|Organisation|   Founder|
+------------+----------+
|       Apple|Steve Jobs|
+------------+----------+

None
68


In [25]:
org_df = org_df.select(org_df.Organisation,org_df.Founder,concat_ws('-', org_df.Organisation, org_df.Founder).alias('concat'))

In [26]:
print(org_df.show())
print(org_df.rdd.id())

+------------+----------+----------------+
|Organisation|   Founder|          concat|
+------------+----------+----------------+
|       Apple|Steve Jobs|Apple-Steve Jobs|
+------------+----------+----------------+

None
74


#### So even though the variable which holds the dataframe is same but the unique rdd id is different after the update. This is because the old dataframe is not being referenced to anymore.


Regarding the withColumn() or any other similar operations when applied, such operations will generate a new data frame instead of updating the existing data frame.

Therefore calling withColumns() function too many times may cause the whole process to become expensive especially in terms of time.

> This method introduces a projection internally. Therefore, calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues and even StackOverflowException. To avoid this, use select() with the multiple columns at once.

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html

# Aggregating DataFrames in PySpark

This is similar to value_counts() method in pandas which gives out a frequency table.

In [27]:
wine_quality.groupBy("quality").count().show()

+-------+-----+
|quality|count|
+-------+-----+
|      6|  638|
|      3|   10|
|      5|  681|
|      4|   53|
|      8|   18|
|      7|  199|
+-------+-----+



In [28]:
wine_quality.groupBy("quality").mean("density").show()

+-------+------------------+
|quality|      avg(density)|
+-------+------------------+
|      6|0.9966150626959255|
|      3|0.9974640000000001|
|      5|0.9971036270190888|
|      4|0.9965424528301888|
|      8|0.9952122222222223|
|      7|0.9961042713567828|
+-------+------------------+



Distinct count method

In [29]:
wine_quality.select(countDistinct("quality").alias('CountDistinct_quality')).show()

+---------------------+
|CountDistinct_quality|
+---------------------+
|                    6|
+---------------------+



# Multiple Aggregations

Multiple column aggregations in a single go

In [30]:
wine_quality.groupBy("quality").agg(round(min(wine_quality.chlorides),3).alias("Min chlorides"),max(wine_quality.chlorides).alias("Max chlorides")).show()

+-------+-------------+-------------+
|quality|Min chlorides|Max chlorides|
+-------+-------------+-------------+
|      6|        0.034|        0.415|
|      3|        0.061|        0.267|
|      5|        0.039|        0.611|
|      4|        0.045|         0.61|
|      8|        0.044|        0.086|
|      7|        0.012|        0.358|
+-------+-------------+-------------+



# Aggregation Without Groupby

In [31]:
wine_quality.agg(min(wine_quality.density).alias("Min density"),max(wine_quality.density).alias("Max density")).show()

+------------------+-----------+
|       Min density|Max density|
+------------------+-----------+
|0.9900700000000001|    1.00369|
+------------------+-----------+



# Pivots

In [32]:
wine_quality.toPandas().head()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5


In [33]:
# creating a categorical to check effect of pH on quality
wine_quality_v2 = wine_quality.select('*', (when(df.pH >= 3.0, 'pH>=3').otherwise('pH<3')).alias("ph_category"))

In [34]:
wine_quality_v2.groupBy("quality").pivot("ph_category").count().show(10)

+-------+----+-----+
|quality|pH<3|pH>=3|
+-------+----+-----+
|      6|  10|  628|
|      3|null|   10|
|      5|  13|  668|
|      4|   1|   52|
|      8|   2|   16|
|      7|   3|  196|
+-------+----+-----+



The pivot method will create cross-tab with all the distinct values that the column passed to it consists of.

In [35]:
wine_quality.groupBy("quality").pivot("quality").count().show(10)

+-------+----+----+----+----+----+----+
|quality|   3|   4|   5|   6|   7|   8|
+-------+----+----+----+----+----+----+
|      6|null|null|null| 638|null|null|
|      3|  10|null|null|null|null|null|
|      5|null|null| 681|null|null|null|
|      4|null|  53|null|null|null|null|
|      8|null|null|null|null|null|  18|
|      7|null|null|null|null| 199|null|
+-------+----+----+----+----+----+----+



To limit the pivot method to create cross-tab for only specific entries of the column passed to it, another argument can be provided to it which should be the list of all distinct values to use for the cross-tab.

In [36]:
wine_quality.groupBy("quality").pivot("free sulfur dioxide", [11,25]).count().show(10)

+-------+----+----+
|quality|  11|  25|
+-------+----+----+
|      6|  26|   8|
|      3|null|null|
|      5|  20|  14|
|      4|   5|null|
|      8|null|null|
|      7|   8|   2|
+-------+----+----+



# Aggregation + Pivot

In [37]:
wine_quality_v2.groupBy("quality").pivot("ph_category").agg(mean(wine_quality_v2.chlorides).alias("Mean Chlorides"),mean(wine_quality_v2["free sulfur dioxide"]).alias("Mean free sulfur dioxide")).toPandas()#.show()

Unnamed: 0,quality,pH<3_Mean Chlorides,pH<3_Mean free sulfur dioxide,pH>=3_Mean Chlorides,pH>=3_Mean free sulfur dioxide
0,6,0.1203,16.3,0.084393,15.702229
1,3,,,0.1225,11.0
2,5,0.095,12.0,0.092692,17.080838
3,4,0.61,32.0,0.080692,11.884615
4,8,0.0775,24.0,0.067312,11.9375
5,7,0.082333,13.0,0.0765,14.061224


# Joins

In [38]:
valuesP = [('koala',1,'yes'),('caterpillar',2,'yes'),('deer',3,'yes'),('human',4,'yes')]
eats_plants = spark.createDataFrame(valuesP,['name','id','eats_plants'])

valuesM = [('shark',5,'yes'),('lion',6,'yes'),('tiger',7,'yes'),('human',4,'yes')]
eats_meat = spark.createDataFrame(valuesM,['name','id','eats_meat'])

print("Plant eaters (herbivores)")
print(eats_plants.show())
print("Meat eaters (carnivores)")
print(eats_meat.show())

Plant eaters (herbivores)
+-----------+---+-----------+
|       name| id|eats_plants|
+-----------+---+-----------+
|      koala|  1|        yes|
|caterpillar|  2|        yes|
|       deer|  3|        yes|
|      human|  4|        yes|
+-----------+---+-----------+

None
Meat eaters (carnivores)
+-----+---+---------+
| name| id|eats_meat|
+-----+---+---------+
|shark|  5|      yes|
| lion|  6|      yes|
|tiger|  7|      yes|
|human|  4|      yes|
+-----+---+---------+

None


# Inner Join

Inner joins get us ONLY the values that appear in BOTH tables we are joining. 

In [39]:
inner_join = eats_plants.join(eats_meat, ["name","id"],"inner")
print("Inner Join Example")
print(inner_join.show())

Inner Join Example
+-----+---+-----------+---------+
| name| id|eats_plants|eats_meat|
+-----+---+-----------+---------+
|human|  4|        yes|      yes|
+-----+---+-----------+---------+

None


# Left Join

Left joins gives the rows and columns that appear in the left table and only the rows from the right table which are successfully mapped to the table on the left along with the columns. A quick quality check we could do would be to make sure that the human column has the value "yes" for both eats_plants and eats_meat columns.

In [40]:
left_join = eats_plants.join(eats_meat, ["name","id"], how='left')
print("Left Join Example")
print(left_join.show())

Left Join Example
+-----------+---+-----------+---------+
|       name| id|eats_plants|eats_meat|
+-----------+---+-----------+---------+
|caterpillar|  2|        yes|     null|
|       deer|  3|        yes|     null|
|      human|  4|        yes|      yes|
|      koala|  1|        yes|     null|
+-----------+---+-----------+---------+

None


# Right Join

A right join gets the values that appear in the right table but not in the left. It also brings it's columns over. 

In [41]:
right_join = eats_plants.join(eats_meat,  ["name","id"],how='right')
print("Right Join")
print(right_join.show())

Right Join
+-----+---+-----------+---------+
| name| id|eats_plants|eats_meat|
+-----+---+-----------+---------+
|human|  4|        yes|      yes|
| lion|  6|       null|      yes|
|shark|  5|       null|      yes|
|tiger|  7|       null|      yes|
+-----+---+-----------+---------+

None


## Full Outer Join

Full outer joins will get all values from both tables and columns from both.

In [42]:
full_outer_join = eats_plants.join(eats_meat, ["name","id"],how='full') # Could also use 'full_outer'
print("Full Outer Join")
print(full_outer_join.show())

Full Outer Join
+-----------+---+-----------+---------+
|       name| id|eats_plants|eats_meat|
+-----------+---+-----------+---------+
|caterpillar|  2|        yes|     null|
|       deer|  3|        yes|     null|
|      human|  4|        yes|      yes|
|      koala|  1|        yes|     null|
|       lion|  6|       null|      yes|
|      shark|  5|       null|      yes|
|      tiger|  7|       null|      yes|
+-----------+---+-----------+---------+

None


# Union

In [43]:
new_df = eats_plants

df_concat = eats_plants.union(new_df)
print(("eats_plants df Counts:", eats_plants.count(), len(eats_plants.columns)))
print(("df_concat Counts:", df_concat.count(), len(df_concat.columns)))
print(eats_plants.show())
print(df_concat.show())

('eats_plants df Counts:', 4, 3)
('df_concat Counts:', 8, 3)
+-----------+---+-----------+
|       name| id|eats_plants|
+-----------+---+-----------+
|      koala|  1|        yes|
|caterpillar|  2|        yes|
|       deer|  3|        yes|
|      human|  4|        yes|
+-----------+---+-----------+

None
+-----------+---+-----------+
|       name| id|eats_plants|
+-----------+---+-----------+
|      koala|  1|        yes|
|caterpillar|  2|        yes|
|       deer|  3|        yes|
|      human|  4|        yes|
|      koala|  1|        yes|
|caterpillar|  2|        yes|
|       deer|  3|        yes|
|      human|  4|        yes|
+-----------+---+-----------+

None


# Join with Different Column Names

In [44]:
import pyspark.sql.functions as F
# creating different name for the same column
eats_plants = eats_plants.withColumnRenamed("name", "name_1")
eats_plants.join(eats_meat, eats_plants['name_1'] == eats_meat['name'], how='outer')\
   .show()

+-----------+----+-----------+-----+----+---------+
|     name_1|  id|eats_plants| name|  id|eats_meat|
+-----------+----+-----------+-----+----+---------+
|caterpillar|   2|        yes| null|null|     null|
|       deer|   3|        yes| null|null|     null|
|      human|   4|        yes|human|   4|      yes|
|      koala|   1|        yes| null|null|     null|
|       null|null|       null| lion|   6|      yes|
|       null|null|       null|shark|   5|      yes|
|       null|null|       null|tiger|   7|      yes|
+-----------+----+-----------+-----+----+---------+



#### This works well but has some extra duplicate columns common between the two tables that can now be dropped after the mapping is complete.

# Coalesce

#### What does coalesce do?
Returns the first column that is not null

Reference: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.coalesce.html

In [45]:
cDf = spark.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))
cDf.show()

+----+----+
|   a|   b|
+----+----+
|null|null|
|   1|null|
|null|   2|
+----+----+



In [46]:
cDf.select(coalesce(cDf["a"], cDf["b"])).show()

+--------------+
|coalesce(a, b)|
+--------------+
|          null|
|             1|
|             2|
+--------------+



# Coalesce in Joins

In [47]:
eats_plants.join(eats_meat, eats_plants['name_1'] == eats_meat['name'], how='outer')\
   .select('*', F.coalesce(F.col('name_1'), F.col('name')).alias('name_coalesced')) \
   .drop('name_1', 'name') \
   .show()

+----+-----------+----+---------+--------------+
|  id|eats_plants|  id|eats_meat|name_coalesced|
+----+-----------+----+---------+--------------+
|   2|        yes|null|     null|   caterpillar|
|   3|        yes|null|     null|          deer|
|   4|        yes|   4|      yes|         human|
|   1|        yes|null|     null|         koala|
|null|       null|   6|      yes|          lion|
|null|       null|   5|      yes|         shark|
|null|       null|   7|      yes|         tiger|
+----+-----------+----+---------+--------------+



We can see how this was able to get rid of extra columns in the data after the merge.

## Thanks and continue to follow for the next update in the same series!

## References

* https://www.educba.com/pyspark-withcolumn/
* https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015
* https://sparkbyexamples.com/pyspark/pyspark-withcolumn/
* https://sparkbyexamples.com/pyspark/pyspark-find-count-of-null-none-nan-values/
* https://stackoverflow.com/questions/53374140/if-dataframes-in-spark-are-immutable-why-are-we-able-to-modify-it-with-operatio