#Getting Started with PySpark in Google Colab

PySpark is Python interface for Apache Spark. The primary use cases for PySpark are to work with huge amounts of data and for creating data pipelines.

You don't need to work with big data to benefit from PySpark. I find that the SparkSQL is a great tool for performing routine data anlysis. Pandas can get slow and you may find yourself writing a lot of code for data cleaning whereas the same actions take much less code in SQL. Let's get started!

See more here! http://spark.apache.org/docs/latest/api/python/

# 1. Installing PySpark in Google Colab

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j
!pip install recommenders

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import sys
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType
from pyspark.ml.feature import StringIndexer

from recommenders.utils.timer import Timer
from recommenders.datasets import movielens
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark

[33m0% [Working][0m            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
[33m0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.82)] [[0m                                                                               Hit:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
[33m0% [Connecting to security.ubuntu.com (185.125.190.82)] [Connected to cloud.r-p[0m                                                                               Hit:3 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:6 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:7 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy 

In [2]:
!pip install -q handyspark

In [3]:
print(f"System version: {sys.version}")
print("Spark version: {}".format(pyspark.__version__))

System version: 3.11.11 (main, Dec  4 2024, 08:55:07) [GCC 11.4.0]
Spark version: 3.5.4


In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# 2. Set up parameters

In [5]:
# top k items to recommend
TOP_K = 10

DATA_SIZE = '1m'

COL_USER = "user"
COL_ITEM = "business"
COL_RATING = "rating"
COL_TIMESTAMP = "timestamp"

## 3. Get datasets

In [6]:
import os
import requests
import gzip, shutil

# URLs of the files to download
urls = {
    "rating-New_York.csv.gz": "https://datarepo.eng.ucsd.edu/mcauley_group/gdrive/googlelocal/rating-New_York.csv.gz",
    "meta-New_York.json.gz": "https://datarepo.eng.ucsd.edu/mcauley_group/gdrive/googlelocal/meta-New_York.json.gz"
}

for filename, url in urls.items():
    if not os.path.isfile(filename):
        print(f"Downloading {filename}...")

        with requests.get(url, stream=True) as r:
            r.raise_for_status()
            with open(filename, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
        print(f"{filename} downloaded successfully.")

        with gzip.open(filename, 'rb') as f_in:
          with open(filename[:-3], 'wb') as f_out:
              shutil.copyfileobj(f_in, f_out)

    else:
        print(f"{filename} already exists.")


rating-New_York.csv.gz already exists.
meta-New_York.json.gz already exists.


## 3. Check with pandas

In [7]:
import pandas as pd

df = pd.read_csv("rating-New_York.csv")
df.head()

Unnamed: 0,business,user,rating,timestamp
0,0x89c24469c758686b:0x641f5b84cb9bedfa,101855823232666695168,1,1629141186463
1,0x89c24469c758686b:0x641f5b84cb9bedfa,105821946869087882225,1,1528477593994
2,0x89c24469c758686b:0x641f5b84cb9bedfa,108990883320903443748,1,1424830512308
3,0x89c24469c758686b:0x641f5b84cb9bedfa,117021514778630212205,5,1512641660497
4,0x89c25fc9494dce47:0x6d63c807b59a55,113722104692308235141,5,1603494795361


In [8]:
df.isnull().sum()

Unnamed: 0,0
business,0
user,0
rating,0
timestamp,0


#3. Build PySpark DataFrames

In [9]:
spark = SparkSession.builder \
    .appName("ALS PySpark: New York Reco") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "2047m") \
    .config("spark.sql.analyzer.failAmbiguousSelfJoin", "false") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/17 21:24:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
# Define schema with user and business as StringType
schema = StructType([
    StructField("user", StringType(), nullable=False),
    StructField("business", StringType(), nullable=False),
    StructField("rating", FloatType(), nullable=False),
    StructField("timestamp", LongType(), nullable=False),
])

# Read the CSV file with the specified schema
data = spark.read.csv("rating-New_York.csv", schema=schema, header=True)

if DATA_SIZE == '100k':
  data = data.limit(100000)
elif DATA_SIZE == '1m':
  data = data.limit(1000000)
elif DATA_SIZE == '10m':
  data = data.limit(10000000)
elif DATA_SIZE == '20m':
  data = data.limit(20000000)

# Extract unique user and business IDs
unique_users = data.select("user").distinct().rdd.map(lambda row: row[0])
unique_businesses = data.select("business").distinct().rdd.map(lambda row: row[0])

# Assign unique integers to each unique ID
user_mapping = unique_users.zipWithUniqueId().toDF(["user", "user_id"])
business_mapping = unique_businesses.zipWithUniqueId().toDF(["business", "business_id"])

# Join the mappings back to the original data
data = data.join(user_mapping, on="user").join(business_mapping, on="business")

[Stage 0:>                                                                             (0 + 0) / 20][Stage 0:>                                                                             (0 + 2) / 20]25/02/17 21:24:34 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: business
 Schema: user
Expected: user but found: business
CSV file: file:///content/rating-New_York.csv
25/02/17 21:25:10 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: user
 Schema: business
Expected: business but found: user
CSV file: file:///content/rating-New_York.csv


In [11]:

# Select only the necessary columns for ALS
als_data = data.select("user_id", "business_id", "rating")

# Show the transformed data
als_data.show(5)

25/02/17 21:25:52 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: business, user, rating
 Schema: user, business, rating
Expected: user but found: business
CSV file: file:///content/rating-New_York.csv


+-------+-----------+------+
|user_id|business_id|rating|
+-------+-----------+------+
|   8204|     147342|   1.0|
|  36765|     711069|   1.0|
|      1|         48|   5.0|
|      1|         48|   5.0|
|  26901|     510398|   5.0|
+-------+-----------+------+
only showing top 5 rows



## 4. Exploratory Data Analysis

In [12]:
# Amount of rows (given by DATA_SIZE)

als_data.count()

25/02/17 21:27:01 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: business, user
 Schema: user, business
Expected: user but found: business
CSV file: file:///content/rating-New_York.csv


1000000

In [13]:
# General characteristics of the dataset

als_data.describe().show()

25/02/17 21:28:00 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: business, user, rating
 Schema: user, business, rating
Expected: user but found: business
CSV file: file:///content/rating-New_York.csv


+-------+------------------+------------------+-----------------+
|summary|           user_id|       business_id|           rating|
+-------+------------------+------------------+-----------------+
|  count|           1000000|           1000000|          1000000|
|   mean|       22087.54184|     373225.790252|         4.385448|
| stddev|12901.062564483873|241792.87838352047|1.280113853525324|
|    min|                 0|                 0|              1.0|
|    max|             43462|            816414|              5.0|
+-------+------------------+------------------+-----------------+



                                                                                                    

In [14]:
# Visualize dataset

# handy_data = als_data.toHandy()

AttributeError: 'DataFrame' object has no attribute 'toHandy'

## 5. Split data in train and test using Spark's built in features


In [15]:
train, test = spark_random_split(data, ratio=0.75, seed=42)
print("N train", train.cache().count())
print("N test", test.cache().count())

25/02/17 21:29:06 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: business, user, rating, timestamp
 Schema: user, business, rating, timestamp
Expected: user but found: business
CSV file: file:///content/rating-New_York.csv


N train 750224


25/02/17 21:30:43 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: business, user, rating, timestamp
 Schema: user, business, rating, timestamp
Expected: user but found: business
CSV file: file:///content/rating-New_York.csv


N test 249776


                                                                                                    

## 6. Train ALS model and get top k recommendations

In [16]:
header = {
    "userCol": "user_id",
    "itemCol": "business_id",
    "ratingCol": COL_RATING,
}

als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=43,
    **header
)

In [17]:
with Timer() as train_time:
  model = als.fit(train)

print(f"Took {train_time.interval} seconds to train the model...")



Took 132.17122379899956 seconds to train the model...


## 5. Run predictions on the test set

In [19]:
from pyspark.sql.functions import col, explode

test_users = test.select("user_id").distinct()
top_n = 10

with Timer() as test_time:
    recommendations = model.recommendForUserSubset(test_users, top_n)
    recommendations = recommendations.select("user_id", explode("recommendations").alias("rec"))
    recommendations = recommendations.select("user_id", col("rec.business_id"), col("rec.rating"))

    recommendations.cache().count()

print(f"Took {test_time.interval} seconds to generate recommendations...")



Took 1110.1797772400005 seconds to generate recommendations...


                                                                                                    

In [24]:
recommendations.show(5)

ConnectionRefusedError: [Errno 111] Connection refused

## 6. Evaluate the model

In [22]:
rank_eval = SparkRankingEvaluation(test, recommendations, k=TOP_K,
                                   col_user="user_id", col_item="business_id",
                                   col_rating="rating", col_prediction="rating",
                                   relevancy_method="top_k")

print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

ConnectionRefusedError: [Errno 111] Connection refused

## 7. Evaluate the predictions

In [None]:
prediction = model.transform(test).cache()
prediction.show()

rating_eval = SparkRatingEvaluation(test, prediction, col_user="user_id", col_item="business_id",
                                    col_rating="rating", col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

t java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputS