# PySpark with MongoDB Integration Example

This notebook demonstrates:
1. Connecting to Spark cluster
2. Using PySpark for data processing
3. Using Spark ML for machine learning
4. Reading/Writing data to MongoDB

## 1. Initialize Spark Session with MongoDB Support

In [11]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
import pymongo

# Stop any existing Spark session first
try:
    spark.stop()
except:
    pass

# Create Spark Session with MongoDB connector
spark = SparkSession.builder \
    .appName("PySpark-MongoDB-Example").getOrCreate()
    # .master("spark://spark-master:7077") \
    # .config("spark.mongodb.input.uri", "mongodb://admin:admin@mongodb-dba:27017/test.collection?authSource=admin") \
    # .config("spark.mongodb.output.uri", "mongodb://admin:admin@mongodb-dba:27017/test.collection?authSource=admin") \
    # .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    # .config("spark.executor.memory", "1g") \
    # .config("spark.driver.memory", "1g") \
    #.getOrCreate()

print(f"Spark Version: {spark.version}")
#print(f"Spark Master: {spark.sparkContext.master}")
#print(f"Application ID: {spark.sparkContext.applicationId}")

IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem

## Alternative: Use Existing Spark Session (Simpler)

If you just want to use the pre-configured Spark session without MongoDB connector:

In [4]:
# Use the pre-existing spark variable (already configured in this notebook image)
# This is simpler and works immediately

from pyspark.sql import SparkSession

# Get or create spark session (reuses existing one if available)
# spark = SparkSession.builder \
#     .appName("SimplePySparkExample") \
#     .getOrCreate()

print(f"Spark Version: {spark.version}")
print(f"Spark Master: {spark.sparkContext.master}")

Spark Version: 3.5.0
Spark Master: spark://spark-master:7077


## 2. Create Sample Data and Basic Operations

In [8]:
# Create sample data
data = [
    (1, "John", 28, 50000, "Engineering"),
    (2, "Jane", 32, 75000, "Marketing"),
    (3, "Bob", 45, 95000, "Engineering"),
    (4, "Alice", 29, 62000, "Sales"),
    (5, "Charlie", 38, 88000, "Engineering")
]

columns = "id: int, name: string, age: int, salary: int, department: string"
df = spark.createDataFrame(data, columns)

print("Sample DataFrame:")
df.show()

print("\nDataFrame Schema:")
df.printSchema()

Py4JJavaError: An error occurred while calling o58.defaultParallelism.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.base/java.lang.Thread.run(Thread.java:833)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:122)
	at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:2707)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


## 3. Basic DataFrame Transformations

In [None]:
# Filter employees with salary > 60000
high_earners = df.filter(df.salary > 60000)
print("Employees with salary > 60000:")
high_earners.show()

# Group by department and calculate average salary
print("\nAverage salary by department:")
df.groupBy("department").avg("salary", "age").show()

# Select specific columns
print("\nNames and Departments:")
df.select("name", "department").show()

## 4. Write Data to MongoDB (if MongoDB connector is configured)

In [None]:
# NOTE: This requires MongoDB Spark Connector JARs to be loaded
# If you get an error, use PyMongo instead (see section 6)

try:
    df.write.format("mongo") \
        .mode("overwrite") \
        .option("uri", "mongodb://admin:admin@mongodb-dba:27017/test.employees?authSource=admin") \
        .save()
    print("Data written to MongoDB successfully!")
except Exception as e:
    print(f"Error writing to MongoDB via Spark: {e}")
    print("\nTip: Use PyMongo for direct MongoDB operations (see section 6)")

## 5. Read Data from MongoDB (if connector is configured)

In [None]:
try:
    df_from_mongo = spark.read.format("mongo") \
        .option("uri", "mongodb://admin:admin@mongodb-dba:27017/test.employees?authSource=admin") \
        .load()
    
    print("Data read from MongoDB:")
    df_from_mongo.show()
except Exception as e:
    print(f"Error reading from MongoDB via Spark: {e}")
    print("\nTip: Use PyMongo for direct MongoDB operations (see section 6)")

## 6. Spark ML - Linear Regression Example

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Create sample data for regression
ml_data = [
    (1, 2.0, 3.0, 100.0),
    (2, 3.0, 4.0, 150.0),
    (3, 4.0, 5.0, 200.0),
    (4, 5.0, 6.0, 250.0),
    (5, 6.0, 7.0, 300.0),
    (6, 7.0, 8.0, 350.0),
    (7, 8.0, 9.0, 400.0),
]

ml_df = spark.createDataFrame(ml_data, ["id", "feature1", "feature2", "label"])

# Prepare features
assembler = VectorAssembler(
    inputCols=["feature1", "feature2"],
    outputCol="features"
)

ml_df_assembled = assembler.transform(ml_df)

print("Data with assembled features:")
ml_df_assembled.select("features", "label").show()

# Split data
train_data, test_data = ml_df_assembled.randomSplit([0.8, 0.2], seed=42)

# Train model
lr = LinearRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)

# Make predictions
predictions = lr_model.transform(test_data)

print("\nLinear Regression Predictions:")
predictions.select("features", "label", "prediction").show()

print(f"\nModel Coefficients: {lr_model.coefficients}")
print(f"Model Intercept: {lr_model.intercept}")
print(f"RMSE: {lr_model.summary.rootMeanSquaredError}")
print(f"R2: {lr_model.summary.r2}")

## 7. Direct MongoDB Operations with PyMongo

In [None]:
import pymongo

# Connect to MongoDB using PyMongo
client = pymongo.MongoClient("mongodb://admin:admin@mongodb-dba:27017/?authSource=admin")
db = client.test
collection = db.employees

# Insert sample documents
employees = [
    {"id": 1, "name": "John", "age": 28, "salary": 50000, "department": "Engineering"},
    {"id": 2, "name": "Jane", "age": 32, "salary": 75000, "department": "Marketing"},
    {"id": 3, "name": "Bob", "age": 45, "salary": 95000, "department": "Engineering"},
]

# Clear existing data and insert new
collection.delete_many({})
collection.insert_many(employees)

print("Data inserted into MongoDB!")

# Query data
print("\nAll employees from MongoDB:")
for employee in collection.find():
    print(employee)

# Query with filter
print("\nEngineering employees:")
for employee in collection.find({"department": "Engineering"}):
    print(f"{employee['name']} - ${employee['salary']}")

# Count documents
print(f"\nTotal employees: {collection.count_documents({})}")

client.close()

## 8. Convert PyMongo Data to Spark DataFrame

In [None]:
import pymongo

# Fetch data from MongoDB using PyMongo
client = pymongo.MongoClient("mongodb://admin:admin@mongodb-dba:27017/?authSource=admin")
db = client.test
collection = db.employees

# Convert MongoDB cursor to list
mongo_data = list(collection.find({}, {"_id": 0}))  # Exclude _id field

# Create Spark DataFrame from MongoDB data
if mongo_data:
    df_from_pymongo = spark.createDataFrame(mongo_data)
    print("MongoDB data as Spark DataFrame:")
    df_from_pymongo.show()
    
    print("\nProcessing with Spark:")
    df_from_pymongo.groupBy("department").avg("salary").show()
else:
    print("No data in MongoDB collection")

client.close()

## 9. Read from HDFS (if you have data in HDFS)

In [None]:
# Example: Read CSV from HDFS
# df_hdfs = spark.read.csv("hdfs://namenode:9000/data/your_file.csv", header=True, inferSchema=True)
# df_hdfs.show()

# Example: Write to HDFS
# df.write.mode("overwrite").parquet("hdfs://namenode:9000/data/employees.parquet")

print("To read/write HDFS, uncomment the code above and provide the correct path")

## 10. Cleanup (Optional)

In [None]:
# Uncomment to stop Spark session when done
# spark.stop()
print("Keep Spark session running for further analysis.")
print("Uncomment spark.stop() above to stop the session.")