# New Section

In [1]:
!pip install pyspark




In [3]:
!spark-submit --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.1
      /_/
                        
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 11.0.28
Branch HEAD
Compiled by user heartsavior on 2024-02-15T11:24:58Z
Revision fd86f85e181fc2dc0f50a096855acf83a6cc5d9c
Url https://github.com/apache/spark
Type --help for more information.


In [20]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('practice').config('spark.sql.shuffle.partition','4').enableHiveSupport().getOrCreate()
print(spark.conf.get('spark.app.name'))
print(spark.conf.get('spark.app.id'))
print(spark.conf.get('spark.sql.shuffle.partition'))
# print(spark.sparkContext.getConf().getAll())

practice
local-1755544272361
4


In [24]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [41]:
data = [("Alice", 25), ("Bob", 30), ("Charlie", 29)]
df = spark.createDataFrame(data, ["name", "age"])
# Tasks:

# Add 5 to age → future_age

# Filter age > 27

# Rename name → full_name

# Group by age → count


In [42]:
#1.1
from pyspark.sql.functions import col
df1=df.withColumn("age",col('age')+5)
df1.show()

+-------+---+
|   name|age|
+-------+---+
|  Alice| 30|
|    Bob| 35|
|Charlie| 34|
+-------+---+



In [43]:
#1.2
df2=df.filter(col('age')>27)
df2.show()

+-------+---+
|   name|age|
+-------+---+
|    Bob| 30|
|Charlie| 29|
+-------+---+



In [44]:
#1.3
df3=df.withColumnRenamed('name','full_name')
df3.show()

+---------+---+
|full_name|age|
+---------+---+
|    Alice| 25|
|      Bob| 30|
|  Charlie| 29|
+---------+---+



In [45]:
#1.4
df4=df.groupby('age').count()
df4.show()

+---+-----+
|age|count|
+---+-----+
| 25|    1|
| 29|    1|
| 30|    1|
+---+-----+



In [47]:
df5=df.groupby(col('age')).agg(F.count('age')).alias('count_of_age')
df5.show()

+---+----------+
|age|count(age)|
+---+----------+
| 25|         1|
| 29|         1|
| 30|         1|
+---+----------+



In [58]:
employees = [(1, "Alice", 1), (2, "Bob", 2), (3, "Charlie", 2), (4, "David", 3)]
departments = [(1, "HR"), (2, "IT"), (3, "Finance")]
df_emp = spark.createDataFrame(employees, ["id", "name", "dept_id"])
df_dept = spark.createDataFrame(departments, ["dept_id", "dept_name"])
df_emp.show(),df_dept.show()

+---+-------+-------+
| id|   name|dept_id|
+---+-------+-------+
|  1|  Alice|      1|
|  2|    Bob|      2|
|  3|Charlie|      2|
|  4|  David|      3|
+---+-------+-------+

+-------+---------+
|dept_id|dept_name|
+-------+---------+
|      1|       HR|
|      2|       IT|
|      3|  Finance|
+-------+---------+



(None, None)

In [53]:
# Tasks:

# Do an inner join on dept_id.

# Do a left join, so employees without a department are retained.

# Register both DataFrames as SQL tables and run this query:

# SELECT dept_name, AVG(id) as avg_id
# FROM employee e JOIN department d ON e.dept_id = d.dept_id
# GROUP BY dept_name


In [55]:
#2.1
joined_df = df_emp.join(df_dept,on='dept_id',how='inner')
joined_df.show()


+-------+---+-------+---------+
|dept_id| id|   name|dept_name|
+-------+---+-------+---------+
|      1|  1|  Alice|       HR|
|      2|  2|    Bob|       IT|
|      2|  3|Charlie|       IT|
|      3|  4|  David|  Finance|
+-------+---+-------+---------+



In [60]:
#2.2
all_emp_df = df_emp.join(df_dept,on='dept_id',how='left')
all_emp_df.show()

+-------+---+-------+---------+
|dept_id| id|   name|dept_name|
+-------+---+-------+---------+
|      1|  1|  Alice|       HR|
|      2|  2|    Bob|       IT|
|      3|  4|  David|  Finance|
|      2|  3|Charlie|       IT|
+-------+---+-------+---------+



In [63]:
#2.3
df_emp.createOrReplaceTempView('employee')
df_dept.createOrReplaceTempView('department')
df=spark.sql("SELECT dept_name, AVG(id) as avg_id \
FROM employee e JOIN department d ON e.dept_id = d.dept_id \
GROUP BY dept_name ")
df.show()

+---------+------+
|dept_name|avg_id|
+---------+------+
|       HR|   1.0|
|  Finance|   4.0|
|       IT|   2.5|
+---------+------+



In [65]:
salaries = [
    (1, "Alice", 5000, 1),
    (2, "Bob", 6000, 1),
    (3, "Charlie", 7000, 2),
    (4, "David", 8000, 2),
    (5, "Eve", 9000, 3)
]
df_sal = spark.createDataFrame(salaries, ["id", "name", "salary", "dept_id"])
df_sal.show()

+---+-------+------+-------+
| id|   name|salary|dept_id|
+---+-------+------+-------+
|  1|  Alice|  5000|      1|
|  2|    Bob|  6000|      1|
|  3|Charlie|  7000|      2|
|  4|  David|  8000|      2|
|  5|    Eve|  9000|      3|
+---+-------+------+-------+



In [66]:
# Tasks:

# Use a Window function to rank employees by salary within each dept.

# Extract the top 2 salaries per department.

In [85]:
#3.1
from pyspark.sql.window import Window as w
from pyspark.sql import functions as F
windowSpec=w.partitionBy('dept_id').orderBy(F.desc('salary'))
df_rank = df_sal.withColumn("rank",F.dense_rank().over(windowSpec))
df_rank.show()

+---+-------+------+-------+----+
| id|   name|salary|dept_id|rank|
+---+-------+------+-------+----+
|  2|    Bob|  6000|      1|   1|
|  1|  Alice|  5000|      1|   2|
|  4|  David|  8000|      2|   1|
|  3|Charlie|  7000|      2|   2|
|  5|    Eve|  9000|      3|   1|
+---+-------+------+-------+----+



In [83]:
#3.1
df_rank=df_sal.withColumn('rank',F.rank().over(w.partitionBy('dept_id').orderBy(F.desc('salary'))))
df_rank.show()

+---+-------+------+-------+----+
| id|   name|salary|dept_id|rank|
+---+-------+------+-------+----+
|  2|    Bob|  6000|      1|   1|
|  1|  Alice|  5000|      1|   2|
|  4|  David|  8000|      2|   1|
|  3|Charlie|  7000|      2|   2|
|  5|    Eve|  9000|      3|   1|
+---+-------+------+-------+----+



In [90]:
#3.2
df_rank_top_2=df_rank.select('dept_id','salary').filter(col('rank')<=2).distinct()
df_rank_top_2.show()

+-------+------+
|dept_id|salary|
+-------+------+
|      1|  6000|
|      1|  5000|
|      2|  8000|
|      2|  7000|
|      3|  9000|
+-------+------+

