Create the SparkSession and set the environment to use our local MongoDB cluster

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook2").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        config("spark.mongodb.input.uri","mongodb://mongo1:27017/Stocks.Source").\
        config("spark.mongodb.output.uri","mongodb://mongo1:27017/Stocks.Source").\
        config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
        getOrCreate()

Next load the dataframes from MongoDB

In [2]:
df = spark.read.format("mongo").load()

Py4JJavaError: An error occurred while calling o51.load.
: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches com.mongodb.client.internal.MongoClientDelegate$1@69aae5. Client view of cluster state is {type=REPLICA_SET, servers=[{address=mongo2:27018, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException: mongo2}, caused by {java.net.UnknownHostException: mongo2}}, {address=mongo3:27019, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException: mongo3}, caused by {java.net.UnknownHostException: mongo3}}]
	at com.mongodb.internal.connection.BaseCluster.createTimeoutException(BaseCluster.java:403)
	at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:118)
	at com.mongodb.internal.connection.AbstractMultiServerCluster.selectServer(AbstractMultiServerCluster.java:54)
	at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:149)
	at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:98)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:278)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:182)
	at com.mongodb.client.internal.MongoDatabaseImpl.executeCommand(MongoDatabaseImpl.java:194)
	at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:163)
	at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:158)
	at com.mongodb.spark.MongoConnector.$anonfun$hasSampleAggregateOperator$1(MongoConnector.scala:234)
	at com.mongodb.spark.MongoConnector.$anonfun$withDatabaseDo$1(MongoConnector.scala:171)
	at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
	at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
	at com.mongodb.spark.MongoConnector.hasSampleAggregateOperator(MongoConnector.scala:234)
	at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator$lzycompute(MongoRDD.scala:221)
	at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator(MongoRDD.scala:221)
	at com.mongodb.spark.sql.MongoInferSchema$.apply(MongoInferSchema.scala:68)
	at com.mongodb.spark.sql.DefaultSource.constructRelation(DefaultSource.scala:97)
	at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:50)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:226)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)


Let’s verify the data was loaded by looking at the schema:

In [None]:
df.printSchema()

We can see that the tx_time field is loaded as a string. We can easily convert this to a time by issuing a cast statement:

In [None]:
df = df.withColumn("tx_time", df.tx_time.cast("timestamp"))

Next, we can add a new ‘movingAverage’ column that will show a moving average based upon the previous value in the dataset. To do this we leverage the PySpark Window function as follows:

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

movAvg = df.withColumn("movingAverage", F.avg("price")
             .over( Window.partitionBy("company_symbol").rowsBetween(-1,1)) )

To see our data with the new moving average column we can issue a movAvg.show().

In [None]:
movAvg.show()

To update the data in our MongoDB cluster, we use the save method.

In [None]:
movAvg.write.format("mongo").option("replaceDocument", "true").mode("append").save()

We can also use the power of the MongoDB Aggregation Framework to pre-filter, sort or aggregate our MongoDB data.

In [None]:
pipeline = "[{'$group': {_id:'$company_name', 'maxprice': {$max:'$price'}}},{$sort:{'maxprice':-1}}]"
aggPipelineDF = spark.read.format("mongo").option("pipeline", pipeline).option("partitioner", "MongoSinglePartitioner").load()
aggPipelineDF.show()

Finally we can use SparkSQL to issue ANSI-compliant SQL against MongoDB data as follows:

In [None]:
movAvg.createOrReplaceTempView("avgs")
sqlDF=spark.sql("SELECT * FROM avgs WHERE movingAverage > 43.0")
sqlDF.show()