Installing and importing the required modules:

In [None]:
!pip install pyspark

In [2]:
import sys
from pyspark import SparkContext, SparkConf
import re
import itertools
from functools import reduce
from itertools import chain
from collections import OrderedDict

Creating a Spark context:

In [3]:
# create Spark context with necessary configuration
sc = SparkContext("local","PySpark Apriori")

Reading the file:

In [5]:
file = #<path to local txt file>

Creating an RDD using a minimum of 30 partitions:

In [6]:
fileRDD = sc.textFile(file, minPartitions=30)

In [7]:
fileRDD.take(5)

['FRO11987 ELE17451 ELE89019 SNA90258 GRO99222 ',
 'GRO99222 GRO12298 FRO12685 ELE91550 SNA11465 ELE26917 ELE52966 FRO90334 SNA30755 ELE17451 FRO84225 SNA80192 ',
 'ELE17451 GRO73461 DAI22896 SNA99873 FRO86643 ',
 'ELE17451 ELE37798 FRO86643 GRO56989 ELE23393 SNA11465 ',
 'ELE17451 SNA69641 FRO86643 FRO78087 SNA11465 GRO39357 ELE28573 ELE11375 DAI54444 ']

Converting all baskets to sets


In [8]:
fileRDD = fileRDD.map(lambda x: x.split()) \
       .map(set)

**Generating singletons above support level (>=85)**

In [9]:
fullsingleRDD = fileRDD.flatMap(lambda x:x) \
                .map(lambda x: (x,1)) \
                .reduceByKey(lambda a,b: a+b) \
                .filter(lambda x: x[1] >= 85)

In [10]:
fullsingleRDD.take(10)

[('GRO36567', 832),
 ('SNA66979', 703),
 ('FRO62970', 115),
 ('SNA14713', 188),
 ('FRO94523', 390),
 ('DAI88807', 1316),
 ('SNA96271', 1295),
 ('ELE15527', 332),
 ('SNA38068', 811),
 ('ELE55721', 169)]

Converting RDD to iterable form:
exRDD = [a, b, c, d]
to
exRDD2 = [[a, b, c, d]]

In [14]:
ts = fullsingleRDD.filter(lambda x: x[1] >= 25) \
                .map(lambda x: x[0]).map(lambda x: (1, x)).reduceByKey(lambda a, b: a+","+b).map(lambda x: x[1].split(","))
#.map(lambda x: (x[1]))

In [21]:
ts.take(2)

[['GRO36567',
  'SNA66979',
  'FRO62970',
  'SNA14713',
  'FRO94523',
  'DAI88807',
  'SNA96271',
  'ELE15527',
  'SNA38068',
  'ELE55721',
  'SNA53220',
  'SNA68538',
  'SNA17270',
  'FRO33641',
  'SNA37544',
  'SNA50789',
  'GRO21047',
  'FRO13478',
  'GRO72531',
  'SNA12690',
  'GRO66698',
  'DAI54320',
  'GRO67376',
  'FRO19221',
  'ELE12808',
  'GRO74951',
  'DAI39727',
  'DAI87448',
  'GRO15017',
  'GRO35122',
  'SNA72003',
  'DAI86167',
  'SNA95666',
  'SNA90094',
  'ELE88031',
  'ELE69983',
  'DAI31081',
  'DAI34374',
  'DAI59546',
  'GRO90585',
  'GRO54048',
  'ELE87243',
  'GRO26160',
  'DAI34002',
  'FRO53088',
  'SNA37363',
  'FRO48038',
  'ELE92920',
  'ELE15428',
  'DAI18527',
  'FRO11987',
  'ELE59935',
  'ELE11111',
  'DAI32480',
  'GRO85051',
  'DAI73122',
  'SNA64534',
  'ELE85027',
  'SNA55617',
  'GRO52067',
  'SNA49028',
  'ELE69675',
  'SNA38067',
  'ELE20847',
  'GRO76157',
  'GRO81647',
  'SNA93177',
  'FRO71033',
  'FRO48055',
  'GRO85863',
  'SNA63157',
  'ELE

**Function to generate n-combinations from original basket RDD**:

In [15]:
def comb(x, broad, n):
  y = frozenset(x)
  for item in broad:
    if n == 2:
      i = frozenset([item])
    else:
      i = frozenset(item)
    if i.issubset(y):
      comb = list(itertools.combinations(x, n))
      try:
        combs = [(tuple(sorted(x)),1) for x in comb]
        return combs 
      except:
        pass

**Function to generate n-combinations from original basket RDD, for association rule mining:**

In [16]:
def gen_comb(x, n):
  if n == 1:
    combos = list(x[0])
  # Calculate combinations only when parameter n > 1  
  elif n > 1:  
    y = list(x[0])
    combos = itertools.combinations(y,n)
  cmbs = []
  for c in combos:
    if n == 1:
      cmbs.append(((c), list(x)))
    elif n > 1:
      c = list(c)
    # Returns ((<n-combination>, basket)) for every combination generated from basket
      cmbs.append((tuple(c), list(x)))
  return cmbs

**Function to compute the confidence of each rule, and returning it only if it is >= the confidence parameter that is passed to the function**

In [17]:
def get_conf(x, c, n):
  if n == 2:
    denom = [x[0]]
  else:
    denom = list(x[0])
  # Confidence (I -> j) = Support (I u j)/Support of I
  conf = round(x[1][1][1]/x[1][0]*100, 2)
  if conf >= c:
    left = list(x[1][1][0])
    lft = [x for x in left if x not in denom]
    rule = str(denom)+" -> "+str(lft)+" Confidence = "+str(conf)+"%"
    return (rule, conf)
  else:
    pass


**Recursive Apriori function:**

In [22]:
# n starts at 2, because singletons have already been generated
n = 2
rulez = sc.parallelize([])
# Function to use the apriori algorithm to generate association rules:
# Parameters:
# 1. prevRDD: RDD of frequent items from previous iteration that will be used to generate candidate item sets
# 2. originalRDD: The original RDD that has all the baskets
# 3. s: Support (>= 85)
# 4. k: To generate k-combinations (4)
# 5. n: n-combinations at which recursive algorithm starts
# 6. c: Confidence threshold above which rules are deemed relevant (>= 90)
# 7. rulez: RDD to store association rules
def apriori(prevRDD, originalRDD, s, k, n, c, rulez):
  if n <= k:
    prev = prevRDD.map(lambda x: x[0])
    # Broadcasting frequent n-combinations to use
    freq_broad = sc.broadcast(prev.collect())
    # n-combinations of frequent items
    combos = originalRDD.map(lambda x: comb(x, freq_broad.value, n)) \
                        .filter(bool) \
                        .flatMap(lambda x: x) \
                        .reduceByKey(lambda a, b: a+b) \
                        .filter(lambda x: x[1] >= s)
    # List of frequent items that will be used in subsequent steps to generate the next order of candidate items
    frequent = combos.map(lambda x: x[0]) \
                         .flatMap(lambda x:x) \
                         .map(lambda x: (1, x)) \
                         .reduceByKey(lambda a, b: a+","+b) \
                         .map(lambda x: x[1].split(",")) 
    # Association rules
    # 1. Exploding frequent items for subsequent join with singles
    explRDD = combos.map(lambda x: gen_comb(x,(n-1))) \
                    .flatMap(lambda x:x) \
                    .map(lambda x: (x[0], tuple(x[1]))) 
    # 2. Conducting an inner join between singles and truly frequent items
    jointRDD = prevRDD.join(explRDD)
    # 3. Getting association rules that have a confidence above the threshold
    assocRD = jointRDD.map(lambda x: get_conf(x, c, n))\
                      .filter(bool) \
                      .filter(lambda x: x[1] >= c) \
                      .sortBy(lambda x: x[1], ascending = False) \
                      .map(lambda x: x[0])
    rulez = rulez.union(assocRD)
    freq_broad.unpersist()
    return apriori(combos, originalRDD, s, k, n+1, c, rulez)
  else:
    return rulez

In [23]:
# Generating association rules using:
# 1. The RDD of frequent singletons, the original RDD
# 2. The original RDD
# 3. Support = 85
# 4. Desired number of items in frequent itemsets = 4
# 5. Starting at pairs (2)
# 6. Confidence = 90%
# 7. rulez to store the association rules to save to a text file later
rools = apriori(fullsingleRDD, fileRDD, 85, 4, 2, 90, rulez)

Saving the generated association rules to a local file:

In [None]:
rools.coalesce(1).saveAsTextFile(<path to local folder>)

In [24]:
rools.take(5)

["['DAI93865'] -> ['FRO40251'] Confidence = 100.0%",
 "['GRO85051'] -> ['FRO40251'] Confidence = 99.92%",
 "['GRO38636'] -> ['FRO40251'] Confidence = 99.07%",
 "['ELE12951'] -> ['FRO40251'] Confidence = 99.06%",
 "['DAI88079'] -> ['FRO40251'] Confidence = 98.67%"]