In [1]:
# import libraries from pyspark 
from pyspark import SparkConf, SparkContext

# set values for spark configuration
conf = SparkConf().setMaster("local").setAppName("data analysis")

# get (if already running) or create a spark context
sc = SparkContext.getOrCreate(conf=conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/25 00:00:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/25 00:00:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
# check (try) if spark context variable (sc) exists and print information about the spark context
try:
    sc
except NameError:
    print("spark context does not exist")
else:
    configurations = sc.getConf().getAll()
    for item in configurations: print(item)

('spark.master', 'local')
('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false')
('spark.driver.host', '192.168.2.118')
('spark.app.id', 'local-1740430858304')
('

In [3]:
sc

In [4]:
ratingsRDD = sc.textFile("./ratings.dat")

In [5]:
ratingsRDD.take(5)
# userid movieid rating timestamp

['1::1193::5::978300760',
 '1::661::3::978302109',
 '1::914::3::978301968',
 '1::3408::4::978300275',
 '1::2355::5::978824291']

In [6]:
# split each row of the text file at '::' string and select the third element of each row
ratings = ratingsRDD.map(lambda x: x.split('::')[2])

In [7]:
type(ratings)

pyspark.rdd.PipelinedRDD

In [8]:
ratings.take(5)

['5', '3', '3', '4', '5']

In [9]:
ratingsCount = ratings.countByValue()
type(ratingsCount)

                                                                                

collections.defaultdict

In [10]:
ratingsCount

defaultdict(int,
            {'5': 226310, '3': 261197, '4': 348971, '2': 107557, '1': 56174})

In [11]:
import collections

sortedRatingsCount = collections.OrderedDict(sorted(ratingsCount.items()))

print("ratings count")

for key, value in sortedRatingsCount.items():
    print(f"{'★'* int(key):{10}}{value}")

ratings count
★         56174
★★        107557
★★★       261197
★★★★      348971
★★★★★     226310


In [12]:
def loadMovieNames():
    movieNames = {}
    with open("./movies.dat", encoding= 'ISO-8859-1') as f:
        for line in f:
            fields = line.split('::')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

In [13]:
nameDict = sc.broadcast(loadMovieNames())

In [14]:
movies = ratingsRDD.map(lambda x: (int(x.split("::")[1]), 1))
movies.take(5)

[(1193, 1), (661, 1), (914, 1), (3408, 1), (2355, 1)]

In [15]:
movieCounts = movies.reduceByKey(lambda x, y: x + y)
movieCounts.take(5)

                                                                                

[(1193, 1725), (661, 525), (914, 636), (3408, 1315), (2355, 1703)]

In [16]:
flipped = movieCounts.map(lambda x : (x[1], x[0]))
flipped.take(5)

[(1725, 1193), (525, 661), (636, 914), (1315, 3408), (1703, 2355)]

In [17]:
sortedMovies = flipped.sortByKey(ascending=False)
sortedMovies.take(5)

[(3428, 2858), (2991, 260), (2990, 1196), (2883, 1210), (2672, 480)]

In [18]:
sortedMoviesWithNames = sortedMovies.map(lambda countMovie : (nameDict.value[countMovie[1]], countMovie[0]))
sortedMoviesWithNames.take(10)

[('American Beauty (1999)', 3428),
 ('Star Wars: Episode IV - A New Hope (1977)', 2991),
 ('Star Wars: Episode V - The Empire Strikes Back (1980)', 2990),
 ('Star Wars: Episode VI - Return of the Jedi (1983)', 2883),
 ('Jurassic Park (1993)', 2672),
 ('Saving Private Ryan (1998)', 2653),
 ('Terminator 2: Judgment Day (1991)', 2649),
 ('Matrix, The (1999)', 2590),
 ('Back to the Future (1985)', 2583),
 ('Silence of the Lambs, The (1991)', 2578)]