In [20]:
from pyspark.sql import SparkSession

import time
import nltk
import matplotlib.pyplot as plt

from pyspark.sql import functions as F
from nltk.sentiment.vader import SentimentIntensityAnalyzer

In [21]:
! rm ./metastore_db/*.lck

In [22]:
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.104:7077") \
        .appName("weak-test-one-executor")\
        .config("spark.dynamicAllocation.enabled", False)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores", 2)\
        .getOrCreate()

In [24]:
# Load the Python API for sparkmeasure package
# and attach the sparkMeasure Listener for stagemetrics to the active Spark session

from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark_session)

In [25]:
# Define cell and line magic to wrap the instrumentation
from IPython.core.magic import (register_line_magic, register_cell_magic, register_line_cell_magic)

@register_line_cell_magic
def sparkmeasure(line, cell=None):
    "run and measure spark workload. Use: %sparkmeasure or %%sparkmeasure"
    val = cell if cell is not None else line
    stagemetrics.begin()
    eval(val)
    stagemetrics.end()
    stagemetrics.print_report()

In [26]:
stagemetrics.begin()

In [27]:
start_time = time.time()
#df = spark.read.json("hdfs://192.168.2.104:9000/reddit_sample/*").cache()
df = spark_session.read.json("hdfs://192.168.2.104:9000/reddit_sample/pRC_2009*").cache()

In [29]:
def sentiment_mapper(comment):
    # Note: the VADER sentiment analyzer is trained on a single sentence as input
    # We take a naive initial approach here and simply treat a comment as single sentence
    sentiment = sid.polarity_scores(comment['body'])['compound']
    return (comment['score'], sentiment)

#nltk.download('vader_lexicon')
sid = SentimentIntensityAnalyzer()
sentiment = df.rdd \
    .filter(lambda x: len(x['body'])<1000)\
    .map(sentiment_mapper)

In [30]:
avg_sentiment = sentiment\
    .mapValues(lambda v: (v, 1))\
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))\
    .mapValues(lambda v: (v[0]/v[1], v[1]))\
    .sortBy(lambda k_v: k_v[1][0], False)

In [31]:
avg_sentiment.count()

876

In [33]:
df_sentiment = avg_sentiment.map(lambda x: (x[0], x[1][0])).toDF(["Score", "Sentiment"])

In [34]:
correlation = df_sentiment.stat.corr("Score", "Sentiment")
correlation

0.0530682939971702

In [35]:
print("Time", time.time()-start_time)

Time 820.7390441894531


In [36]:
stagemetrics.end()
stagemetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 2
Aggregated Spark stage metrics:
numStages => 9
sum(numTasks) => 123
elapsedTime => 820484 (14 min)
sum(stageDuration) => 819387 (14 min)
sum(executorRunTime) => 1572399 (26 min)
sum(executorCpuTime) => 134650 (2.2 min)
sum(executorDeserializeTime) => 3192 (3 s)
sum(executorDeserializeCpuTime) => 987 (1.0 s)
sum(resultSerializationTime) => 26 (26 ms)
sum(jvmGCTime) => 25681 (26 s)
sum(shuffleFetchWaitTime) => 2 (2 ms)
sum(shuffleWriteTime) => 260 (0.3 s)
max(resultSize) => 44051 (43.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 16320
sum(recordsRead) => 7565928
sum(bytesRead) => 4463291454 (4.0 GB)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 487169 (475.0 KB)
sum(shuffleTotalBlocksFetched) => 1447
sum(shuffleLocalBlocksFetched) => 1447
sum(shuffleRemoteBlocksFetched) => 0
sum(

In [37]:
stagemetrics.print_accumulables() 


Aggregated Spark accumulables of type internal.metric. Sum of values grouped by metric name
Name => sum(value) [group by name]

diskBytesSpilled => 0 (0 Bytes)
executorCpuTime => 134653 (2.2 min)
executorDeserializeCpuTime => 991 (1.0 s)
executorDeserializeTime => 3192 (3 s)
executorRunTime => 1572399 (26 min)
input.bytesRead => 4463291454 (4.0 GB)
input.recordsRead => 7565928
jvmGCTime => 25681 (26 s)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 19296
resultSerializationTime => 26 (26 ms)
resultSize => 252514 (246.0 KB)
shuffle.read.fetchWaitTime => 2 (2 ms)
shuffle.read.localBlocksFetched => 1447
shuffle.read.localBytesRead => 487169 (475.0 KB)
shuffle.read.recordsRead => 2347
shuffle.read.remoteBlocksFetched => 0
shuffle.read.remoteBytesRead => 0 (0 Bytes)
shuffle.read.remoteBytesReadToDisk => 0 (0 Bytes)
shuffle.write.bytesWritten => 177687 (173.0 KB)
shuffle.write.recordsWritten => 880
shuffle.write.writeTime => 262 (0.3 s)

SQL Metrics and other non-internal metrics.

In [38]:
spark_session.stop()