<a href="https://colab.research.google.com/github/psword/big-data-pipeline-sparky/blob/Data-view/mongodb_spark_colab_compatible.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ✅ PySpark + MongoDB Atlas (Compatible Setup for Google Colab)
This notebook uses Apache Spark **3.3.2** and MongoDB Spark Connector **10.1.1**, which are compatible.

In [2]:
# 📦 Step 1: Install Python dependencies
!pip install -q pyspark findspark pymongo

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.2/317.2 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m14.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [3]:
# ⚙️ Step 2: Install Java and Spark 3.3.2
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz

In [1]:
# 🌱 Step 3: Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"
os.environ["MONGO_URI"] = "mongodb+srv://<username>:<password>@bigdata.kvauode.mongodb.net/tmdb?retryWrites=true&w=majority"

In [2]:
# 🚀 Step 4: Start Spark session
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MongoDBIntegration") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1") \
    .config("spark.mongodb.read.connection.uri", os.environ["MONGO_URI"]) \
    .config("spark.mongodb.write.connection.uri", os.environ["MONGO_URI"]) \
    .getOrCreate()

In [3]:
# 📄 Step 5: Read from MongoDB
df = spark.read.format("mongodb") \
    .option("database", "tmdb") \
    .option("collection", "movies") \
    .load()

df.show(5)

+--------------------+-----+--------------------+---------+--------------------+--------------------+------+---------+--------------------+-----------------+---------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-----------+-------+--------------------+--------+--------------------+---------------+------------+----------+
|                 _id|adult|       backdrop_path|   budget|              genres|            homepage|    id|  imdb_id|            keywords|original_language| original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|    revenue|runtime|    spoken_languages|  status|             tagline|          title|vote_average|vote_count|
+--------------------+-----+--------------------+---------+--------------------+--------------------+------+---------+--------------------+-----------------+---------------+--------------------+----------+---------

In [None]:
# ⚙️ Step 6: Import Additional
from pyspark.sql.functions import col, desc


In [None]:
# 🔥 Step 7: Process Data

# "Adult" column turned into boolean
df = df.withColumn('adult', col('adult').cast('boolean'))

# Print first 5 lines
print("First 5 movies")
df.show(5)

# Print 20 first movies with revenue over 1,000,000,000 (filter where adult == False)
print("First 20 movies with revenue over 1 billion")
over_billion_revenue = df.where(
    (col('revenue') > 1000000000) & (col('adult') == False)
)
over_billion_revenue.show(20)

# Print 10 movies with the highest revenue (filter where adult == False and vote_count > 1)
print("Top 10 movies with highest revenue")
highest_revenue = df.where((col('adult') == False) & (col('vote_count') > 1)).orderBy(desc('revenue'))
highest_revenue.show(10)