In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=753031395ce3f6b025b573ff23a03b3af61c2a4c89ab879b382cf73a684280d1
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
import pyspark
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('Movie Recommendation').getOrCreate()

from pyspark.mllib.recommendation import Rating
from pyspark.mllib.recommendation import ALS

raw = spark.sparkContext.textFile("/content/ratings.dat")

mydata = [(2, 0.01)]

mydatardd = spark.sparkContext.parallelize(mydata).map(lambda x: Rating(0, x[0], x[1]))

def parseRating(str):
    fields = str.split("::")
    assert(len(fields) == 4)
    return Rating(int(fields[0]), int(fields[1]), float(fields[2]))

ratings = raw.map(parseRating)
totalratings = ratings.union(mydatardd)
model = ALS.train(totalratings, rank=8, iterations=5, lambda_=1.0)
products = model.recommendProducts(1, 10)

products

[Rating(user=1, product=3382, rating=4.436747925406144),
 Rating(user=1, product=989, rating=4.0262556400755205),
 Rating(user=1, product=3233, rating=3.9094207185718495),
 Rating(user=1, product=787, rating=3.9083432772314546),
 Rating(user=1, product=1830, rating=3.9037972119182323),
 Rating(user=1, product=557, rating=3.8986426937960825),
 Rating(user=1, product=3607, rating=3.8560229948069913),
 Rating(user=1, product=3172, rating=3.838212642618937),
 Rating(user=1, product=3656, rating=3.7965952425691736),
 Rating(user=1, product=2503, rating=3.792973560809293)]

In [None]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.stat import Statistics

vectorRDD = totalratings.map(lambda rating: Vectors.dense(rating.user, rating.product, rating.rating))
summary = Statistics.colStats(vectorRDD)

print(summary.mean())
print(summary.variance())
print(summary.numNonzeros())

[3024.50932404 1865.53803501    3.58156088]
[2.98741660e+06 1.20130746e+06 1.24792804e+00]
[1000209. 1000210. 1000210.]


In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

data = spark.read.text("/content/kmeans_data.txt")
parsedData = data.rdd.map(lambda s: Vectors.dense([float(x) for x in s.value.split(' ')]))

# Convert RDD to DataFrame
parsedData = parsedData.map(lambda x: (x, )).toDF(["features"])

# Cluster the data into two classes using KMeans
numClusters = 2
numIterations = 20
kmeans = KMeans().setK(numClusters).setMaxIter(numIterations)
model = kmeans.fit(parsedData)

# Predict the cluster for each data point
predictions = model.transform(parsedData)

# Compute Within Set Sum of Squared Errors
wssse = model.summary.trainingCost
print("Within Set Sum of Squared Errors = " + str(wssse))

# Print the predicted cluster for each data point
predictions.select("prediction").show()


Within Set Sum of Squared Errors = 0.1199999999999996
+----------+
|prediction|
+----------+
|         0|
|         0|
|         0|
|         1|
|         1|
|         1|
+----------+



In [None]:
from pyspark.mllib.clustering import KMeans, KMeansModel
from numpy import array
from math import sqrt


# Load and parse the data
data = spark.sparkContext.textFile("/content/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
clusters = KMeans.train(parsedData, k=2, maxIterations=10, initializationMode="random")

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x ** 2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)

print("Within Set Sum of Squared Error = " + str(WSSSE))

# Save and load model
clusters.save(spark.sparkContext, "/content/")
sameModel = KMeansModel.load(spark.sparkContext, "/content/")

Within Set Sum of Squared Error = 0.6928203230275529
