# Performing calculation using Apache Spark

In [1]:
import findspark
findspark.init()
from pyspark import SparkConf , SparkContext 
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions
import collections
import random

# Connection to spark cluster using local machine

In [2]:
confi = SparkConf().setMaster("local").setAppName("rating")
sc = SparkContext(conf=  confi)


In [3]:
rating = "E:\\python folder\\working dataset\\ml-100k\\u.data"
name = "E:\\python folder\\working dataset\\ml-100k\\u.item"
# Second method filename = "file:///E:/python folder/working dataset/ml-100k/u.data"

# viewing data from the file

In [4]:
lines = sc.textFile(rating)

list(lines.collect())[:10]

['196\t242\t3\t881250949',
 '186\t302\t3\t891717742',
 '22\t377\t1\t878887116',
 '244\t51\t2\t880606923',
 '166\t346\t1\t886397596',
 '298\t474\t4\t884182806',
 '115\t265\t2\t881171488',
 '253\t465\t5\t891628467',
 '305\t451\t3\t886324817',
 '6\t86\t3\t883603013']

In [5]:
sc.defaultMinPartitions

1

# Number of workers

In [6]:
sc.defaultParallelism

1

# Extract Column 2

In [7]:
rating = lines.map(lambda x: x.split()[2])
list(rating.collect())[:10]


['3', '3', '1', '2', '1', '4', '2', '5', '3', '3']

# Total Count of all ratings

In [8]:
results = rating.countByValue()
final = collections.Counter(results)
print( sorted(final.items()))

[('1', 6110), ('2', 11370), ('3', 27145), ('4', 34174), ('5', 21201)]


# For filtering Data

In [9]:
a = (rating.filter(lambda x: x == '5').collect())
a.count('5')

21201

# To display first 5 records

In [10]:
lines.take(5)

['196\t242\t3\t881250949',
 '186\t302\t3\t891717742',
 '22\t377\t1\t878887116',
 '244\t51\t2\t880606923',
 '166\t346\t1\t886397596']

In [11]:
lines.takeSample(True, 5)

['197\t188\t3\t891409982',
 '709\t823\t3\t879849573',
 '894\t887\t4\t880993374',
 '435\t252\t2\t884134677',
 '249\t634\t5\t879572314']

# Total Number of records

In [12]:
lines.count()

100000

In [13]:
rating.countByValue()

defaultdict(int, {'3': 27145, '1': 6110, '2': 11370, '4': 34174, '5': 21201})

# function to return name when ID is provided

In [14]:
def loadname():
    names = {}
    with open(name) as f:
        for a in f:
            field = a.split("|")
            names[int(field[0])] = field[1]
    return names



In [15]:
def parse(a):
    field = a.split()
    return Row(ID = int(field[1]) , rating = float(field[2]) )

# Starting Spark Session

In [16]:
if __name__ == "__main__":
    spark = SparkSession.builder.appName("Popluar").getOrCreate()
    

# List of first 5 names

In [17]:
mname = loadname()
list(mname.items())[:5]


[(1, 'Toy Story (1995)'),
 (2, 'GoldenEye (1995)'),
 (3, 'Four Rooms (1995)'),
 (4, 'Get Shorty (1995)'),
 (5, 'Copycat (1995)')]

# Rating

In [18]:
mrating = lines.map(parse)
mrating.take(5)


[Row(ID=242, rating=3.0),
 Row(ID=302, rating=3.0),
 Row(ID=377, rating=1.0),
 Row(ID=51, rating=2.0),
 Row(ID=346, rating=1.0)]

# Creating DataFrame

In [19]:
df = spark.createDataFrame(mrating)
df.take(5)

[Row(ID=242, rating=3.0),
 Row(ID=302, rating=3.0),
 Row(ID=377, rating=1.0),
 Row(ID=51, rating=2.0),
 Row(ID=346, rating=1.0)]

# Average Rating

In [20]:
average = df.groupBy("ID").avg("rating")
average.take(5)

[Row(ID=474, avg(rating)=4.252577319587629),
 Row(ID=29, avg(rating)=2.6666666666666665),
 Row(ID=26, avg(rating)=3.452054794520548),
 Row(ID=964, avg(rating)=3.3333333333333335),
 Row(ID=1677, avg(rating)=3.0)]

# Total Rating Count

In [21]:
counts = df.groupBy("id").count()
counts.take(5)

[Row(id=474, count=194),
 Row(id=29, count=114),
 Row(id=26, count=73),
 Row(id=964, count=9),
 Row(id=1677, count=1)]

# Joining count and average rating

In [22]:
avgCount = counts.join(average , "ID")
avgCount.take(5)

[Row(id=26, count=73, avg(rating)=3.452054794520548),
 Row(id=29, count=114, avg(rating)=2.6666666666666665),
 Row(id=474, count=194, avg(rating)=4.252577319587629),
 Row(id=964, count=9, avg(rating)=3.3333333333333335),
 Row(id=1677, count=1, avg(rating)=3.0)]

In [23]:
top = avgCount.orderBy("avg(rating)").take(5)
top

[Row(id=830, count=1, avg(rating)=1.0),
 Row(id=858, count=3, avg(rating)=1.0),
 Row(id=1374, count=2, avg(rating)=1.0),
 Row(id=1671, count=1, avg(rating)=1.0),
 Row(id=1329, count=1, avg(rating)=1.0)]

# Getting names for rating

In [24]:
for l in top:
    print(mname[l[0]] , l[1] , l[2])

Power 98 (1995) 1 1.0
Amityville: Dollhouse (1996) 3 1.0
Falling in Love Again (1980) 2 1.0
Further Gesture, A (1996) 1 1.0
Low Life, The (1994) 1 1.0


# To stop spark context

In [25]:
sc.stop()