<a href="https://colab.research.google.com/github/vaishali-senthil/Big-Data/blob/main/spark/spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install pyspark



In [2]:
import pyspark

lowest average rating movie-RDD

In [19]:
from pyspark import SparkConf, SparkContext

In [20]:
def loadMovieNames():
    movieNames = {}
    with open('u.item',encoding = "ISO-8859-1") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

In [21]:
def parseInput(line):
    fields = line.split()
    return (int(fields[1]), (float(fields[2]), 1.0))

In [24]:
if __name__ == "__main__":
    # The main script - create our SparkContext
    conf = SparkConf().setAppName("WorstMovies")
    sc = SparkContext.getOrCreate()

    # Load up our movie ID -> movie name lookup table
    movieNames = loadMovieNames()

    # Load up the raw u.data file
    lines = sc.textFile("u.data")

    # Convert to (movieID, (rating, 1.0))
    movieRatings = lines.map(parseInput)

    # Reduce to (movieID, (sumOfRatings, totalRatings))
    ratingTotalsAndCount = movieRatings.reduceByKey(lambda movie1, movie2: ( movie1[0] + movie2[0], movie1[1] + movie2[1] ) )

    # Map to (rating, averageRating)
    averageRatings = ratingTotalsAndCount.mapValues(lambda totalAndCount : totalAndCount[0] / totalAndCount[1])

    # Sort by average rating
    sortedMovies = averageRatings.sortBy(lambda x: x[1])

    # Take the top 10 results
    results = sortedMovies.take(40)

    # Print them out:
    for result in results:
        print(movieNames[result[0]], result[1])

Amityville: Dollhouse (1996) 1.0
Somebody to Love (1994) 1.0
Every Other Weekend (1990) 1.0
Homage (1995) 1.0
3 Ninjas: High Noon At Mega Mountain (1998) 1.0
Bird of Prey (1996) 1.0
Power 98 (1995) 1.0
Beyond Bedlam (1993) 1.0
Falling in Love Again (1980) 1.0
T-Men (1947) 1.0
Mighty, The (1998) 1.0
Venice/Venice (1992) 1.0
Bloody Child, The (1996) 1.0
Eye of Vichy, The (Oeil de Vichy, L') (1993) 1.0
Lashou shentan (1992) 1.0
Liebelei (1933) 1.0
Babyfever (1994) 1.0
Man from Down Under, The (1943) 1.0
Hungarian Fairy Tale, A (1987) 1.0
Vermont Is For Lovers (1992) 1.0
Gordy (1995) 1.0
Crude Oasis, The (1995) 1.0
Girl in the Cadillac (1995) 1.0
Quartier Mozart (1992) 1.0
Wend Kuuni (God's Gift) (1982) 1.0
The Courtyard (1995) 1.0
Chairman of the Board (1998) 1.0
King of New York (1990) 1.0
Symphonie pastorale, La (1946) 1.0
Mostro, Il (1994) 1.0
Mat' i syn (1997) 1.0
Shadows (Cienie) (1988) 1.0
JLG/JLG - autoportrait de décembre (1994) 1.0
To Cross the Rubicon (1991) 1.0
Pharaoh's Army (

lowest average rating movie-Dataframe

In [25]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

In [26]:
def loadMovieNames():
    movieNames = {}
    with open('u.item',encoding = "ISO-8859-1") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

In [27]:
def parseInput(line):
    fields = line.split()
    return Row(movieID = int(fields[1]), rating = float(fields[2]))

In [28]:

if __name__ == "__main__":
    # Create a SparkSession (the config bit is only for Windows!)
    spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

    # Load up our movie ID -> name dictionary
    movieNames = loadMovieNames()

    # Get the raw data
    lines = spark.sparkContext.textFile("u.data")
    # Convert it to a RDD of Row objects with (movieID, rating)
    movies = lines.map(parseInput)
    # Convert that to a DataFrame
    movieDataset = spark.createDataFrame(movies)

    # Compute average rating for each movieID
    averageRatings = movieDataset.groupBy("movieID").avg("rating")

    # Compute count of ratings for each movieID
    counts = movieDataset.groupBy("movieID").count()

    # Join the two together (We now have movieID, avg(rating), and count columns)
    averagesAndCounts = counts.join(averageRatings, "movieID")

    # Pull the top 10 results
    topTen = averagesAndCounts.orderBy("avg(rating)").take(10)

    # Print them out, converting movie ID's to names as we go.
    for movie in topTen:
        print (movieNames[movie[0]], movie[1], movie[2])

    # Stop the session
    spark.stop()

Touki Bouki (Journey of the Hyena) (1973) 1 1.0
Amityville: Dollhouse (1996) 3 1.0
Quartier Mozart (1992) 1 1.0
Power 98 (1995) 1 1.0
Amityville: A New Generation (1993) 5 1.0
Lotto Land (1995) 1 1.0
Hostile Intentions (1994) 1 1.0
Falling in Love Again (1980) 2 1.0
The Courtyard (1995) 1 1.0
Bloody Child, The (1996) 1 1.0
