In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row

from pyspark.ml.fpm import FPGrowth
from pyspark.ml.fpm import PrefixSpan
from pyspark.ml.recommendation import ALS

In [2]:
%%time
spark

CPU times: user 2 µs, sys: 1 µs, total: 3 µs
Wall time: 6.44 µs


## Read data

### Load files from google cloud storage

In [3]:
%%time
sdf_orders = spark.read.option("multiline","true").json("gs://lz-order-compact-02/allcombined2.json")

23/10/11 19:58:34 WARN org.apache.hadoop.util.concurrent.ExecutorHelper: Thread (Thread[GetFileInfo #1,5,main]) interrupted: 
java.lang.InterruptedException
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:510)
	at com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:88)
	at org.apache.hadoop.util.concurrent.ExecutorHelper.logThrowableFromAfterExecute(ExecutorHelper.java:48)
	at org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor.afterExecute(HadoopThreadPoolExecutor.java:90)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1157)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
                                                                                

CPU times: user 8.79 ms, sys: 95 µs, total: 8.89 ms
Wall time: 11.7 s


## Orders

In [5]:
sdf_orders.filter(sdf_orders.CustomerId.isNotNull()).limit(3).toPandas()

                                                                                

Unnamed: 0,AnonymousId,CartId,CountryKey,CreatedAt,CustomerId,Id,LastModifiedAt,LineItems,Locale,OrderNumber,...,ShippingPrice,ShippingPriceTotalGross,ShippingState,State,Store,TaxMode,TotalGross,TotalNet,TotalPrice,TotalTax
0,0cbe06fd-3046-4441-b178-50ba88a761ff,1c091625-0311-4614-8b15-c5457ee222b8,DE,2023-09-14T23:35:31.663Z,4fa5a0fb-c80d-4006-92d3-6932cc0ba8b4,8f82f06b-d75a-415a-863a-efde9a01f3aa,2023-10-07T16:01:49.168Z,"[(2023-09-14T23:21:10.806Z, (999, None, True, ...",de,52732832,...,0.0,0.0,Shipped,Complete,storeDE,Platform,1008.97,847.85,847.85,161.12
1,13733ad2-9ba1-4496-9972-fc7fae658891,29d87e25-52a2-448e-b87f-21d74282b890,DE,2023-09-02T04:27:15.477Z,57cf49e5-f4da-4394-8cc2-f4805ae97023,24005627-de23-4d48-b8a4-b4718795b62d,2023-10-07T16:01:18.157Z,"[(2023-08-28T04:37:29.624Z, (202, None, True, ...",de,49971777,...,0.0,0.0,Shipped,Complete,storeDE,Platform,2656.68,2232.51,2232.51,424.17
2,,b802bd3b-6f17-48f1-9634-7fcc48030c29,DE,2023-07-11T14:21:44.947Z,879cb75b-2568-41aa-b05b-5d3e12d85cf8,b0bffd39-61ab-4d09-9a2a-255941c1d211,2023-10-07T15:59:25.668Z,"[(2023-07-10T17:27:54.336Z, None, a5e1d462-aa2...",de,16544379,...,0.0,0.0,Shipped,Complete,storeDE,Platform,251.28,211.15,211.15,40.13


### No need for a JSON schema because the data is already parsed

In [8]:
sdf_orders.select("LineItems")

DataFrame[LineItems: array<struct<AddedAt:string,Availability:struct<AvailableQuantity:bigint,Channels:struct<06df45e0-85be-40ff-b878-f2c8bbae9699:struct<AvailableQuantity:bigint,Id:string,IsOnStock:boolean,RestockableInDays:bigint,Version:bigint>,b56c0601-3416-4e90-82d5-de1ed4941b83:struct<AvailableQuantity:bigint,Id:string,IsOnStock:boolean,RestockableInDays:bigint,Version:bigint>>,IsOnStock:boolean,RestockableInDays:bigint>,Id:string,ImageUrl:string,LastModifiedAt:string,Name:string,Price:struct<Discounted:string,Value:double>,ProductId:string,ProductNumber:string,Tax:double,TaxedGrossPrice:double,TaxedNetPrice:double,TypeId:string>>]

Explode the array of line items to create multiple rows

In [4]:
sdf_orders_exploded = sdf_orders.select(F.col("CustomerId"), F.col("OrderNumber"), F.explode(F.col("LineItems")).alias("LineItem"))

In [10]:
sdf_orders_exploded

DataFrame[CustomerId: string, OrderNumber: string, LineItem: struct<AddedAt:string,Availability:struct<AvailableQuantity:bigint,Channels:struct<06df45e0-85be-40ff-b878-f2c8bbae9699:struct<AvailableQuantity:bigint,Id:string,IsOnStock:boolean,RestockableInDays:bigint,Version:bigint>,b56c0601-3416-4e90-82d5-de1ed4941b83:struct<AvailableQuantity:bigint,Id:string,IsOnStock:boolean,RestockableInDays:bigint,Version:bigint>>,IsOnStock:boolean,RestockableInDays:bigint>,Id:string,ImageUrl:string,LastModifiedAt:string,Name:string,Price:struct<Discounted:string,Value:double>,ProductId:string,ProductNumber:string,Tax:double,TaxedGrossPrice:double,TaxedNetPrice:double,TypeId:string>]

In [11]:
sdf_orders_exploded.limit(3).toPandas()

                                                                                

Unnamed: 0,CustomerId,OrderNumber,LineItem
0,,15590393,"(2023-04-11T12:03:46.331Z, None, c6634465-bb54..."
1,,15590393,"(2023-04-11T12:03:49.894Z, None, f4561cf2-159c..."
2,,15590393,"(2023-04-11T12:03:54.642Z, None, 3b45c378-9e81..."


Access the individual fields within the struct

In [5]:
sdf_orders_exploded = sdf_orders_exploded.select("CustomerId", "OrderNumber", "LineItem.TypeId", "LineItem.ProductNumber", "LineItem.Name", "LineItem.Price.Value", "LineItem.Price.Discounted", "LineItem.AddedAt", "LineItem.Availability.IsOnStock")

In [13]:
sdf_orders_exploded

DataFrame[CustomerId: string, OrderNumber: string, TypeId: string, ProductNumber: string, Name: string, Value: double, Discounted: string, AddedAt: string, IsOnStock: boolean]

In [14]:
sdf_orders_exploded.limit(3).toPandas()

Unnamed: 0,CustomerId,OrderNumber,TypeId,ProductNumber,Name,Value,Discounted,AddedAt,IsOnStock
0,,15590393,d490fc5d-7fe1-4175-b08a-3b38443b9116,487555,Griff CS 70 EB VOLLST.,5.35,,2023-04-11T12:03:46.331Z,
1,,15590393,d490fc5d-7fe1-4175-b08a-3b38443b9116,219049,"Scheibe SCHIJF B 8,4 DIN 9021",1.0,,2023-04-11T12:03:49.894Z,
2,,15590393,d490fc5d-7fe1-4175-b08a-3b38443b9116,457150,Kopfstück BASIS 5 A,3.85,,2023-04-11T12:03:54.642Z,


Replace TypeIDs:
- `03f9ec1f-502b-44e3-a9fa-7d2078064198` => Product
- `d490fc5d-7fe1-4175-b08a-3b38443b9116` => SparePart

In [6]:
sdf_orders_exploded = sdf_orders_exploded.withColumn("TypeId", F.regexp_replace(F.col("TypeId"), "d490fc5d-7fe1-4175-b08a-3b38443b9116", "SparePart"))
sdf_orders_exploded = sdf_orders_exploded.withColumn("TypeId", F.regexp_replace(F.col("TypeId"), "03f9ec1f-502b-44e3-a9fa-7d2078064198", "Product"))

In [16]:
sdf_orders_exploded.limit(3).toPandas()

Unnamed: 0,CustomerId,OrderNumber,TypeId,ProductNumber,Name,Value,Discounted,AddedAt,IsOnStock
0,,15590393,SparePart,487555,Griff CS 70 EB VOLLST.,5.35,,2023-04-11T12:03:46.331Z,
1,,15590393,SparePart,219049,"Scheibe SCHIJF B 8,4 DIN 9021",1.0,,2023-04-11T12:03:49.894Z,
2,,15590393,SparePart,457150,Kopfstück BASIS 5 A,3.85,,2023-04-11T12:03:54.642Z,


### Write to csv for EDA

In [9]:
output_path = "gs://lz-gcs/eda/orders2.csv"

In [10]:
sdf_orders_exploded.write.csv(output_path, header=True, mode="overwrite")

                                                                                

## FP Growth

In [7]:
sdf_orders_fp = sdf_orders_exploded.filter(sdf_orders_exploded.TypeId != 'SparePart').groupBy("OrderNumber").agg(F.collect_set("ProductNumber").alias("SalesNumbers"))

In [16]:
sdf_orders_fp.limit(3).toPandas()

                                                                                

Unnamed: 0,OrderNumber,SalesNumbers
0,10036602,"[577325, 577474]"
1,10090022,"[492125, 492271]"
2,10092132,"[577932, 497303]"


### Filter for N products / N machines (no spare parts)

In [8]:
N = 1000

In [9]:
top_machines = sdf_orders_exploded.filter(sdf_orders_exploded.TypeId != 'SparePart').groupBy("ProductNumber").count().orderBy(F.col("count").desc()).head(N) # todo

                                                                                

In [10]:
top_machines = list([row.ProductNumber for row in top_machines])

In [27]:
top_machines

['577474',
 '205398',
 '201464',
 '577315',
 '577366',
 '576984',
 '205399',
 '203994',
 '577221',
 '495207']

### Filter for top N values

In [11]:
values_to_check = top_machines

# Create a filter condition for each value in the list
filter_condition = None
for value in values_to_check:
    if filter_condition is None:
        filter_condition = F.array_contains(sdf_orders_fp.SalesNumbers, value)
    else:
        filter_condition = filter_condition | F.array_contains(sdf_orders_fp.SalesNumbers, value)

# Filter rows where "SalesNumbers" contains any of the specified values
filtered_df = sdf_orders_fp.filter(filter_condition)

In [13]:
%%time
fpGrowth = FPGrowth(itemsCol="SalesNumbers", minConfidence=0.2, minSupport=0.01)

CPU times: user 2.75 ms, sys: 0 ns, total: 2.75 ms
Wall time: 10.5 ms


OOM killed :(

In [14]:
%%time
model = fpGrowth.fit(filtered_df)

ERROR:root:Exception while sending command.                         (0 + 1) / 1]
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3508, in run_code
    exec(code_obj,

Py4JError: An error occurred while calling o3132.fit

Display frequent itemsets.

In [None]:
model.freqItemsets.sort(F.desc("items")).toPandas()

Display generated association rules.

In [None]:
model.associationRules.sort("antecedent", "consequent").toPandas()

Transform examines the input items against all the association rules and summarize the consequents as prediction

In [49]:
model.transform(filtered_df).filter(F.size("prediction") > 0).toPandas()

                                                                                

Unnamed: 0,OrderNumber,SalesNumbers,prediction
0,11926240,[577315],[577474]
1,13079805,"[203994, 201464]",[205398]
2,13917402,"[205398, 577474]",[205399]
3,16142058,"[576984, 500119, 205398]","[577474, 205399]"
4,16398237,"[202308, 577315, 498866, 204012, 576984, 57731...","[205398, 577474]"
...,...,...,...
312,63628848,"[577315, 576984, 577474]",[205398]
313,91624954,"[495207, 205399, 450158, 205398]",[577474]
314,23241218,"[203994, 577325, 205182, 205398]","[201464, 577474, 205399]"
315,40131859,"[577315, 205398, 498385]","[577474, 205399]"
