In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook2").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "1g").\
        config("spark.mongodb.input.uri","mongodb://mongo1:27017,mongo2:27018,mongo3:27019/Stocks.Source?replicaSet=rs0").\
        config("spark.mongodb.output.uri","mongodb://mongo1:27017,mongo2:27018,mongo3:27019/Stocks.Source?replicaSet=rs0").\
        config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
        getOrCreate()

In [2]:
#reading dataframes from MongoDB
df = spark.read.format("mongo").load()

df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- company_symbol: string (nullable = true)
 |-- price: double (nullable = true)
 |-- tx_time: string (nullable = true)



In [3]:
#let's change the data type to a timestamp
df = df.withColumn("tx_time", df.tx_time.cast("timestamp"))

In [4]:
#Here we are calculating a moving average
from pyspark.sql.window import Window
from pyspark.sql import functions as F

movAvg = df.withColumn("movingAverage", F.avg("price").over( Window.partitionBy("company_symbol").rowsBetween(-1,1)) )
movAvg.show()

+--------------------+--------------------+--------------+-----+-------------------+------------------+
|                 _id|        company_name|company_symbol|price|            tx_time|     movingAverage|
+--------------------+--------------------+--------------+-----+-------------------+------------------+
|[5f527ac32f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.39|2020-09-04 13:34:59|            43.405|
|[5f527ac42f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.42|2020-09-04 13:35:00|43.419999999999995|
|[5f527ac52f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.45|2020-09-04 13:35:01|43.443333333333335|
|[5f527ac62f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.46|2020-09-04 13:35:02|             43.46|
|[5f527ac72f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.47|2020-09-04 13:35:03| 43.47666666666667|
|[5f527ac82f6a1552...|ITCHY ACRE CORPOR...|           IAC| 43.5|2020-09-04 13:35:04| 43.49666666666667|
|[5f527ac92f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.52|

In [5]:
#Saving Dataframes to MongoDB
movAvg.write.format("mongo").option("replaceDocument", "true").mode("append").save()

In [6]:
#Reading Dataframes from the Aggregation Pipeline in MongoDB
pipeline = "[{'$group': {_id:'$company_name', 'maxprice': {$max:'$price'}}},{$sort:{'maxprice':-1}}]"
aggPipelineDF = spark.read.format("mongo").option("pipeline", pipeline).load()
aggPipelineDF.show()

+--------------------+--------+
|                 _id|maxprice|
+--------------------+--------+
|FRUSTRATING CHAOS...|    87.6|
|HOMELY KIOSK UNLI...|   86.48|
| CREEPY GIT HOLDINGS|    83.4|
|GREASY CHAMPION C...|   81.76|
|COMBATIVE TOWNSHI...|   72.18|
|FROTHY MIDNIGHT P...|   66.81|
|ITCHY ACRE CORPOR...|   44.42|
|LACKADAISICAL SAV...|   42.34|
|CORNY PRACTITIONE...|   38.55|
|TRITE JACKFRUIT P...|   22.62|
+--------------------+--------+



In [7]:
#using SparkSQL with MongoDB
movAvg.createOrReplaceTempView("avgs")

sqlDF=spark.sql("SELECT * FROM avgs WHERE movingAverage > 43.0")

sqlDF.show()

+--------------------+--------------------+--------------+-----+-------+------------------+
|                 _id|        company_name|company_symbol|price|tx_time|     movingAverage|
+--------------------+--------------------+--------------+-----+-------+------------------+
|[5f527ac32f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.39|   null|            43.405|
|[5f527ac42f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.42|   null|43.419999999999995|
|[5f527ac52f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.45|   null|43.443333333333335|
|[5f527ac62f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.46|   null|             43.46|
|[5f527ac72f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.47|   null| 43.47666666666667|
|[5f527ac82f6a1552...|ITCHY ACRE CORPOR...|           IAC| 43.5|   null| 43.49666666666667|
|[5f527ac92f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.52|   null|             43.52|
|[5f527aca2f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.54|   null| 43.5366