In [1]:
from pymongo import MongoClient
import findspark
from pyspark.sql import SparkSession
findspark.init()

In [2]:
client = MongoClient('mongodb://localhost:27017/')
db = client['test'] 
collection_ratings = db["ratings"]
collection_movies = db["movies"]
collection_users = db["users"]

def run_load():

    ratings_list = []
    with open(r"ml-1m\ratings.dat", encoding= "latin-1") as ratings:
        lines = ratings.readlines()
        for line in lines :
            dat = line.replace("\n","").split("::")
            dct = {
                "UserID": int(dat[0]),
                "MovieID": int(dat[1]),
                "Rating": int(dat[2]),
                "Timestamp" : int(dat[3])
                }
            ratings_list.append(dct)


    movies_list = []
    with open(r"ml-1m\movies.dat", encoding= "latin-1") as movies:
        lines = movies.readlines()
        for line in lines :
            dat = line.replace("\n","").split("::")
            dct = {
                "MovieID": int(dat[0]),
                "Title": dat[1],
                "Genre" : [x.lower() for x in dat[2].split("|")]
                }
            movies_list.append(dct)


    users_list = []
    with open(r"ml-1m\users.dat", encoding= "latin-1") as users:
        lines = users.readlines()
        for line in lines :
            dat = line.replace("\n","").split("::")
            dct = {
                "UserID": int(dat[0]),
                "Gender": dat[1],
                "Age": dat[2],
                "Occupation" : int(dat[3]),
                "Zip-code" : dat[4]
                }
            users_list.append(dct)


    collection_ratings.insert_many(ratings_list)
    collection_movies.insert_many(movies_list)
    collection_users.insert_many(users_list)


#run_load()

* Read Data Using pyspark

In [2]:
spark = SparkSession.builder \
    .appName("SparkJob") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .master("local[*]") \
    .getOrCreate()

mongo_ip = "mongodb://127.0.0.1:27017/test."

In [3]:
users = spark.read.format('com.mongodb.spark.sql.DefaultSource').option("uri", mongo_ip + "users").load()
ratings = spark.read.format('com.mongodb.spark.sql.DefaultSource').option("uri", mongo_ip + "ratings").load()
movies = spark.read.format('com.mongodb.spark.sql.DefaultSource').option("uri", mongo_ip + "movies").load()

In [4]:
ratings.show()

+-------+------+---------+------+--------------------+
|MovieID|Rating|Timestamp|UserID|                 _id|
+-------+------+---------+------+--------------------+
|   1193|     5|978300760|     1|{6647abe58c776a88...|
|    661|     3|978302109|     1|{6647abe58c776a88...|
|    914|     3|978301968|     1|{6647abe58c776a88...|
|   3408|     4|978300275|     1|{6647abe58c776a88...|
|   2355|     5|978824291|     1|{6647abe58c776a88...|
|   1197|     3|978302268|     1|{6647abe58c776a88...|
|   1287|     5|978302039|     1|{6647abe58c776a88...|
|   2804|     5|978300719|     1|{6647abe58c776a88...|
|    594|     4|978302268|     1|{6647abe58c776a88...|
|    919|     4|978301368|     1|{6647abe58c776a88...|
|    595|     5|978824268|     1|{6647abe58c776a88...|
|    938|     4|978301752|     1|{6647abe58c776a88...|
|   2398|     4|978302281|     1|{6647abe58c776a88...|
|   2918|     4|978302124|     1|{6647abe58c776a88...|
|   1035|     5|978301753|     1|{6647abe58c776a88...|
|   2791| 