In [1]:
from pyspark import RDD, SparkConf, SparkContext
import os
import numpy as np
import math

from tree import Tree, TreeNode
import threading

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import time

memory = '10g'
pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args
os.environ["PYTHONHASHSEED"]=str(232)

In [2]:
def scanDB(path, seperation):
    db = []
    f = open(path, 'r')
    for line in f:
        if line:
            temp_list = line.rstrip().split(seperation)
            temp_list = [int(i) for i in temp_list]
            temp_list.sort()
            temp_list = [str(i) for i in temp_list]
            db.append(temp_list)
    f.close()
    return db

In [3]:
def runFreno(transactions, minsup):
    # for each worker: input (transactions line, minsup) and return minsup list
    
    tree = Tree(minsup)
    for trx in transactions:
        tree.insert(tree._root,trx)
    return tree
    
def concatFreno(transactions, tree):
    # for each worker: input (transactions line, minsup) and return minsup list
    
    for trx in transactions:
        tree.insert(tree._root,trx)
    return tree

In [4]:
def incFreno(incDBPath, minsup, sc, k, freqRange, res):
    
    deltaDBRaw = scanDB(incDBPath, " ")
        
    out_rdd = []
    for trx in deltaDBRaw:
        out_rdd.extend([trx[i:] for i in range(len(trx))])
    transDataFile = sc.parallelize(out_rdd)
    transData = transDataFile.map(lambda v: (v[0], v))
    transData = transData.map(lambda v: v[1])
    transData = transData.groupBy(lambda v: int(v[0])%k).map(lambda v : (v[0], list(v[1]))).collect()
    
    
    res = freqRange.map(lambda v: concatFreno(transData[v][1],res[v])).collect()
    return freqRange, res
    

In [5]:
def distFreno(inFile, minsup, sc, k, freqRange):
    
    transDataRaw = scanDB(inFile, " ")
    
    out_rdd = []
    for trx in transDataRaw:
        out_rdd.extend([trx[i:] for i in range(len(trx))])
    transDataFile = sc.parallelize(out_rdd)    
    transData = transDataFile.map(lambda v: (v[0], v))
    transData = transData.map(lambda v: v[1])
    
    transData = transData.groupBy(lambda v: int(v[0])%k).map(lambda v : (v[0], list(v[1]))).collect()#.sortByKey()

    print("number of partitions used: {}".format(sc.defaultParallelism))

    #phase 3: Freno from k-itemsets
    res = freqRange.map(lambda v: runFreno(transData[v][1],minsup)).collect()
    return freqRange, res

In [6]:
testFiles = ["retail"]
support = [0.4]
partition = 8
interval = [20000]

conf = SparkConf().setAppName("")
sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession(sc)
schema = StructType([
    StructField("algorithm", StringType(), False),
    StructField("datasets", StringType(), False),
    StructField("support", FloatType(), False)
])
for i in range(1):
    schema.add("test{}".format(i+1), FloatType(), True)
#experiments = []

for f in testFiles:
    for s in support:
        for i in interval:
            for t in range(1):
                transDataRaw = scanDB("./datasets/{}.txt".format(f), " ")
                numTrans = len(transDataRaw)
                minsup = s * numTrans

                incDir = "./incdatasets/interval_{0}_{1}".format(f,i)
                incNames = os.listdir(incDir)

                freqRange = sc.parallelize(range(0, partition))
                freqRange, res = distFreno(os.path.join(incDir,"db_0.txt"), minsup, sc, partition, freqRange)
                
                for incName in incNames[1:]:
                    freqRange, res = incFreno(os.path.join(incDir,incName), minsup, sc, partition, freqRange, res)
                print(res)
sc.stop()

number of partitions used: 8
[['40'], ['49'], [], [], [], [], [], []]
