### 01. Union and UnionByName Transformation

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("PySpark_Practice_01").getOrCreate()

In [4]:
americans = spark.createDataFrame(
    [("bab", 42), ("lisa", 59)], ["first_name", "age"])

colombians = spark.createDataFrame(
    [("maria", 20), ("camilo", 31)], ["first_name", "age"])


In [5]:
res = americans.union(colombians)
res.show()

+----------+---+
|first_name|age|
+----------+---+
|       bab| 42|
|      lisa| 59|
|     maria| 20|
|    camilo| 31|
+----------+---+



In [6]:
data_frame1 = spark.createDataFrame(
                [("Nitya", 82.98), ("Abhishek", 80.31)],
                ["Student Name", "Overall Percentage"]
)

data_frame1.show()

+------------+------------------+
|Student Name|Overall Percentage|
+------------+------------------+
|       Nitya|             82.98|
|    Abhishek|             80.31|
+------------+------------------+



In [7]:
data_frame2 = spark.createDataFrame(
                [(91.123, "Naveen"), (90.51, "Sandeep"), (87.67, "Rakesh")],
                ["Overall Percentage", "Student Name"]
)

data_frame2.show()

+------------------+------------+
|Overall Percentage|Student Name|
+------------------+------------+
|            91.123|      Naveen|
|             90.51|     Sandeep|
|             87.67|      Rakesh|
+------------------+------------+



In [8]:
byName = data_frame1.unionByName(data_frame2)
byName.show()

+------------+------------------+
|Student Name|Overall Percentage|
+------------+------------------+
|       Nitya|             82.98|
|    Abhishek|             80.31|
|      Naveen|            91.123|
|     Sandeep|             90.51|
|      Rakesh|             87.67|
+------------+------------------+



In [9]:
data_frame3 = spark.createDataFrame(
            [("Bhuwanesh", 82.98, "Computer Science"), ("Harshit", 80.31, "Information Technology")],
            ["Student Name", "Overall Percentage", "Department"]
)

data_frame3.show()

+------------+------------------+--------------------+
|Student Name|Overall Percentage|          Department|
+------------+------------------+--------------------+
|   Bhuwanesh|             82.98|    Computer Science|
|     Harshit|             80.31|Information Techn...|
+------------+------------------+--------------------+



In [10]:
data_frame4 = spark.createDataFrame( 
    [("Naveen", 91.123), ("Piyush", 90.51)], 
    ["Student Name", "Overall Percentage"] )

data_frame4.show()

+------------+------------------+
|Student Name|Overall Percentage|
+------------+------------------+
|      Naveen|            91.123|
|      Piyush|             90.51|
+------------+------------------+



In [11]:
merged_df = data_frame3.unionByName(data_frame4, allowMissingColumns=True)
merged_df.show()

+------------+------------------+--------------------+
|Student Name|Overall Percentage|          Department|
+------------+------------------+--------------------+
|   Bhuwanesh|             82.98|    Computer Science|
|     Harshit|             80.31|Information Techn...|
|      Naveen|            91.123|                NULL|
|      Piyush|             90.51|                NULL|
+------------+------------------+--------------------+



### 02. PySpark Window Ranking Functions

In [12]:
from pyspark.sql.window import Window

In [13]:
data = (("Nitya", 28, "Sales", 3000),
        ("Abhishek", 33, "Sales", 4600),
        ("Sandeep", 40, "Sales", 4100),
        ("Rakesh", 25, "Finance", 3000),
        ("Ram", 28, "Sales", 3000),
        ("Srishti", 46, "Management", 3300),
        ("Arbind", 26, "Finance", 3900),
        ("Hitesh", 30, "Marketing", 3000),
        ("Kailash", 29, "Marketing", 2000),
        ("Sushma", 39, "Sales", 4100)
        )

In [14]:
schema = ["Employee_Name", "Age","Department", "Salary"]

In [15]:
df = spark.createDataFrame(data, schema)

df.show()

+-------------+---+----------+------+
|Employee_Name|Age|Department|Salary|
+-------------+---+----------+------+
|        Nitya| 28|     Sales|  3000|
|     Abhishek| 33|     Sales|  4600|
|      Sandeep| 40|     Sales|  4100|
|       Rakesh| 25|   Finance|  3000|
|          Ram| 28|     Sales|  3000|
|      Srishti| 46|Management|  3300|
|       Arbind| 26|   Finance|  3900|
|       Hitesh| 30| Marketing|  3000|
|      Kailash| 29| Marketing|  2000|
|       Sushma| 39|     Sales|  4100|
+-------------+---+----------+------+



In [16]:
windowPartition = Window.partitionBy("Department").orderBy("Age")

In [17]:
df.printSchema()

root
 |-- Employee_Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: long (nullable = true)



In [18]:
from pyspark.sql.functions import cume_dist

In [19]:
df.withColumn("cume_dist", cume_dist().over(windowPartition)).show()

+-------------+---+----------+------+---------+
|Employee_Name|Age|Department|Salary|cume_dist|
+-------------+---+----------+------+---------+
|       Rakesh| 25|   Finance|  3000|      0.5|
|       Arbind| 26|   Finance|  3900|      1.0|
|      Srishti| 46|Management|  3300|      1.0|
|      Kailash| 29| Marketing|  2000|      0.5|
|       Hitesh| 30| Marketing|  3000|      1.0|
|        Nitya| 28|     Sales|  3000|      0.4|
|          Ram| 28|     Sales|  3000|      0.4|
|     Abhishek| 33|     Sales|  4600|      0.6|
|       Sushma| 39|     Sales|  4100|      0.8|
|      Sandeep| 40|     Sales|  4100|      1.0|
+-------------+---+----------+------+---------+



In [20]:
from pyspark.sql.functions import lag

In [21]:
df.withColumn("Lag", lag("Salary", 2).over(windowPartition)).show()

+-------------+---+----------+------+----+
|Employee_Name|Age|Department|Salary| Lag|
+-------------+---+----------+------+----+
|       Rakesh| 25|   Finance|  3000|NULL|
|       Arbind| 26|   Finance|  3900|NULL|
|      Srishti| 46|Management|  3300|NULL|
|      Kailash| 29| Marketing|  2000|NULL|
|       Hitesh| 30| Marketing|  3000|NULL|
|        Nitya| 28|     Sales|  3000|NULL|
|          Ram| 28|     Sales|  3000|NULL|
|     Abhishek| 33|     Sales|  4600|3000|
|       Sushma| 39|     Sales|  4100|3000|
|      Sandeep| 40|     Sales|  4100|4600|
+-------------+---+----------+------+----+



In [22]:
from pyspark.sql.functions import lead

In [23]:
df.withColumn("Lead", lead("Salary", 2).over(windowPartition)).show()

+-------------+---+----------+------+----+
|Employee_Name|Age|Department|Salary|Lead|
+-------------+---+----------+------+----+
|       Rakesh| 25|   Finance|  3000|NULL|
|       Arbind| 26|   Finance|  3900|NULL|
|      Srishti| 46|Management|  3300|NULL|
|      Kailash| 29| Marketing|  2000|NULL|
|       Hitesh| 30| Marketing|  3000|NULL|
|        Nitya| 28|     Sales|  3000|4600|
|          Ram| 28|     Sales|  3000|4100|
|     Abhishek| 33|     Sales|  4600|4100|
|       Sushma| 39|     Sales|  4100|NULL|
|      Sandeep| 40|     Sales|  4100|NULL|
+-------------+---+----------+------+----+



### 03. rank(), row_number(), dense_rank()  Function

In [24]:
sampleData = ((101, "Ram", "Biology", 80),
                (103, "Sita", "Social Science", 78),
                (104, "Lakshman", "Sanskrit", 58),
                (102, "Kunal", "Phisycs", 89),
                (101, "Ram", "Biology", 80),
                (106, "Srishti", "Maths", 70),
                (108, "Sandeep", "Physics", 75),
                (107, "Hitesh", "Maths", 88),
                (109, "Kailash", "Maths", 90),
                (105, "Abhishek", "Social Science", 84)
                )

columns = ["Roll_No", "Student_Name", "Subject", "Marks"]

In [25]:
df2 = spark.createDataFrame(sampleData, columns)

df2.show()

+-------+------------+--------------+-----+
|Roll_No|Student_Name|       Subject|Marks|
+-------+------------+--------------+-----+
|    101|         Ram|       Biology|   80|
|    103|        Sita|Social Science|   78|
|    104|    Lakshman|      Sanskrit|   58|
|    102|       Kunal|       Phisycs|   89|
|    101|         Ram|       Biology|   80|
|    106|     Srishti|         Maths|   70|
|    108|     Sandeep|       Physics|   75|
|    107|      Hitesh|         Maths|   88|
|    109|     Kailash|         Maths|   90|
|    105|    Abhishek|Social Science|   84|
+-------+------------+--------------+-----+



In [26]:
windowPartition2 = Window.partitionBy("Subject").orderBy("Marks")

In [27]:
df2.printSchema()

root
 |-- Roll_No: long (nullable = true)
 |-- Student_Name: string (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Marks: long (nullable = true)



In [28]:
from pyspark.sql.functions import row_number

In [31]:
df2.withColumn("row_number", row_number().over(windowPartition2)).show()

+-------+------------+--------------+-----+----------+
|Roll_No|Student_Name|       Subject|Marks|row_number|
+-------+------------+--------------+-----+----------+
|    101|         Ram|       Biology|   80|         1|
|    101|         Ram|       Biology|   80|         2|
|    106|     Srishti|         Maths|   70|         1|
|    107|      Hitesh|         Maths|   88|         2|
|    109|     Kailash|         Maths|   90|         3|
|    102|       Kunal|       Phisycs|   89|         1|
|    108|     Sandeep|       Physics|   75|         1|
|    104|    Lakshman|      Sanskrit|   58|         1|
|    103|        Sita|Social Science|   78|         1|
|    105|    Abhishek|Social Science|   84|         2|
+-------+------------+--------------+-----+----------+



In [33]:
from pyspark.sql.functions import rank
df2.withColumn("rank", rank().over(windowPartition2)).show()

+-------+------------+--------------+-----+----+
|Roll_No|Student_Name|       Subject|Marks|rank|
+-------+------------+--------------+-----+----+
|    101|         Ram|       Biology|   80|   1|
|    101|         Ram|       Biology|   80|   1|
|    106|     Srishti|         Maths|   70|   1|
|    107|      Hitesh|         Maths|   88|   2|
|    109|     Kailash|         Maths|   90|   3|
|    102|       Kunal|       Phisycs|   89|   1|
|    108|     Sandeep|       Physics|   75|   1|
|    104|    Lakshman|      Sanskrit|   58|   1|
|    103|        Sita|Social Science|   78|   1|
|    105|    Abhishek|Social Science|   84|   2|
+-------+------------+--------------+-----+----+



In [34]:
from pyspark.sql.functions import dense_rank
df2.withColumn("rank", dense_rank().over(windowPartition2)).show()

+-------+------------+--------------+-----+----+
|Roll_No|Student_Name|       Subject|Marks|rank|
+-------+------------+--------------+-----+----+
|    101|         Ram|       Biology|   80|   1|
|    101|         Ram|       Biology|   80|   1|
|    106|     Srishti|         Maths|   70|   1|
|    107|      Hitesh|         Maths|   88|   2|
|    109|     Kailash|         Maths|   90|   3|
|    102|       Kunal|       Phisycs|   89|   1|
|    108|     Sandeep|       Physics|   75|   1|
|    104|    Lakshman|      Sanskrit|   58|   1|
|    103|        Sita|Social Science|   78|   1|
|    105|    Abhishek|Social Science|   84|   2|
+-------+------------+--------------+-----+----+



### 04. Aggregate Functions

In [35]:
emp_data = (("Ram", "Sales", 3000),
            ("Meena", "Sales", 4600),
            ("Abhishek", "Sales", 4100),
            ("Kunal", "Finance", 3000),
            ("Ram", "Sales", 3000),
            ("Srishti", "Management", 3300),
            ("Sandeep", "Finance", 3900),
            ("Hitesh", "Marketing", 3000),
            ("Kailash", "Marketing", 2000),
            ("Shyam", "Sales", 4100)
            )

col = ["Employee_name", "Department", "Salary"]

In [36]:
df3 = spark.createDataFrame(emp_data, col)
df3.show()

+-------------+----------+------+
|Employee_name|Department|Salary|
+-------------+----------+------+
|          Ram|     Sales|  3000|
|        Meena|     Sales|  4600|
|     Abhishek|     Sales|  4100|
|        Kunal|   Finance|  3000|
|          Ram|     Sales|  3000|
|      Srishti|Management|  3300|
|      Sandeep|   Finance|  3900|
|       Hitesh| Marketing|  3000|
|      Kailash| Marketing|  2000|
|        Shyam|     Sales|  4100|
+-------------+----------+------+



In [37]:
df3.printSchema()

root
 |-- Employee_name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: long (nullable = true)



In [38]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, avg, sum, min, max, row_number

In [39]:
windowPartitionAgg = Window.partitionBy("Department")

In [40]:
df3.withColumn("Avg", avg(col("salary")).over(windowPartitionAgg)).show()

+-------------+----------+------+------+
|Employee_name|Department|Salary|   Avg|
+-------------+----------+------+------+
|        Kunal|   Finance|  3000|3450.0|
|      Sandeep|   Finance|  3900|3450.0|
|      Srishti|Management|  3300|3300.0|
|       Hitesh| Marketing|  3000|2500.0|
|      Kailash| Marketing|  2000|2500.0|
|          Ram|     Sales|  3000|3760.0|
|        Meena|     Sales|  4600|3760.0|
|     Abhishek|     Sales|  4100|3760.0|
|          Ram|     Sales|  3000|3760.0|
|        Shyam|     Sales|  4100|3760.0|
+-------------+----------+------+------+



In [41]:
df3.withColumn("Sum", sum(col("salary")).over(windowPartitionAgg)).show()

+-------------+----------+------+-----+
|Employee_name|Department|Salary|  Sum|
+-------------+----------+------+-----+
|        Kunal|   Finance|  3000| 6900|
|      Sandeep|   Finance|  3900| 6900|
|      Srishti|Management|  3300| 3300|
|       Hitesh| Marketing|  3000| 5000|
|      Kailash| Marketing|  2000| 5000|
|          Ram|     Sales|  3000|18800|
|        Meena|     Sales|  4600|18800|
|     Abhishek|     Sales|  4100|18800|
|          Ram|     Sales|  3000|18800|
|        Shyam|     Sales|  4100|18800|
+-------------+----------+------+-----+



In [42]:
df3.withColumn("Min", min(col("salary")).over(windowPartitionAgg)).show()

+-------------+----------+------+----+
|Employee_name|Department|Salary| Min|
+-------------+----------+------+----+
|        Kunal|   Finance|  3000|3000|
|      Sandeep|   Finance|  3900|3000|
|      Srishti|Management|  3300|3300|
|       Hitesh| Marketing|  3000|2000|
|      Kailash| Marketing|  2000|2000|
|          Ram|     Sales|  3000|3000|
|        Meena|     Sales|  4600|3000|
|     Abhishek|     Sales|  4100|3000|
|          Ram|     Sales|  3000|3000|
|        Shyam|     Sales|  4100|3000|
+-------------+----------+------+----+



In [43]:
df3.withColumn("Max", max(col("salary")).over(windowPartitionAgg)).show()

+-------------+----------+------+----+
|Employee_name|Department|Salary| Max|
+-------------+----------+------+----+
|        Kunal|   Finance|  3000|3900|
|      Sandeep|   Finance|  3900|3900|
|      Srishti|Management|  3300|3300|
|       Hitesh| Marketing|  3000|3000|
|      Kailash| Marketing|  2000|3000|
|          Ram|     Sales|  3000|4600|
|        Meena|     Sales|  4600|4600|
|     Abhishek|     Sales|  4100|4600|
|          Ram|     Sales|  3000|4600|
|        Shyam|     Sales|  4100|4600|
+-------------+----------+------+----+



### 05. date and timestamp functions in timestamp