In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower,col
from sparkmeasure import StageMetrics

In [2]:
spark = SparkSession \
    .builder \
    .master('spark://192.168.56.1:7077') \
    .appName('Virtual cluster') \
    .config("spark.jars", "/spark-measure_2.12-0.17.jar") \
    .getOrCreate()

In [3]:
spark

In [4]:
stagemetrics = StageMetrics(spark)

In [5]:
movies = (spark.read
           .format('csv')
           .option('header', 'true')
           .option('delimiter', ',')
           .option('inferSchema', 'true')
           .load('movie.csv'))

rating = (spark.read
           .format('csv')
           .option('header', 'true')
           .option('delimiter', ',')
           .option('inferSchema', 'true')
           .load('rating.csv'))

tag = (spark.read
           .format('csv')
           .option('header', 'true')
           .option('delimiter', ',')
           .option('inferSchema', 'true')
           .load('new_tag.csv'))

In [6]:
tag = tag.withColumn('tag', lower(col('tag')))

+------+-------+-------------+-------------------+-------------+
|userId|movieId|          tag|          timestamp|      new_tag|
+------+-------+-------------+-------------------+-------------+
|    18|   4141|  mark waters|2009-04-24 18:19:40|  mark waters|
|    65|    208|    dark hero|2013-05-10 01:41:18|    dark hero|
|    65|    353|    dark hero|2013-05-10 01:41:19|    dark hero|
|    65|    521|noir thriller|2013-05-10 01:39:43|noir thriller|
|    65|    592|    dark hero|2013-05-10 01:41:18|    dark hero|
+------+-------+-------------+-------------------+-------------+
only showing top 5 rows



In [7]:
movies.createOrReplaceTempView('movies')
rating.createOrReplaceTempView('ratings')
tag.createOrReplaceTempView('tags')

In [8]:
##1
stagemetrics.begin()
spark.sql("SELECT COUNT(userId) AS users FROM ratings WHERE movieId = \
                    (SELECT movieId FROM movies WHERE title='Jumanji (1995)')").show()
stagemetrics.end()

+-----+
|users|
+-----+
|22243|
+-----+



In [9]:
stagemetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 3
numTasks => 10
elapsedTime => 3253 (3 s)
stageDuration => 3172 (3 s)
executorRunTime => 21427 (21 s)
executorCpuTime => 19748 (20 s)
executorDeserializeTime => 388 (0,4 s)
executorDeserializeCpuTime => 108 (0,1 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 162 (0,2 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 26 (26 ms)
resultSize => 16004 (15,0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 20000264
bytesRead => 692422957 (660,0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 8
shuffleTotalBlocksFetched => 8
shuffleLocalBlocksFetched => 8
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 472 (472 Bytes)
shuffleLocalBytesRead => 472 (472 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 472 (472 B

In [10]:
##2
stagemetrics.begin()
spark.sql("SELECT DISTINCT(title) FROM movies INNER JOIN tags ON tags.movieId=movies.movieID WHERE tag='boring' ORDER BY 1 ASC LIMIT 5").show(truncate=0)
stagemetrics.end()

+------------------------------------+
|title                               |
+------------------------------------+
|(500) Days of Summer (2009)         |
|101 Reykjavik (101 Reykjavík) (2000)|
|12 Years a Slave (2013)             |
|1408 (2007)                         |
|1492: Conquest of Paradise (1992)   |
+------------------------------------+



In [11]:
stagemetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 3
numTasks => 208
elapsedTime => 2271 (2 s)
stageDuration => 2134 (2 s)
executorRunTime => 8363 (8 s)
executorCpuTime => 3608 (4 s)
executorDeserializeTime => 2037 (2 s)
executorDeserializeCpuTime => 1264 (1 s)
resultSerializationTime => 52 (52 ms)
jvmGCTime => 41 (41 ms)
shuffleFetchWaitTime => 18 (18 ms)
shuffleWriteTime => 581 (0,6 s)
resultSize => 1392879 (1360,0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 845152256
recordsRead => 492826
bytesRead => 28305226 (26,0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 910
shuffleTotalBlocksFetched => 657
shuffleLocalBlocksFetched => 657
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 67093 (65,0 KB)
shuffleLocalBytesRead => 67093 (65,0 KB)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten 

In [12]:
##3
stagemetrics.begin()
spark.sql("SELECT DISTINCT(ratings.userId) FROM ratings INNER JOIN tags ON tags.movieId = ratings.movieId \
                      AND tags.userId = ratings.userId WHERE tag='bollywood' AND rating>3 ORDER BY ratings.userId").show(5)
stagemetrics.end()

+------+
|userId|
+------+
| 10573|
| 19837|
| 23333|
| 25004|
| 31338|
+------+
only showing top 5 rows



In [13]:
stagemetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 3
numTasks => 215
elapsedTime => 3800 (4 s)
stageDuration => 3733 (4 s)
executorRunTime => 23840 (24 s)
executorCpuTime => 21514 (22 s)
executorDeserializeTime => 903 (0,9 s)
executorDeserializeCpuTime => 639 (0,6 s)
resultSerializationTime => 5 (5 ms)
jvmGCTime => 625 (0,6 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 94 (94 ms)
resultSize => 1311196 (1280,0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 140509184
recordsRead => 12661114
bytesRead => 717740887 (684,0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 21
shuffleTotalBlocksFetched => 21
shuffleLocalBlocksFetched => 21
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 1239 (1239 Bytes)
shuffleLocalBytesRead => 1239 (1239 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesW

In [14]:
##4
stagemetrics.begin()
spark.sql("SELECT title,AVG(rating) FROM movies INNER JOIN ratings ON movies.movieId=ratings.movieId WHERE timestamp LIKE '2005%' GROUP BY 1 ORDER BY 2 DESC, movies.title LIMIT 10").show(10,truncate=0)
stagemetrics.end()

+----------------------------------------------------------------------------+-----------+
|title                                                                       |avg(rating)|
+----------------------------------------------------------------------------+-----------+
|Before the Fall (NaPolA - Elite für den Führer) (2004)                      |5.0        |
|Dancemaker (1998)                                                           |5.0        |
|Fear Strikes Out (1957)                                                     |5.0        |
|Gate of Heavenly Peace, The (1995)                                          |5.0        |
|Life Is Rosy (a.k.a. Life Is Beautiful) (Vie est belle, La) (1987)          |5.0        |
|Married to It (1991)                                                        |5.0        |
|My Life and Times With Antonin Artaud (En compagnie d'Antonin Artaud) (1993)|5.0        |
|Not Love, Just Frenzy (Más que amor, frenesí) (1996)                        |5.0        |

In [15]:
stagemetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 4
numTasks => 210
elapsedTime => 4385 (4 s)
stageDuration => 4293 (4 s)
executorRunTime => 30479 (30 s)
executorCpuTime => 26451 (26 s)
executorDeserializeTime => 918 (0,9 s)
executorDeserializeCpuTime => 655 (0,7 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 958 (1,0 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 1445 (1 s)
resultSize => 993665 (970,0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 891289600
recordsRead => 1830436
bytesRead => 692422957 (660,0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 57877
shuffleTotalBlocksFetched => 1800
shuffleLocalBlocksFetched => 1800
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 2325484 (2,0 MB)
shuffleLocalBytesRead => 2325484 (2,0 MB)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleByte

In [28]:
##5
stagemetrics.begin()
spark.sql("SELECT movies.title as title, collect_set(tags.tag) AS tags FROM movies INNER JOIN tags ON tags.movieId = movies.movieId WHERE tags.timestamp LIKE '%2015%' GROUP BY title ORDER BY title").show(5)
stagemetrics.end()

+--------------------+--------------------+
|               title|                tags|
+--------------------+--------------------+
|""Great Performan...|              [bd-r]|
|  'burbs, The (1989)|[1980's, dark com...|
|(500) Days of Sum...|[zooey deschanel,...|
|...tick... tick.....|              [bd-r]|
|            1 (2014)|           [sukumar]|
+--------------------+--------------------+
only showing top 5 rows



In [29]:
stagemetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 3
numTasks => 208
elapsedTime => 1079 (1 s)
stageDuration => 1014 (1 s)
executorRunTime => 4948 (5 s)
executorCpuTime => 2982 (3 s)
executorDeserializeTime => 525 (0,5 s)
executorDeserializeCpuTime => 577 (0,6 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 48 (48 ms)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 931 (0,9 s)
resultSize => 1472696 (1438,0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 19454456
recordsRead => 47202
bytesRead => 28305226 (26,0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 5380
shuffleTotalBlocksFetched => 1311
shuffleLocalBlocksFetched => 1311
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 694603 (678,0 KB)
shuffleLocalBytesRead => 694603 (678,0 KB)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritt

In [18]:
##6
stagemetrics.begin()
spark.sql("SELECT title,COUNT(rating) FROM movies JOIN ratings ON movies.movieId=ratings.movieId GROUP BY 1 ORDER BY 2 DESC").show(5,truncate=0)
stagemetrics.end()

+--------------------------------+-------------+
|title                           |count(rating)|
+--------------------------------+-------------+
|Pulp Fiction (1994)             |67310        |
|Forrest Gump (1994)             |66172        |
|Shawshank Redemption, The (1994)|63366        |
|Silence of the Lambs, The (1991)|63299        |
|Jurassic Park (1993)            |59715        |
+--------------------------------+-------------+
only showing top 5 rows



In [19]:
stagemetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 3
numTasks => 209
elapsedTime => 3935 (4 s)
stageDuration => 3880 (4 s)
executorRunTime => 28159 (28 s)
executorCpuTime => 25858 (26 s)
executorDeserializeTime => 520 (0,5 s)
executorDeserializeCpuTime => 561 (0,6 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 386 (0,4 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 1084 (1 s)
resultSize => 1373567 (1341,0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 891289600
recordsRead => 20027541
bytesRead => 692422957 (660,0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 147890
shuffleTotalBlocksFetched => 1600
shuffleLocalBlocksFetched => 1600
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 5305804 (5,0 MB)
shuffleLocalBytesRead => 5305804 (5,0 MB)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffle

In [20]:
##7
stagemetrics.begin()
spark.sql("SELECT userId, COUNT(rating) FROM ratings WHERE SUBSTRING(timestamp,1,4) = '1995' GROUP BY userId, SUBSTRING(timestamp,1,4)").show()
stagemetrics.end()

+------+-------------+
|userId|count(rating)|
+------+-------------+
| 28507|            1|
|131160|            3|
+------+-------------+



In [21]:
stagemetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 6
numTasks => 208
elapsedTime => 4253 (4 s)
stageDuration => 4235 (4 s)
executorRunTime => 31012 (31 s)
executorCpuTime => 29794 (30 s)
executorDeserializeTime => 406 (0,4 s)
executorDeserializeCpuTime => 670 (0,7 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 426 (0,4 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 28 (28 ms)
resultSize => 352821 (344,0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 30408704
recordsRead => 20000263
bytesRead => 690929309 (658,0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 2
shuffleTotalBlocksFetched => 2
shuffleLocalBlocksFetched => 2
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 156 (156 Bytes)
shuffleLocalBytesRead => 156 (156 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 

In [22]:
##8
stagemetrics.begin()
spark.sql("SELECT genres, first(title) as title, MAX(count) as total_ratings FROM (SELECT movies.genres, movies.title, COUNT(ratings.rating) AS count FROM movies INNER JOIN ratings ON ratings.movieId = movies.movieId GROUP BY movies.genres, movies.title ORDER BY genres, count DESC) GROUP BY genres ORDER BY genres").show(5,truncate=0)
stagemetrics.end()

+-----------------------------------+------------------------------------------------------------------------------+-------------+
|genres                             |title                                                                         |total_ratings|
+-----------------------------------+------------------------------------------------------------------------------+-------------+
|(no genres listed)                 |Doctor Who: The Time of the Doctor (2013)                                     |36           |
|Action                             |Delta Force, The (1986)                                                       |8335         |
|Action|Adventure                   |Raiders of the Lost Ark (Indiana Jones and the Raiders of the Lost Ark) (1981)|43295        |
|Action|Adventure|Animation         |How to Train Your Dragon 2 (2014)                                             |678          |
|Action|Adventure|Animation|Children|Brave (2012)                                  

In [23]:
stagemetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 6
numTasks => 809
elapsedTime => 11052 (11 s)
stageDuration => 10897 (11 s)
executorRunTime => 77792 (1,3 min)
executorCpuTime => 59435 (59 s)
executorDeserializeTime => 2154 (2 s)
executorDeserializeCpuTime => 2154 (2 s)
resultSerializationTime => 1 (1 ms)
jvmGCTime => 798 (0,8 s)
shuffleFetchWaitTime => 1 (1 ms)
shuffleWriteTime => 26310 (26 s)
resultSize => 2466371 (2,0 MB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 891289600
recordsRead => 20027541
bytesRead => 692422957 (660,0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 324109
shuffleTotalBlocksFetched => 23218
shuffleLocalBlocksFetched => 23218
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 17412322 (16,0 MB)
shuffleLocalBytesRead => 17412322 (16,0 MB)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Byte

In [30]:
##9
stagemetrics.begin()
spark.sql("SELECT SUM(total) FROM (SELECT COUNT(userId) as total FROM ratings GROUP BY movieId, SUBSTRING(timestamp,6,9) HAVING total > 1 ORDER BY total DESC)").show()
stagemetrics.end()

+----------+
|sum(total)|
+----------+
|  12003573|
+----------+



In [31]:
stagemetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 3
numTasks => 209
elapsedTime => 10722 (11 s)
stageDuration => 10718 (11 s)
executorRunTime => 80999 (1,3 min)
executorCpuTime => 75889 (1,3 min)
executorDeserializeTime => 550 (0,6 s)
executorDeserializeCpuTime => 609 (0,6 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 2823 (3 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 1701 (2 s)
resultSize => 829094 (809,0 KB)
diskBytesSpilled => 158937509 (151,0 MB)
memoryBytesSpilled => 2176843776 (2,0 GB)
peakExecutionMemory => 2097152000
recordsRead => 20000263
bytesRead => 690929309 (658,0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 18003080
shuffleTotalBlocksFetched => 1800
shuffleLocalBlocksFetched => 1800
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 212319210 (202,0 MB)
shuffleLocalBytesRead => 212319210 (202,0 MB)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteByt

In [26]:
##10
stagemetrics.begin()
spark.sql("SELECT movies.genres, COUNT(ratings.movieId) FROM ratings INNER JOIN movies ON movies.movieId = ratings.movieId INNER JOIN tags ON tags.movieId = ratings.movieId WHERE ratings.rating > 3.5 AND tags.tag LIKE '%funny%' GROUP BY movies.genres ORDER BY genres").show(5, truncate=0)
stagemetrics.end()

+--------------------------------------------------+--------------+
|genres                                            |count(movieId)|
+--------------------------------------------------+--------------+
|Action|Adventure                                  |2140          |
|Action|Adventure|Animation|Children|Comedy        |89936         |
|Action|Adventure|Animation|Children|Comedy|Fantasy|1472          |
|Action|Adventure|Animation|Children|Comedy|IMAX   |410           |
|Action|Adventure|Animation|Children|Comedy|Sci-Fi |366           |
+--------------------------------------------------+--------------+
only showing top 5 rows



In [27]:
stagemetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 4
numTasks => 216
elapsedTime => 3713 (4 s)
stageDuration => 3741 (4 s)
executorRunTime => 25451 (25 s)
executorCpuTime => 23545 (24 s)
executorDeserializeTime => 506 (0,5 s)
executorDeserializeCpuTime => 484 (0,5 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 487 (0,5 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 646 (0,6 s)
resultSize => 1518121 (1482,0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 698351616
recordsRead => 10488236
bytesRead => 719234535 (685,0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 2188
shuffleTotalBlocksFetched => 1220
shuffleLocalBlocksFetched => 1220
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 153832 (150,0 KB)
shuffleLocalBytesRead => 153832 (150,0 KB)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffl