# Testando Spark

In [133]:
import pyspark
import pandas as pd
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import desc
import time

In [134]:
SparkContext.stop(sc)

In [135]:
def initialize_NewSparkContext(memory_fraction, message_maxSize, executor_cores, parallelism):
    conf = pyspark.SparkConf()
    conf.set("spark.storage.memoryFraction", "0.5")
    conf.set('spark.rpc.message.maxSize', '300')
    conf.set('spark.executor.cores', '5')
    conf.set('spark.default.parallelism','50')
    return pyspark.SparkContext(master = "spark://node5:7077", appName='testando Spark', conf=conf)


def calculate_join_time(df, dictionary):
    start = time.time()
    df_copy = df.alias("df_copy")
    new_df = df.join(df_copy, df.NUMERODN == df_copy.NUMERODN, how='left')
    end = time.time()

    dictionary['time_to_run_join'].append(end - start)
    

def calculate_groubyAndSort_time(df, dictionary):
    start = time.time()
    df_copy = df.alias("df_copy")
    new_df = df.join(df_copy, df.NUMERODN == df_copy.NUMERODN, how='left')
    end = time.time()

    dictionary['time_to_run_groubySort'].append(end - start)

    
def calculate_count_time(df, dictionary):
    start = time.time()
    df.count()
    end = time.time()
    dictionary['time_to_run_count'].append(end - start)

In [136]:
memory_fraction_list = ["0.5","0.8"]
message_maxSize_list = ['300','500']
executor_cores_list = ['5','3','1']
parallelism_list =  ['50','500']

result_data = {
                'experiment_block':[],
                'experiment_turn': [],
                'memory_fraction': [],
                'message_maxSize': [],
                'executor_cores': [],
                'parallelism': [],
                'time_to_run_join': [],
                'time_to_run_groubySort': [],  
                'time_to_run_count': [],  
            }

turn = 0
for memory_fraction in memory_fraction_list:
    for message_maxSize in message_maxSize_list:
        for executor_cores in executor_cores_list:
            for parallelism in parallelism_list:
                turn = turn + 1
                sc.stop()
                sc = initialize_NewSparkContext(memory_fraction, message_maxSize, executor_cores, parallelism)
                spark = SparkSession(sc)
                df = spark.read.csv('/home/bigdata/repos/fundamentos-big-data/codes/DNPBA2017.csv', header=True)
                
#                 print(sc.getConf().getAll())
                experiment_turns = 3
                for i in range(1,experiment_turns+1):
                    result_data['experiment_block'].append('Experiment Block: ' + str(turn))
                    result_data['experiment_turn'].append('Experiment turn: ' + str(i))
                                       
                    calculate_join_time(df, result_data)
                    calculate_groubyAndSort_time(df, result_data)
                    calculate_count_time(df, result_data)

                    result_data['memory_fraction'].append(memory_fraction)
                    result_data['message_maxSize'].append(message_maxSize)
                    result_data['executor_cores'].append(executor_cores)
                    result_data['parallelism'].append(parallelism)

   
                
                

In [137]:
df_result= pd.DataFrame(result_data)
df_result['total_time'] = df_result['time_to_run_join'] + df_result['time_to_run_groubySort'] +  df_result['time_to_run_count']
df_result.sort_values(by=['total_time']).head(50)

Unnamed: 0,experiment_block,experiment_turn,memory_fraction,message_maxSize,executor_cores,parallelism,time_to_run_join,time_to_run_groubySort,time_to_run_count,total_time
35,Experiment Block: 12,Experiment turn: 3,0.5,500,1,500,0.026824,0.026304,0.16118,0.214309
11,Experiment Block: 4,Experiment turn: 3,0.5,300,3,500,0.027222,0.026313,0.166204,0.219739
68,Experiment Block: 23,Experiment turn: 3,0.8,500,1,50,0.028579,0.027845,0.165311,0.221734
5,Experiment Block: 2,Experiment turn: 3,0.5,300,5,500,0.028181,0.028062,0.166465,0.222708
14,Experiment Block: 5,Experiment turn: 3,0.5,300,1,50,0.027745,0.028277,0.166927,0.22295
53,Experiment Block: 18,Experiment turn: 3,0.8,300,1,500,0.029345,0.028326,0.167509,0.22518
47,Experiment Block: 16,Experiment turn: 3,0.8,300,3,500,0.029606,0.028301,0.176199,0.234106
32,Experiment Block: 11,Experiment turn: 3,0.5,500,1,50,0.027712,0.026989,0.182362,0.237062
8,Experiment Block: 3,Experiment turn: 3,0.5,300,3,50,0.028813,0.027182,0.182648,0.238643
56,Experiment Block: 19,Experiment turn: 3,0.8,500,5,50,0.031102,0.029712,0.181616,0.24243


In [138]:
df_result

Unnamed: 0,experiment_block,experiment_turn,memory_fraction,message_maxSize,executor_cores,parallelism,time_to_run_join,time_to_run_groubySort,time_to_run_count,total_time
0,Experiment Block: 1,Experiment turn: 1,0.5,300,5,50,0.033102,0.029054,0.603614,0.665769
1,Experiment Block: 1,Experiment turn: 2,0.5,300,5,50,0.029103,0.027899,0.281200,0.338201
2,Experiment Block: 1,Experiment turn: 3,0.5,300,5,50,0.029410,0.028676,0.200861,0.258947
3,Experiment Block: 2,Experiment turn: 1,0.5,300,5,500,0.030464,0.027407,0.597176,0.655047
4,Experiment Block: 2,Experiment turn: 2,0.5,300,5,500,0.030718,0.029504,0.227072,0.287294
...,...,...,...,...,...,...,...,...,...,...
67,Experiment Block: 23,Experiment turn: 2,0.8,500,1,50,0.031355,0.029947,0.226376,0.287677
68,Experiment Block: 23,Experiment turn: 3,0.8,500,1,50,0.028579,0.027845,0.165311,0.221734
69,Experiment Block: 24,Experiment turn: 1,0.8,500,1,500,0.038094,0.029830,0.602646,0.670570
70,Experiment Block: 24,Experiment turn: 2,0.8,500,1,500,0.029675,0.028590,0.215816,0.274082


In [140]:
df_result.to_csv('Resultado_Tarefa01_eq03.csv', index = False)