1. PySpark Setup & Initialization

In [1]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("BotCampus Intermediate Session") \
    .master("local[*]") \
    .getOrCreate()

# Load starter data
data = [("Ananya", "Bangalore", 24),
        ("Ravi", "Hyderabad", 28),
        ("Kavya", "Delhi", 22),
        ("Meena", "Chennai", 25)]
columns = ["name", "city", "age"]

df = spark.createDataFrame(data, columns)
df.show()


+------+---------+---+
|  name|     city|age|
+------+---------+---+
|Ananya|Bangalore| 24|
|  Ravi|Hyderabad| 28|
| Kavya|    Delhi| 22|
| Meena|  Chennai| 25|
+------+---------+---+



2. RDDs & Transformations

In [2]:
# Create feedback RDD
feedback = spark.sparkContext.parallelize([
    "Ravi from Bangalore loved the mobile app",
    "Meena from Delhi reported poor response time",
    "Ajay from Pune liked the delivery speed",
    "Ananya from Hyderabad had an issue with UI",
    "Rohit from Mumbai gave positive feedback"
])


In [3]:
# a) Count total number of words
word_count = feedback.flatMap(lambda line: line.split()).count()
print("Total words:", word_count)

Total words: 35


In [4]:
# b) Top 3 most common words
from collections import Counter
words = feedback.flatMap(lambda line: line.lower().split()).collect()
top3 = Counter(words).most_common(3)
print("Top 3 common words:", top3)

Top 3 common words: [('from', 5), ('the', 2), ('ravi', 1)]


In [5]:
# c) Remove stop words
stop_words = {"from", "with", "the", "an", "had", "and", "of"}
filtered_words = [word for word in words if word not in stop_words]

In [None]:
# d) Dictionary of word → count
word_dict = dict(Counter(filtered_words))
print("Word frequency without stop words:", word_dict)

3. DataFrames Transformations

In [6]:
from pyspark.sql.functions import when, avg, rank, col, upper
from pyspark.sql.window import Window

# Create DataFrame
scores = [
    ("Ravi", "Math", 88),
    ("Ananya", "Science", 92),
    ("Kavya", "English", 79),
    ("Ravi", "English", 67),
    ("Neha", "Math", 94),
    ("Meena", "Science", 85)
]
columns = ["name", "subject", "score"]
df_scores = spark.createDataFrame(scores, columns)

In [7]:
# a) Add grade column
df_scores = df_scores.withColumn("grade", when(col("score") >= 90, "A")
                                  .when((col("score") >= 80), "B")
                                  .when((col("score") >= 70), "C")
                                  .otherwise("D"))

In [8]:
# b) Average score per subject
df_scores.groupBy("subject").agg(avg("score").alias("avg_score")).show()

+-------+---------+
|subject|avg_score|
+-------+---------+
|Science|     88.5|
|   Math|     91.0|
|English|     73.0|
+-------+---------+



In [9]:
# c) Classify subject difficulty
df_scores = df_scores.withColumn("difficulty", when(col("subject").isin("Math", "Science"), "Difficult").otherwise("Easy"))

In [10]:
# d) Rank students per subject
windowSpec = Window.partitionBy("subject").orderBy(col("score").desc())
df_scores = df_scores.withColumn("rank", rank().over(windowSpec))

In [11]:
# e) UDF to format names to uppercase
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def to_upper(name):
    return name.upper()

upper_udf = udf(to_upper, StringType())
df_scores = df_scores.withColumn("name_upper", upper_udf(col("name")))
df_scores.show()

+------+-------+-----+-----+----------+----+----------+
|  name|subject|score|grade|difficulty|rank|name_upper|
+------+-------+-----+-----+----------+----+----------+
| Kavya|English|   79|    C|      Easy|   1|     KAVYA|
|  Ravi|English|   67|    D|      Easy|   2|      RAVI|
|  Neha|   Math|   94|    A| Difficult|   1|      NEHA|
|  Ravi|   Math|   88|    B| Difficult|   2|      RAVI|
|Ananya|Science|   92|    A| Difficult|   1|    ANANYA|
| Meena|Science|   85|    B| Difficult|   2|     MEENA|
+------+-------+-----+-----+----------+----+----------+



4. Ingest CSV & JSON Save to Parquet

Load

In [19]:
with open("students.csv", "w") as f:
    f.write("""id,name,department,city,salary
1,Amit,IT,Bangalore,78000
2,Kavya,HR,Chennai,62000
3,Arjun,Finance,Hyderabad,55000
""")


In [20]:
df_csv = spark.read.option("header", True).csv("students.csv")
df_csv.show()



+---+-----+----------+---------+------+
| id| name|department|     city|salary|
+---+-----+----------+---------+------+
|  1| Amit|        IT|Bangalore| 78000|
|  2|Kavya|        HR|  Chennai| 62000|
|  3|Arjun|   Finance|Hyderabad| 55000|
+---+-----+----------+---------+------+



In [22]:
with open("employee_nested.json", "w") as f:
    f.write("""
[
  {
    "id": 101,
    "name": "Sneha",
    "address": {
      "city": "Mumbai",
      "pincode": 400001
    },
    "skills": ["Python", "Spark"]
  }
]
""")



In [23]:
df_json = spark.read.option("multiline", True).json("employee_nested.json")
df_json.printSchema()
df_json.show(truncate=False)

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pincode: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------+---+-----+---------------+
|address         |id |name |skills         |
+----------------+---+-----+---------------+
|{Mumbai, 400001}|101|Sneha|[Python, Spark]|
+----------------+---+-----+---------------+



In [24]:
#flatten Json
from pyspark.sql.functions import explode

df_flat = df_json.select(
    "id", "name",
    col("address.city").alias("city"),
    col("address.pincode").alias("pincode"),
    explode("skills").alias("skill")
)
df_flat.show(truncate=False)

+---+-----+------+-------+------+
|id |name |city  |pincode|skill |
+---+-----+------+-------+------+
|101|Sneha|Mumbai|400001 |Python|
|101|Sneha|Mumbai|400001 |Spark |
+---+-----+------+-------+------+



In [25]:
#write to parquet
df_csv.write.mode("overwrite").parquet("/tmp/output/csv_output")
df_flat.write.mode("overwrite").parquet("/tmp/output/json_output")

Spark SQL Temp Views & Queries

In [26]:
# Create temp view
df_scores.createOrReplaceTempView("exam_scores")

In [27]:
# a) Top scorer per subject
spark.sql("""
SELECT subject, name, MAX(score) as max_score
FROM exam_scores
GROUP BY subject, name
ORDER BY subject, max_score DESC
""").show()

+-------+------+---------+
|subject|  name|max_score|
+-------+------+---------+
|English| Kavya|       79|
|English|  Ravi|       67|
|   Math|  Neha|       94|
|   Math|  Ravi|       88|
|Science|Ananya|       92|
|Science| Meena|       85|
+-------+------+---------+



In [28]:
# b) Count of students per grade
spark.sql("SELECT grade, COUNT(*) as count FROM exam_scores GROUP BY grade").show()

+-----+-----+
|grade|count|
+-----+-----+
|    B|    2|
|    C|    1|
|    A|    2|
|    D|    1|
+-----+-----+



In [29]:
# c) Students with multiple subjects
spark.sql("""
SELECT name, COUNT(DISTINCT subject) as subjects
FROM exam_scores
GROUP BY name
HAVING subjects > 1
""").show()

+----+--------+
|name|subjects|
+----+--------+
|Ravi|       2|
+----+--------+



In [30]:
# d) Subjects with average score above 85
spark.sql("""
SELECT subject, AVG(score) as avg_score
FROM exam_scores
GROUP BY subject
HAVING avg_score > 85
""").show()

+-------+---------+
|subject|avg_score|
+-------+---------+
|Science|     88.5|
|   Math|     91.0|
+-------+---------+



Join attendance

In [32]:
attendance = [("Ravi", 22), ("Ananya", 18), ("Meena", 25), ("Kavya", 19), ("Neha", 23)]
df_attendance = spark.createDataFrame(attendance, ["name", "days_present"])

df_joined = df_scores.join(df_attendance, on="name", how="left")

# Downgrade grade if days_present < 20
df_final = df_joined.withColumn("adjusted_grade",
    when(col("days_present") < 20,
         when(col("grade") == "A", "B")
        .when(col("grade") == "B", "C")
        .when(col("grade") == "C", "D")
        .otherwise("D"))
    .otherwise(col("grade"))
)
df_final.select("name", "subject", "score", "grade", "days_present", "adjusted_grade").show()

+------+-------+-----+-----+------------+--------------+
|  name|subject|score|grade|days_present|adjusted_grade|
+------+-------+-----+-----+------------+--------------+
|Ananya|Science|   92|    A|          18|             B|
|  Ravi|   Math|   88|    B|          22|             B|
| Kavya|English|   79|    C|          19|             D|
|  Ravi|English|   67|    D|          22|             D|
|  Neha|   Math|   94|    A|          23|             A|
| Meena|Science|   85|    B|          25|             B|
+------+-------+-----+-----+------------+--------------+



Partitioned Load (Full + Incremental)

In [33]:
# Initial Load
df_scores.write.mode("overwrite").partitionBy("subject").parquet("/tmp/scores/")

In [34]:
# Incremental Load
incremental = [("Meena", "Math", 93)]
df_inc = spark.createDataFrame(incremental, ["name", "subject", "score"])
df_inc.write.mode("append").partitionBy("subject").parquet("/tmp/scores/")

In [35]:
# List all folders (only works in local filesystem or with Hadoop API)
import os
print("Folders in /tmp/scores:")
print(os.listdir("/tmp/scores"))

Folders in /tmp/scores:
['._SUCCESS.crc', 'subject=Science', 'subject=English', '_SUCCESS', 'subject=Math']


In [36]:
# Read only Math partition
df_math = spark.read.parquet("/tmp/scores/subject=Math")
df_math.show()

+-----+-----+-----+----------+----+----------+
| name|score|grade|difficulty|rank|name_upper|
+-----+-----+-----+----------+----+----------+
| Neha|   94|    A| Difficult|   1|      NEHA|
| Ravi|   88|    B| Difficult|   2|      RAVI|
|Meena|   93| NULL|      NULL|NULL|      NULL|
+-----+-----+-----+----------+----+----------+



7. ETL: Clean, Transform, Load

In [37]:
with open("raw_employees.csv", "w") as f:
    f.write("""emp_id,name,dept,salary,bonus
1,Arjun,IT,78000,5000
2,Kavya,HR,62000,
3,Sneha,Finance,55000,3000
""")


In [38]:
# Load raw CSV with header
df_raw = spark.read.option("header", True).csv("raw_employees.csv")

In [39]:
# Fill missing bonus with 2000
df_clean = df_raw.fillna({"bonus": "2000"})

In [40]:
# Convert bonus & salary to IntegerType
from pyspark.sql.types import IntegerType
df_clean = df_clean.withColumn("salary", col("salary").cast(IntegerType()))
df_clean = df_clean.withColumn("bonus", col("bonus").cast(IntegerType()))

In [41]:
# Calculate total_ctc
df_clean = df_clean.withColumn("total_ctc", col("salary") + col("bonus"))

In [42]:
# Filter where total_ctc > 60000
df_filtered = df_clean.filter(col("total_ctc") > 60000)

In [43]:
# Save to Parquet and JSON
df_filtered.write.mode("overwrite").parquet("/tmp/final_etl/parquet")
df_filtered.write.mode("overwrite").json("/tmp/final_etl/json")

In [44]:
df.show()

+------+---------+---+
|  name|     city|age|
+------+---------+---+
|Ananya|Bangalore| 24|
|  Ravi|Hyderabad| 28|
| Kavya|    Delhi| 22|
| Meena|  Chennai| 25|
+------+---------+---+



<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>