In [1]:
from pyspark import SparkConf, SparkContext

# Initialize Spark
conf = SparkConf().setAppName("MovieAvgRating").setMaster("local")
sc = SparkContext(conf=conf)

# Sample data (simulating lines in a CSV file)
ratings_data = [
    "1,101,4.0,881250949",
    "1,102,3.5,891717742",
    "2,101,5.0,881250949",
    "2,103,4.0,891717742",
    "3,104,2.0,891717742",
    "3,105,3.0,891717742"
]

# Parallelize dataset
rdd = sc.parallelize(ratings_data)

# Map step: Emit (movie_id, rating)
def map_function(line):
    fields = line.split(",")
    movie_id = fields[0]
    rating = float(fields[2])
    return (movie_id, rating)

mapped_rdd = rdd.map(map_function)

# Reduce step: Compute average rating
def avg_reduce(values):
    total = sum(values)
    count = len(values)
    return round(total / count, 2)

# Group ratings by movie
grouped_rdd = mapped_rdd.groupByKey()

# Apply average calculation
avg_ratings_rdd = grouped_rdd.mapValues(lambda ratings: avg_reduce(list(ratings)))

# Collect and print result
for movie_id, avg_rating in avg_ratings_rdd.collect():
    print(f"Movie {movie_id} has an average rating of {avg_rating}")

sc.stop()


Movie 1 has an average rating of 3.75
Movie 2 has an average rating of 4.5
Movie 3 has an average rating of 2.5


Viva-Style Explanation
Phase	Function	Explanation
Map	map_function	Converts each line into (movie_id, rating) pairs.
Shuffle	groupByKey()	Groups all ratings by movie_id.
Reduce	mapValues() with avg_reduce	Calculates average rating from list of ratings for each movie.
Output	collect()	Gathers final results for printing or saving.
🧠 Big Data Concepts Applied:

    RDDs: Distributed datasets that support fault-tolerant, parallel operations.

    MapReduce Pattern: Uses Map → Shuffle → Reduce stages.

    Parallelism: Efficiently computes average ratings even with millions of rows.

    Fault Tolerance: Spark automatically handles node failures.

    Scalability: Runs efficiently across large clusters.

🚀 To Scale with Real Hadoop Cluster:

    Put ratings file on HDFS (e.g. hdfs://input/movies.csv)

    Replace rdd = sc.parallelize(...) with:

rdd = sc.textFile("hdfs://input/movies.csv")

Run via:

spark-submit movie_avg_rating.py