In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Unit2Labs").getOrCreate()

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [None]:
df = spark.read.option("header",True).option("inferSchema",True).csv("/content/students.csv")

In [None]:
# 1. Pass (>=50) and Fail (<50)
pass_df = df.filter(F.col("Marks") >= 50)
fail_df = df.filter(F.col("Marks") < 50)

# show
print("Pass students:")
pass_df.show()
print("Fail students:")
fail_df.show()



Pass students:
+-----+-------+-----+
| Name|Subject|Marks|
+-----+-------+-----+
|Dhana|   Math|   78|
|Meena|Physics|   82|
| Ravi|Physics|   66|
|  anu|   Math|   92|
+-----+-------+-----+

Fail students:
+-----+-------+-----+
| Name|Subject|Marks|
+-----+-------+-----+
| Ravi|  Math |   46|
|Meena|   Math|   48|
+-----+-------+-----+



In [None]:
# 2. Subject-wise average marks
avg_by_subject = df.groupBy("Subject").agg(F.avg("Marks").alias("avg_marks"))
avg_by_subject.show()



+-------+-----------------+
|Subject|        avg_marks|
+-------+-----------------+
|  Math |             46.0|
|   Math|72.66666666666667|
|Physics|             74.0|
+-------+-----------------+



In [None]:
# 3. Topper (max marks) per subject - show Name and Marks
w = Window.partitionBy("Subject").orderBy(F.desc("Marks"))
topper = df.withColumn("rn", F.row_number().over(w)).filter(F.col("rn") == 1).select("Subject","Name","Marks")
topper.show()


+-------+-----+-----+
|Subject| Name|Marks|
+-------+-----+-----+
|   Math|  anu|   92|
|  Math | Ravi|   46|
|Physics|Meena|   82|
+-------+-----+-----+



In [None]:
tx = spark.read.option("header",True).option("inferSchema",True).csv("/content/transactions2.csv")

In [None]:
# 1. Total debit and credit per customer
totals = tx.groupBy("CustID", "Type").agg(F.sum("Amount").alias("total_amount"))
# Optionally pivot to have debit/credit in columns
totals_pivot = tx.groupBy("CustID").pivot("Type", ["debit","credit"]).sum("Amount").na.fill(0)
totals_pivot.show()


+------+-----+------+
|CustID|debit|credit|
+------+-----+------+
|  C003|    0| 78000|
|  C004|45000|     0|
|  C001|60000| 25000|
|  C002|15000| 51000|
+------+-----+------+



In [None]:
# 2. Customer with highest single transaction
highest_tx = tx.orderBy(F.desc("Amount")).limit(1)
highest_tx.show()


+------+------+------+
|CustID|Amount|  Type|
+------+------+------+
|  C003| 78000|credit|
+------+------+------+



In [None]:
# 3. Suspicious transactions (Amount > 50000)
suspicious = tx.filter(F.col("Amount") > 50000)
suspicious.show()


+------+------+------+
|CustID|Amount|  Type|
+------+------+------+
|  C001| 60000| debit|
|  C003| 78000|credit|
|  C002| 51000|credit|
+------+------+------+



In [None]:

att = spark.read.option("header",True).option("inferSchema",True).csv("/content/attendance2.csv")


In [None]:
# total present and absent days per employee
agg_att = att.groupBy("EmpID").agg(
    F.count("*").alias("total_days"),
    F.sum(F.when(F.col("Status") == "P", 1).otherwise(0)).alias("present_days"),
    F.sum(F.when(F.col("Status") == "A", 1).otherwise(0)).alias("absent_days")
).withColumn("attendance_pct", F.round(100 * F.col("present_days") / F.col("total_days"), 2))

agg_att.show()


+-----+----------+------------+-----------+--------------+
|EmpID|total_days|present_days|absent_days|attendance_pct|
+-----+----------+------------+-----------+--------------+
| E002|         2|           1|          1|          50.0|
| E003|         1|           1|          0|         100.0|
| E001|         3|           3|          0|         100.0|
+-----+----------+------------+-----------+--------------+



In [None]:
# employees with attendance < 75%
low_att = agg_att.filter(F.col("attendance_pct") < 75)
low_att.show()


+-----+----------+------------+-----------+--------------+
|EmpID|total_days|present_days|absent_days|attendance_pct|
+-----+----------+------------+-----------+--------------+
| E002|         2|           1|          1|          50.0|
+-----+----------+------------+-----------+--------------+



In [None]:
# most regular employee(s) - highest attendance_pct
max_pct = agg_att.agg(F.max("attendance_pct").alias("max_pct")).collect()[0]["max_pct"]
most_regular = agg_att.filter(F.col("attendance_pct") == max_pct)
most_regular.show()


+-----+----------+------------+-----------+--------------+
|EmpID|total_days|present_days|absent_days|attendance_pct|
+-----+----------+------------+-----------+--------------+
| E003|         1|           1|          0|         100.0|
| E001|         3|           3|          0|         100.0|
+-----+----------+------------+-----------+--------------+



In [None]:
nums = spark.read.option("header",True).option("inferSchema",True).csv("/content/numbers.csv")

In [None]:

# even and odd
even = nums.filter((F.col("Value") % 2) == 0)
odd  = nums.filter((F.col("Value") % 2) != 0)
print("Evens:"); even.show()
print("Odds:");  odd.show()



Evens:
+-----+
|Value|
+-----+
|    2|
|    4|
|   20|
+-----+

Odds:
+-----+
|Value|
+-----+
|    3|
|   17|
|  121|
+-----+



In [None]:
# stats: max, min, sum, avg
stats = nums.agg(
    F.max("Value").alias("max_val"),
    F.min("Value").alias("min_val"),
    F.sum("Value").alias("sum_val"),
    F.round(F.avg("Value"),2).alias("avg_val")
)
stats.show()


+-------+-------+-------+-------+
|max_val|min_val|sum_val|avg_val|
+-------+-------+-------+-------+
|    121|      2|    167|  27.83|
+-------+-------+-------+-------+



In [None]:
# prime number check via UDF (suitable for teaching; not best for huge datasets)
from pyspark.sql.types import BooleanType
def is_prime(n):
    if n is None or n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    i = 3
    while i * i <= n:
        if n % i == 0:
            return False
        i += 2
    return True

is_prime_udf = F.udf(is_prime, BooleanType())
primes = nums.filter(is_prime_udf(F.col("Value")))
print("Primes:")
primes.show()



Primes:
+-----+
|Value|
+-----+
|    2|
|    3|
|   17|
+-----+



In [None]:

rdd = nums.rdd.map(lambda row: row["Value"])
total_sum = rdd.reduce(lambda a,b: a+b)
even_count = rdd.filter(lambda x: x % 2 == 0).count()
print("sum=", total_sum, "even_count=", even_count)



sum= 167 even_count= 3


In [None]:
people = spark.read.option("header",True).option("inferSchema",True).csv("/content/people.csv")

In [None]:
# 1. Categorize
people_cat = people.withColumn(
    "Category",
    F.when(F.col("Age") < 18, "Minor")
     .when((F.col("Age") >= 18) & (F.col("Age") <= 59), "Adult")
     .otherwise("Senior")
)
people_cat.show()


+-----+---+--------+
| Name|Age|Category|
+-----+---+--------+
|Dhana| 29|   Adult|
| Ravi| 16|   Minor|
|  Anu| 64|  Senior|
|Meena| 45|   Adult|
|Kumar| 12|   Minor|
+-----+---+--------+



In [None]:

# 2. Count per category
people_cat.groupBy("Category").count().show()


+--------+-----+
|Category|count|
+--------+-----+
|  Senior|    1|
|   Minor|    2|
|   Adult|    2|
+--------+-----+



In [None]:
# 3. Oldest and youngest person
oldest = people.orderBy(F.desc("Age")).limit(1)
youngest = people.orderBy(F.asc("Age")).limit(1)
print("Oldest:"); oldest.show()
print("Youngest:"); youngest.show()


Oldest:
+----+---+
|Name|Age|
+----+---+
| Anu| 64|
+----+---+

Youngest:
+-----+---+
| Name|Age|
+-----+---+
|Kumar| 12|
+-----+---+



In [None]:

sales = spark.read.option("header",True).option("inferSchema",True).csv("/content/sales.csv")


In [None]:
# 1. Revenue per product
sales_with_rev = sales.withColumn("Revenue", F.col("Quantity") * F.col("Price"))
rev_per_product = sales_with_rev.groupBy("Product","Category").agg(F.sum("Revenue").alias("total_revenue"), F.sum("Quantity").alias("total_qty"))
rev_per_product.orderBy(F.desc("total_revenue")).show()


+--------+------------+-------------+---------+
| Product|    Category|total_revenue|total_qty|
+--------+------------+-------------+---------+
|    Rice|   Groceries|      20000.0|      500|
| Shampoo|PersonalCare|       9600.0|       80|
|   Sugar|   Groceries|       2700.0|       60|
|    Soap|PersonalCare|       2460.0|      120|
|Notebook|  Stationery|        750.0|       30|
+--------+------------+-------------+---------+



In [None]:

# 2. Best-selling product and category (by quantity)
best_product = rev_per_product.orderBy(F.desc("total_qty")).limit(1)
best_product.show()

best_category = sales.groupBy("Category").agg(F.sum("Quantity").alias("category_qty")).orderBy(F.desc("category_qty")).limit(1)
best_category.show()


+-------+---------+-------------+---------+
|Product| Category|total_revenue|total_qty|
+-------+---------+-------------+---------+
|   Rice|Groceries|      20000.0|      500|
+-------+---------+-------------+---------+

+---------+------------+
| Category|category_qty|
+---------+------------+
|Groceries|         560|
+---------+------------+



In [None]:
# 3. Products with sales below 100 units (total quantity across dataset)
low_sales_products = rev_per_product.filter(F.col("total_qty") < 100)
low_sales_products.show()



+--------+------------+-------------+---------+
| Product|    Category|total_revenue|total_qty|
+--------+------------+-------------+---------+
|Notebook|  Stationery|        750.0|       30|
|   Sugar|   Groceries|       2700.0|       60|
| Shampoo|PersonalCare|       9600.0|       80|
+--------+------------+-------------+---------+

