In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/home/rhyme/spark-2.4.7-bin-hadoop2.7"
import findspark
findspark.init()
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext
import pyspark
from pyspark.sql.functions import col

In [None]:
# Task 1 - start Mongodb server, start MongoDB Compass and Connect, upload movies dataset to MongoDB, run from Project folder
# sudo mongod --nojournal --dbpath /data/db
# mongorestore --host localhost --port 27017 --db sample_mflix --dir ./sample_mflix

In [5]:
# task 2 - connect to mongo db, read collection, get schema
spark = SparkSession \
    .builder \
    .master('local') \
    .config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.11:2.2.0') \
    .config('spark.mongodb.input.uri','mongodb://127.0.0.1/sample_mflix.movies') \
    .config('spark.mongodb.output.uri','mongodb://127.0.0.1/sample_mflix.analyses') \
    .getOrCreate()

In [6]:
data = spark.read.format('com.mongodb.spark.sql.DefaultSource').load()

In [7]:
data.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- awards: struct (nullable = true)
 |    |-- wins: integer (nullable = true)
 |    |-- nominations: integer (nullable = true)
 |    |-- text: string (nullable = true)
 |-- cast: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- countries: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- directors: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- fullplot: string (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- imdb: struct (nullable = true)
 |    |-- rating: double (nullable = true)
 |    |-- votes: integer (nullable = true)
 |    |-- id: integer (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- lastupdated: string (nullable = true)
 |-- metacritic: integer (nullable = true)
 |-- num_mflix_comments: integ

In [14]:
# task 3 - selecting and grouping
data.count()

23539

In [15]:
data.show(5)

+--------------------+--------------+--------------------+---------+--------------------+--------------------+--------------------+-----------------+---------+--------------------+----------+------------------+--------------------+--------------------+-------+-------------------+-------+--------------------+--------------------+-----+--------------------+----+
|                 _id|        awards|                cast|countries|           directors|            fullplot|              genres|             imdb|languages|         lastupdated|metacritic|num_mflix_comments|                plot|              poster|  rated|           released|runtime|               title|            tomatoes| type|             writers|year|
+--------------------+--------------+--------------------+---------+--------------------+--------------------+--------------------+-----------------+---------+--------------------+----------+------------------+--------------------+--------------------+-------+--------------

In [18]:
data.select(['title','fullplot']).head(5)

[Row(title='Blacksmith Scene', fullplot='A stationary camera looks at a large anvil with a blacksmith behind it and one on either side. The smith in the middle draws a heated metal rod from the fire, places it on the anvil, and all three begin a rhythmic hammering. After several blows, the metal goes back in the fire. One smith pulls out a bottle of beer, and they each take a swig. Then, out comes the glowing metal and the hammering resumes.'),
 Row(title='The Great Train Robbery', fullplot="Among the earliest existing films in American cinema - notable as the first film that presented a narrative story to tell - it depicts a group of cowboy outlaws who hold up a train and rob the passengers. They are then pursued by a Sheriff's posse. Several scenes have color included - all hand tinted."),
 Row(title='The Land Beyond the Sunset', fullplot="Thanks to the Fresh Air Fund, a slum child escapes his drunken mother for a day's outing in the country. Upon arriving, he and the other children 

In [19]:
data.select('imdb').head(5)

[Row(imdb=Row(id=5, rating='6.2', votes='1189')),
 Row(imdb=Row(id=439, rating='7.4', votes='9847')),
 Row(imdb=Row(id=488, rating='7.1', votes='448')),
 Row(imdb=Row(id=832, rating='6.6', votes='1375')),
 Row(imdb=Row(id=1737, rating='7.3', votes='1034'))]

In [20]:
data.select(['countries','imdb.rating']).head(5)

[Row(countries=['USA'], rating='6.2'),
 Row(countries=['USA'], rating='7.4'),
 Row(countries=['USA'], rating='7.1'),
 Row(countries=['USA'], rating='6.6'),
 Row(countries=['USA'], rating='7.3')]

In [26]:
data.select(['imdb.rating']).agg({'rating':'average'}).show()

+---------------+
|    avg(rating)|
+---------------+
|6.6934662236988|
+---------------+



In [14]:
genre = data.groupBy(['countries']).count()
##genre.show()
genre.orderBy(['count'],ascending=False).show()

+---------------+-----+
|      countries|count|
+---------------+-----+
|          [USA]| 8985|
|           [UK]| 1126|
|       [France]|  848|
|        [Japan]|  671|
|        [India]|  555|
|       [Canada]|  535|
|        [Italy]|  481|
|      [Germany]|  383|
|      [UK, USA]|  373|
|        [Spain]|  289|
|    [Australia]|  262|
|  [South Korea]|  254|
|  [USA, Canada]|  241|
|      [USA, UK]|  228|
|      [Finland]|  213|
|       [Russia]|  198|
|[Italy, France]|  191|
|    [Hong Kong]|  191|
|       [Sweden]|  174|
| [USA, Germany]|  161|
+---------------+-----+
only showing top 20 rows



In [31]:
# task 4 - writing results to CSV file
genre.withColumn('countries',col('countries').cast('string'))\
    .write.option('header','true').csv('./genre_count.csv')

In [32]:
genre.withColumn('countries',col('countries').cast('string'))\
    .coalesce(1).write.option('header','true').csv('./genre_count2.csv')

In [8]:
# task 5 - register df as SQL table, run SQL queries
data.registerTempTable('data_sql')

In [11]:
countries = spark.sql("""
select countries,count(_id) as count from data_Sql
group by countries having count > 100 order by count desc 
""")

In [12]:
countries.show()

+---------------+-----+
|      countries|count|
+---------------+-----+
|          [USA]| 8985|
|           [UK]| 1126|
|       [France]|  848|
|        [Japan]|  671|
|        [India]|  555|
|       [Canada]|  535|
|        [Italy]|  481|
|      [Germany]|  383|
|      [UK, USA]|  373|
|        [Spain]|  289|
|    [Australia]|  262|
|  [South Korea]|  254|
|  [USA, Canada]|  241|
|      [USA, UK]|  228|
|      [Finland]|  213|
|       [Russia]|  198|
|[Italy, France]|  191|
|    [Hong Kong]|  191|
|       [Sweden]|  174|
| [USA, Germany]|  161|
+---------------+-----+
only showing top 20 rows



In [None]:
# task 6 - writing results back to MongoDB


In [15]:
 genre.write \
      .format('com.mongodb.spark.sql.DefaultSource')\
      .mode('append')\
      .save()

In [16]:
spark.stop()