# 1) Colab setup — install Java, Spark binary, PySpark, findspark

In [None]:
# Install Java
!apt-get install openjdk-11-jdk -y

# Install PySpark
!pip install pyspark


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
openjdk-11-jdk is already the newest version (11.0.28+6-1ubuntu1~22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 38 not upgraded.


In [None]:
# Download Spark (prebuilt for Hadoop 3)
!wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

# Extract it
!tar xvf spark-3.5.0-bin-hadoop3.tgz


--2025-10-03 23:45:02--  https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400395283 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.0-bin-hadoop3.tgz.1’


2025-10-03 23:54:00 (728 KB/s) - ‘spark-3.5.0-bin-hadoop3.tgz.1’ saved [400395283/400395283]

spark-3.5.0-bin-hadoop3/
spark-3.5.0-bin-hadoop3/kubernetes/
spark-3.5.0-bin-hadoop3/kubernetes/tests/
spark-3.5.0-bin-hadoop3/kubernetes/tests/pyfiles.py
spark-3.5.0-bin-hadoop3/kubernetes/tests/decommissioning.py
spark-3.5.0-bin-hadoop3/kubernetes/tests/autoscale.py
spark-3.5.0-bin-hadoop3/kubernetes/tests/python_executable_check.py
spark-3.5.0-bin-hadoop3/kubernetes/tests/worker_memory_check.py
spark-3.5.0-bin-hadoop3/kubernetes/tests/py_container_checks.py
spark-3.5.0-bin-hadoop3

In [None]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
os.environ["PATH"] += ":/content/spark-3.5.0-bin-hadoop3/bin"


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("RDD_Practical") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()

sc = spark.sparkContext
print("SparkContext initialized:", sc)


SparkContext initialized: <SparkContext master=local[*] appName=RDD_Practical>


In [None]:
# Create RDD from Python list
data = [10, 20, 30, 40, 50]
rdd = sc.parallelize(data)
print(rdd.collect())  # Action to fetch data


[10, 20, 30, 40, 50]


In [None]:
# Suppose you have a text file 'sample.txt' in Colab
!echo -e "Hello\nWorld\nSpark" > sample.txt

text_rdd = sc.textFile("sample.txt")
print(text_rdd.collect())


In [None]:
# Create a CSV for demo
!echo -e "name,age\nAlice,30\nBob,25" > sample.csv

csv_rdd = sc.textFile("sample.csv") \
           .map(lambda line: line.split(","))  # Split by comma
print(csv_rdd.collect())


[['name', 'age'], ['Alice', '30'], ['Bob', '25']]


In [None]:
print("Count:", rdd.count())       # Number of elements
print("First:", rdd.first())       # First element
print("Max:", rdd.max())           # Maximum
print("Sum:", rdd.sum())           # Sum of all
print("Take 3:", rdd.take(3))      # Take first 3 elements


In [None]:
pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
print("GroupByKey:", pairs.groupByKey().mapValues(list).collect())
print("ReduceByKey:", pairs.reduceByKey(lambda x, y: x+y).collect())


GroupByKey: [('b', [2]), ('a', [1, 3])]
ReduceByKey: [('b', 2), ('a', 4)]


In [None]:
print("Partitions before:", rdd.getNumPartitions())

rdd2 = rdd.repartition(3)   # Increase partitions (shuffling occurs)
print("Partitions after repartition:", rdd2.getNumPartitions())

rdd3 = rdd2.coalesce(2)     # Reduce partitions (without shuffle)
print("Partitions after coalesce:", rdd3.getNumPartitions())


Partitions before: 2
Partitions after repartition: 3
Partitions after coalesce: 2


In [None]:
# Set shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "4")


In [None]:
# Broadcast variable (shared across all workers)
broadcastVar = sc.broadcast([1, 2, 3])
print(broadcastVar.value)


[1, 2, 3]


In [None]:
# Accumulator for counting
accum = sc.accumulator(0)

def add_to_acc(x):
    global accum
    accum += x

rdd.foreach(add_to_acc)
print("Accumulator value:", accum.value)


Accumulator value: 150


In [None]:
# RDD of tuples
rdd_data = sc.parallelize([("Alice", 30), ("Bob", 25)])
df = rdd_data.toDF(["Name", "Age"])
df.show()


+-----+---+
| Name|Age|
+-----+---+
|Alice| 30|
|  Bob| 25|
+-----+---+



In [None]:
!echo -e "File1\nData1" > file1.txt
!echo -e "File2\nData2" > file2.txt

multi_rdd = sc.textFile("file*.txt")
print(multi_rdd.collect())


In [None]:
empty_rdd = sc.emptyRDD()
print(empty_rdd.collect())  # []


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, udf
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("SparkSQL_Practical8").getOrCreate()


In [None]:
data = [(1,"Alice",23), (2,"Bob",30), (3,"Charlie",25), (4,"David",30)]
df = spark.createDataFrame(data, ["id","name","age"])
df.show()


In [None]:
df.filter(df.age > 25).show()
df.where(col("name") == "Alice").show()


+---+-----+---+
| id| name|age|
+---+-----+---+
|  2|  Bob| 30|
|  4|David| 30|
+---+-----+---+

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 23|
+---+-----+---+



In [None]:
df2 = df.withColumn("age_plus_5", col("age") + 5)
df2.show()


In [None]:
df3 = df.withColumnRenamed("name","full_name")
df3.show()


+---+---------+---+
| id|full_name|age|
+---+---------+---+
|  1|    Alice| 23|
|  2|      Bob| 30|
|  3|  Charlie| 25|
|  4|    David| 30|
+---+---------+---+



In [None]:
df4 = df.drop("age")
df4.show()


In [None]:
df.distinct().show()


+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 23|
|  2|    Bob| 30|
|  4|  David| 30|
|  3|Charlie| 25|
+---+-------+---+



In [None]:
df.groupBy("age").count().show()


In [None]:
dept = [(1,"HR"), (2,"IT"), (3,"Finance")]
df_dept = spark.createDataFrame(dept, ["id","dept"])

df_join = df.join(df_dept, on="id", how="inner")
df_join.show()


+---+-------+---+-------+
| id|   name|age|   dept|
+---+-------+---+-------+
|  1|  Alice| 23|     HR|
|  2|    Bob| 30|     IT|
|  3|Charlie| 25|Finance|
+---+-------+---+-------+



In [None]:
rdd = df.rdd

mapped = rdd.map(lambda x: (x[1], x[2]+1)).collect()
print("map:", mapped)

mapped_part = rdd.mapPartitions(lambda part: [(x[1], x[2]*2) for x in part]).collect()
print("mapPartitions:", mapped_part)


In [None]:
print("foreach output:")
df.rdd.foreach(lambda x: print(x))

print("foreachPartition output:")
df.rdd.foreachPartition(lambda part: [print("Partition:", list(part))])


In [None]:
sales = [("Alice","Jan",200),
         ("Alice","Feb",250),
         ("Bob","Jan",300),
         ("Bob","Feb",100)]

df_sales = spark.createDataFrame(sales, ["name","month","amount"])
pivoted = df_sales.groupBy("name").pivot("month").sum("amount")
pivoted.show()


+-----+---+---+
| name|Feb|Jan|
+-----+---+---+
|Alice|250|200|
|  Bob|100|300|
+-----+---+---+



In [None]:
df_a = spark.createDataFrame([(5,"Eve",28)], ["id","name","age"])
df_union = df.union(df_a)
df_union.show()


In [None]:
df_cached = df.cache()
print("Cached count:", df_cached.count())

from pyspark import StorageLevel
df_persisted = df.persist(StorageLevel.MEMORY_AND_DISK)
print("Persisted count:", df_persisted.count())


In [None]:
def greet(name):
    return "Hello " + name

greet_udf = udf(greet, StringType())
df_udf = df.withColumn("greeting", greet_udf(col("name")))
df_udf.show()


+---+-------+---+-------------+
| id|   name|age|     greeting|
+---+-------+---+-------------+
|  1|  Alice| 23|  Hello Alice|
|  2|    Bob| 30|    Hello Bob|
|  3|Charlie| 25|Hello Charlie|
|  4|  David| 30|  Hello David|
+---+-------+---+-------------+



# Practical 8: Spark Streaming

In [38]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder
  .appName("StreamingExample")
  .master("local[*]")
  .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

// Simulate input from socket (run `nc -lk 9999` in terminal)
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

// OutputMode: Append
val query1 = wordCounts.writeStream
  .outputMode("append")
  .format("console")
  .start()

// OutputMode: Complete
val query2 = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

// OutputMode: Update
val query3 = wordCounts.writeStream
  .outputMode("update")
  .format("console")
  .start()

query1.awaitTermination()


AnalysisException: [PATH_NOT_FOUND] Path does not exist: /content/input_dir.

In [None]:
val jsonDF = spark.readStream
  .schema("id INT, name STRING, age INT")
  .json("path/to/input/json_dir")  // place new JSON files here

val query = jsonDF.writeStream
  .format("console")
  .outputMode("append")
  .start()

query.awaitTermination()


In [None]:
val socketDF = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

socketDF.writeStream
  .format("console")
  .outputMode("append")
  .start()
  .awaitTermination()


In [None]:
val kafkaDF = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "jsonTopic")
  .load()

// Parse JSON
val parsed = kafkaDF.selectExpr("CAST(value AS STRING) as json")
  .select(from_json($"json", "id INT, name STRING").as("data"))
  .select("data.*")

parsed.writeStream
  .format("console")
  .outputMode("append")
  .start()
  .awaitTermination()


In [None]:
import org.apache.spark.sql.avro._

// Read from Kafka in Avro format
val avroDF = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "avroTopic")
  .load()

// Deserialize using from_avro()
val schema = """
  {"type":"record","name":"User","fields":[
    {"name":"id","type":"int"},
    {"name":"name","type":"string"}
  ]}
"""

val decoded = avroDF.select(from_avro($"value", schema).as("user"))
decoded.select("user.*").writeStream
  .format("console")
  .outputMode("append")
  .start()
  .awaitTermination()


In [None]:
val outputDF = decoded.select(to_avro(struct($"id", $"name")).as("value"))

outputDF.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "avroOutputTopic")
  .option("checkpointLocation", "/tmp/checkpoints")
  .start()
  .awaitTermination()


In [None]:
// Batch read
val batchDF = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "jsonTopic")
  .load()

batchDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .show(false)


In [None]:
# ---------------------------------------------------------
# Spark MLlib Practical 9: Estimator, Transformer, Param
# ---------------------------------------------------------
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Step 1: Start Spark Session
spark = SparkSession.builder \
    .appName("MLlib_Estimator_Transformer_Param") \
    .getOrCreate()

# Step 2: Create Sample Data
data = [
    (0.0, 1.0, 0.1, -1.0),
    (1.0, 2.0, 1.1, 1.0),
    (0.0, 2.0, 1.3, -0.5),
    (1.0, 3.0, 1.2, 1.3),
    (0.0, 3.0, 0.8, -0.7),
]
columns = ["label", "feature1", "feature2", "feature3"]

df = spark.createDataFrame(data, columns)

# Step 3: Transformer Example (VectorAssembler)
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"],
                            outputCol="features")
df_transformed = assembler.transform(df)

# Step 4: Transformer Example (StandardScaler)
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
scaler_model = scaler.fit(df_transformed)   # Estimator produces Transformer
df_scaled = scaler_model.transform(df_transformed)

# Step 5: Estimator Example (Logistic Regression)
lr = LogisticRegression(featuresCol="scaledFeatures", labelCol="label", maxIter=10)

# Step 6: Pipeline (combining everything)
pipeline = Pipeline(stages=[assembler, scaler, lr])

# Fit the pipeline (Estimator -> Model)
model = pipeline.fit(df)

# Step 7: Make Predictions
predictions = model.transform(df)
predictions.select("label", "features", "scaledFeatures", "prediction", "probability").show()

# Step 8: Working with Params
print("Logistic Regression Parameters:")
print(lr.explainParams())

print("\nCurrent Max Iterations Param:", lr.getOrDefault("maxIter"))
print("Changing maxIter to 20...")
lr.setMaxIter(20)
print("Updated Max Iterations Param:", lr.getOrDefault("maxIter"))


In [None]:
# ---------------------------------------------------------
# Practical 10: Spark HDFS and Amazon S3 File Processing
# ---------------------------------------------------------

from pyspark.sql import SparkSession

# Step 1: Start Spark Session
spark = SparkSession.builder \
    .appName("Spark_HDFS_S3_FileOps") \
    .getOrCreate()

# -------------------------
# HDFS FILE PROCESSING
# -------------------------

# Text File from HDFS
text_df = spark.read.text("hdfs://namenode:8020/user/data/sample.txt")
print("HDFS TEXT FILE:")
text_df.show(5, truncate=False)

# CSV from HDFS
csv_df = spark.read.option("header", "true").csv("hdfs://namenode:8020/user/data/sample.csv")
print("HDFS CSV FILE:")
csv_df.show(5)

# Parquet from HDFS
parquet_df = spark.read.parquet("hdfs://namenode:8020/user/data/sample.parquet")
print("HDFS PARQUET FILE:")
parquet_df.show(5)

# Avro from HDFS
avro_df = spark.read.format("avro").load("hdfs://namenode:8020/user/data/sample.avro")
print("HDFS AVRO FILE:")
avro_df.show(5)

# JSON from HDFS
json_df = spark.read.json("hdfs://namenode:8020/user/data/sample.json")
print("HDFS JSON FILE:")
json_df.show(5)

# -------------------------
# AMAZON S3 FILE PROCESSING
# -------------------------

# NOTE: You need to set AWS credentials in Spark/Hadoop config before using S3
# Example (use your keys):
# spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "YOUR_ACCESS_KEY")
# spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "YOUR_SECRET_KEY")
# spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")

# Text file from S3
s3_text_df = spark.read.text("s3a://my-bucket/data/sample.txt")
print("S3 TEXT FILE:")
s3_text_df.show(5, truncate=False)

# JSON file from S3
s3_json_df = spark.read.json("s3a://my-bucket/data/sample.json")
print("S3 JSON FILE:")
s3_json_df.show(5)

# CSV file from S3
s3_csv_df = spark.read.option("header", "true").csv("s3a://my-bucket/data/sample.csv")
print("S3 CSV FILE:")
s3_csv_df.show(5)

# Parquet file from S3
s3_parquet_df = spark.read.parquet("s3a://my-bucket/data/sample.parquet")
print("S3 PARQUET FILE:")
s3_parquet_df.show(5)

# Avro file from S3
s3_avro_df = spark.read.format("avro").load("s3a://my-bucket/data/sample.avro")
print("S3 AVRO FILE:")
s3_avro_df.show(5)

# -------------------------
# END OF FILE OPS
# -------------------------
