##### FP Growth association rules is available only as a scala API

In [3]:
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types._

import org.apache.spark.sql.types._

In [1]:
val data = sc.textFile("adl://srramhdi.azuredatalakestore.net/clusters/srramhdir/HdiSamples/HdiSamples/mbadataset11")
val tx = data.filter(s => s.trim.split(",")(0) != "TransactionID")
tx.take(4)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1509639485426_0010,spark,idle,Link,Link,✔


SparkSession available as 'spark'.
res1: Array[String] = Array(20170718249904392,MISCELLANEOUS ENGINEERED PUMPS, 20170718249904392,"HORIZ AXIALLY SPLIT, MULTISTAGE - HEAVY DUTY", 20170807096039672,ANSI/ASME METALLIC SEALED, 20170315096039672,ANSI/ASME METALLIC SEALED)

#### Example of a reflection

In [4]:
val schemaString = "transactionID,itemName"
val fields = schemaString.split(",").map(fieldName => StructField(fieldName,StringType, nullable = true))
val schema = StructType(fields)

schema: org.apache.spark.sql.types.StructType = StructType(StructField(transactionID,StringType,true), StructField(itemName,StringType,true))

In [5]:
val rowRDD = tx.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1)))
val sqlContext = new SQLContext(sc)
val txDF = sqlContext.createDataFrame(rowRDD,schema)

txDF: org.apache.spark.sql.DataFrame = [transactionID: string, itemName: string]

In [6]:
txDF.take(2)

res2: Array[org.apache.spark.sql.Row] = Array([20170718249904392,MISCELLANEOUS ENGINEERED PUMPS], [20170718249904392,"HORIZ AXIALLY SPLIT])

##### operations on a data frame a powerful way to convert transactions into market basket with a single line of code

In [7]:
val txg :RDD[Array[String]] = txDF.groupBy("transactionID").agg(collect_set("itemName").alias("items")).select("items").as[Array[String]].rdd
/*.map(i => i(0).getString)*/

txg: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at rdd at <console>:49

In [8]:
txg.take(4)

res5: Array[Array[String]] = Array(Array(ANSI/ASME METALLIC SEALED), Array(MISCELLANEOUS ENGINEERED PUMPS), Array(HORIZ.RADIALLY SPLIT DOUBLE SUCTION SINGLE STAGE, HORIZONTAL OVERHUNG SINGLE STAGE, PROGRESSIVE CAVITY, GEAR PUMPS, BARREL MULTISTAGE, MISCELLANEOUS ENGINEERED PUMPS), Array(MISCELLANEOUS ENGINEERED PUMPS))

In [9]:
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD
val fpg = new FPGrowth().setMinSupport(0.05).setNumPartitions(10)
val model = fpg.run(txg)

model: org.apache.spark.mllib.fpm.FPGrowthModel[String] = org.apache.spark.mllib.fpm.FPGrowthModel@64ce99ac

In [10]:
model.freqItemsets.collect().foreach { itemset =>
  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}

[ANSI/ASME METALLIC SEALED], 12706
[MISCELLANEOUS ENGINEERED PUMPS], 12232
[MISCELLANEOUS ENGINEERED PUMPS,ANSI/ASME METALLIC SEALED], 2973
[HORIZONTAL OVERHUNG SINGLE STAGE], 3696
[HORIZONTAL OVERHUNG SINGLE STAGE,MISCELLANEOUS ENGINEERED PUMPS], 2188
[GEAR PUMPS], 3595
[GEAR PUMPS,MISCELLANEOUS ENGINEERED PUMPS], 1668
[GEAR PUMPS,ANSI/ASME METALLIC SEALED], 2410
[HORIZ SINGLE STG FRAME MOUNTED], 1874
[ANSI/ASME NON-METALLIC SEALED], 1810
[HORIZ AXIALLY SPLIT SINGLE STG LTE 12 INCHES], 1712
[HORIZ.RADIALLY SPLIT DOUBLE SUCTION SINGLE STAGE], 1534

In [11]:
val minConfidence = 0.5
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
  println(
    rule.antecedent.mkString("[", ",", "]")
      + " => " + rule.consequent .mkString("[", ",", "]")
      + ", " + rule.confidence)
}

[GEAR PUMPS] => [ANSI/ASME METALLIC SEALED], 0.6703755215577191
[HORIZONTAL OVERHUNG SINGLE STAGE] => [MISCELLANEOUS ENGINEERED PUMPS], 0.591991341991342