# Document Database Model

- Examples From Video Lecture 

In [1]:
# Spark init
import pyspark
from pyspark.sql import SparkSession
mongo_uri = "mongodb://admin:mongopw@mongo:27017/admin?authSource=admin"

spark = SparkSession \
    .builder \
    .master("local") \
    .appName('jupyter-pyspark') \
      .config("spark.mongodb.input.uri", mongo_uri) \
      .config("spark.mongodb.output.uri", mongo_uri) \
      .config("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")\
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4ea8c498-c724-445f-8e87-c795b6883f3c;1.0
	confs: [default]


:: loading settings :: url = jar:file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
downloading https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/3.0.1/mongo-spark-connector_2.12-3.0.1.jar ...
	[SUCCESSFUL ] org.mongodb.spark#mongo-spark-connector_2.12;3.0.1!mongo-spark-connector_2.12.jar (93ms)
downloading https://repo1.maven.org/maven2/org/mongodb/mongodb-driver-sync/4.0.5/mongodb-driver-sync-4.0.5.jar ...
	[SUCCESSFUL ] org.mongodb#mongodb-driver-sync;4.0.5!mongodb-driver-sync.jar (30ms)
downloading https://repo1.maven.org/maven2/org/mongodb/bson/4.0.5/bson-4.0.5.jar ...
	[SUCCESSFUL ] org.mongodb#bson;4.0.5!bson.jar (58ms)
downloading https://repo1.maven.org/maven2/org/mongodb/mongodb-driver-core/4.0.5/mongodb-driver-core-4.0.5.jar ...
	[SUCCESSFUL ] org.mongodb#mongodb-driver-core;4.0.5!mongodb-driver-core.jar (149ms)
:

## Loading Sample Data

- Run this code to load some sample data into MongoDb

In [3]:
## Loading Sample Data
from pyspark.sql.functions import col
s = spark.read.option("multiline","true").json("file:///home/jovyan/datasets/json-samples/stocks.json")
spark.read.option("multiline","true").json("file:///home/jovyan/datasets/json-samples/europe.json")\
    .write.format("mongo").mode("overwrite").option("database","demo").option("collection","europe").save()
spark.read.option("multiline","true").json("file:///home/jovyan/datasets/json-samples/fudgemart-products.json")\
    .withColumn("_id", col("product_id")).write.format("mongo").mode("overwrite").option("database","demo").option("collection","products").save()
spark.read.option("multiline","true").json("file:///home/jovyan/datasets/json-samples/US-Senators.json")\
    .write.format("mongo").mode("overwrite").option("database","demo").option("collection","senators").save()


                                                                                

## MongoDB Clients and Applications:

- The **Mongo Db Shell** is the offical client where you can type in the MQL (Mongo Query Language)  
  `PS> docker-compose exec mongo mongosh -u admin -p mongopw --authenticationDatabase=admin`
- **Mongo Express** is a web-based database administration application, http://localhost:8881
- There is a **Sample Python Application** here: http://localhost:5081
- To Connect MongoDb to Tools like Tableau or PowerBI, use an ODBC driver like this one here:  
  https://github.com/mongodb/mongo-bi-connector-odbc-driver/releases/
  
  
## Mongo Shell Queries


### MQL: Create and Read

```
# current Database 
db

# Show all databases
show databases


# Use a database, does not have to exist - mongo don't care!
use demo

#show collections 
show collections

# insert some data – how did it make the collection – mongo don’t care!
db.cars.insertOne({ "make": "Chevy", "model" : "Cruze" })

# insert a couple....
db.cars.insertMany( [ { "make": "Chevy", "model" : "Traverse" }, { "make": "Chevy", "model" : "Trax", "mpg" : 36} ] )

# show all documents
db.cars.find()

# No a car? Mongo don't care!
db.cars.insertOne( { "name" : "Mike Fudge", "age" : 50 } )
db.cars.find()


# Insert same thing twice … mongo don’t care!
db.cars.insertOne({ "make": "Honda", "model" : "Civic"})

# oops no schema but I forgot mpg...
db.cars.insertOne({ "make": "Honda", "model" : "Civic", "mpg" : 40 })

## Simple query By Value

db.cars.find( { "make" : "Chevy" })

```

### MQL:Understanding_id

```
# Insert Multiple Times 

db.cars.insertOne( { "make" : "Honda", "model" : "CRV"})
db.cars.insertOne( { "make" : "Honda", "model" : "CRV"})

# added once
db.cars.insertOne( { "make" : "Honda", "model" : "CRV", "_id" : 1 })

# cannot be added again! At least there is key integrity.
db.cars.insertOne( { "make" : "Honda", "model" : "CRV", "_id" : 1 })
```

### MQL: Updating and Deleting Data


```
# delete the first one that matches 
db.cars.deleteOne( { "model" : "CRV" } )

bb.cars.find()

# delete all that match
db.cars.deleteMany( { "model" : "CRV" } )

# delete something that's not there
db.cars.deleteMany( { "model" : "Fabio" })

# delete by Object Id
db.cars.find({ "name" : "Mike Fudge" })
(record object ID)
db.cars.deleteOne({"_id" : ObjectId("6203d8c48017a5f57d121bf6")})

# delete by an object ID, Non Surrogate
db.cars.deletOne( { "_id" : 1 } )

# replace – no partial updates
db.cars.insertOne( { "make" : "Honda", "model" : "CRV", "_id" : 2 })

db.cars.replaceOne({ "_id" : 2},  { "mpg" : 26 } )

#where did it go?
bb.cars.find()

# full overwrite, so you must replace
db.cars.replaceOne({ "_id" : 2},  { "make" : "Honda", "model" : "CRV", "mpg": 26, "_id" : 2 })


# updates - add MPG to traverse

db.cars.updateOne({model: 'Traverse'}, {$set : {mpg:18}})

# update multiple values
db.cars.updateOne({model: 'Traverse'}, {$set : {mpg:16, "type": "SUV" }})

#update several documents

db.cars.updateMany({}, { $set: { "owner": "mafudge"}} )

```

### Find Queries

```
# no filter,  just ask for 3 columns (notice we get nothing for license plate)

db.cars({}, { make:1, model:2, "license place":3 }) 

# here's a complex filter: make is chevy and includes an mpg
db.cars.find({ $and : [  {make : "Chevy"}, {mpg : { $exists: true } } ] })

# let's combine that:
db.cars.find({ $and : [  {make : "Chevy"}, {mpg : { $exists: true } } ] }, { make:1, model:2, mpg:3, "license place":6 })

#and let's sort that
db.cars.find({ $and : [  {make : "Chevy"}, {mpg : { $exists: true } } ] }, { make:1, model:2, mpg:3, "license place":6 }).sort({mpg:-1})

```

### Indexing


```
# querying by region!
db.europe.find( {"subregion" : "Eastern Europe"}).explain("executionStats")

Seaches through all 53 countries…. Blah. (docsExamined)
COLLSCAN is like a TABLE SCAN in SQL

# Let’s add an index.
db.europe.createIndex( {subregion:1})

db.europe.find( {"subregion" : "Eastern Europe"}).explain("executionStats")
db.europe.find( {"subregion" : “Southern Europe"}).explain("executionStats")

Now its doing an IXSCAN (index scan) and looking at the keysExamined!
```

## Drilling Mongo

```

Storage config is easy!

{
  "type": "mongo",
  "connection": "mongodb://admin:mongopw@mongo:27017/admin",
  "enabled": true
}

some drills:

# avg population by subregion 
select subregion, avg(population) as avg_pop, count(*) as county_count 
    from mongo.demo.europe
    group by subregion 

# russian timeszones
select name, population, flatten(timezones) from mongo.demo.europe where name = 'Russia'

```

## MongoDb In Spark

- MongoDb has first-class support for Spark
- When using filters with DataFrames API, the underlying Mongo Connector code constructs an aggregation pipeline to filter the data in MongoDB before sending it to Spark.
- This ensures only the data needed is retrieved from MongoDb


In [4]:
# Write data, Surrogate key ID
s = spark.read.option("multiline","true").json("file:///home/jovyan/datasets/json-samples/stocks.json")
s.write.format("mongo") \
    .mode("overwrite").option("database","demo")\
    .option("collection","stocks").save()

In [5]:
# Write data, assign an existing column as the "_id" before write.
s.withColumn("_id",s.symbol).write.format("mongo")\
    .mode("overwrite").option("database","demo").option("collection","stocks2").save()

In [6]:
s2 = spark.read.format("mongo").option("database","demo").option("collection","stocks2").load()
s2.show()

+----+-------+------+
| _id|  price|symbol|
+----+-------+------+
|AAPL| 126.82|  AAPL|
|AMZN|3098.12|  AMZN|
|  FB| 251.11|    FB|
|GOOG|1725.05|  GOOG|
| IBM| 128.39|   IBM|
|MSFT| 212.55|  MSFT|
| NET|   78.0|   NET|
|NFLX|  497.0|  NFLX|
|TSLA|  823.8|  TSLA|
|TWTR|  45.11|  TWTR|
+----+-------+------+



In [15]:
s2.createOrReplaceTempView("stocks")
s3 = spark.sql("SELECT * FROM stocks WHERE symbol='IBM'")
s3.show()

+---+------+------+
|_id| price|symbol|
+---+------+------+
|IBM|128.39|   IBM|
+---+------+------+



In [8]:
nfstock = s2.filter("_id = 'NFLX'")
nfstock.explain()

== Physical Plan ==
*(1) Filter (isnotnull(_id#351) AND (_id#351 = NFLX))
+- *(1) Scan MongoRelation(MongoRDD[65] at RDD at MongoRDD.scala:51,Some(StructType(StructField(_id,StringType,true), StructField(price,DoubleType,true), StructField(symbol,StringType,true)))) [_id#351,price#352,symbol#353] PushedFilters: [IsNotNull(_id), EqualTo(_id,NFLX)], ReadSchema: struct<_id:string,price:double,symbol:string>




In [11]:
nfs = s.withColumn("_id",s.symbol).filter("_id = 'NFLX'")
nfs.explain()

== Physical Plan ==
*(1) Project [price#322, symbol#323, symbol#323 AS _id#411]
+- *(1) Filter (isnotnull(symbol#323) AND (symbol#323 = NFLX))
   +- FileScan json [price#322,symbol#323] Batched: false, DataFilters: [isnotnull(symbol#323), (symbol#323 = NFLX)], Format: JSON, Location: InMemoryFileIndex[file:/home/jovyan/datasets/json-samples/stocks.json], PartitionFilters: [], PushedFilters: [IsNotNull(symbol), EqualTo(symbol,NFLX)], ReadSchema: struct<price:double,symbol:string>




## Understanding how the Mongo Spark Connector Builds the aggregation pipeline

In this example we demonstrate how the MongoDb connect passes most of the DataFrame tranformation logic directly to MongoDb thereby reducing the amount of computational effort expected of the Spark cluster.

- `localq` processes all the transformations on spark. This consumes more memory and CPU on the spark cluster
- `mongoq` Applies a PushedFilters and ReadSchema to MongoDb, meaning only "Northern Europe" documents and only the "alpha3Code", "name","subregion", "population", and "borders" columns are being sent from MongoDb to Spark. 


In [12]:
local_euro = spark.read.option("multiline","true").json("file:///home/jovyan/datasets/json-samples/europe.json")

In [13]:
local_euro.write.format("mongo").mode("overwrite").option("database","fdoc").option("collection","europe").save()
mongo_euro = spark.read.format("mongo").option("database","fdoc").option("collection","europe").load()

In [14]:
from pyspark.sql.functions import * 
#local_euro.printSchema()

# Heres a DataFrame transformation. This could be in SQL too.
localq = local_euro.select("alpha3Code", "name","subregion", "population", explode(col("borders")).alias("borderAlpha3Code")).filter("subregion = 'Northern Europe'")
mongoq = mongo_euro.select("alpha3Code", "name","subregion", "population", explode(col("borders")).alias("borderAlpha3Code")).filter("subregion = 'Northern Europe'")

localq.show(7)
mongoq.show(7)

print("plan for local file")
localq.explain()

print("plan for MongoDb")
mongoq.explain()

+----------+-------------------+---------------+----------+----------------+
|alpha3Code|               name|      subregion|population|borderAlpha3Code|
+----------+-------------------+---------------+----------+----------------+
|       DNK|            Denmark|Northern Europe|   5678348|             DEU|
|       EST|            Estonia|Northern Europe|   1313271|             LVA|
|       EST|            Estonia|Northern Europe|   1313271|             RUS|
|       FIN|            Finland|Northern Europe|   5485215|             NOR|
|       FIN|            Finland|Northern Europe|   5485215|             SWE|
|       FIN|            Finland|Northern Europe|   5485215|             RUS|
|       IRL|Republic of Ireland|Northern Europe|   6378000|             GBR|
+----------+-------------------+---------------+----------+----------------+
only showing top 7 rows

+----------+-------------------+---------------+----------+----------------+
|alpha3Code|               name|      subregion|pop

In [None]:
mongoq.printSchema()

In [None]:
mongo_euro