In [1]:
import sys
import pandas as pd
from spark_session import LocalSparkSession
from dataset import Dataset
from mr_id3 import MapReduceIDR3
from pyspark.mllib.tree import DecisionTree

In [2]:
from log import log
from sklearn.impute import SimpleImputer
from pyspark.sql.functions import lit
from pyspark.sql.functions import col, sum
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
from pyspark import SparkContext
from decisiontree_pyspark import DecisionTreePySpark
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

In [3]:
%time
num_fields = [
    'age', 'fnlwgt', 'education_num', 'capital_gain', 'capital_loss',
    'hours_per_week', ]

categorical_fields = [
    'workclass', 'education',
    'marital_status', 'occupation', 'relationship',
    'race', 'sex', 'native_country', ]

CPU times: user 2 µs, sys: 1e+03 ns, total: 3 µs
Wall time: 5.72 µs


In [4]:
%time
target = 'label'
filename = 'dataset/adult.data'

CPU times: user 2 µs, sys: 1 µs, total: 3 µs
Wall time: 5.01 µs


In [5]:
%time
numbers_of_cores = [4, ]
multiplication_factors = [1, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, ]
metrics = []

CPU times: user 2 µs, sys: 1e+03 ns, total: 3 µs
Wall time: 5.25 µs


In [6]:
%time
for number_of_cores in numbers_of_cores:

    spark = LocalSparkSession(number_of_cores)
    spark.start()

    for f in multiplication_factors:

        dataset = Dataset(
            spark.spark, 
            f'dataset/adult_{f}x.data',
            num_fields, categorical_fields, target)
        dataset.load()
        dataset.select_only_numerical_features()
        
        df = dataset.df

        mr_id3 = DecisionTreePySpark(df)
        mr_id3.train()

        metric = mr_id3.get_metrics()
        metric['length_rows'] = df.count()
        metric['dataset_size_num'] = f
        metric['dataset_size'] = sys.getsizeof(df)
        metric['number_of_cores'] = number_of_cores
        metrics.append(metric)
        
        log(f"Metrics: Clusters {metric['number_of_cores']} - Dataset size {metric['f']}x - Time {metric['time']} seconds")

    spark.stop()

2022-10-16 18:17:51,841 [INFO] LocalSparkSession : Starting with 4 clusters


CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 6.2 µs
22/10/16 18:17:53 WARN Utils: Your hostname, Mac-Pro-de-MARCELO.local resolves to a loopback address: 127.0.0.1; using 192.168.0.62 instead (on interface en2)
22/10/16 18:17:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/16 18:17:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


2022-10-16 18:17:55,997 [INFO] Dataset : Starting
2022-10-16 18:17:55,998 [INFO] Dataset : Loading Dataset dataset/adult_1x.data
2022-10-16 18:17:59,170 [INFO] Dataset : Select Only Numerical Features
2022-10-16 18:17:59,234 [INFO] DecisionTreePySpark : Starting
2022-10-16 18:17:59,235 [INFO] DecisionTreePySpark : Training
2022-10-16 18:17:59,235 [INFO] DecisionTreePySpark : Setting Labeled Point
2022-10-16 18:18:00,001 [INFO] DecisionTreePySpark : Splitting
2022-10-16 18:18:00,002 [INFO] DecisionTreePySpark : Assembling
2022-10-16 18:18:05,880 [INFO] DecisionTreePySpark : Training time 5.639004 seconds
2022-10-16 18:18:05,881 [INFO] DecisionTreePySpark : Get metrics


KeyError: 'f'

In [None]:
%time
df = pd.DataFrame.from_dict(metrics)

In [None]:
df

In [None]:
%time
df[['dataset_size_num', 'time']].plot()