In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col, count, collect_set, udf, explode
from pyspark.sql.types import ArrayType, StringType, BooleanType, StructField, IntegerType, StructType

In [None]:
spark = SparkSession.builder.appName("Apriori").getOrCreate()
df = spark.read.csv('/content/Assignment-1_Data.csv', sep=';',header = True)
df.printSchema()

root
 |-- BillNo: string (nullable = true)
 |-- Itemname: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [None]:
df = df.filter(df.Quantity>0).dropna()
df.show(5)

+------+--------------------+--------+----------------+-----+----------+--------------+
|BillNo|            Itemname|Quantity|            Date|Price|CustomerID|       Country|
+------+--------------------+--------+----------------+-----+----------+--------------+
|536365|WHITE HANGING HEA...|       6|01.12.2010 08:26| 2,55|     17850|United Kingdom|
|536365| WHITE METAL LANTERN|       6|01.12.2010 08:26| 3,39|     17850|United Kingdom|
|536365|CREAM CUPID HEART...|       8|01.12.2010 08:26| 2,75|     17850|United Kingdom|
|536365|KNITTED UNION FLA...|       6|01.12.2010 08:26| 3,39|     17850|United Kingdom|
|536365|RED WOOLLY HOTTIE...|       6|01.12.2010 08:26| 3,39|     17850|United Kingdom|
+------+--------------------+--------+----------------+-----+----------+--------------+
only showing top 5 rows



In [None]:
df_group = df.groupBy("BillNo").agg(collect_set("Itemname").alias("Basket"))
df_group.show(5)

+------+--------------------+
|BillNo|              Basket|
+------+--------------------+
|536366|[HAND WARMER UNIO...|
|536367|[FELTCRAFT PRINCE...|
|536371|[PAPER CHAIN KIT ...|
|536374|[VICTORIAN SEWING...|
|536375|[EDWARDIAN PARASO...|
+------+--------------------+
only showing top 5 rows



In [None]:
df_small = df_group.limit(100)
df_small.show(5)

+------+--------------------+
|BillNo|              Basket|
+------+--------------------+
|536366|[HAND WARMER UNIO...|
|536367|[FELTCRAFT PRINCE...|
|536371|[PAPER CHAIN KIT ...|
|536374|[VICTORIAN SEWING...|
|536375|[EDWARDIAN PARASO...|
+------+--------------------+
only showing top 5 rows



In [None]:
def generate_candidate_itemsets(frequent_itemsets, k):
    candidate_itemsets = set()
    for i in range(len(frequent_itemsets)):
        for j in range(i + 1, len(frequent_itemsets)):
            if k==2:
              itemset1 = tuple([frequent_itemsets[i]])
              itemset2 = tuple([frequent_itemsets[j]])
            else:
              itemset1 = tuple([value for value in frequent_itemsets[i]])
              itemset2 = tuple([value for value in frequent_itemsets[j]])
            # print(set(itemset1))
            # print(set(itemset2))
            union_itemset = tuple(set(itemset1)|set(itemset2))
            # print(set(itemset1)|set(itemset2))
            # print(union_itemset)
            if len(union_itemset) == k:
                candidate_itemsets.add(union_itemset)
    return list(candidate_itemsets)

In [None]:
def count_freq(transaction,items):
  count = 0
  print(set(items))
  
  for trans in transaction:
    print(set(trans))
    if set(items).issubset(set(trans)):
      count += 1
  return (items,count)

In [None]:
def apriori(dataset,threshold):
  # Map Phase
  itemsets = dataset.select('Basket').rdd.flatMap(lambda row: [(item) for item in row[0]])
  frequent_itemsets = itemsets.map(lambda itemset: (itemset, 1)) \
                                .reduceByKey(lambda a, b: a + b) \
                                .filter(lambda x: x[1] >= threshold) \
                                .map(lambda x: x[0]) \
                                .collect()
  Frequent_set = frequent_itemsets.copy()
  # Reduce phase
  k = 2
  while frequent_itemsets:
        candidate_itemsets = generate_candidate_itemsets(frequent_itemsets, k)
        sc = SparkContext.getOrCreate()
        c_item = sc.parallelize(candidate_itemsets)
        rdd_transaction = dataset.select('Basket').rdd.flatMap(lambda row: [item for item in row]).collect()
        frequent_itemsets = c_item.map(lambda items : count_freq(rdd_transaction,items)) \
                .reduceByKey(lambda a, b: a + b) \
                .filter(lambda x: x[1] >= threshold) \
                .map(lambda x: x[0]) \
                .collect()
        for items in frequent_itemsets:
          Frequent_set.append(items)
        print('K =',k)
        print(frequent_itemsets)
        k += 1

  return Frequent_set

In [None]:
F_set = apriori(df_small,7)

In [None]:
F_set

['HAND WARMER UNION JACK',
 'HAND WARMER RED POLKA DOT',
 'ASSORTED COLOUR BIRD ORNAMENT',
 "PAPER CHAIN KIT 50'S CHRISTMAS",
 'WOOD S/3 CABINET ANT WHITE FINISH',
 'VINTAGE BILLBOARD DRINK ME MUG',
 'RED WOOLLY HOTTIE WHITE HEART.',
 'SET 7 BABUSHKA NESTING BOXES',
 'CREAM CUPID HEARTS COAT HANGER',
 'GLASS STAR FROSTED T-LIGHT HOLDER',
 'WOOD 2 DRAWER CABINET WHITE FINISH',
 'SAVE THE PLANET MUG',
 'RETRO COFFEE MUGS ASSORTED',
 'VINTAGE BILLBOARD LOVE/HATE MUG',
 'KNITTED UNION FLAG HOT WATER BOTTLE',
 'WHITE HANGING HEART T-LIGHT HOLDER',
 'WOODEN PICTURE FRAME WHITE FINISH',
 'WOODEN FRAME ANTIQUE WHITE',
 'WHITE METAL LANTERN',
 'JAM MAKING SET PRINTED',
 'JUMBO BAG RED RETROSPOT',
 'VINTAGE HEADS AND TAILS CARD GAME',
 'HAND WARMER OWL DESIGN',
 'HAND WARMER SCOTTY DOG DESIGN',
 'HAND WARMER RED RETROSPOT',
 'HAND WARMER BIRD DESIGN',
 'PACK OF 72 RETROSPOT CAKE CASES',
 'PAPER CHAIN KIT VINTAGE CHRISTMAS',
 'REGENCY CAKESTAND 3 TIER',
 'JUMBO SHOPPER VINTAGE RED PAISLEY',
 ('VI