# Harnessing the Power of Python with Apache Spark

### Imports

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list
from itertools import combinations
import os

### Load the Dataset

In [2]:
# initialize spark session
spark = (SparkSession.builder.appName('OnlineRetailAnalysis')
         .getOrCreate())

# file path
file_path = 'online_retail_II_2010-2011.csv'

# read dataset
df = spark.read.csv(file_path, header=True, inferSchema=True)

# display dataframe
df.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/28 17:03:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-------+---------+--------------------+--------+--------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|   InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+--------------+-----+-----------+--------------+
| 536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26| 2.55|      17850|United Kingdom|
| 536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26| 3.39|      17850|United Kingdom|
| 536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26| 2.75|      17850|United Kingdom|
| 536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26| 3.39|      17850|United Kingdom|
| 536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26| 3.39|      17850|United Kingdom|
| 536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26| 7.65|      17850|United Kingdom|
| 536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26| 4.25|      17850|United Kingdom|
| 536366|    22633|H

### Market Basket Analysis

In [3]:
# filter columns and remove nulls
df_filtered = df.select('Invoice', 'StockCode').na.drop()

# group by invoice and aggregate stockcodes
df_grouped = df_filtered.groupBy('Invoice').agg(collect_list('StockCode').alias('Items'))

# generate and count item pairs
item_pairs = (df_grouped.rdd.flatMap(lambda row: combinations(row[1], 2))
                                     .map(lambda pair: (pair, 1)))

pair_frequencies = item_pairs.reduceByKey(lambda a, b: a + b)

# sort pairs by frequency
sorted_pairs = pair_frequencies.sortBy(lambda x: x[1], ascending=False)

# display top 10 pairs
sorted_pairs.take(10)

                                                                                

[(('22697', '22698'), 618),
 (('22386', '85099B'), 544),
 (('22697', '22699'), 534),
 (('22411', '85099B'), 467),
 (('85099B', 'DOT'), 461),
 (('21931', '85099B'), 458),
 (('20725', '20727'), 443),
 (('22698', '22699'), 419),
 (('85099B', '85099C'), 396),
 (('22726', '22727'), 396)]