### Prerequisites
#### 1. Have the following jar files in PySpark root (`/anaconda3/lib/python3.6/site-packages/pyspark/jars`):
    1. bson-3.8.1-javadoc.jar
    2. mongodb-driver-core-3.8.1-javadoc.jar
    3. mongodb-driver-3.8.1-javadoc.jar
    Download: https://stackoverflow.com/a/52388477
#### 2. Start/stop mongodb service using homebrew:
`brew services <start/stop> mongodb-community@6.0`

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import array_contains

### Connect to MongoDB

In [2]:
# CNCT_STR = 'mongodb://127.0.0.1:27017/svp_local.video_tags'
CNCT_STR = 'mongodb+srv://sri:<PASSWORD>@svp-cluster.1uzpyjf.mongodb.net/svp_database.video_tags?retryWrites=true'
FORMAT = 'com.mongodb.spark.sql.DefaultSource'

In [3]:
spark = SparkSession \
        .builder \
        .appName('spark_video_processing') \
        .master('local')\
        .config('spark.mongodb.input.uri', CNCT_STR) \
        .config('spark.mongodb.output.uri', CNCT_STR) \
        .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
        .getOrCreate()



### Read from MongoDB

In [4]:
video_tags = spark.read \
             .format(FORMAT) \
             .option('uri', CNCT_STR) \
             .load()

In [5]:
video_tags.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- video_id: long (nullable = true)



In [6]:
video_tags.show()

+--------------------+--------------------+--------+
|                 _id|                tags|video_id|
+--------------------+--------------------+--------+
|{635b67c84e6a9461...|[cat, dog, elephant]|     123|
|{635b67c84e6a9461...|    [dog, bear, cow]|     456|
|{635b687c4e6a9461...|[monkey, dog, zebra]|     567|
|{635ed4f44015f46b...|[penguin, monkey,...|    1003|
+--------------------+--------------------+--------+



In [7]:
VIDEO_ID = 1002

In [8]:
video_tags.filter(f'video_id = {VIDEO_ID}').show()

+---+----+--------+
|_id|tags|video_id|
+---+----+--------+
+---+----+--------+



In [9]:
QUERY_TAG = 'dog'
df = video_tags.filter(array_contains(video_tags.tags, QUERY_TAG))
df.show()
df.count() # get count

+--------------------+--------------------+--------+
|                 _id|                tags|video_id|
+--------------------+--------------------+--------+
|{635b67c84e6a9461...|[cat, dog, elephant]|     123|
|{635b67c84e6a9461...|    [dog, bear, cow]|     456|
|{635b687c4e6a9461...|[monkey, dog, zebra]|     567|
+--------------------+--------------------+--------+



3

#### Retrieve videos which either contain `dog` or `monkey`

In [10]:
QUERY_TAG1 = 'dog'
df1 = video_tags.filter(array_contains(video_tags.tags, QUERY_TAG1))
QUERY_TAG2 = 'monkey'
df2 = video_tags.filter(array_contains(video_tags.tags, QUERY_TAG2))
df1.union(df2).show()

+--------------------+--------------------+--------+
|                 _id|                tags|video_id|
+--------------------+--------------------+--------+
|{635b67c84e6a9461...|[cat, dog, elephant]|     123|
|{635b67c84e6a9461...|    [dog, bear, cow]|     456|
|{635b687c4e6a9461...|[monkey, dog, zebra]|     567|
|{635b687c4e6a9461...|[monkey, dog, zebra]|     567|
|{635ed4f44015f46b...|[penguin, monkey,...|    1003|
+--------------------+--------------------+--------+



#### Retrieve videos which contain `dog` and `monkey`

In [11]:
QUERY_TAG1 = 'dog'
df1 = video_tags.filter(array_contains(video_tags.tags, QUERY_TAG1))
QUERY_TAG2 = 'cow'
df2 = video_tags.filter(array_contains(video_tags.tags, QUERY_TAG2))
df1.intersect(df2).show()

+--------------------+----------------+--------+
|                 _id|            tags|video_id|
+--------------------+----------------+--------+
|{635b67c84e6a9461...|[dog, bear, cow]|     456|
+--------------------+----------------+--------+



#### Using SQL for retrieval

In [12]:
video_tags.createOrReplaceTempView('VIDEO_TAGS')

In [13]:
QUERY = f'SELECT * FROM VIDEO_TAGS WHERE `video_id` = {VIDEO_ID}'
spark.sql(QUERY).show()

+---+----+--------+
|_id|tags|video_id|
+---+----+--------+
+---+----+--------+



### Write to MongoDB

In [17]:
new_tags = spark.createDataFrame([{'video_id': 1004,
                                   'tags': ['zebra', 'tiger', 'rabbit']
                                  }
                                 ])

In [18]:
new_tags.printSchema()

root
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- video_id: long (nullable = true)



In [19]:
new_tags.write.format(FORMAT).mode('append').save()