<a href="https://colab.research.google.com/github/vsubu1/PySpark_Tutorial/blob/main/PySparkTraining.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
### 1. **Select specific columns from a DataFrame**
###   Write a PySpark transformation to select only the columns `col1`, `col2`, and `col3` from a DataFrame.

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

spark = SparkSession.builder.master("local").appName("Select Columns").getOrCreate()

data = [("John",25, "USA"), ("Mike",35, "Japan"),("Peter",32,"USA")]
columns = ["Name","Age","Country"]
df = spark.createDataFrame(data,columns)
df.show()

selected_df = df.select("Name","Age")
selected_df.show()



+-----+---+-------+
| Name|Age|Country|
+-----+---+-------+
| John| 25|    USA|
| Mike| 35|  Japan|
|Peter| 32|    USA|
+-----+---+-------+

+-----+---+
| Name|Age|
+-----+---+
| John| 25|
| Mike| 35|
|Peter| 32|
+-----+---+



In [None]:
### 2. Write a PySpark transformation to rename column `old_name` to `new_name` in a DataFrame.
df_renamed = df.withColumnRenamed("Name","New Name")
df_renamed.show()

+--------+---+-------+
|New Name|Age|Country|
+--------+---+-------+
|    John| 25|    USA|
|    Mike| 35|  Japan|
|   Peter| 32|    USA|
+--------+---+-------+



In [None]:
### 3.  Add a new column 'new_column' with the constant value 100
df_newColumn = df.withColumn("Salary",lit(1000))
df_newColumn.show()

+-----+---+-------+------+
| Name|Age|Country|Salary|
+-----+---+-------+------+
| John| 25|    USA|  1000|
| Mike| 35|  Japan|  1000|
|Peter| 32|    USA|  1000|
+-----+---+-------+------+



In [None]:
### 4. Drop column
df_dropColumn = df_newColumn.drop("Salary")
df_dropColumn.show()


+-----+---+-------+
| Name|Age|Country|
+-----+---+-------+
| John| 25|    USA|
| Mike| 35|  Japan|
|Peter| 32|    USA|
+-----+---+-------+



In [None]:
### 5. **Perform an inner join on two DataFrames**
###   Write a PySpark transformation to perform an inner join between two DataFrames `df1` and `df2` based on the common column `id`.

spark= SparkSession.builder.master("local").appName("Inner join").getOrCreate()

data1 = [(1,"John","5000"), (2,"Peter",2000), (3,"Mike",3000),(1,"Britto",5000),(2,"Alpha",6000),(3,"Stanley",3000),(4,"Subu",1000)]
columns1 = ["id","name","salary"]

data2 = [(1,"Science"),(2,"Maths"),(3,"Engineering"),(5,"Medicine")]
columns2 = ["id","Department"]

df1 = spark.createDataFrame(data1,columns1)
df2 = spark.createDataFrame(data2,columns2)

idf = df1.join(df2,on="id",how="inner")
idf.show()



+---+-------+------+-----------+
| id|   name|salary| Department|
+---+-------+------+-----------+
|  1|   John|  5000|    Science|
|  1| Britto|  5000|    Science|
|  2|  Peter|  2000|      Maths|
|  2|  Alpha|  6000|      Maths|
|  3|   Mike|  3000|Engineering|
|  3|Stanley|  3000|Engineering|
+---+-------+------+-----------+



In [None]:
### 6. **Group by a column and calculate the average**
###   Write a PySpark transformation that groups a DataFrame by the column `department` and calculates the average salary for each department.

from pyspark.sql.functions import avg

idf.groupBy("Department").agg(avg("salary").alias("Avg Salary")).show()


+-----------+----------+
| Department|Avg Salary|
+-----------+----------+
|    Science|    5000.0|
|Engineering|    3000.0|
|      Maths|    4000.0|
+-----------+----------+



In [None]:
### 7. Write a PySpark transformation that returns distinct values from the `Department` column.

idf.select("id","department").distinct().show()

+---+-----------+
| id| department|
+---+-----------+
|  1|    Science|
|  2|      Maths|
|  3|Engineering|
+---+-----------+



In [None]:
### 8. **Apply a UDF to a column**
###  Write a PySpark transformation that applies a user-defined function (UDF) to the `salary` column to increase salary by $3000

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col

def hikeSalary(salary) :
  return(salary+3000)

# Register the UDF with the appropriate return type (IntegerType)
hikeSalaryUDF = udf(hikeSalary,IntegerType())

# Cast the 'salary' column to IntegerType before applying the UDF
hdf = idf.withColumn("newSalary",hikeSalary(col("salary").cast(IntegerType())))
hdf.show()


+---+-------+------+-----------+---------+
| id|   name|salary| Department|newSalary|
+---+-------+------+-----------+---------+
|  1|   John|  5000|    Science|     8000|
|  1| Britto|  5000|    Science|     8000|
|  2|  Peter|  2000|      Maths|     5000|
|  2|  Alpha|  6000|      Maths|     9000|
|  3|   Mike|  3000|Engineering|     6000|
|  3|Stanley|  3000|Engineering|     6000|
+---+-------+------+-----------+---------+



In [None]:
### 9. Write a PySpark transformation that calculates the length of the strings in the `name` column and adds the result as a new column `name_length`.

from pyspark.sql.functions import length
ldf = hdf.withColumn("name_length",length(col("name")))
ldf.show()



+---+-------+------+-----------+---------+-----------+
| id|   name|salary| Department|newSalary|name_length|
+---+-------+------+-----------+---------+-----------+
|  1|   John|  5000|    Science|     8000|          4|
|  1| Britto|  5000|    Science|     8000|          6|
|  2|  Peter|  2000|      Maths|     5000|          5|
|  2|  Alpha|  6000|      Maths|     9000|          5|
|  3|   Mike|  3000|Engineering|     6000|          4|
|  3|Stanley|  3000|Engineering|     6000|          7|
+---+-------+------+-----------+---------+-----------+



In [None]:
### 10. Write a PySpark transformation that creates a new column `status`, which is `True` if the value in the `salary` column is greater than 5000, else `False`.
from pyspark.sql.functions import when

cdf = ldf.withColumn("status",when(col("newSalary") > 5000,True).otherwise(False))
cdf.show()


+---+-------+------+-----------+---------+-----------+------+
| id|   name|salary| Department|newSalary|name_length|status|
+---+-------+------+-----------+---------+-----------+------+
|  1|   John|  5000|    Science|     8000|          4|  true|
|  1| Britto|  5000|    Science|     8000|          6|  true|
|  2|  Peter|  2000|      Maths|     5000|          5| false|
|  2|  Alpha|  6000|      Maths|     9000|          5|  true|
|  3|   Mike|  3000|Engineering|     6000|          4|  true|
|  3|Stanley|  3000|Engineering|     6000|          7|  true|
+---+-------+------+-----------+---------+-----------+------+



In [None]:
### 11. **Find the maximum value in a column**
###  Write a PySpark transformation to find the maximum value in the `salary` column.

from pyspark.sql.functions import max
cdf.agg(max("salary")).show()

+-----------+
|max(salary)|
+-----------+
|       6000|
+-----------+



In [None]:
### 12. **Perform a left outer, Right Outer and Inner joins**
### Write a PySpark transformation to perform a left outer join between two DataFrames `df1` and `df2` on the `id` column.

data1 = [(1,"John",2000),(2,"Peter",3000),(3,"Mike",4000),(4,"Subu",5000)]
columns1 = ["id","name","salary"]

data2 = [(1,"Science"),(2,"Maths"),(3,"Engineering"),(5,"Medicine")]
columns2 = ["id","Department"]

df1 = spark.createDataFrame(data1,columns1)
df2 = spark.createDataFrame(data2,columns2)

ldf = df1.join(df2,on="id",how="left")
ldf.show()

rdf = df1.join(df2,on="id",how="right")
rdf.show()

idf = df1.join(df2,on="id",how="inner")
idf.show()


+---+-----+------+-----------+
| id| name|salary| Department|
+---+-----+------+-----------+
|  1| John|  2000|    Science|
|  3| Mike|  4000|Engineering|
|  2|Peter|  3000|      Maths|
|  4| Subu|  5000|       NULL|
+---+-----+------+-----------+

+---+-----+------+-----------+
| id| name|salary| Department|
+---+-----+------+-----------+
|  5| NULL|  NULL|   Medicine|
|  1| John|  2000|    Science|
|  3| Mike|  4000|Engineering|
|  2|Peter|  3000|      Maths|
+---+-----+------+-----------+

+---+-----+------+-----------+
| id| name|salary| Department|
+---+-----+------+-----------+
|  1| John|  2000|    Science|
|  2|Peter|  3000|      Maths|
|  3| Mike|  4000|Engineering|
+---+-----+------+-----------+



In [None]:
### 13 Write a pyspark code to find second highest salary from a data frame using windowing functions

from pyspark.sql.functions import row_number, col, desc, asc
from pyspark.sql.window import Window

windowspec = Window.orderBy(col("Salary").desc())
wdf = idf.withColumn("RowId",row_number().over(windowspec))
wdf.show()

wdf.filter(col("RowId") == 2).show()



+---+-----+------+-----------+-----+
| id| name|salary| Department|RowId|
+---+-----+------+-----------+-----+
|  3| Mike|  4000|Engineering|    1|
|  2|Peter|  3000|      Maths|    2|
|  1| John|  2000|    Science|    3|
+---+-----+------+-----------+-----+

+---+-----+------+----------+-----+
| id| name|salary|Department|RowId|
+---+-----+------+----------+-----+
|  2|Peter|  3000|     Maths|    2|
+---+-----+------+----------+-----+



In [None]:
### 14 Write a pyspark code to find second highest salary from each department using windowing functions

data1 = [(1,"John","5000"), (2,"Peter",2000), (3,"Mike",4000),(1,"Britto",6000),(2,"Alpha",6000),(3,"Stanley",3000),(4,"Subu",1000)]
columns1 = ["id","name","salary"]

data2 = [(1,"Science"),(2,"Maths"),(3,"Engineering"),(5,"Medicine")]
columns2 = ["id","Department"]

df1 = spark.createDataFrame(data1,columns1)

windowspec = Window.partitionBy("id").orderBy(col("Salary").desc())
wdf = df1.withColumn("RowNumber",row_number().over(windowspec))
wdf.show()

wdf.filter(col("RowNumber")==2).show()


+---+-------+------+---------+
| id|   name|salary|RowNumber|
+---+-------+------+---------+
|  1| Britto|  6000|        1|
|  1|   John|  5000|        2|
|  2|  Alpha|  6000|        1|
|  2|  Peter|  2000|        2|
|  3|   Mike|  4000|        1|
|  3|Stanley|  3000|        2|
|  4|   Subu|  1000|        1|
+---+-------+------+---------+

+---+-------+------+---------+
| id|   name|salary|RowNumber|
+---+-------+------+---------+
|  1|   John|  5000|        2|
|  2|  Peter|  2000|        2|
|  3|Stanley|  3000|        2|
+---+-------+------+---------+



In [None]:
### 15. **Drop rows with missing values**
### Write a PySpark transformation that removes rows with `null` values in any column.

data1 = [(1,"John",2000),(2,"Peter",3000),(3,"Mike",4000),(4,"Subu",5000)]
columns1 = ["id","name","salary"]

data2 = [(1,"Science"),(2,"Maths"),(3,"Engineering"),(5,"Medicine")]
columns2 = ["id","Department"]

df1 = spark.createDataFrame(data1,columns1)
df2 = spark.createDataFrame(data2,columns2)

ldf = df1.join(df2,on="id",how="left")
ldf.show()

rdf = df1.join(df2,on="id",how="right")
rdf.show()

idf = df1.join(df2,on="id",how="inner")
idf.show()

ldf.na.drop().show()
rdf.na.drop().show()





+---+-----+------+-----------+
| id| name|salary| Department|
+---+-----+------+-----------+
|  1| John|  2000|    Science|
|  3| Mike|  4000|Engineering|
|  2|Peter|  3000|      Maths|
|  4| Subu|  5000|       NULL|
+---+-----+------+-----------+

+---+-----+------+-----------+
| id| name|salary| Department|
+---+-----+------+-----------+
|  5| NULL|  NULL|   Medicine|
|  1| John|  2000|    Science|
|  3| Mike|  4000|Engineering|
|  2|Peter|  3000|      Maths|
+---+-----+------+-----------+

+---+-----+------+-----------+
| id| name|salary| Department|
+---+-----+------+-----------+
|  1| John|  2000|    Science|
|  2|Peter|  3000|      Maths|
|  3| Mike|  4000|Engineering|
+---+-----+------+-----------+

+---+-----+------+-----------+
| id| name|salary| Department|
+---+-----+------+-----------+
|  1| John|  2000|    Science|
|  3| Mike|  4000|Engineering|
|  2|Peter|  3000|      Maths|
+---+-----+------+-----------+

+---+-----+------+-----------+
| id| name|salary| Department|
+---

In [None]:
### 16. **Convert a column to lowercase**
### Write a PySpark transformation that converts all the values in the column `name` to lowercase.

from pyspark.sql.functions import lower

xdf = ldf.withColumn("name",lower(col("name"))).withColumn("Department",lower(col("Department")))
xdf.show()


+---+-----+------+-----------+
| id| name|salary| Department|
+---+-----+------+-----------+
|  1| john|  2000|    science|
|  3| mike|  4000|engineering|
|  2|peter|  3000|      maths|
|  4| subu|  5000|       NULL|
+---+-----+------+-----------+



In [None]:
### 17. **Extract a substring from a column**
### Write a PySpark transformation that extracts the first 3 characters from the `name` column.

from pyspark.sql.functions import substring

xdf.withColumn("name",substring(col("name"),1,3)).show()


+---+----+------+-----------+
| id|name|salary| Department|
+---+----+------+-----------+
|  1| joh|  2000|    science|
|  3| mik|  4000|engineering|
|  2| pet|  3000|      maths|
|  4| sub|  5000|       NULL|
+---+----+------+-----------+



In [None]:
### 17. **Extract a replace  column values**
### Write a PySpark transformation that replaces NULL with a "Not Available"

xdf.na.fill("Not Available").show()


+---+-----+------+-------------+
| id| name|salary|   Department|
+---+-----+------+-------------+
|  1| john|  2000|      science|
|  3| mike|  4000|  engineering|
|  2|peter|  3000|        maths|
|  4| subu|  5000|Not Available|
+---+-----+------+-------------+



In [None]:
### 18. Write a PySpark transformation that uses explode function

from pyspark.sql.functions import explode
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("Explode Example").getOrCreate()

data = [("John", ["Java", "Python"]), ("Mike", ["C++", "Scala", "R"]), ("Peter", ["Spark"])]
columns = ["Name", "Languages"]
df = spark.createDataFrame(data, columns)
df.show()

exploded_df = df.select("Name", explode("Languages").alias("Language"))
exploded_df.show()

+-----+---------------+
| Name|      Languages|
+-----+---------------+
| John| [Java, Python]|
| Mike|[C++, Scala, R]|
|Peter|        [Spark]|
+-----+---------------+

+-----+--------+
| Name|Language|
+-----+--------+
| John|    Java|
| John|  Python|
| Mike|     C++|
| Mike|   Scala|
| Mike|       R|
|Peter|   Spark|
+-----+--------+



In [None]:
### 19. Write a PySpark code to explain all windowing functions with  a simple example each

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, ntile, lag, lead, percent_rank, cume_dist, col, avg, sum, min, max

# Create a sample DataFrame for windowing examples
data_window = [
    ("Sales", "John", 5000),
    ("Sales", "Peter", 6000),
    ("Sales", "Mike", 5500),
    ("IT", "Alpha", 7000),
    ("IT", "Britto", 6500),
    ("IT", "Stanley", 7200),
    ("HR", "Subu", 4500),
    ("HR", "Mary", 4800)
]
columns_window = ["Department", "Name", "Salary"]
df_window = spark.createDataFrame(data_window, columns_window)
df_window.show()

# Define a window specification partitioned by Department and ordered by Salary in descending order
window_spec_salary_desc = Window.partitionBy("Department").orderBy(col("Salary").desc())

# 1. row_number()
# Assigns a sequential rank to each row within its partition, starting from 1.
df_window_rownumber = df_window.withColumn("row_number", row_number().over(window_spec_salary_desc))
print("row_number(): Assigns a sequential rank to each row within its partition.")
df_window_rownumber.show()

# 2. rank()
# Assigns a rank to each row within its partition. Rows with the same value receive the same rank, and there are gaps in the sequence if ranks are skipped.
df_window_rank = df_window.withColumn("rank", rank().over(window_spec_salary_desc))
print("rank(): Assigns a rank with gaps for ties.")
df_window_rank.show()

# 3. dense_rank()
# Assigns a rank to each row within its partition. Rows with the same value receive the same rank, and there are no gaps in the sequence.
df_window_dense_rank = df_window.withColumn("dense_rank", dense_rank().over(window_spec_salary_desc))
print("dense_rank(): Assigns a rank without gaps for ties.")
df_window_dense_rank.show()

# 4. ntile(n)
# Divides the rows within a partition into n approximately equal groups (buckets) and assigns a bucket number (from 1 to n) to each row.
# Let's divide each department into 2 groups based on salary
df_window_ntile = df_window.withColumn("ntile_2", ntile(2).over(window_spec_salary_desc))
print("ntile(n): Divides rows into n groups.")
df_window_ntile.show()

# Define a window specification ordered by Salary ascending for lag and lead
window_spec_salary_asc = Window.partitionBy("Department").orderBy(col("Salary").asc())

# 5. lag(column, offset, default)
# Returns the value of an expression from a preceding row within the partition.
# offset specifies how many rows back to look (default is 1).
# default specifies the value to return when the offset goes beyond the partition boundaries.
df_window_lag = df_window.withColumn("previous_salary", lag("Salary", 1, 0).over(window_spec_salary_asc))
print("lag(): Value from a preceding row.")
df_window_lag.show()

# 6. lead(column, offset, default)
# Returns the value of an expression from a subsequent row within the partition.
# offset specifies how many rows forward to look (default is 1).
# default specifies the value to return when the offset goes beyond the partition boundaries.
df_window_lead = df_window.withColumn("next_salary", lead("Salary", 1, 0).over(window_spec_salary_asc))
print("lead(): Value from a subsequent row.")
df_window_lead.show()

# 7. percent_rank()
# Calculates the percentile rank of a row within its partition. It is (rank - 1) / (total_rows - 1).
df_window_percent_rank = df_window.withColumn("percent_rank", percent_rank().over(window_spec_salary_desc))
print("percent_rank(): Percentile rank of a row.")
df_window_percent_rank.show()

# 8. cume_dist()
# Calculates the cumulative distribution of a value within its partition. It is the number of rows less than or equal to the current row divided by the total number of rows in the partition.
df_window_cume_dist = df_window.withColumn("cume_dist", cume_dist().over(window_spec_salary_desc))
print("cume_dist(): Cumulative distribution of a value.")
df_window_cume_dist.show()

# Aggregate Window Functions
# These functions perform aggregations (like sum, avg, min, max, count) over a window of rows.
# We can define different window frames for these.

# Define a window specification partitioned by Department (no ordering needed for some aggregates, but important for others like moving average)
window_spec_department = Window.partitionBy("Department")

# 9. avg()
# Calculates the average of a column within the window.
df_window_avg = df_window.withColumn("avg_dept_salary", avg("Salary").over(window_spec_department))
print("avg(): Average salary within the department.")
df_window_avg.show()

# 10. sum()
# Calculates the sum of a column within the window.
df_window_sum = df_window.withColumn("total_dept_salary", sum("Salary").over(window_spec_department))
print("sum(): Total salary within the department.")
df_window_sum.show()

# 11. min()
# Finds the minimum value of a column within the window.
df_window_min = df_window.withColumn("min_dept_salary", min("Salary").over(window_spec_department))
print("min(): Minimum salary within the department.")
df_window_min.show()

# 12. max()
# Finds the maximum value of a column within the window.
df_window_max = df_window.withColumn("max_dept_salary", max("Salary").over(window_spec_department))
print("max(): Maximum salary within the department.")
df_window_max.show()

# Example of a moving average using a window frame
# Calculate the average salary of the current row and the preceding row within each department, ordered by salary.
window_spec_moving_avg = Window.partitionBy("Department").orderBy("Salary").rowsBetween(-1, 0)
df_window_moving_avg = df_window.withColumn("moving_avg_salary", avg("Salary").over(window_spec_moving_avg))
print("avg() with moving window: Average salary of current and previous row.")
df_window_moving_avg.show()

spark.stop()


+----------+-------+------+
|Department|   Name|Salary|
+----------+-------+------+
|     Sales|   John|  5000|
|     Sales|  Peter|  6000|
|     Sales|   Mike|  5500|
|        IT|  Alpha|  7000|
|        IT| Britto|  6500|
|        IT|Stanley|  7200|
|        HR|   Subu|  4500|
|        HR|   Mary|  4800|
+----------+-------+------+

row_number(): Assigns a sequential rank to each row within its partition.
+----------+-------+------+----------+
|Department|   Name|Salary|row_number|
+----------+-------+------+----------+
|        HR|   Mary|  4800|         1|
|        HR|   Subu|  4500|         2|
|        IT|Stanley|  7200|         1|
|        IT|  Alpha|  7000|         2|
|        IT| Britto|  6500|         3|
|     Sales|  Peter|  6000|         1|
|     Sales|   Mike|  5500|         2|
|     Sales|   John|  5000|         3|
+----------+-------+------+----------+

rank(): Assigns a rank with gaps for ties.
+----------+-------+------+----+
|Department|   Name|Salary|rank|
+----------