# Operators in PySpark

### **select():** 
Projects specific columns from a DataFrame.

### **filter():** 
Filters rows based on a condition.

### **groupBy():** 
Groups data based on specific columns.

### **agg():** 
Performs aggregate functions on grouped data.

### **orderBy():** 
Sorts the DataFrame based on specified columns.

### **withColumn():** 
Adds a new column or replaces an existing one.

### **drop():** 
Removes one or more columns from the DataFrame.

### **distinct():** 
Returns distinct rows from the DataFrame.

### **dropDuplicates():** 
Drops duplicate rows from the DataFrame.

### **limit():** 
Limits the number of rows in the DataFrame.

In [1]:
#Import Spark Packges 
from pyspark.sql import SparkSession

In [2]:
#Build an Spark Session
spark = SparkSession.builder.getOrCreate()

In [3]:
spark

In [4]:
# Using a DataFrame Object from Spark
df=spark.read.csv('data/emp.csv',header='True',inferSchema=True)
df.show()

+------+----------+-----------------+----------+--------------+-------------+-------------+
|emp_id|  emp_name|  emp_designation|emp_salary|emp_department|emp_join_date| emp_location|
+------+----------+-----------------+----------+--------------+-------------+-------------+
|   101|John Smith|Software Engineer|     75000|            IT|   15-01-2022|     New York|
|   102|  Jane Doe|     Data Analyst|     60000|     Analytics|   20-08-2021|San Francisco|
|   103|Mike Brown|  Product Manager|     90000|       Product|   10-05-2023|       London|
|   104|Lisa Green|       HR Manager|     85000|            HR|   05-11-2020|         NULL|
+------+----------+-----------------+----------+--------------+-------------+-------------+



In [5]:
empDF = spark.read.csv('data/emp.csv',header='True',inferSchema='True')

In [6]:
empDF

DataFrame[emp_id: int, emp_name: string, emp_designation: string, emp_salary: int, emp_department: string, emp_join_date: string, emp_location: string]

In [7]:
print("The Type of object",type(empDF))

The Type of object <class 'pyspark.sql.dataframe.DataFrame'>


In [18]:
empDF.show()

+------+----------+-----------------+----------+--------------+-------------+-------------+
|emp_id|  emp_name|  emp_designation|emp_salary|emp_department|emp_join_date| emp_location|
+------+----------+-----------------+----------+--------------+-------------+-------------+
|   101|John Smith|Software Engineer|     75000|            IT|   15-01-2022|     New York|
|   102|  Jane Doe|     Data Analyst|     60000|     Analytics|   20-08-2021|San Francisco|
|   103|Mike Brown|  Product Manager|     90000|       Product|   10-05-2023|       London|
|   104|Lisa Green|       HR Manager|     85000|            HR|   05-11-2020|         null|
+------+----------+-----------------+----------+--------------+-------------+-------------+



In [21]:
empDF = spark.read.csv('data/emp.csv',header='True',inferSchema='True')
#empDF = spark.read.csv('data/emp.csv',header='True')

In [22]:
empDF

DataFrame[emp_id: int, emp_name: string, emp_designation: string, emp_salary: int, emp_department: string, emp_join_date: string, emp_location: string]

# 1. select() Operator:   
The `select()` operator is used to project specific columns from a DataFrame.
Selecting specific columns from the DataFrame

In [8]:
selected_df = df.select("emp_name", "emp_department")

selected_df.show()

+----------+--------------+
|  emp_name|emp_department|
+----------+--------------+
|John Smith|            IT|
|  Jane Doe|     Analytics|
|Mike Brown|       Product|
|Lisa Green|            HR|
+----------+--------------+



In [9]:
select_empDF = empDF.select("emp_name","emp_department","emp_salary")

In [10]:
print(type(select_empDF))

<class 'pyspark.sql.dataframe.DataFrame'>


In [11]:
select_empDF.show()

+----------+--------------+----------+
|  emp_name|emp_department|emp_salary|
+----------+--------------+----------+
|John Smith|            IT|     75000|
|  Jane Doe|     Analytics|     60000|
|Mike Brown|       Product|     90000|
|Lisa Green|            HR|     85000|
+----------+--------------+----------+



# 2. filter() Operator: 
The `filter()` operator is used to filter rows based on a condition.
 Filtering rows where the salary is greater than 70000

In [None]:
filtered_df = df.filter(df.emp_salary > 70000)

filtered_df.show()


In [47]:
filtered_empDF = select_empDF.filter((select_empDF.emp_salary>100000) || (select_empDF.emp_department=='IT'))

In [48]:
print(type(filtered_empDF))

<class 'pyspark.sql.dataframe.DataFrame'>


In [49]:
filtered_empDF.show()

+--------+--------------+----------+
|emp_name|emp_department|emp_salary|
+--------+--------------+----------+
+--------+--------------+----------+



# 3. groupBy() and agg() Operators:
The `groupBy()` operator is used to group data based on specific columns, and the `agg()` operator is used to perform aggregate functions on grouped data.

In [13]:
empDF.show()

+------+----------+-----------------+----------+--------------+-------------+-------------+
|emp_id|  emp_name|  emp_designation|emp_salary|emp_department|emp_join_date| emp_location|
+------+----------+-----------------+----------+--------------+-------------+-------------+
|   101|John Smith|Software Engineer|     75000|            IT|   15-01-2022|     New York|
|   102|  Jane Doe|     Data Analyst|     60000|     Analytics|   20-08-2021|San Francisco|
|   103|Mike Brown|  Product Manager|     90000|       Product|   10-05-2023|       London|
|   104|Lisa Green|       HR Manager|     85000|            HR|   05-11-2020|         NULL|
+------+----------+-----------------+----------+--------------+-------------+-------------+



In [12]:
from pyspark.sql import functions as F
# Grouping data by department and calculating average salary
grouped_df = df.groupBy("emp_department").agg(F.avg("emp_salary").alias("avg_salary"))
grouped_df.show()

+--------------+----------+
|emp_department|avg_salary|
+--------------+----------+
|            HR|   85000.0|
|     Analytics|   60000.0|
|            IT|   75000.0|
|       Product|   90000.0|
+--------------+----------+



# 4. orderBy() Operator:
The `orderBy()` operator is used to sort the DataFrame based on specified columns.
Sorting the DataFrame by employee name in ascending order


In [None]:
sorted_df = df.orderBy("emp_name")

sorted_df.show()

In [56]:
from pyspark.sql.functions import avg,min,max

In [None]:
#avg_salary, min_salary, max_salary

In [57]:
# df.select(avg(col("id"))).show()
avg_salary = empDF.select(avg(empDF.emp_salary)).show()

+---------------+
|avg(emp_salary)|
+---------------+
|        77500.0|
+---------------+



In [58]:
min_salary = empDF.select(min(empDF.emp_salary)).show()

+---------------+
|min(emp_salary)|
+---------------+
|          60000|
+---------------+



In [59]:
max_salary = empDF.select(max(empDF.emp_salary)).show()

+---------------+
|max(emp_salary)|
+---------------+
|          90000|
+---------------+

