In [0]:
# Execute Manipulating, Dropping, Sorting, Aggregations, Joining, GroupeBy in DataFrames
import pyspark
from pyspark.sql import SparkSession

# Initializing Spark Session
spark = SparkSession.builder.appName("Manipulation in dataframes").getOrCreate()
# Creating dataframe
data = [
    ('Mitushi',22,'F',1000),
    ('Vishesh',24,'M',2000),
    ('Tanisha',22,'F',3000),
    ('Zamran',38,'M',5000)
]
columns = ["Name",'Age','Gender',"Salary"]
df = spark.createDataFrame(data=data,schema=columns)
df.show()

+-------+---+------+------+
|   Name|Age|Gender|Salary|
+-------+---+------+------+
|Mitushi| 22|     F|  1000|
|Vishesh| 24|     M|  2000|
|Tanisha| 22|     F|  3000|
| Zamran| 38|     M|  5000|
+-------+---+------+------+



In [0]:
# Execute Manipulating 
# Adding new column using withColumn() with lit()
from pyspark.sql.functions import lit 

df_newcolumn = df.withColumn('Country',lit("India"))
df_newcolumn.show()

+-------+---+------+------+-------+
|   Name|Age|Gender|Salary|Country|
+-------+---+------+------+-------+
|Mitushi| 22|     F|  1000|  India|
|Vishesh| 24|     M|  2000|  India|
|Tanisha| 22|     F|  3000|  India|
| Zamran| 38|     M|  5000|  India|
+-------+---+------+------+-------+



In [0]:
# Filtering Data using filter()

df_filter = df.filter(df['Age']> 22)
df_filter.show()

+-------+---+------+------+
|   Name|Age|Gender|Salary|
+-------+---+------+------+
|Vishesh| 24|     M|  2000|
| Zamran| 38|     M|  5000|
+-------+---+------+------+



In [0]:
# Selecting Columns
df.select(df['Name']).show() 

+-------+
|   Name|
+-------+
|Mitushi|
|Vishesh|
|Tanisha|
| Zamran|
+-------+



In [0]:
# Execute Dropping 
csv_file = spark.read.csv('/FileStore/tables/jobs_in_data___Copy.csv',header=True,inferSchema=True)
csv_file.show()

+---------+--------------------+------+------------------+------------+----------------+
|work_year|           job_title|salary|employee_residence|work_setting|company_location|
+---------+--------------------+------+------------------+------------+----------------+
|     2023|Data DevOps Engineer| 88000|           Germany|      Hybrid|         Germany|
|     2023|      Data Architect|186000|     United States|   In-person|   United States|
|     2023|      Data Architect| 81800|              null|   In-person|   United States|
|     2023|      Data Scientist|212000|     United States|   In-person|   United States|
|     2023|                null| 93300|     United States|   In-person|   United States|
|     2023|      Data Scientist|130000|     United States|      Remote|   United States|
|     2023|      Data Scientist|100000|     United States|      Remote|            null|
|     null|Machine Learning ...|  null|     United States|   In-person|   United States|
|     2023|Machine Le

In [0]:
# dropping column
csv_file.drop(csv_file['work_year'],csv_file['work_setting']).show()

+--------------------+------+------------------+----------------+
|           job_title|salary|employee_residence|company_location|
+--------------------+------+------------------+----------------+
|Data DevOps Engineer| 88000|           Germany|         Germany|
|      Data Architect|186000|     United States|   United States|
|      Data Architect| 81800|              null|   United States|
|      Data Scientist|212000|     United States|   United States|
|                null| 93300|     United States|   United States|
|      Data Scientist|130000|     United States|   United States|
|      Data Scientist|100000|     United States|            null|
|Machine Learning ...|  null|     United States|   United States|
|Machine Learning ...|138700|     United States|   United States|
|       Data Engineer|210000|     United States|   United States|
|                null|168000|     United States|   United States|
|Machine Learning ...|224400|     United States|   United States|
|Machine L

In [0]:
# dropping na values
csv_file.na.drop().show()

+---------+--------------------+------+------------------+------------+----------------+
|work_year|           job_title|salary|employee_residence|work_setting|company_location|
+---------+--------------------+------+------------------+------------+----------------+
|     2023|Data DevOps Engineer| 88000|           Germany|      Hybrid|         Germany|
|     2023|      Data Architect|186000|     United States|   In-person|   United States|
|     2023|      Data Scientist|212000|     United States|   In-person|   United States|
|     2023|      Data Scientist|130000|     United States|      Remote|   United States|
|     2023|       Data Engineer|210000|     United States|      Remote|   United States|
|     2023|Machine Learning ...|224400|     United States|   In-person|   United States|
|     2023|Machine Learning ...|138700|     United States|   In-person|   United States|
|     2023|      Data Scientist| 35000|    United Kingdom|   In-person|  United Kingdom|
|     2023|      Data

In [0]:
# Sorting
# using sort()
df.sort(df['Name'].desc()).show()
# using orderBy()
df.orderBy('Age').show()

+-------+---+------+------+
|   Name|Age|Gender|Salary|
+-------+---+------+------+
| Zamran| 38|     M|  5000|
|Vishesh| 24|     M|  2000|
|Tanisha| 22|     F|  3000|
|Mitushi| 22|     F|  1000|
+-------+---+------+------+

+-------+---+------+------+
|   Name|Age|Gender|Salary|
+-------+---+------+------+
|Mitushi| 22|     F|  1000|
|Tanisha| 22|     F|  3000|
|Vishesh| 24|     M|  2000|
| Zamran| 38|     M|  5000|
+-------+---+------+------+



In [0]:
# GroupBy and Aggregations
csv_file.groupBy('job_title').sum('salary').show()

+--------------------+-----------+
|           job_title|sum(salary)|
+--------------------+-----------+
|                null|     495300|
|Machine Learning ...|     363100|
|      Data Scientist|     807000|
|        Data Analyst|     170000|
|Data DevOps Engineer|      88000|
|      Data Architect|     267800|
|Machine Learning ...|     138700|
|       Data Engineer|     210000|
+--------------------+-----------+



In [0]:
csv_file.groupBy('job_title').min('salary').show()

+--------------------+-----------+
|           job_title|min(salary)|
+--------------------+-----------+
|                null|      93300|
|Machine Learning ...|     138700|
|      Data Scientist|      30000|
|        Data Analyst|      75000|
|Data DevOps Engineer|      88000|
|      Data Architect|      81800|
|Machine Learning ...|     138700|
|       Data Engineer|     210000|
+--------------------+-----------+



In [0]:
csv_file.groupBy('job_title').avg('salary').show()

+--------------------+-----------+
|           job_title|avg(salary)|
+--------------------+-----------+
|                null|   165100.0|
|Machine Learning ...|   181550.0|
|      Data Scientist|   134500.0|
|        Data Analyst|    85000.0|
|Data DevOps Engineer|    88000.0|
|      Data Architect|   133900.0|
|Machine Learning ...|   138700.0|
|       Data Engineer|   210000.0|
+--------------------+-----------+



In [0]:
csv_file.groupBy('job_title').count().show()

+--------------------+-----+
|           job_title|count|
+--------------------+-----+
|                null|    3|
|Machine Learning ...|    2|
|      Data Scientist|    6|
|        Data Analyst|    2|
|Data DevOps Engineer|    1|
|      Data Architect|    2|
|Machine Learning ...|    2|
|       Data Engineer|    1|
+--------------------+-----+



In [0]:
# Joins in pyspark
emp = [(1,"Smith",-1,"2018","10","M",3000),
       (2, "Rose",1 , "2010", "20","M", 4000),
       (3,"Williams",1,"2010","10","M",1000),
       (4, "Jones",2 ,"2005","10","F",2000),
       (5,"Brown",2,"2010","40","",-1),
       (6, "Brown", 2, "2010","50","",-1)]
empColumns = ["emp_id","name","superior_emp_id","year_joined", "emp_dept_id","gender","salary"]
empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.show()

dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.show()

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|
|     2|    Rose|              1|       2010|         20|     M|  4000|
|     3|Williams|              1|       2010|         10|     M|  1000|
|     4|   Jones|              2|       2005|         10|     F|  2000|
|     5|   Brown|              2|       2010|         40|      |    -1|
|     6|   Brown|              2|       2010|         50|      |    -1|
+------+--------+---------------+-----------+-----------+------+------+

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+



In [0]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner").show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [0]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer").show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|  null|    null|           null|       null|       null|  null|  null|    Sales|     30|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
|     6|   Brown|              2|       2010|         50|      |    -1|     null|   null|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [0]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left").show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
|     6|   Brown|              2|       2010|         50|      |    -1|     null|   null|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [0]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right").show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|  null|    null|           null|       null|       null|  null|  null|    Sales|     30|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [0]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi").show()

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|
|     3|Williams|              1|       2010|         10|     M|  1000|
|     4|   Jones|              2|       2005|         10|     F|  2000|
|     2|    Rose|              1|       2010|         20|     M|  4000|
|     5|   Brown|              2|       2010|         40|      |    -1|
+------+--------+---------------+-----------+-----------+------+------+



In [0]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti").show()

+------+-----+---------------+-----------+-----------+------+------+
|emp_id| name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|     6|Brown|              2|       2010|         50|      |    -1|
+------+-----+---------------+-----------+-----------+------+------+



In [0]:
# Execute Pyspark - sparkSQL joins & Applying Functions in a Pandas DataFrame
# Spark SQL joins
empDF.createOrReplaceTempView("EmployeeView")
deptDF.createOrReplaceTempView("DeptView")
spark.sql("SELECT * from EmployeeView").show()
spark.sql("SELECT * from DeptView").show()

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|
|     2|    Rose|              1|       2010|         20|     M|  4000|
|     3|Williams|              1|       2010|         10|     M|  1000|
|     4|   Jones|              2|       2005|         10|     F|  2000|
|     5|   Brown|              2|       2010|         40|      |    -1|
|     6|   Brown|              2|       2010|         50|      |    -1|
+------+--------+---------------+-----------+-----------+------+------+

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+



In [0]:
spark.sql("SELECT * from DeptView DeptDF JOIN EmployeeView EmpDF ON empDF.emp_dept_id ==  deptDF.dept_id ").show()

+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|dept_name|dept_id|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|  Finance|     10|     1|   Smith|             -1|       2018|         10|     M|  3000|
|  Finance|     10|     3|Williams|              1|       2010|         10|     M|  1000|
|  Finance|     10|     4|   Jones|              2|       2005|         10|     F|  2000|
|Marketing|     20|     2|    Rose|              1|       2010|         20|     M|  4000|
|       IT|     40|     5|   Brown|              2|       2010|         40|      |    -1|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+



In [0]:
spark.sql("SELECT * from DeptView DeptDF LEFT JOIN EmployeeView EmpDF ON empDF.emp_dept_id ==  deptDF.dept_id ").show()

+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|dept_name|dept_id|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|  Finance|     10|     4|   Jones|              2|       2005|         10|     F|  2000|
|  Finance|     10|     3|Williams|              1|       2010|         10|     M|  1000|
|  Finance|     10|     1|   Smith|             -1|       2018|         10|     M|  3000|
|Marketing|     20|     2|    Rose|              1|       2010|         20|     M|  4000|
|    Sales|     30|  null|    null|           null|       null|       null|  null|  null|
|       IT|     40|     5|   Brown|              2|       2010|         40|      |    -1|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+



In [0]:
spark.sql("SELECT * from DeptView DeptDF RIGHT JOIN EmployeeView EmpDF ON empDF.emp_dept_id ==  deptDF.dept_id ").show()

+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|dept_name|dept_id|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|  Finance|     10|     1|   Smith|             -1|       2018|         10|     M|  3000|
|Marketing|     20|     2|    Rose|              1|       2010|         20|     M|  4000|
|  Finance|     10|     3|Williams|              1|       2010|         10|     M|  1000|
|  Finance|     10|     4|   Jones|              2|       2005|         10|     F|  2000|
|       IT|     40|     5|   Brown|              2|       2010|         40|      |    -1|
|     null|   null|     6|   Brown|              2|       2010|         50|      |    -1|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+



In [0]:
spark.sql("SELECT * from DeptView DeptDF FULL OUTER JOIN EmployeeView EmpDF ON empDF.emp_dept_id ==  deptDF.dept_id ").show()

+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|dept_name|dept_id|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|  Finance|     10|     1|   Smith|             -1|       2018|         10|     M|  3000|
|  Finance|     10|     3|Williams|              1|       2010|         10|     M|  1000|
|  Finance|     10|     4|   Jones|              2|       2005|         10|     F|  2000|
|Marketing|     20|     2|    Rose|              1|       2010|         20|     M|  4000|
|    Sales|     30|  null|    null|           null|       null|       null|  null|  null|
|       IT|     40|     5|   Brown|              2|       2010|         40|      |    -1|
|     null|   null|     6|   Brown|              2|       2010|         50|      |    -1|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+



In [0]:
spark.sql("SELECT * from DeptView DeptDF LEFT ANTI JOIN EmployeeView EmpDF ON empDF.emp_dept_id ==  deptDF.dept_id ").show()

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|    Sales|     30|
+---------+-------+



In [0]:
spark.sql("SELECT * from DeptView DeptDF LEFT SEMI JOIN EmployeeView EmpDF ON empDF.emp_dept_id ==  deptDF.dept_id ").show()

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|       IT|     40|
+---------+-------+



In [0]:
# Applying Functions in a Pandas DataFrame
import pyspark.pandas as ps
pandasdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pser):
    return pser + 1 
pandasdf.apply(pandas_plus)

Unnamed: 0,a,b
0,2,5
1,3,6
2,4,7


In [0]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType

@pandas_udf(DoubleType())
def square_udf(age_series: pd.Series) -> pd.Series:
    return age_series.apply(lambda x: x ** 2)

# Apply the Pandas UDF to a column
df = df.withColumn("AgeSquared", square_udf(df["Age"]))
df.show()

+-------+---+------+------+----------+
|   Name|Age|Gender|Salary|AgeSquared|
+-------+---+------+------+----------+
|Mitushi| 22|     F|  1000|     484.0|
|Vishesh| 24|     M|  2000|     576.0|
|Tanisha| 22|     F|  3000|     484.0|
| Zamran| 38|     M|  5000|    1444.0|
+-------+---+------+------+----------+

