In [None]:
import findspark
# Locate the Spark installation
findspark.init()

import pyspark
from pyspark import StorageLevel
from pyspark.sql.functions import col
from pyspark.sql.functions import regexp_replace
from pyspark.sql import *

spark = SparkSession.builder\
        .appName("MongoInsert")\
        .master("local[*]")\
        .config("spark.driver.memory", "5g")\
        .config("spark.executor.memory", "5g")\
        .config("spark.storage.memoryFraction", "0.5")\
        .config("spark.shuffle.memoryFraction", "0.5")\
        .config("spark.driver.maxResultSize", "0")\
        .getOrCreate()

In [None]:
from pyspark.sql.types import StructType, StructField, ArrayType, StringType, LongType

# Define the schema
schema = StructType([
    StructField('movie', StringType(), True),
    StructField('rating', LongType(), True),
    StructField('genre', StringType(), True),
    StructField('review_date', StringType(),True),
    StructField('review_detail', StringType(), True),
    StructField('review_id', StringType(), True),
    StructField('review_summary', StringType(), True),
    StructField('reviewer', StringType(), True),
    StructField('spoiler_tag', LongType(), True),
    StructField('helpful_upvotes', LongType(), True),
    StructField('helpful_total', LongType(), True)])

In [None]:
# Read the files to filter from the HDFS
df = spark.read.csv('hdfs://localhost:54310/user/data/filtered', schema=schema, sep="\t", mode="DROPMALFORMED")
df.limit(5).toPandas()

                                                                                

Unnamed: 0,movie,rating,genre,review_date,review_detail,review_id,review_summary,reviewer,spoiler_tag,helpful_upvotes,helpful_total
0,Star Trek II: The Wrath of Khan (1982),9.0,Action|Adventure|Sci-Fi|Thriller,19 July 2003,Star Trek had been a long-running TV-series be...,rw0197040,Defining in Ways Both Good and bad,OttoVonB,0,4,5
1,Star Trek II: The Wrath of Khan (1982),10.0,Action|Adventure|Sci-Fi|Thriller,13 July 2003,This is by far the best Star Trek movie. Howe...,rw0197039,"Excellent, except for the fake Vulcan",antigraviton,0,0,0
2,Star Trek II: The Wrath of Khan (1982),,Action|Adventure|Sci-Fi|Thriller,3 July 2003,"Okay, so I wrote the review just for that head...",rw0197038,KHHHHHAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAANNNN!!!!...,jaffakree340,0,0,0
3,Star Trek II: The Wrath of Khan (1982),10.0,Action|Adventure|Sci-Fi|Thriller,21 June 2003,"It was 1982 and on Friday, June 4 two friends ...",rw0197037,Still the greatest of them all,jtkirkfan2002,0,2,3
4,Star Trek II: The Wrath of Khan (1982),9.0,Action|Adventure|Sci-Fi|Thriller,17 June 2003,I love The Wrath of Khan! This was a very well...,rw0197036,Great Movie!,trekmanryan,0,0,0


In [None]:
# Randomly subsample the data to work on a manageable sample in the sandox environment
number_of_samples = 300000
sample_df = df.sample(withReplacement=False, fraction=number_of_samples/df.count())

                                                                                

In [None]:
# Verify the number of rows extracted from the original data
rows = sample_df.count()
print(f'Rows extracted = {rows}')

In [None]:
import pymongo

# Connect to the local MongoDB instance and select the database used as repository for the dataset
mongo = pymongo.MongoClient()
mongo_db = mongo.project
# Clear the content of the reviews collection
mongo_db.reviews.delete_many({})

<pymongo.results.DeleteResult at 0x7fc38c6ec100>

In [None]:
# Get a dict representation of the DataFrame containing the sample
dict = sample_df.toPandas().to_dict(orient='records')
# Insert the reviews in the collection of the project MongoDB database
mongo_db.reviews.insert_many(dict);

                                                                                

In [None]:
# Close the connection to the local MongoDB instance
mongo.close()

# Stop the Spark context underlying the Spark session
spark.stop()