In [1]:
# https://ipython.readthedocs.io/en/stable/config/extensions/autoreload.html?highlight=autoreload
%load_ext autoreload
%autoreload 2

In [2]:
import findspark

findspark.init()

from bson.son import SON
import pprint
from pymongo import MongoClient
import pyspark # only run after findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from sqlalchemy import create_engine
from sqlalchemy import text

import avg_age
import top_10

In [3]:
def mysql_engine():
    engine = create_engine('mysql+pymysql://imdb:imdb@localhost:3306/imdb')
    return engine

def get_spark():
    conf = SparkConf().setAppName('jupyter').setMaster('spark://0.0.0.0:7077')\
            .set('spark.jars.packages', 'mysql:mysql-connector-java:8.0.15,org.mongodb.spark:mongo-spark-connector_2.11:2.4.0')\
            .set('spark.executor.memory', '4g')
    
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
        
    return spark

# Init Spark
spark = get_spark()

In [None]:
# Init MySQL
engine = mysql_engine()

url = "jdbc:mysql://localhost:3306/imdb"
properties = {'driver': 'com.mysql.jdbc.Driver', 
                'user': "imdb",
                'password': "imdb"}

name_basics = spark.read.jdbc(url=url, table="name_basics", properties=properties)
title_basics = spark.read.jdbc(url=url, table="title_basics", properties=properties)
title_principals = spark.read.jdbc(url=url, table="title_principals", properties=properties)
title_ratings = spark.read.jdbc(url=url, table="title_ratings", properties=properties)

In [4]:
# Init Mongo
mongo_client = MongoClient('mongodb://localhost:27017/')

# Avg age

In [8]:
print("python mysql")
%timeit -n 1 -r 1 avg_age.py_mysql(engine)

print("spark mysql")
%timeit -n 1 -r 1 avg_age.spark_mysql(name_basics, title_basics, title_principals)

py_mongo(mongo_client, avg_age.pipeline)

spark_mongo(spark, avg_age.pipeline).collect()

# Top 10

In [13]:
def top_10_py_mysql(engine):
    
    query = """
       SELECT * FROM (
            SELECT
                ROW_NUMBER() OVER (PARTITION BY titles.titleType, titles.startYear ORDER BY averageRating DESC) AS row_num,
                originalTitle,
                titleType,
                startYear, 
                averageRating
            FROM
                title_basics titles
                    JOIN
                title_ratings ratings ON titles.tconst = ratings.tconst
            WHERE startYear IS NOT NULL
        ) AS ranking
        WHERE ranking.row_num <= 10
        ORDER BY titleType, startYear, averageRating DESC;
    """
    stmt = text(query)
    
    conn = engine.connect()
    
    rs = conn.execute(stmt)
    
    rs.fetchall()
    
    conn.close()
    

In [17]:
%%timeit -n 1 -r 1

top_10_py_mysql(engine)

3.26 s ± 14.1 ms per loop (mean ± std. dev. of 2 runs, 3 loops each)


In [21]:
def spark_mysql(title_basics, title_ratings):
    
    titles = title_basics.select('tconst', 'startYear', 'originalTitle', 'titleType')
    titles = titles.withColumn('startYear', titles['startYear'].cast(IntegerType()))\
                    .where(titles['startYear'].isNotNull())
    
    ratings = title_ratings.select('tconst', 'averageRating')
    
    result = ratings.join(titles, on=['tconst'])
    
    window = Window.partitionBy(['titleType', 'startYear']).orderBy(desc('averageRating'))

    result = result.select('*', rank().over(window).alias('rank')).filter(col('rank') <= 10)
    
    result = result.orderBy('titleType', 'startYear', desc('averageRating'))
    
    return result

In [25]:
%%timeit -n 3 -r 2

spark_mysql(title_basics, title_ratings).collect()

18 s ± 814 ms per loop (mean ± std. dev. of 2 runs, 3 loops each)


In [17]:
pipeline = [
        { "$match": {"startYear": {"$ne": '\\N'} } },
        { "$match": {"startYear": 2017 } },
        { "$match": {"titleType": 'tvEpisode' } },
        { "$match": {"averageRating": {'$exists': 'true'} } },
        { "$sort": {'averageRating': -1} },
        { '$limit': 10},
        { "$project": {
            "_id": 0,
            'titleType': 1,
            'startYear': 1,
            'originalTitle': 1,
            'averageRating': 1
            }
        }
    ]

def py_mongo(mongo_client, pipeline):
    db = mongo_client['imdb']
    
    db.titles.aggregate(pipeline)
    #pprint.pprint(db.command('aggregate', 'titles', pipeline=pipeline, explain=True))

In [18]:
%%timeit -n 3 -r 2

py_mongo(mongo_client, pipeline)

1.95 s ± 1.86 ms per loop (mean ± std. dev. of 2 runs, 3 loops each)


In [21]:
def spark_mongo(spark, pipeline):
    
    df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
            .option("uri", "mongodb://localhost:27017/imdb.titles") \
            .option("pipeline", pipeline) \
            .load()
    return df

In [22]:
%%timeit -n 3 -r 2

spark_mongo(spark, pipeline).collect()

10.4 s ± 58.2 ms per loop (mean ± std. dev. of 2 runs, 3 loops each)


In [16]:
spark.stop()