### In order for Spark to talk to MongoDB, we need to initial the Spark context with pointers to the mongo uri and also include the mongo-spark-connector

### Additionally, whoever configures the cluster may need to make sure additional jars are installed in $SPARK_HOME/jars



In [None]:
# This is sample code of how to create the Spark context with a custom configuration to connect to MongoDB

# import os

# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1 pyspark-shell'

# def initspark(appname = "Test", servername = "local", mongo="mongodb://127.0.0.1/classroom"):
#     print ('initializing pyspark')
#     conf = SparkConf().setAppName(appname).setMaster(servername)
#     sc = SparkContext(conf=conf)
#     spark = SparkSession.builder.appName(appname) \
#     .config("spark.mongodb.input.uri", mongo) \
#     .config("spark.mongodb.output.uri", mongo) \
#     .enableHiveSupport().getOrCreate()
#     sc.setLogLevel("WARN")
#     print ('pyspark initialized')
#     return sc, spark, conf


In [None]:
import sys, os
#print(sys.version)
os.environ["SPARK_HOME"] = '/usr/local/spark'
# os.environ["PYTHON_PATH"] = 'python3'
# os.environ["PYSPARK_DRIVER_PYTHON"] = 'python3'

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# PYSPARK_SUBMIT_ARGS is used to tell Spark to load additional packages when submitting the job.
# In this case the mongo-spark-connector

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1 pyspark-shell'
sys.path.append('/class')
from initspark import *
sc, spark, conf = initspark()


### To read from a Mongo collection

In [None]:
df = spark.read.format("mongo").option("uri", "mongodb://127.0.0.1/Northwind.regions").load()
df.show()


### Explore the query plan and what is sent to Mongo by running the following commands in a mongosh session
```
use Northwind
db.setProfilingLevel(2)
db.system.profile.find().sort({ts:-1}).limit(1).pretty()
```

In [None]:
df.explain()


### Now, add a filter condition and see if it is pushed down to MongoDB

In [None]:
df2 = df.filter("RegionDescription='Western'")
df2.show()
df2.explain()

#db.system.profile.find().sort({ts:-1}).limit(1).pretty()

### We can also take a DataFrame and write it to a Mongo destination

In [None]:
x = sc.parallelize([('APAC', '5')])
x1 = spark.createDataFrame(x, schema = ['RegionDescription', 'RegionID'])
x1.write.format("mongo").options(collection="regions", database="Northwind").mode("append").save()
print('Done')


In [None]:
df = spark.read.format("mongo").option("uri", "mongodb://127.0.0.1/Northwind.regions").load()
df.show()


### Like any DataFrame, we can make it into a temporary view and use SparkSQL on it

In [None]:
df.createOrReplaceTempView('regions')
spark.sql('select * from regions where regionid between 2 and 4').show()


## From here, we can start using Mongo collections just like tables from any other source, and use Spark to process them with SQL or Spark dot methods

In [None]:
c = spark.read.format("mongo").option("uri", "mongodb://127.0.0.1/Northwind.categories").load()
p = spark.read.format("mongo").option("uri", "mongodb://127.0.0.1/Northwind.products").load()
c.createOrReplaceTempView('categories')
p.createOrReplaceTempView('products')
spark.sql('''select c.categoryid, c.categoryname, p.productid, p.productname
from products as p 
join categories as c on p.categoryid = c.categoryid 
order by c.categoryid, p.productid''').show()



## COLLECT_LIST can be used to create nested repeating fields instead of using the aggregate pipeline

In [None]:
spark.sql('''select c.categoryid, c.categoryname
, COLLECT_LIST(NAMED_STRUCT('productid', p.productid, 'productname', p.productname, 'unitprice', p.unitprice)) as products
from products as p 
join categories as c on p.categoryid = c.categoryid 
group by c.categoryid, c.categoryname
order by c.categoryid''').show()



## Adding the SORT_ARRAY function lets you sort the contents of the nested collection

In [None]:
spark.sql('''select c.categoryid, c.categoryname
, SORT_ARRAY(COLLECT_LIST(NAMED_STRUCT('productid', p.productid, 'productname', p.productname, 'unitprice', p.unitprice))) as products
from products as p 
join categories as c on p.categoryid = c.categoryid 
group by c.categoryid, c.categoryname
order by c.categoryid''').show()



## LAB: ## 
### Write shippers to Mongo and find all the shippers with an 800 number using  a temporary view.
<br>
<details><summary>Click for <b>hint</b></summary>
<p>
Unlike Cassandra, Mongo does not require a collection to exist before writing to it, so just write the DataFrame to a new collection
<br>
Make a DataFrame from the new Mongo collection and turn it into a temporary view
<br>
Use SQL-like expression to find the desired records
<br>
<br>
</p>
</details>

<details><summary>Click for <b>code</b></summary>
<p>

```python
shippers.write.format("mongo").options(collection="shippers", database="classroom").mode("append").save()

s=spark.read.format("mongo").option("uri","mongodb://127.0.0.1/classroom.shippers").load()
s.createOrReplaceTempView('shippers')
display(spark.sql("select * from shippers where phone like '%800%'"))
```
</p>
</details>

## LAB: ## 
### From the Northwind database, read products and suppliers and join them and display ProductID, ProductName, SupplierID, CompanyName, and Country. Limit the results to products from the USA.
### Take a look at the query plan and the push down results with the following command in mongosh
```
db.system.profile.find().sort({ts:-1}).limit(2).pretty()
```
<br>

<details><summary>Click for <b>code</b></summary>
<p>

```python
s=spark.read.format("mongo").option("uri","mongodb://127.0.0.1/Northwind.suppliers").load()
s.createOrReplaceTempView('suppliers')
p=spark.read.format("mongo").option("uri","mongodb://127.0.0.1/Northwind.products").load()
p.createOrReplaceTempView('products')

j = spark.sql('''SELECT p.ProductID, p.ProductName, p.SupplierID, s.CompanyName, s.Country 
FROM products AS p
JOIN suppliers AS s on p.SupplierID = s.SupplierID
WHERE s.Country = 'USA'
''')
j.show()
# db.system.profile.find().sort({ts:-1}).limit(2).pretty()
```
</p>
</details>

## HOMEWORK: ## 
**First Challenge**

Map the following Mongo tables in Northwind to temporary views: customers, orders, order-details, products & categories. 

**Second Challenge**

Join all those tables and make another temporary view.

**Third Challenge**

Group the super join by OrderID, OrderDate, CustomerID, CompanyName and collect the order line items into a collection with the productid, productname, categoryid, categoryname, price paid, listed price, and quantity.

**Fourth Challenge**

Sort the collection by OrderID and save it to a new Mongo collection. 

**Bonus Challenge**

Turn the previous collection into one that is grouped by CustomerID with the Orders under them and the order line items under the orders.
<br>
<p></p>
<br>
<details><summary>Click for <b>hint</b></summary>
<p>
<br>
**First Challenge**
<br>
Read each table from the NoSQL source and turn it into a temporary view.
<br>
```x = spark.read.format("mongo").option("uri", "mongodb://127.0.0.1/db.table").load()
x.createOrReplaceTempView('table1')
```
<br>
<br>
**Second Challenge**
<br>
Write a SQL query to join all the tables, common keys are CustomerID, OrderID, ProductID, CategoryID.
<br>
```orderjoin = spark.sql('''SELECT ....
'''
)
```
<br>
<br>
**Third Challenge**
<br>
Use SORT_ARRAY, COLLECT_LIST, NAMED_STRUCT with GROUP BY clause.
<br>
**Fourth Challenge**
<br>
```x.write.format("mongo").options(collection="collection", database="database").mode("append").save()
```
<br>
<br>
**Bonus Challenge**
<br>
Make the result of the nested join into a temporary view and do another COLLECT_LIST on CustomerID to group the orders into that.
<br>
</p>
</details>
<br>
<details><summary>Click for <b>code</b></summary>
<p>

```python
cu = spark.read.format("mongo").option("uri", "mongodb://127.0.0.1/Northwind.customers").load()
o = spark.read.format("mongo").option("uri", "mongodb://127.0.0.1/Northwind.orders").load()
od = spark.read.format("mongo").option("uri", "mongodb://127.0.0.1/Northwind.order-details").load()
p = spark.read.format("mongo").option("uri", "mongodb://127.0.0.1/Northwind.products").load()
ca = spark.read.format("mongo").option("uri", "mongodb://127.0.0.1/Northwind.categories").load()

cu.createOrReplaceTempView('customers')
o.createOrReplaceTempView('orders')
od.createOrReplaceTempView('orderdetails')
p.createOrReplaceTempView('products')
ca.createOrReplaceTempView('categories')

orderjoin = spark.sql('''SELECT o.OrderID, o.OrderDate, o.CustomerID, cu.CompanyName
, od.ProductID, od.UnitPrice as PurchasePrice, od.Quantity
, p.ProductName, p.UnitPrice as ListPrice, ca.CategoryID, ca.CategoryName
FROM orders AS o
JOIN orderdetails AS od ON o.OrderID = od.OrderID
JOIN products AS p ON od.ProductID = p.ProductID
JOIN categories AS ca ON p.CategoryID = ca.CategoryID 
JOIN customers as cu ON o.CustomerID = cu.CustomerID
''')
orderjoin.createOrReplaceTempView('orderjoin')

ord1 = spark.sql('''
SELECT OrderID, OrderDate, CustomerID, CompanyName
, SORT_ARRAY(COLLECT_LIST(NAMED_STRUCT('ProductID', ProductID, 'ProductName', ProductName
               , 'CategoryID', CategoryID, 'CategoryName', CategoryName
               , 'ListPrice', ListPrice, 'PurchasePrice', PurchasePrice, 'Quantity', Quantity
               ))) AS LineItems

from orderjoin
GROUP BY OrderID, OrderDate, CustomerID, CompanyName
''')

#ord1.show()
ord1.createOrReplaceTempView('ord1')
# ord1.write.format("mongo").options(collection="orders1", database="Northwind2").mode("append").save()

ord2 = spark.sql('''
SELECT CustomerID, CompanyName
, SORT_ARRAY(COLLECT_LIST(NAMED_STRUCT('OrderID', OrderID, 'OrderDate', OrderDate, 'LineItems', LineItems))) AS Orders
from ord1
GROUP BY CustomerID, CompanyName
''')

ord2.show()

# ord2.write.format("mongo").options(collection="orders1", database="Northwind2").mode("append").save()
```
</p>
</details>


