# MongoDb connection test

In [18]:
from pymongo import MongoClient
import pprint

In [19]:
my_client = MongoClient()

In [20]:
db= my_client.test

In [21]:
restaurants= db.restaurants

In [23]:
print('total record for the collection:'+ str(restaurants.count()))

total record for the collection:25359


In [24]:
for record in restaurants.find().limit(1):
    pprint.pprint(record)

{'_id': ObjectId('5a705f566d0dc0e6fa9b53a6'),
 'address': {'building': '1007',
             'coord': [-73.856077, 40.848447],
             'street': 'Morris Park Ave',
             'zipcode': '10462'},
 'borough': 'Bronx',
 'cuisine': 'Bakery',
 'grades': [{'date': datetime.datetime(2014, 3, 3, 0, 0),
             'grade': 'A',
             'score': 2},
            {'date': datetime.datetime(2013, 9, 11, 0, 0),
             'grade': 'A',
             'score': 6},
            {'date': datetime.datetime(2013, 1, 24, 0, 0),
             'grade': 'A',
             'score': 10},
            {'date': datetime.datetime(2011, 11, 23, 0, 0),
             'grade': 'A',
             'score': 9},
            {'date': datetime.datetime(2011, 3, 10, 0, 0),
             'grade': 'B',
             'score': 14}],
 'name': 'Morris Park Bake Shop',
 'restaurant_id': '30075445'}


# mongo to spark connection

In [25]:
import os
import sys

In [26]:
spark_path = '/home/ubuntu/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyspark'
os.environ['SPARK_HOME']=spark_path
os.environ['HADOOP_HOME']=spark_path
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.mongodb.spark:mongo-spark-connector_2.10:2.2.1 pyspark-shell'
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.4-src.zip")
#$SPARK_HOME/bin/spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.10:2.2.1

In [27]:
from pyspark.sql import SparkSession

In [28]:
my_spark = SparkSession\
.builder.appName('my_app')\
.config('spark.mongodb.input.uri','mongodb://localhost/test.restaurant')\
.config('spark.mongodb.output.uri','mongodb://localhost/test.sparkdb')\
.getOrCreate()


In [29]:
people = my_spark.createDataFrame([("Bilbo Baggins",  50), ("Gandalf", 1000)\
                                   , ("Thorin", 195), ("Balin", 178), ("Kili", 77)\
                                   ,("Dwalin", 169), ("Oin", 167), ("Gloin", 158)\
                                   , ("Fili", 82), ("Bombur", None)], ["name", "age"])


In [30]:
people.count()

10

In [31]:
people.head()

Row(name='Bilbo Baggins', age=50)

In [35]:
people.write.format("com.mongodb.spark.sql.DefaultSource").mode('append').option("database","test").option("collection", "people").save()

In [35]:
df =my_spark.read.format('com.mongodb.spark.sql.DefaultSource').option('uri','mongodb://localhost/test.people').load()

In [35]:
df.count()

In [28]:
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [29]:
df.collect()

[Row(_id=Row(oid='5a71b15700661807c24a7ac5'), age=50, name='Bilbo Baggins'),
 Row(_id=Row(oid='5a71b15700661807c24a7ac6'), age=1000, name='Gandalf'),
 Row(_id=Row(oid='5a71b15700661807c24a7ac7'), age=195, name='Thorin'),
 Row(_id=Row(oid='5a71b15700661807c24a7ac8'), age=178, name='Balin'),
 Row(_id=Row(oid='5a71b15700661807c24a7ac9'), age=77, name='Kili'),
 Row(_id=Row(oid='5a71b15700661807c24a7aca'), age=169, name='Dwalin'),
 Row(_id=Row(oid='5a71b15700661807c24a7acb'), age=167, name='Oin'),
 Row(_id=Row(oid='5a71b15700661807c24a7acc'), age=158, name='Gloin'),
 Row(_id=Row(oid='5a71b15700661807c24a7acd'), age=82, name='Fili'),
 Row(_id=Row(oid='5a71b15700661807c24a7ace'), age=None, name='Bombur'),
 Row(_id=Row(oid='5a71b1c700661807c24a7ad0'), age=169, name='Dwalin'),
 Row(_id=Row(oid='5a71b1c700661807c24a7ad1'), age=167, name='Oin'),
 Row(_id=Row(oid='5a71b1c700661807c24a7ad2'), age=158, name='Gloin'),
 Row(_id=Row(oid='5a71b1c700661807c24a7ad3'), age=82, name='Fili'),
 Row(_id=Row(oi

In [33]:
df.distinct().collect()

[Row(_id=Row(oid='5a71b1c700661807c24a7ad3'), age=82, name='Fili'),
 Row(_id=Row(oid='5a71b15700661807c24a7ac8'), age=178, name='Balin'),
 Row(_id=Row(oid='5a71b1c700661807c24a7ad6'), age=1000, name='Gandalf'),
 Row(_id=Row(oid='5a71b15700661807c24a7ac5'), age=50, name='Bilbo Baggins'),
 Row(_id=Row(oid='5a71b1c700661807c24a7ad7'), age=195, name='Thorin'),
 Row(_id=Row(oid='5a71b1c700661807c24a7ad1'), age=167, name='Oin'),
 Row(_id=Row(oid='5a71b15700661807c24a7acc'), age=158, name='Gloin'),
 Row(_id=Row(oid='5a71b15700661807c24a7ace'), age=None, name='Bombur'),
 Row(_id=Row(oid='5a71b1c700661807c24a7ad0'), age=169, name='Dwalin'),
 Row(_id=Row(oid='5a71b15700661807c24a7ac7'), age=195, name='Thorin'),
 Row(_id=Row(oid='5a71b1c700661807c24a7ad2'), age=158, name='Gloin'),
 Row(_id=Row(oid='5a71b1c700661807c24a7ad5'), age=50, name='Bilbo Baggins'),
 Row(_id=Row(oid='5a71b1c700661807c24a7ad4'), age=None, name='Bombur'),
 Row(_id=Row(oid='5a71b15700661807c24a7acd'), age=82, name='Fili'),
 R

In [34]:
type(df)

pyspark.sql.dataframe.DataFrame

In [37]:
df.describe().collect()

[Row(summary='count', age='18', name='20'),
 Row(summary='mean', age='230.66666666666666', name=None),
 Row(summary='stddev', age='284.3800274280879', name=None),
 Row(summary='min', age='50', name='Balin'),
 Row(summary='max', age='1000', name='Thorin')]

In [None]:
df.plo