Inteligência na Web e Big Data - 2018 - UFABC

Implementação do algoritmo Apriori em PySpark

Pedro Henrique Parreira

Base de dados utilizada, arquivo order_products__prior.csv em  <a href="https://www.kaggle.com/c/instacart-market-basket-analysis/data">Competição Instacart Market Basket Analysis, Kaggle</a>

Versão não distribuída.



In [1]:
%%time

from pyspark   import SparkContext,SparkConf
import os.path

fileName = os.path.join(os.getcwd(),"Base.csv")

#Distribuído para 1 thread.
conf = SparkConf().setMaster("local[1]")
sc = SparkContext(conf=conf)

#Carregar arquivo
arquivo     = sc.textFile(fileName, 1)
print(arquivo.take(2))



['order_id,product_id,add_to_cart_order,reordered', '2,33120,1,1']
Wall time: 12.7 s


O arquivo original contém em cada linha um item de cada transação, logo é feita uma transformação para reunir todos os tens da mesma transação em uma tupla.


In [2]:
%%time

#Formatando as entradas do arquivo.
base    = arquivo.map(lambda x: list(str(x).strip().rstrip(',').split(',')))\
                 .map(lambda x: (x[0], x[1]))\
                 .groupByKey().map(lambda x : tuple(x[1]))

numeroTransacoes = base.count()
print("Transações totais: " + str(base.count()))
print(base.take(1))

Transações totais: 3214875


[('23909', '22754', '24852', '28993', '41273', '23341', '46941', '39993', '19068', '21410', '17341', '40593', '46979', '31268')]
Wall time: 4min 15s


Definição das funções que gera os itemsets e que verifica se o suporte dos itemsets gerados é maior que o definido pelo usuário.


In [3]:
%%time

from itertools import combinations


# Definindo função que irá gerar as combinações dos elementos
# Ex: (A,B,C) para k = 2 => (A,B),(A,C),(B,C)
def gerarCombinacoes(k, elementos):
    x = list(combinations(elementos, k))
    return ((tuple(x[i]), 1) for i in range(len(x)))

# Definindo função recebe um RDD contendo os dados e retorna uma com os candidatos que
# tiverem o suporte maior que o definido pelo usuário.
def gerarCandidatos(k,rddDados,totalTransacao,suporteMinimo):
    return  rddDados.flatMap(lambda x: gerarCombinacoes(k,x))\
                    .reduceByKey(lambda x, y: y + x) \
                    .map(lambda p: (p[0],round((p[1]/(totalTransacao*1.0)),6)))\
                    .filter(lambda xy: xy[1] >= suporteMinimo)


Wall time: 0 ns


Geração dos itemsets com o suporte maior que a definida pelo usuário, caso não seja gerado nenhum itemset de tamanho k o laço é encerrado.
A cada novo k é feito a poda, ou seja, é deletado da base o item que não aparece como elemento dos itemsets.

In [4]:
%%time

suporteMinimo   = 0.005
confiancaMinimo = 0.01
liftMinimo      = 1.50

candidatosTotais  = sc.parallelize([],1)

k = 1
while(True):
    print("Iteração: " + str(k)) 	
    
    candidatos = gerarCandidatos(k, base, numeroTransacoes,suporteMinimo)
    if(candidatos.count()>0):
			
        print("Número de candidatos aceitos:" + str(candidatos.count()))
			
        elementos = candidatos.map(lambda x: x[0])\
                              .flatMap(lambda x: [(x[i]) for i in range(len(x))])\
                              .distinct().collect()

        base = base.map(lambda x: list(set(x).intersection(elementos)))\
                   .filter(lambda x: len(x) > 0)

        candidatosTotais = candidatosTotais.union(candidatos)
        k=k+1
    else:
        break
       
print("Candidatos totais: " + str(candidatosTotais.count()))

Iteração: 1


Número de candidatos aceitos:252


Iteração: 2


Número de candidatos aceitos:62


Iteração: 3


Candidatos totais: 314
Wall time: 8min 5s


É gerado as regras de associação fazendo o cálculo da confiança e lift.

In [5]:
%%time

from tabulate  import tabulate
print(candidatosTotais.count())
#Base com o total de itemsets.
candidatosTotais.cache()

dicionario  = candidatosTotais.collectAsMap()

candidatosTotais     = candidatosTotais.map(lambda x: x[0])
elementos            = candidatosTotais.collect()

associacoes = candidatosTotais.flatMap(lambda x: [(tuple(x), tuple(b)) for b in elementos])\
                              .filter(lambda x: any(t in x[0] for t in x[1]) == False)\
                              .map(lambda x: (x[0],x[1],tuple(sorted(set().union(x[0],x[1]))),0))\
                              .filter(lambda x: ( (x[2] in dicionario.keys()) ) == True)\
                              .map(lambda x: (x[0],x[1], 
                                        ((dicionario[x[2]])/(dicionario[x[0]]*1.0)),
                                        ((dicionario[x[2]])/(dicionario[x[0]]*dicionario[x[1]]*1.0)) ))\
                              .filter(lambda x: x[2] > confiancaMinimo) \
                              .filter(lambda x: x[3] > liftMinimo).collect()


resultado = tabulate(associacoes, headers=['De', 'Para','Confiança','Lift']
                                 ,tablefmt="fancy_grid")

print(resultado)

314


╒════════════╤════════════╤═════════════╤═════════╕
│ De         │ Para       │   Confiança │    Lift │
╞════════════╪════════════╪═════════════╪═════════╡
│ ('40706',) │ ('13176',) │   0.197611  │ 1.67426 │
├────────────┼────────────┼─────────────┼─────────┤
│ ('45007',) │ ('13176',) │   0.215911  │ 1.82931 │
├────────────┼────────────┼─────────────┼─────────┤
│ ('30391',) │ ('13176',) │   0.267856  │ 2.26941 │
├────────────┼────────────┼─────────────┼─────────┤
│ ('30391',) │ ('47209',) │   0.206111  │ 3.10239 │
├────────────┼────────────┼─────────────┼─────────┤
│ ('28204',) │ ('24852',) │   0.353372  │ 2.404   │
├────────────┼────────────┼─────────────┼─────────┤
│ ('27845',) │ ('13176',) │   0.192023  │ 1.62691 │
├────────────┼────────────┼─────────────┼─────────┤
│ ('27845',) │ ('21137',) │   0.148032  │ 1.79802 │
├────────────┼────────────┼─────────────┼─────────┤
│ ('13176',) │ ('40706',) │   0.043879  │ 1.67426 │
├────────────┼────────────┼─────────────┼─────────┤
│ ('13176',)