<a href="https://colab.research.google.com/github/turatig/frequent_itemsets/blob/master/frequent_itemsets.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**MARKET BASKET ANALYSIS NOTEBOOK**

Dowload and preprocess dataset

In [92]:
!pip install kaggle


import os,sys,time,zipfile,json,re
import functools as ft
import itertools as it
from datetime import datetime as dt

os.environ['KAGGLE_USERNAME']='giacomoturati1'
os.environ['KAGGLE_KEY']='7d34a1aefc3558065164b70c24ce27ed'

from kaggle.api.kaggle_api_extended import KaggleApi

def get_dataset():

  #execute only if the dataset was not already downloaded
  if 'old-newspaper.tsv' not in os.listdir():
    api=KaggleApi()
    api.authenticate()

    api.dataset_download_file('alvations/old-newspapers','old-newspaper.tsv')

    with zipfile.ZipFile('old-newspaper.tsv.zip','r') as _zip:
      _zip.extractall()

#Yield baskets (list of lists) reading from tsv
#languages: subset of languages to be considered during the market-basket analysis
def read_dataset_from_tsv(languages=None,max_basket=-1):
  count=0
  with open('old-newspaper.tsv','r') as f:
    #skip header line
    next(f)
    for line in f:
      l=line.split('\t')
      if languages is not None and l[0] not in languages: continue

      #get a list of words as basket skipping any sequence of non-alphabetical characters
      basket=re.split(r'[^a-zA-Z]+',l[3])
      #remove any empty string
      basket=[word.lower() for word in basket if word!='']
      count+=1
      yield basket

      if max_basket>0 and count>=max_basket: break
      
    f.close()

#Create json wich contains an array of baskets (lists of words)
def create_test_json_dataset(languages=None,max_basket=None):
  baskets=[]

  #execute only if the dataset was not already created
  for line in read_dataset_from_tsv(languages,max_basket):
    baskets.append(line)

  filename=ft.reduce(lambda i,j:i+'_'+j,languages).lower() if languages is not None else 'all_languages' 
  filename+=str(len(baskets))+'.json'

  with open(filename,'w') as f:
    f.write(json.dumps(baskets,indent='\t'))
    f.close()
  
  

#Yield baskets from json structures as array of arrays
def iter_baskets_from_json(filename,max_basket=-1):
  theres_next=True
  basket=[]
  count=0

  with open(filename,'r') as f:
    #skip first square braket
    next(f)
    for line in f:

      m=re.search(r'[a-zA-Z]+|\[|\]',line)
      line=line[m.start():m.end()]

      if line=='[': 
        basket=[]
        theres_next=True

      elif line==']':
        if theres_next:
          theres_next=False
          yield basket
          count+=1
          if max_basket>0 and count>=max_basket:break

      else: basket.append(line)
    
    f.close()

#Just to iterate agnostically over basket file
def iter_baskets(basket_file,max_basket=-1):
  if callable(basket_file):
    for basket in basket_file(max_basket): yield basket
  else:
    count=0
    for basket in basket_file:
      yield basket
      count+=1
      if max_basket>0 and count>=max_basket: break 

    


get_dataset()
create_test_json_dataset(['Italian'],300)






Utilities to log the execution

In [93]:
def sizeof_GB(obj): return "%f"%(sys.getsizeof(obj)/1000000000)

#decorator to log time execution of function/method
def time_it(f):
  def _wrap(*args,**kwargs):
    start=time.time()
    res=f(*args,**kwargs)
    stop=time.time()
    
    print('\n'+'-'*30)
    print('Function {0} executed in {1} seconds'.format(f.__name__,stop-start))
    print('-'*30+'\n')
    return res
  return _wrap

#decorator to log the memory space used before and after a candidate itemset filtering operation
def log_filter(f):
  def _wrap(*args,**kwargs):
    if len(args)>0:
      print('Total number of candidate {0}-itemsets: {1}\nSize in GB: {2}'.\
            format(len(list(args[0].keys())[0]),len(args[0]),sizeof_GB(args[0])))
      
    #argument was given by key=value
    else:
      print('Total number of candidate {0}-itemsets: {1}\nSize in GB: {2}'.\
            format(len(list(kwargs['candidates'].keys())[0]),len(kwargs['candidates']),sizeof_GB(kwargs['candidates'])))
    
    res=f(*args,**kwargs)

    if res:
      print('Number of frequent {0}-itemsets: {1}\nSize in GB: {2}'.\
              format(len(list(res.keys())[0]),len(res),sizeof_GB(res)))
    else:
      print('Number of frequent {0}-itemsets: {1}\nSize in GB: {2}'.\
              format(0,0,0))
    print('-'*30+'\n')
    return res

  return _wrap

#Dump on json file the result of an algorithm run
def dump_result(algo,s,basket_count,freq_it_sets):
  def remap(dic):
    return {str(k):v for k,v in dic.items()}

  header_info={'support_threshold':s,'total_n_baskets':basket_count}

  filename=algo+'_market_basket_analysis_'+str(dt.today())[:10]+'_'+str(dt.today())[11:]+'.json'
              
  with open(filename,'w') as f:
    f.write(json.dumps([header_info]+[remap(dic) for dic in freq_it_sets],indent='\t'))
    f.close()

A-priori algorithm implementation

In [94]:
"""
  Filter candidate set of itemsets according to suppport threshold 
"""
"""@log_filter
@time_it"""
def filter_ck(candidates,s):
  return {k:v for k,v in candidates.items() if v>=s}

"""
  Discard unfrequent singletons from a basket
"""
def freq_sing(freq_it_sets,basket):
  return [word for word in basket if (word,) in freq_it_sets[1]]

"""
  Check monotonicity property
  kuple is a possible k-itemset -> all immediate subsets (k-1 itemsets) are frequent itemsets.
""" 
def check_mono_prop(kuple,k,freq_it_sets):
  return all([tuple(sorted(el)) in freq_it_sets[k-1] for el in it.combinations(kuple,r=k-1)])

"""
  Utility to clean output of analysis
"""
def clean_output(freq_it_sets):
  if not freq_it_sets[-1]:
    #remove the last element if empty
    freq_it_sets=freq_it_sets[:-1]
  #remove empty itemset set
  freq_it_sets=freq_it_sets[1:]
  return freq_it_sets

"""
  Return candidate k-itemsets found after a basket_file pass
"""
"""@time_it"""  
def apr_get_ck(basket_file,k,freq_it_sets,max_basket=-1):
  basket_count=0
  candidates=dict()

  for basket in iter_baskets(basket_file,max_basket):
    basket_count+=1
    if k>2:
      basket=freq_sing(freq_it_sets,basket)
              
    for kuple in it.combinations(basket,r=k):
        #sort tuple in order to avoid duplication of the same itemset considered in different order
        kuple=tuple(sorted(kuple))

        if check_mono_prop(kuple,k,freq_it_sets):
          if kuple not in candidates.keys(): candidates[kuple]=1
          else: candidates[kuple]+=1

  return candidates,basket_count

"""
  Apriori algorithm iteration
  freq_it_sets:the k-1-th element is the set of frequent itemsets made of k elements
"""
def apriori(basket_file,s=0,max_basket=-1,max_k=-1,log=False):
    freq_it_sets=[{tuple():1}]
    k=1
    #stop when no more frequent itemsets are found or k>max_k
    while freq_it_sets[-1] and (max_k<0 or k<=max_k):

      ck,basket_count=apr_get_ck(basket_file,k,freq_it_sets,max_basket)
      if s<=0:
        #set threshold to the 1% of the total number of baskets
        s=basket_count//100
      freq_it_sets.append(filter_ck(ck,s))
      k+=1

    freq_it_sets=clean_output(freq_it_sets)
    if log:dump_result("apriori",s,basket_count,freq_it_sets)

    return freq_it_sets
    

In [95]:
from functools import partial

basket_file=partial(iter_baskets_from_json,'italian300.json')
res=apriori(basket_file,s=260,max_k=3)


PCY implementation

In [96]:
!pip install -q bitmap
from bitmap import BitMap

#map a tuple to a bucket of a table of size=s
def hash_tuple(t,s): return hash(t)%s
def set_all(bm):
  for i in bm.size():
    bm.set(i)


In [97]:
"""
  Return candidate k-itemsets found after a basket_file pass.
  bm: bitmap of frequent buckets of couples
"""
"""@time_it"""  
def pcy_get_ck(basket_file,k,freq_it_sets,bm,max_basket=-1):
  basket_count=0
  candidates=dict()
  buckets=[0 for i in range(bm.size())]

  for basket in iter_baskets(basket_file,max_basket):
    basket_count+=1
    if k>2:
      basket=freq_sing(freq_it_sets,basket)
              
    for kuple in it.combinations(basket,r=k):
        kuple=tuple(sorted(kuple))

        #PCY variant: added constraint for couple -> must hash to a frequent bucket
        if check_mono_prop(kuple,k,freq_it_sets) and (k!=2 or bm[hash_tuple(kuple,bm.size())]):
          if kuple not in candidates.keys(): candidates[kuple]=1
          else: candidates[kuple]+=1
        
    if k==1:
      #PCY variant: during the first pass hash couples to buckets
      for couple in it.combinations(basket,r=2):
          buckets[hash_tuple(tuple(sorted(couple)),bm.size())]+=1

  return candidates,basket_count,buckets

"""
  PCY algorithm iteration
"""
def pcy(basket_file,s=0,bm_size=256,max_basket=-1,max_k=-1,log=False):
    freq_it_sets=[{tuple():1}]
    bm=BitMap(bm_size)
    k=1

    while freq_it_sets[-1] and (max_k<0 or k<=max_k):
      ck,basket_count,buckets=pcy_get_ck(basket_file,k,freq_it_sets,bm,max_basket)
      if s<=0: s=basket_count//100
      freq_it_sets.append(filter_ck(ck,s))

      if k==1:
        #PCY variant:set bit of frequent buckets in the bitmap
        for i in range(len(buckets)):
          if buckets[i]>=s: bm.set(i)
      k+=1
    
    freq_it_sets=clean_output(freq_it_sets)
    if log:dump_result("pcy",s,basket_count,freq_it_sets)
    
    return freq_it_sets


In [98]:
res=pcy(basket_file,max_k=3)

Spark installation

In [99]:
!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

SON algorithm implementation

In [100]:
import findspark

os.environ['JAVA_HOME']='/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME']='spark-2.4.8-bin-hadoop2.7'

findspark.init()

from pyspark.sql import SparkSession
import pyspark


In [101]:
spark=SparkSession.builder.master("local[*]").getOrCreate()
sc=spark.sparkContext

def from_it_to_list(it):
  l=[]
  for i in it:
    l.append(i)
  return l

basket_rdd=sc.parallelize(basket_file())
support_t=3//basket_rdd.getNumPartitions()

res=basket_rdd.mapPartitions(from_it_to_list).\
  mapPartitions(lambda chunk: apriori(chunk,max_k=3)).collect()

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:38153)
Traceback (most recent call last):
  File "spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: ignored