# PySpark vs. MongoDB Connector for Spark

 The `pyspark` package and the [MongoDB Connector for Spark](https://www.mongodb.com/docs/spark-connector/v10.2/) are pre-installed in the Docker image.

In [1]:
from pyspark.sql import SparkSession

## Load Environment Variables from .env file

We'll later pick connection strings from there. 

In [2]:
# !pip install python-dotenv
import os
from dotenv import load_dotenv
load_dotenv()

# e.g. "mongodb+srv://<username>:<password>@cluster0.mongodb.net/database.collection"
MONGO_INPUT_URI = os.getenv('MONGO_INPUT_URI', '')
print(MONGO_INPUT_URI[0:14])

# e.g. jdbc:singlestore://host.docker.internal:3036/<database>?user=<username>&password=<password>
SINGLESTORE_JDBC_URI = os.getenv('SINGLESTORE_JDBC_URI', '')
print(SINGLESTORE_JDBC_URI[0:19])

mongodb+srv://
jdbc:singlestore://


Ensure the Docker host is reachable by name.

In [8]:
import socket
socket.gethostbyname('host.docker.internal')

'172.17.0.1'

To make the port-forward accessible from Docker, ensure it binds to all IPs (or at the very least to the Docker Host IP), e.g.:

```shell
kubectl port-forward -n singlestore svc/singlestore-ddl --address=0.0.0.0 3306:3306
```

## Create a Spark session

In [3]:
# SparkSession.builder.master("spark://spark-master:7077").getOrCreate().stop()

In [4]:
# Define Spark session
spark = (SparkSession
    .builder
    .appName("Python Spark SQL MongoDB Atlas example")
    .master('spark://spark-master:7077')
    # Announcing the connector JAR is not required in our case because it is
    # already bundled with the Dockerfile.
    # .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:10.2.0'),
    .getOrCreate())

print(spark.version)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


2023-10-02 07:56:21 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
3.4.1


## Run an aggregation against MongoDB Atlas

In [5]:
pipeline = [
    { '$match': { 'data._t': 'image_info' } },
    { '$limit': 10 },
    { '$project': { 
        '_id': 0, 
        'rid': '$rid',
        'sha256': '$data.hashes.sha256',
        'md5': '$data.hashes.md5'}
    }
    ]

df = (spark.read.format('mongodb')
    .option('connection.uri', MONGO_INPUT_URI)
    .option('database', 'request_insights')
    .option('collection', 'requests')
    .option('aggregation.pipeline', pipeline)
    .option('outputExtendedJson', 'true')
    .load())

df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+
|                 md5|                 rid|              sha256|
+--------------------+--------------------+--------------------+
|[D2 2C 18 18 55 8...|[41 82 60 9E F3 E...|[26 6F 75 A0 01 6...|
|[06 2C BF 06 88 F...|[58 04 F3 E1 A9 5...|[13 40 43 DA 51 C...|
|[0C 10 AE 2B B3 1...|[8B B4 68 0E 21 7...|[4A 5E 4E 73 12 0...|
|[CC FF CB 2E 37 F...|[60 3D D7 1E 26 2...|[8F 14 E7 A8 69 9...|
|[67 12 B9 65 FE F...|[A4 A4 E8 71 F9 9...|[AF C4 C1 36 6B E...|
|[0C 10 AE 2B B3 1...|[9A 78 F7 5C E6 E...|[4A 5E 4E 73 12 0...|
|[6D 74 35 34 29 F...|[9A 5A 1A 4C 93 4...|[D4 11 8E 7B 3B 7...|
|[34 72 34 84 36 7...|[4B 90 FE 53 C0 E...|[3A DC 26 F3 17 E...|
|[84 31 B6 CB 68 3...|[00 F7 5A E9 0D 8...|[A8 23 5F B3 6C E...|
|[1A C2 10 0B 88 A...|[12 23 54 79 19 2...|[8E C2 9E 1B 9E D...|
+--------------------+--------------------+--------------------+


                                                                                

## Run a query against SingleStore (MemSQL)

Note that, again, the JDBC driver is bundled in the Docker image.

In [33]:
df_t = (spark.read.format("jdbc")
      .option("url", SINGLESTORE_JDBC_URI)
      # .option("dbtable", "images_sha256_v2.image_sha256_translation")
      .option("query", "SELECT (internal_id :> BIGINT UNSIGNED NOT NULL) AS internal_id, image_sha256 FROM image_sha256_translation LIMIT 10")
      .load())

df_t = df_t.withColumn('internal_id', df_t['internal_id'].cast('bigint'))
df_t = df_t.withColumnRenamed('internal_id', 'id')
df_t.head()

2023-10-02 08:27:00 WARN  TaskSetManager:72 - Lost task 0.0 in stage 18.0 (TID 69) (172.22.0.5 executor 1): java.sql.SQLDataException: value 'internal_id' cannot be decoded as BigDecimal
	at com.singlestore.jdbc.plugin.codec.BigDecimalCodec.decodeText(BigDecimalCodec.java:93)
	at com.singlestore.jdbc.plugin.codec.BigDecimalCodec.decodeText(BigDecimalCodec.java:21)
	at com.singlestore.jdbc.codec.TextRowDecoder.decode(TextRowDecoder.java:29)
	at com.singlestore.jdbc.codec.RowDecoder.getValue(RowDecoder.java:114)
	at com.singlestore.jdbc.client.result.Result.getBigDecimal(Result.java:501)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3(JdbcUtils.scala:412)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3$adapted(JdbcUtils.scala:410)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:361)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.

[Stage 18:>                                                         (0 + 1) / 1]

2023-10-02 08:27:00 ERROR TaskSetManager:76 - Task 0 in stage 18.0 failed 4 times; aborting job


Py4JJavaError: An error occurred while calling o264.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 4 times, most recent failure: Lost task 0.3 in stage 18.0 (TID 72) (172.22.0.6 executor 3): java.sql.SQLDataException: value 'internal_id' cannot be decoded as BigDecimal
	at com.singlestore.jdbc.plugin.codec.BigDecimalCodec.decodeText(BigDecimalCodec.java:93)
	at com.singlestore.jdbc.plugin.codec.BigDecimalCodec.decodeText(BigDecimalCodec.java:21)
	at com.singlestore.jdbc.codec.TextRowDecoder.decode(TextRowDecoder.java:29)
	at com.singlestore.jdbc.codec.RowDecoder.getValue(RowDecoder.java:114)
	at com.singlestore.jdbc.client.result.Result.getBigDecimal(Result.java:501)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3(JdbcUtils.scala:412)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3$adapted(JdbcUtils.scala:410)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:361)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:343)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3997)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3994)
	at jdk.internal.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.sql.SQLDataException: value 'internal_id' cannot be decoded as BigDecimal
	at com.singlestore.jdbc.plugin.codec.BigDecimalCodec.decodeText(BigDecimalCodec.java:93)
	at com.singlestore.jdbc.plugin.codec.BigDecimalCodec.decodeText(BigDecimalCodec.java:21)
	at com.singlestore.jdbc.codec.TextRowDecoder.decode(TextRowDecoder.java:29)
	at com.singlestore.jdbc.codec.RowDecoder.getValue(RowDecoder.java:114)
	at com.singlestore.jdbc.client.result.Result.getBigDecimal(Result.java:501)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3(JdbcUtils.scala:412)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3$adapted(JdbcUtils.scala:410)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:361)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:343)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


In [None]:
# spark.stop()