In [1]:
from pyspark import SparkContext
sc = SparkContext()

import math
import time

In [2]:
# carregar base de dados
import os.path
fileName = os.path.join('C:\spark\Data', 'DelayedFlights.csv')
numPartitions = 1
rawData = sc.textFile(fileName, numPartitions)


In [3]:
Cancelled = rawData.map(lambda x: x.split(",")[22])
featMonth = rawData.map(lambda x: x.split(",")[2])
featDayofMonth = rawData.map(lambda x: x.split(",")[3])
featDayofWeek = rawData.map(lambda x: x.split(",")[4])
featOrigin =  rawData.map(lambda x: x.split(",")[17])
featDest =  rawData.map(lambda x: x.split(",")[18])

In [4]:
def Entropia (classe):
    """Calcula a Entropia de Shannon de uma distribuição de dados.

    Args:
        classe (RDD): RDD contendo conjunto de dados a ser calculado a entropia.
                      Valores devem ser categóricos.

    Returns:
        float: valor de Entropia de Shannon calculado para o RDD.
    """
    #counts calcula paralelamente o conteúdo do RDD
    #como tuplas contendo (tipo, quantidade)
    counts = (classe.map(lambda x: (x, 1))
                    .reduceByKey(lambda a,b: a + b))
    # n recebe o valor total de itens do RDD
    n = classe.count()
    
    # probs calcula a probabilidade de cada um dos estados do RDD
    probs = counts.map(lambda x: x[1]/float(n))
    
    # Entropia calcula a entropia do RDD
    ## a função map faz o calculo da Entropia de cada um dos estados
    ## a função reduce faz o somatório da entropia de Shannon
    entropia = (probs.map(lambda p: -p*math.log(p,2))
                     .reduce(lambda a,b: a + b))
    
    # retorna valor escalar referênte a entropia do RDD.
    return entropia

 
def infoGain (feature, classe, H):
    """Calcula o ganho de informação de um atributo em relação a uma classe.

    Args:
        feature (RDD): RDD contendo os conjuntos de dados do atributo a ser
                       calculado o Ganho de Informação
        
        classe (RDD): RDD contendo conjunto de dados da classe
        
        H (float): Entropia da Classe, previamente calculada.

    Returns:
        float: valor de ganho de informação (redução da Entropia) que o atributo fornece sobre a classe
    """
    # calcula paralelamente o conteúdo do RDD
    # como tuplas contendo (tipo, quantidade)
    feat_count = feature.map(lambda x: (x, 1))\
                        .reduceByKey(lambda a,b: a + b)\
                        .collect()
    # calcula as Entropias de um conjunto da classe dado cada um dos estados do atributo   
    entropiasN = [Entropia(classe.zip(feature).filter(lambda x: x[1]==v).map(lambda x: x[0]))  for v,_ in feat_count]
    
    # calcula a quantidade de itens no atributo
    n = classe.count()
        
    # calcula o ganho de informação do atributo.
    ig = H - sum([(f[1]/float(n))*p for f,p in zip(feat_count, entropiasN)])
    
    return ig
    

In [6]:
# carregar base de dados
numPartitions = 1
rawData = sc.textFile(fileName, numPartitions)
Cancelled = rawData.map(lambda x: x.split(",")[22])
featDayofWeek = rawData.map(lambda x: x.split(",")[4])


start_time = time.time()
H =  Entropia (Cancelled)
print H
print("--- %s seconds ---" % (time.time() - start_time))

start_time = time.time()
igDayofWeek = infoGain(featDayofWeek, Cancelled, H)
print igDayofWeek
print("--- %s seconds ---" % (time.time() - start_time))

print "numer of partition:", numPartitions


0.00425591582633
--- 31.870000124 seconds ---
1.24766472434e-05
--- 490.332000017 seconds ---
numer of partition: 1


In [7]:
# carregar base de dados
numPartitions = 2
rawData = sc.textFile(fileName, numPartitions)
Cancelled = rawData.map(lambda x: x.split(",")[22])
featDayofWeek = rawData.map(lambda x: x.split(",")[4])


start_time = time.time()
H =  Entropia (Cancelled)
print H
print("--- %s seconds ---" % (time.time() - start_time))

start_time = time.time()
igDayofWeek = infoGain(featDayofWeek, Cancelled, H)
print igDayofWeek
print("--- %s seconds ---" % (time.time() - start_time))

print "numer of partition:", numPartitions


0.00425591582633
--- 32.8069999218 seconds ---
1.24766472434e-05
--- 495.953000069 seconds ---
numer of partition: 2


In [8]:
# carregar base de dados
numPartitions = 4
rawData = sc.textFile(fileName, numPartitions)
Cancelled = rawData.map(lambda x: x.split(",")[22])
featDayofWeek = rawData.map(lambda x: x.split(",")[4])


start_time = time.time()
H =  Entropia (Cancelled)
print H
print("--- %s seconds ---" % (time.time() - start_time))

start_time = time.time()
igDayofWeek = infoGain(featDayofWeek, Cancelled, H)
print igDayofWeek
print("--- %s seconds ---" % (time.time() - start_time))

print "numer of partition:", numPartitions


0.00425591582633
--- 32.2950000763 seconds ---
1.24766472434e-05
--- 492.785000086 seconds ---
numer of partition: 4


In [9]:
# carregar base de dados
numPartitions = 6
rawData = sc.textFile(fileName, numPartitions)
Cancelled = rawData.map(lambda x: x.split(",")[22])
featDayofWeek = rawData.map(lambda x: x.split(",")[4])


start_time = time.time()
H =  Entropia (Cancelled)
print H
print("--- %s seconds ---" % (time.time() - start_time))

start_time = time.time()
igDayofWeek = infoGain(featDayofWeek, Cancelled, H)
print igDayofWeek
print("--- %s seconds ---" % (time.time() - start_time))

print "numer of partition:", numPartitions


0.00425591582633
--- 32.1840000153 seconds ---
1.24766472434e-05
--- 489.861999989 seconds ---
numer of partition: 6


In [10]:
# carregar base de dados
numPartitions = 8
rawData = sc.textFile(fileName, numPartitions)
Cancelled = rawData.map(lambda x: x.split(",")[22])
featDayofWeek = rawData.map(lambda x: x.split(",")[4])


start_time = time.time()
H =  Entropia (Cancelled)
print H
print("--- %s seconds ---" % (time.time() - start_time))

start_time = time.time()
igDayofWeek = infoGain(featDayofWeek, Cancelled, H)
print igDayofWeek
print("--- %s seconds ---" % (time.time() - start_time))

print "numer of partition:", numPartitions


0.00425591582633
--- 32.4440000057 seconds ---
1.24766472434e-05
--- 489.858999968 seconds ---
numer of partition: 8


In [11]:
# carregar base de dados
numPartitions = 16
rawData = sc.textFile(fileName, numPartitions)
Cancelled = rawData.map(lambda x: x.split(",")[22])
featDayofWeek = rawData.map(lambda x: x.split(",")[4])


start_time = time.time()
H =  Entropia (Cancelled)
print H
print("--- %s seconds ---" % (time.time() - start_time))

start_time = time.time()
igDayofWeek = infoGain(featDayofWeek, Cancelled, H)
print igDayofWeek
print("--- %s seconds ---" % (time.time() - start_time))

print "numer of partition:", numPartitions


0.00425591582633
--- 37.8569998741 seconds ---
1.24766472434e-05
--- 594.200999975 seconds ---
numer of partition: 16


In [12]:
# carregar base de dados
numPartitions = 32
rawData = sc.textFile(fileName, numPartitions)
Cancelled = rawData.map(lambda x: x.split(",")[22])
featDayofWeek = rawData.map(lambda x: x.split(",")[4])


start_time = time.time()
H =  Entropia (Cancelled)
print H
print("--- %s seconds ---" % (time.time() - start_time))

start_time = time.time()
igDayofWeek = infoGain(featDayofWeek, Cancelled, H)
print igDayofWeek
print("--- %s seconds ---" % (time.time() - start_time))

print "numer of partition:", numPartitions


0.00425591582633
--- 68.2899999619 seconds ---
1.24766472434e-05
--- 815.203999996 seconds ---
numer of partition: 32
