In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=4b9780870db051fde985fd4a293a9a9764753e8075647007ef6d0082a7d2fa5e
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("emp_join").getOrCreate()

# Read the CSV files into DataFrames
rdd1 = spark.read.csv("emp1.txt", header=True, inferSchema=True)
rdd2 = spark.read.csv("emp2.txt", header=True, inferSchema=True)


Q1

In [None]:
# Perform an inner join on emp_id
result = rdd1.join(rdd2, on=["emp_id"])

# Display the result
result.show()

+------+--------+--------------------+----------+--------------+
|emp_id|emp_name|            emp_dept|emp_salary|emp_experience|
+------+--------+--------------------+----------+--------------+
|     1|    John|   Digital Marketing|     50000|             2|
|     2|   Alice|Software Development|     65000|             5|
|     3|     Bob|        Data Science|     70000|             3|
|     4| Charlie|     Human Resources|     55000|             1|
|     5|     Eve|             Finance|     60000|             4|
|     6| Michael|Software Development|     80000|             6|
|     7|  Olivia|        Data Science|     90000|             3|
|     8|  Sophia|             Finance|     75000|             2|
|     9|    Liam|        Data Science|     55000|             4|
|    10|   Emily|Software Development|     70000|             5|
|    11|   James|             Finance|     60000|             3|
|    12|Benjamin|   Digital Marketing|     45000|             1|
|    13|    Emma|Software

Q2

In [None]:
# Perform a left outer join on emp_id
result = rdd1.join(rdd2, on=["emp_id"], how="leftouter")

# Display the result
result.show()

+------+--------+--------------------+----------+--------------+
|emp_id|emp_name|            emp_dept|emp_salary|emp_experience|
+------+--------+--------------------+----------+--------------+
|     1|    John|   Digital Marketing|     50000|             2|
|     2|   Alice|Software Development|     65000|             5|
|     3|     Bob|        Data Science|     70000|             3|
|     4| Charlie|     Human Resources|     55000|             1|
|     5|     Eve|             Finance|     60000|             4|
|     6| Michael|Software Development|     80000|             6|
|     7|  Olivia|        Data Science|     90000|             3|
|     8|  Sophia|             Finance|     75000|             2|
|     9|    Liam|        Data Science|     55000|             4|
|    10|   Emily|Software Development|     70000|             5|
|    11|   James|             Finance|     60000|             3|
|    12|Benjamin|   Digital Marketing|     45000|             1|
|    13|    Emma|Software

Q3

In [None]:
# Multiply emp_salary with emp_experience
result = rdd1.join(rdd2, on=["emp_id"])
result = result.select(rdd1["emp_id"], rdd1["emp_name"], rdd1["emp_dept"], rdd2["emp_salary"] * rdd2["emp_experience"].cast("double").alias("total_salary"))

# Display the result
result.show()

+------+--------+--------------------+-------------------------------------------------------------+
|emp_id|emp_name|            emp_dept|(emp_salary * CAST(emp_experience AS DOUBLE) AS total_salary)|
+------+--------+--------------------+-------------------------------------------------------------+
|     1|    John|   Digital Marketing|                                                     100000.0|
|     2|   Alice|Software Development|                                                     325000.0|
|     3|     Bob|        Data Science|                                                     210000.0|
|     4| Charlie|     Human Resources|                                                      55000.0|
|     5|     Eve|             Finance|                                                     240000.0|
|     6| Michael|Software Development|                                                     480000.0|
|     7|  Olivia|        Data Science|                                                     

Q4

In [None]:
from pyspark.sql import SparkSession
from pyspark.broadcast import Broadcast

# SparkSession
spark = SparkSession.builder.appName("broadcast_join").getOrCreate()

# Read the CSV files into DataFrames
rdd1 = spark.read.csv("emp1.txt", header=True, inferSchema=True)
rdd2 = spark.read.csv("emp2.txt", header=True, inferSchema=True)

# Broadcast the first RDD
broadcast_rdd1 = spark.sparkContext.broadcast(rdd1.rdd.map(lambda x: (x["emp_id"], x)).collectAsMap())

# Perform a broadcast join on the emp_id column
result = rdd2.rdd.map(lambda x: (x["emp_id"], x["emp_salary"], broadcast_rdd1.value.get(x["emp_id"], {"emp_name": None, "emp_dept": None})["emp_name"], broadcast_rdd1.value.get(x["emp_id"], {"emp_name": None, "emp_dept": None})["emp_dept"])).toDF(["emp_id", "emp_salary", "emp_name", "emp_dept"])

# Display the result
result.show()

+------+----------+--------+--------------------+
|emp_id|emp_salary|emp_name|            emp_dept|
+------+----------+--------+--------------------+
|     1|     50000|    John|   Digital Marketing|
|     2|     65000|   Alice|Software Development|
|     3|     70000|     Bob|        Data Science|
|     4|     55000| Charlie|     Human Resources|
|     5|     60000|     Eve|             Finance|
|     6|     80000| Michael|Software Development|
|     7|     90000|  Olivia|        Data Science|
|     8|     75000|  Sophia|             Finance|
|     9|     55000|    Liam|        Data Science|
|    10|     70000|   Emily|Software Development|
|    11|     60000|   James|             Finance|
|    12|     45000|Benjamin|   Digital Marketing|
|    13|     85000|    Emma|Software Development|
|    14|     75000|     Ava|        Data Science|
|    15|     70000|Isabella|     Human Resources|
|    16|     80000|   Lucas|             Finance|
|    17|     55000|   Avery|Software Development|


Q5

In [None]:
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
rdd = sc.parallelize(range(1, 11))

# Define accumulator
accumulator = sc.accumulator(0)
def update_acc(x):
    accumulator.add(x)
rdd.foreach(update_acc)

# Print final value of accumulator
print("\nSum of Values using Accumulator:", accumulator.value)


Sum of Values using Accumulator: 55
