<a href="https://colab.research.google.com/github/t-abs/Apache-Spark/blob/main/Pyspark_9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Transformations(2) in Spark

In [23]:
from pyspark.sql import SparkSession


In [24]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()

# Read CSV file
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema","true") \
    .load("/content/employees.csv")


# Show data
df.show()

# Check schema
df.printSchema()

+----------+------+----------+---------------+------+-------+-----------------+--------------------+
|First Name|Gender|Start Date|Last Login Time|Salary|Bonus %|Senior Management|                Team|
+----------+------+----------+---------------+------+-------+-----------------+--------------------+
|   Douglas|  Male|  8/6/1993|       12:42 PM| 97308|  6.945|             true|           Marketing|
|    Thomas|  Male| 3/31/1996|        6:53 AM| 61933|   4.17|             true|                NULL|
|     Maria|Female| 4/23/1993|       11:17 AM|130590| 11.858|            false|             Finance|
|     Jerry|  Male|  3/4/2005|        1:00 PM|138705|   9.34|             true|             Finance|
|     Larry|  Male| 1/24/1998|        4:47 PM|101004|  1.389|             true|     Client Services|
|    Dennis|  Male| 4/18/1987|        1:35 AM|115163| 10.125|            false|               Legal|
|      Ruby|Female| 8/17/1987|        4:20 PM| 65476| 10.012|             true|            

1. Alias

In [25]:
df.select(col("First Name").alias("fn") , col("Salary").alias("sn")).show()

+--------+------+
|      fn|    sn|
+--------+------+
| Douglas| 97308|
|  Thomas| 61933|
|   Maria|130590|
|   Jerry|138705|
|   Larry|101004|
|  Dennis|115163|
|    Ruby| 65476|
|    NULL| 45906|
|  Angela| 95570|
| Frances|139852|
|  Louise| 63241|
|   Julie|102508|
| Brandon|112807|
|    Gary|109831|
|Kimberly| 41426|
| Lillian| 59414|
|  Jeremy| 90370|
|   Shawn|111737|
|   Diana|132940|
|   Donna| 81014|
+--------+------+
only showing top 20 rows


In [22]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [6]:
df.select(col("First Name")).show()

+----------+
|First Name|
+----------+
|   Douglas|
|    Thomas|
|     Maria|
|     Jerry|
|     Larry|
|    Dennis|
|      Ruby|
|      NULL|
|    Angela|
|   Frances|
|    Louise|
|     Julie|
|   Brandon|
|      Gary|
|  Kimberly|
|   Lillian|
|    Jeremy|
|     Shawn|
|     Diana|
|     Donna|
+----------+
only showing top 20 rows


2. Filter/Where - Both are same

In [39]:
df.where((col("Salary") > 15000) & (col("Bonus %") > 5)).show()

+----------+------+----------+---------------+------+-------+-----------------+--------------------+
|First Name|Gender|Start Date|Last Login Time|Salary|Bonus %|Senior Management|                Team|
+----------+------+----------+---------------+------+-------+-----------------+--------------------+
|   Douglas|  Male|  8/6/1993|       12:42 PM| 97308|  6.945|             true|           Marketing|
|     Maria|Female| 4/23/1993|       11:17 AM|130590| 11.858|            false|             Finance|
|     Jerry|  Male|  3/4/2005|        1:00 PM|138705|   9.34|             true|             Finance|
|    Dennis|  Male| 4/18/1987|        1:35 AM|115163| 10.125|            false|               Legal|
|      Ruby|Female| 8/17/1987|        4:20 PM| 65476| 10.012|             true|             Product|
|      NULL|Female| 7/20/2015|       10:43 AM| 45906| 11.598|             NULL|             Finance|
|    Angela|Female|11/22/2005|        6:29 AM| 95570| 18.523|             true|         Eng

3. Adding columns

In [31]:
df.withColumn("Surname" , lit("Singh")).show()

+----------+------+----------+---------------+------+-------+-----------------+--------------------+-------+
|First Name|Gender|Start Date|Last Login Time|Salary|Bonus %|Senior Management|                Team|Surname|
+----------+------+----------+---------------+------+-------+-----------------+--------------------+-------+
|   Douglas|  Male|  8/6/1993|       12:42 PM| 97308|  6.945|             true|           Marketing|  Singh|
|    Thomas|  Male| 3/31/1996|        6:53 AM| 61933|   4.17|             true|                NULL|  Singh|
|     Maria|Female| 4/23/1993|       11:17 AM|130590| 11.858|            false|             Finance|  Singh|
|     Jerry|  Male|  3/4/2005|        1:00 PM|138705|   9.34|             true|             Finance|  Singh|
|     Larry|  Male| 1/24/1998|        4:47 PM|101004|  1.389|             true|     Client Services|  Singh|
|    Dennis|  Male| 4/18/1987|        1:35 AM|115163| 10.125|            false|               Legal|  Singh|
|      Ruby|Female|

4. Rename Column

In [33]:
df.withColumnRenamed("First Name","fn").show()

+--------+------+----------+---------------+------+-------+-----------------+--------------------+
|      fn|Gender|Start Date|Last Login Time|Salary|Bonus %|Senior Management|                Team|
+--------+------+----------+---------------+------+-------+-----------------+--------------------+
| Douglas|  Male|  8/6/1993|       12:42 PM| 97308|  6.945|             true|           Marketing|
|  Thomas|  Male| 3/31/1996|        6:53 AM| 61933|   4.17|             true|                NULL|
|   Maria|Female| 4/23/1993|       11:17 AM|130590| 11.858|            false|             Finance|
|   Jerry|  Male|  3/4/2005|        1:00 PM|138705|   9.34|             true|             Finance|
|   Larry|  Male| 1/24/1998|        4:47 PM|101004|  1.389|             true|     Client Services|
|  Dennis|  Male| 4/18/1987|        1:35 AM|115163| 10.125|            false|               Legal|
|    Ruby|Female| 8/17/1987|        4:20 PM| 65476| 10.012|             true|             Product|
|    NULL|

5. Casting data type

In [40]:
df.withColumn("Salary_as_int",col("Salary").cast("Integer")).printSchema()

root
 |-- First Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Last Login Time: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Bonus %: double (nullable = true)
 |-- Senior Management: boolean (nullable = true)
 |-- Team: string (nullable = true)
 |-- Salary_as_int: integer (nullable = true)



6. Removing columns

In [35]:
df.drop("First Name").show()

+------+----------+---------------+------+-------+-----------------+--------------------+
|Gender|Start Date|Last Login Time|Salary|Bonus %|Senior Management|                Team|
+------+----------+---------------+------+-------+-----------------+--------------------+
|  Male|  8/6/1993|       12:42 PM| 97308|  6.945|             true|           Marketing|
|  Male| 3/31/1996|        6:53 AM| 61933|   4.17|             true|                NULL|
|Female| 4/23/1993|       11:17 AM|130590| 11.858|            false|             Finance|
|  Male|  3/4/2005|        1:00 PM|138705|   9.34|             true|             Finance|
|  Male| 1/24/1998|        4:47 PM|101004|  1.389|             true|     Client Services|
|  Male| 4/18/1987|        1:35 AM|115163| 10.125|            false|               Legal|
|Female| 8/17/1987|        4:20 PM| 65476| 10.012|             true|             Product|
|Female| 7/20/2015|       10:43 AM| 45906| 11.598|             NULL|             Finance|
|Female|11

In [9]:
df.select("First Name","Salary").show()

+----------+------+
|First Name|Salary|
+----------+------+
|   Douglas| 97308|
|    Thomas| 61933|
|     Maria|130590|
|     Jerry|138705|
|     Larry|101004|
|    Dennis|115163|
|      Ruby| 65476|
|      NULL| 45906|
|    Angela| 95570|
|   Frances|139852|
|    Louise| 63241|
|     Julie|102508|
|   Brandon|112807|
|      Gary|109831|
|  Kimberly| 41426|
|   Lillian| 59414|
|    Jeremy| 90370|
|     Shawn|111737|
|     Diana|132940|
|     Donna| 81014|
+----------+------+
only showing top 20 rows


In [36]:
df.createOrReplaceGlobalTempView("emp_tb")

In [37]:
spark.sql("""
select * from global_temp.emp_tb
""").show()


+----------+------+----------+---------------+------+-------+-----------------+--------------------+
|First Name|Gender|Start Date|Last Login Time|Salary|Bonus %|Senior Management|                Team|
+----------+------+----------+---------------+------+-------+-----------------+--------------------+
|   Douglas|  Male|  8/6/1993|       12:42 PM| 97308|  6.945|             true|           Marketing|
|    Thomas|  Male| 3/31/1996|        6:53 AM| 61933|   4.17|             true|                NULL|
|     Maria|Female| 4/23/1993|       11:17 AM|130590| 11.858|            false|             Finance|
|     Jerry|  Male|  3/4/2005|        1:00 PM|138705|   9.34|             true|             Finance|
|     Larry|  Male| 1/24/1998|        4:47 PM|101004|  1.389|             true|     Client Services|
|    Dennis|  Male| 4/18/1987|        1:35 AM|115163| 10.125|            false|               Legal|
|      Ruby|Female| 8/17/1987|        4:20 PM| 65476| 10.012|             true|            

In [19]:
df.show()

+----------+------+----------+---------------+------+-------+-----------------+--------------------+
|First Name|Gender|Start Date|Last Login Time|Salary|Bonus %|Senior Management|                Team|
+----------+------+----------+---------------+------+-------+-----------------+--------------------+
|   Douglas|  Male|  8/6/1993|       12:42 PM| 97308|  6.945|             true|           Marketing|
|    Thomas|  Male| 3/31/1996|        6:53 AM| 61933|   4.17|             true|                NULL|
|     Maria|Female| 4/23/1993|       11:17 AM|130590| 11.858|            false|             Finance|
|     Jerry|  Male|  3/4/2005|        1:00 PM|138705|   9.34|             true|             Finance|
|     Larry|  Male| 1/24/1998|        4:47 PM|101004|  1.389|             true|     Client Services|
|    Dennis|  Male| 4/18/1987|        1:35 AM|115163| 10.125|            false|               Legal|
|      Ruby|Female| 8/17/1987|        4:20 PM| 65476| 10.012|             true|            

In [41]:
spark.sql("""
select salary  from global_temp.emp_tb where salary > 15000
""").show()


+------+
|salary|
+------+
| 97308|
| 61933|
|130590|
|138705|
|101004|
|115163|
| 65476|
| 45906|
| 95570|
|139852|
| 63241|
|102508|
|112807|
|109831|
| 41426|
| 59414|
| 90370|
|111737|
|132940|
| 81014|
+------+
only showing top 20 rows


In [47]:
spark.sql("""
select *, "kumar" as `last name`,"id" as emp_id from global_temp.emp_tb
""").show()

+----------+------+----------+---------------+------+-------+-----------------+--------------------+---------+------+
|First Name|Gender|Start Date|Last Login Time|Salary|Bonus %|Senior Management|                Team|last name|emp_id|
+----------+------+----------+---------------+------+-------+-----------------+--------------------+---------+------+
|   Douglas|  Male|  8/6/1993|       12:42 PM| 97308|  6.945|             true|           Marketing|    kumar|    id|
|    Thomas|  Male| 3/31/1996|        6:53 AM| 61933|   4.17|             true|                NULL|    kumar|    id|
|     Maria|Female| 4/23/1993|       11:17 AM|130590| 11.858|            false|             Finance|    kumar|    id|
|     Jerry|  Male|  3/4/2005|        1:00 PM|138705|   9.34|             true|             Finance|    kumar|    id|
|     Larry|  Male| 1/24/1998|        4:47 PM|101004|  1.389|             true|     Client Services|    kumar|    id|
|    Dennis|  Male| 4/18/1987|        1:35 AM|115163| 10

In [48]:
spark.sql("""
select *, "kumar" as `last name`,"id" as emp_id from global_temp.emp_tb
""").printSchema()

root
 |-- First Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Last Login Time: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Bonus %: double (nullable = true)
 |-- Senior Management: boolean (nullable = true)
 |-- Team: string (nullable = true)
 |-- last name: string (nullable = false)
 |-- emp_id: string (nullable = false)

