In [13]:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

In [14]:
case class Ticket( 
    TID: String, 
    Product: String)

case class Frequency(
    Source: String, 
    Target: String, 
    Weight: Double)

In [15]:
// Ladda data, i detta fall semikolon-separerade (";")
val data = sc.textFile("Transactions.csv")
val kv = data.map(_.split(";")).map(v => Ticket(v(0), v(1))).cache()

// Skapa en Spark DataFrame och registrera den i SQL-kontexten
val df = sqlContext.createDataFrame(kv)
df.registerTempTable("transactions")

In [16]:
// Generera produktsupport
val prodFreq = sqlContext.sql("""
            SELECT Product, CAST(COUNT(DISTINCT TID) AS Double) as count
            FROM transactions 
            GROUP BY Product"""
        ).toDF()

val totFreq = df.groupBy("TID").count().count()

val productSupport = prodFreq.select(prodFreq("Product"), (prodFreq("count") / totFreq).cast("double").as("Support")).cache()

In [17]:
val prodCart = sqlContext.sql("""
            SELECT 
                tab1.Product AS Source, 
                tab2.Product AS Target, 
                COUNT(DISTINCT tab1.TID) AS Weight 
            FROM transactions tab1 
            JOIN transactions tab2 
            WHERE tab1.TID = tab2.TID AND tab1.Product <> tab2.Product
            GROUP BY tab1.Product, tab2.Product"""
        ).toDF().cache()

In [18]:
val ruleSupport = prodCart.select(
            prodCart("Source"), 
            prodCart("Target"), 
            (prodCart("Weight").cast("double") / totFreq.toDouble).as("Support")
            ).cache()

val ruleConfidence = prodCart.join(prodFreq, prodCart("Source") === prodFreq("Product")).select(
            prodCart("Source"), 
            prodCart("Target"), 
            (prodCart("Weight").cast("double") / prodFreq("count")).as("Confidence")
            ).cache()

In [19]:
val ruleConfSup = ruleSupport.join(ruleConfidence, ruleSupport("Source") === ruleConfidence("Source") and 
    ruleSupport("Target") === ruleConfidence("Target")).select(
        ruleSupport("Source"),
        ruleSupport("Target"),
        ruleSupport("Support"),
        ruleConfidence("Confidence")
    )

In [20]:
ruleSupport.registerTempTable("rulesupport")
productSupport.registerTempTable("productsupport")
ruleConfSup.registerTempTable("ruleconfsup")

In [21]:
// Formulan för lift(X-->Y) = ( (sup(X U Y)/ N) / (sup(X)/ N*sup(Y)/ N )
val queryString = "SELECT rs.Source, rs.Target, rs.Support, rs.Confidence, (rs.Support / "+ totFreq.toString+ " / (ss.Support / (ts.Support * "+ totFreq.toString+ ") / "+ totFreq.toString+ """
        )) AS Lift FROM ruleconfsup AS rs
        JOIN productsupport AS ss ON rs.Source = ss.Product
        JOIN productsupport AS ts ON rs.Target = ts.Product"""

In [22]:
val fullMBA = sqlContext.sql(queryString).cache()

In [23]:
// Spara resultatet
fullMBA.repartition(1).save("Rules", "json")
productSupport.repartition(1).save("Products", "json")