In [6]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession as spark
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
import numpy as np
from matplotlib import pyplot as plt
import time

In [2]:
def runExperiment(inputUri, outputUri, appName, numCores):
    # fixed memory size
    sparkSession = SparkSession\
            .builder\
            .master('spark://192.168.2.25:7077') \
            .appName(appName)\
            .config('spark.dynamicAllocation.enabled', True)\
            .config('spark.dynamicAllocation.shuffleTracking.enabled',True)\
            .config('spark.shuffle.service.enabled', False)\
            .config('spark.dynamicAllocation.executorIdleTimeout','30s')\
            .config('spark.cores.max', numCores)\
            .config('spark.driver.port',9998)\
            .config('spark.blockManager.port',10005)\
            .config('spark.mongodb.input.uri', inputUri)\
            .config('spark.mongodb.output.uri', outputUri)\
            .config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12:2.4.2')\
            .getOrCreate()
    
    t = time.time()
    df = sparkSession.read.format('com.mongodb.spark.sql.DefaultSource').load()
    df = df.select('_id','year','loudness','tempo','artist_hotttnesss','beats_confidence','duration','artist_latitude','artist_longitude','key_confidence')
    df = df.withColumn('y_label', f.when(f.col('artist_hotttnesss') > 0.5, 1).otherwise(0))
    features = ['year', 'loudness', 'tempo', 'duration', 'key_confidence']
    label = 'y_label'

    assembler = VectorAssembler()\
            .setInputCols(features)\
            .setOutputCol('vectorized_features')
    df = assembler.transform(df)
    train, test = df.randomSplit([0.7, 0.3], seed = 2018)
    lr = LogisticRegression(featuresCol = 'vectorized_features', labelCol = 'y_label', maxIter=10)
    lrModel = lr.fit(train)
    d = time.time()
    duration = d - t

    sparkSession.stop()

    return duration

## Strong Scaling with 1.5 million data

## 2

In [3]:
inputUri = 'mongodb://127.0.0.1/million_songs.songsOHalfM'
outputUri = 'mongodb://127.0.0.1/million_songs.songsOHalfM'
appName = '2 cores'
numCores = 2
numIterations = 2

results = [runExperiment(inputUri, outputUri, appName, numCores) for i in range(numIterations)]
print(f'{results}, Avg: {np.mean(results)}')

:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2061ec4b-90a3-42ad-bfff-3c29aa6e5b8b;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;2.4.2 in central
	found org.mongodb#mongo-java-driver;3.12.5 in central
:: resolution report :: resolve 340ms :: artifacts dl 7ms
	:: modules in use:
	org.mongodb#mongo-java-driver;3.12.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;2.4.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0  

[107.92764258384705, 75.34484505653381]


## 4

In [4]:
input_uri_4c="mongodb://127.0.0.1/million_songs.songsOHalfM"
output_uri_4c="mongodb://127.0.0.1/million_songs.songsOHalfM"
appName = '4 cores'
numCores = 4
numIterations = 2

results = [runExperiment(inputUri, outputUri, appName, numCores) for i in range(numIterations)]
print(f'{results}, Avg: {np.mean(results)}')

22/03/19 14:30:44 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.
22/03/19 14:32:04 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.
                                                                                

NameError: name 'mean' is not defined

## 8 

In [None]:
inputUri = 'mongodb://127.0.0.1/million_songs.songsOHalfM'
outputUri = 'mongodb://127.0.0.1/million_songs.songsOHalfM'
appName = '8 cores'
numCores = 8
numIterations = 2

results = [runExperiment(inputUri, outputUri, appName, numCores) for i in range(numIterations)]
print(f'{results}, Avg: {np.mean(results)}')

# 12

In [5]:
inputUri = 'mongodb://127.0.0.1/million_songs.songsOHalfM'
outputUri = 'mongodb://127.0.0.1/million_songs.songsOHalfM'
appName = '12 cores'
numCores = 12
numIterations = 2

results = [runExperiment(inputUri, outputUri, appName, numCores) for i in range(numIterations)]
print(f'{results}, Avg: {np.mean(results)}')

22/03/19 14:44:43 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.
22/03/19 14:45:32 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.
                                                                                

NameError: name 'np' is not defined

## Weak Scaling

## 250000 data for 2 cores

In [None]:
inputUri = 'mongodb://127.0.0.1/million_songs.songsQuarterM'
outputUri = 'mongodb://127.0.0.1/million_songs.songsQuarterM'
appName = '250000 data for 2 cores'
numCores = 2
numIterations = 2

results = [runExperiment(inputUri, outputUri, appName, numCores) for i in range(numIterations)]
print(f'{results}, Avg: {np.mean(results)}')

## 500000 data for 4 cores

In [None]:
inputUri = 'mongodb://127.0.0.1/million_songs.songsHalfM'
outputUri = 'mongodb://127.0.0.1/million_songs.songsHalfM'
appName = '500000 data for 4 cores'
numCores = 4
numIterations = 2

results = [runExperiment(inputUri, outputUri, appName, numCores) for i in range(numIterations)]
print(f'{results}, Avg: {np.mean(results)}')

## 1000000 data for 8 cores

In [None]:
inputUri = 'mongodb://127.0.0.1/million_songs.songsOneM'
outputUri = 'mongodb://127.0.0.1/million_songs.songsOneM'
appName = '1000000 data for 8 cores'
numCores = 8
numIterations = 2

results = [runExperiment(inputUri, outputUri, appName, numCores) for i in range(numIterations)]
print(f'{results}, Avg: {np.mean(results)}')

## 500000 data for 12 cores

In [None]:
inputUri = 'mongodb://127.0.0.1/million_songs.songsOHalfM'
outputUri = 'mongodb://127.0.0.1/million_songs.songsOHalfM'
appName = '1500000 data for 12 cores'
numCores = 12
numIterations = 2

results = [runExperiment(inputUri, outputUri, appName, numCores) for i in range(numIterations)]
print(f'{results}, Avg: {np.mean(results)}')

## 3 workers with 8 cores