In [1]:
# frozenset() 返回一个冻结的集合，冻结后集合不能再添加或删除任何元素。
import numpy as np
import pandas as pd

from Config import *


# 倒入设置参数和加载数据集函数
from Config import *



# 返回只有单个元素的候选集
def createC1(dataSet):
    '''
        构建初始候选项集的列表，即所有候选项集只包含一个元素，
        C1是大小为1的所有候选项集的集合
    '''
    C1 = []
    for transaction in dataSet:
        for item in transaction:
            if [item] not in C1:
                C1.append([item])
    C1.sort()
    # return map( frozenset, C1 )
    # return [var for var in map(frozenset,C1)]
    return [frozenset(var) for var in C1]


def scanD(D, Ck, minSupport):
    '''
        计算Ck中的项集在数据集合D(记录或者transactions)中的支持度,
        返回满足最小支持度的项集的集合，和所有项集支持度信息的字典。
    '''
    print(len(Ck))
    ssCnt = {}
    for tid in D:  # 对于每一条transaction
        for can in Ck:  # 对于每一个候选项集can，检查是否是transaction的一部分 # 即该候选can是否得到transaction的支持
            flag = True
            for i in can:
                if i not in tid:
                    flag = False
                    
            if flag:
                ssCnt[can] = ssCnt.get(can, 0) + 1
                
#             if can.issubset(tid):
#                 ssCnt[can] = ssCnt.get(can, 0) + 1
    numItems = float(len(D))
    # print("ssCnt is",ssCnt)
    retList = []
    supportData = {}
    for key in ssCnt:
        support = ssCnt[key] / numItems  # 每个项集的支持度
        if support >= minSupport:  # 将满足最小支持度的项集，加入retList
            retList.insert(0, key)
        supportData[key] = support  # 汇总支持度数据
    return retList, supportData


def aprioriGen(Lk, k):  # Aprior算法
    '''
        由初始候选项集的集合Lk生成新的生成候选项集，
        k表示生成的新项集中所含有的元素个数
        注意其生成的过程中，首选对每个项集按元素排序，然后每次比较两个项集，只有在k-1项相同时才将这两项合并。这样做是因为函数并非要两两合并各个集合，那样生成的集合并非都是k+1项的。在限制项数为k+1的前提下，只有在前k-1项相同、最后一项不相同的情况下合并才为所需要的新候选项集。
    '''
    retList = set()
    lenLk = len(Lk)
    for i in range(lenLk):
        for j in range(i + 1, lenLk):
            
            L1 = Lk[i]
            L2 = Lk[j]
            cnt =0
            for m in L1:
                if m in L2:
                    cnt+=1
            if cnt == k-2:
                retList.add(Lk[i] | Lk[j])
    return retList


def apriori(dataSet, minSupport=0.5):
    """
    该函数为Apriori算法的主函数，按照前述伪代码的逻辑执行。Ck表示项数为k的候选项集，最初的C1通过createC1()函数生成。Lk表示项数为k的频繁项集，supK为其支持度，Lk和supK由scanD()函数通过Ck计算而来。
    :param dataSet:
    :param minSupport:
    :return:
    """
    C1 = createC1(
        dataSet)  # 构建初始候选项集C1  [frozenset({1}), frozenset({2}), frozenset({3}), frozenset({4}), frozenset({5})]

    D = [set(var) for var in dataSet]  # 集合化数据集
    L1, suppData = scanD(D, C1, minSupport)  # 构建初始的频繁项集，即所有项集只有一个元素
    L = [L1]  # 最初的L1中的每个项集含有一个元素，新生成的
    # print()
    k = 2  # 项集应该含有2个元素，所以 k=2

    while (len(L[k - 2]) > 0):
        print("iter is ", k)
        t = time.time()
        Ck = aprioriGen(L[k - 2], k)
        print(f'gen coast:{time.time() - t:.8f}s')
        
        t = time.time()
        Lk, supK = scanD(D, Ck, minSupport) # 筛选最小支持度的频繁项集
        print(f'scan coast:{time.time() - t:.8f}s')
        # print("iter is ")
        # print(Ck)
        # print(Lk)
        # print()
        suppData.update(supK)  # 将新的项集的支持度数据加入原来的总支持度字典中
        L.append(Lk)  # 将符合最小支持度要求的项集加入L
        k += 1  # 新生成的项集中的元素个数应不断增加
    return L, suppData  # 返回所有满足条件的频繁项集的列表，和所有候选项集的支持度信息


def calcConf(freqSet, H, supportData, brl, minConf=0.7):  # 规则生成与评价
    '''
        计算规则的可信度，返回满足最小可信度的规则。
        freqSet(frozenset):频繁项集
        H(frozenset):频繁项集中所有的元素
        supportData(dic):频繁项集中所有元素的支持度
        brl(tuple):满足可信度条件的关联规则
        minConf(float):最小可信度
    '''
    prunedH = []
    for conseq in H:
        conf = supportData[freqSet] / supportData[freqSet - conseq]
        if conf >= minConf:
            print(freqSet - conseq, '-->', conseq, 'conf:', conf)
            brl.append((freqSet - conseq, conseq, conf))
            prunedH.append(conseq)
    return prunedH


def rulesFromConseq(freqSet, H, supportData, brl, minConf=0.7):
    '''
        对频繁项集中元素超过2的项集进行合并。
        freqSet(frozenset):频繁项集
        H(frozenset):频繁项集中的所有元素，即可以出现在规则右部的元素
        supportData(dict):所有项集的支持度信息
        brl(tuple):生成的规则
    '''
    m = len(H[0])
    if len(freqSet) > m + 1:  # 查看频繁项集是否足够大，以到于移除大小为 m的子集，否则继续生成m+1大小的频繁项集
        Hmp1 = aprioriGen(H, m + 1)
        Hmp1 = calcConf(freqSet, Hmp1, supportData, brl, minConf)  # 对于新生成的m+1大小的频繁项集，计算新生成的关联规则的右则的集合
        if len(Hmp1) > 1:  # 如果不止一条规则满足要求（新生成的关联规则的右则的集合的大小大于1），进一步递归合并，
            # 这样做的结果就是会有“[1|多]->多”(右边只会是“多”，因为合并的本质是频繁子项集变大，
            # 而calcConf函数的关联结果的右侧就是频繁子项集）的关联结果
            rulesFromConseq(freqSet, Hmp1, supportData, brl, minConf)


def generateRules(L, supportData, minConf=0.7):
    '''
        根据频繁项集和最小可信度生成规则。
        L(list):存储频繁项集
        supportData(dict):存储着所有项集（不仅仅是频繁项集）的支持度
        minConf(float):最小可信度
    '''
    bigRuleList = []
    for i in range(1, len(L)):
        for freqSet in L[i]:  # 对于每一个频繁项集的集合freqSet
            H1 = [frozenset([item]) for item in freqSet]
            if i > 1:  # 如果频繁项集中的元素个数大于2，需要进一步合并，这样做的结果就是会有“[1|多]->多”(右边只会是“多”，
                # 因为合并的本质是频繁子项集变大，而calcConf函数的关联结果的右侧就是频繁子项集），的关联结果
                rulesFromConseq(freqSet, H1, supportData, bigRuleList, minConf)
            else:
                calcConf(freqSet, H1, supportData, bigRuleList, minConf)

    sorted(bigRuleList)
    return bigRuleList




In [2]:
myDat = loadTest()  # 导入数据集

myDat.shape

  ans = np.array(ans)


(5000,)

In [3]:

import time
t = time.time()

L, suppData = apriori(myDat, para['minSupport'])  # 选择频繁项集
# print(u"频繁项集L：", suppData)
# print(u"所有候选项集的支持度信息：", suppData)
print(f'花费的时间为:{time.time() - t:.8f}s')

60
iter is  2
gen coast:0.00062609s
528
scan coast:0.01784515s
iter is  3
gen coast:0.02405310s
4204
scan coast:0.10617709s
iter is  4
gen coast:0.54108167s
20145
scan coast:0.72195506s
iter is  5
gen coast:8.97044992s
65366
scan coast:3.34417105s
iter is  6


KeyboardInterrupt: 

In [4]:
for i in L:
    print(len(i))

25
148
415
664
658
413
161
37
4
0


##  Apriori 多进程 算法

In [5]:
from Config import *
import ray
ray.init()

RayContext(dashboard_url='', python_version='3.7.13', ray_version='1.12.0', ray_commit='f18fc31c7562990955556899090f8e8656b48d2d', address_info={'node_ip_address': '127.0.0.1', 'raylet_ip_address': '127.0.0.1', 'redis_address': None, 'object_store_address': '/tmp/ray/session_2022-05-02_00-35-45_254522_4401/sockets/plasma_store', 'raylet_socket_name': '/tmp/ray/session_2022-05-02_00-35-45_254522_4401/sockets/raylet', 'webui_url': '', 'session_dir': '/tmp/ray/session_2022-05-02_00-35-45_254522_4401', 'metrics_export_port': 62852, 'gcs_address': '127.0.0.1:63342', 'address': '127.0.0.1:63342', 'node_id': '60993580a92ec439cb64cedc9c1226e77edeec723fd0b88ce7c40ddb'})

In [6]:
# frozenset() 返回一个冻结的集合，冻结后集合不能再添加或删除任何元素。
import numpy as np
import pandas as pd


# 返回只有单个元素的候选集
def createC1(dataSet):
    '''
        构建初始候选项集的列表，即所有候选项集只包含一个元素，
        C1是大小为1的所有候选项集的集合
    '''
    C1 = []
    for transaction in dataSet:
        for item in transaction:
            if [item] not in C1:
                C1.append([item])
    C1.sort()
    # return map( frozenset, C1 )
    # return [var for var in map(frozenset,C1)]
    return [frozenset(var) for var in C1]


def scanD(D, Ck, minSupport):
    '''
        计算Ck中的项集在数据集合D(记录或者transactions)中的支持度,
        返回满足最小支持度的项集的集合，和所有项集支持度信息的字典。
    '''
    print(len(Ck))
    ssCnt = {}
    for tid in D:  # 对于每一条transaction
        for can in Ck:  # 对于每一个候选项集can，检查是否是transaction的一部分 # 即该候选can是否得到transaction的支持
            flag = True
            for i in can:
                if i not in tid:
                    flag = False
                    
            if flag:
                ssCnt[can] = ssCnt.get(can, 0) + 1

    numItems = float(len(D))
    # print("ssCnt is",ssCnt)
    retList = []
    supportData = {}
    for key in ssCnt:
        support = ssCnt[key] / numItems  # 每个项集的支持度
        if support >= minSupport:  # 将满足最小支持度的项集，加入retList
            retList.insert(0, key)
        supportData[key] = support  # 汇总支持度数据
    return retList, supportData



import copy
myDat = loadTest()  # 导入数据集

Ds = [copy.deepcopy(myDat) for i in range(10)]
def total_scan(D, Ck, minSupport):
    global Ds
    # 分为10核
    Cs = []
    Ck =list(Ck)
    print(len(Ck))
    single_size = int(len(Ck)/10)
    for i in range(10):
        Cs.append(Ck[i* single_size : (i+1)*single_size])
    Cs.append(Ck[10 * single_size: -1])
    
    t = time.time()
    ans = [single_scan.remote(Ds[i], Cs[i], minSupport) for i in range(10)]
    
    
    ans = ray.get(ans)
    print(f'scan coast:{time.time() - t:.8f}s')

    tmp = {}
    for i in ans:
        for j in i:
            if i[j] > minSupport:
                tmp[j] = i[j]
#     print(tmp)
    ret_list = [i for i in tmp]
    return ret_list, tmp
    

@ray.remote
def single_scan(D, Ck, minS):
    r, c = scanD(D, Ck,minS)
    return c


def aprioriGen(Lk, k):  # Aprior算法
    '''
        由初始候选项集的集合Lk生成新的生成候选项集，
        k表示生成的新项集中所含有的元素个数
        注意其生成的过程中，首选对每个项集按元素排序，然后每次比较两个项集，只有在k-1项相同时才将这两项合并。这样做是因为函数并非要两两合并各个集合，那样生成的集合并非都是k+1项的。在限制项数为k+1的前提下，只有在前k-1项相同、最后一项不相同的情况下合并才为所需要的新候选项集。
    '''
    retList = set()
    lenLk = len(Lk)
    for i in range(lenLk):
        for j in range(i + 1, lenLk):
            
            L1 = Lk[i]
            L2 = Lk[j]
            cnt =0
            for m in L1:
                if m in L2:
                    cnt+=1
            if cnt == k-2:
                retList.add(Lk[i] | Lk[j])
    return retList


def apriori(dataSet, minSupport=0.5):
    """
    该函数为Apriori算法的主函数，按照前述伪代码的逻辑执行。Ck表示项数为k的候选项集，最初的C1通过createC1()函数生成。Lk表示项数为k的频繁项集，supK为其支持度，Lk和supK由scanD()函数通过Ck计算而来。
    :param dataSet:
    :param minSupport:
    :return:
    """
    C1 = createC1(
        dataSet)  # 构建初始候选项集C1  [frozenset({1}), frozenset({2}), frozenset({3}), frozenset({4}), frozenset({5})]

    D = [set(var) for var in dataSet]  # 集合化数据集
    L1, suppData = scanD(D, C1, minSupport)  # 构建初始的频繁项集，即所有项集只有一个元素
    L = [L1]  # 最初的L1中的每个项集含有一个元素，新生成的
    # print()
    k = 2  # 项集应该含有2个元素，所以 k=2

    while (len(L[k - 2]) > 0):
        print("iter is ", k)
        t = time.time()
        Ck = aprioriGen(L[k - 2], k)
        print(f'gen coast:{time.time() - t:.8f}s')
        
        Lk, supK = total_scan(D, Ck, minSupport) # 筛选最小支持度的频繁项集
        # print("iter is ")
        # print(Ck)
        # print(Lk)
        # print()
        suppData.update(supK)  # 将新的项集的支持度数据加入原来的总支持度字典中
        L.append(Lk)  # 将符合最小支持度要求的项集加入L
        k += 1  # 新生成的项集中的元素个数应不断增加
    return L, suppData  # 返回所有满足条件的频繁项集的列表，和所有候选项集的支持度信息


def calcConf(freqSet, H, supportData, brl, minConf=0.7):  # 规则生成与评价
    '''
        计算规则的可信度，返回满足最小可信度的规则。
        freqSet(frozenset):频繁项集
        H(frozenset):频繁项集中所有的元素
        supportData(dic):频繁项集中所有元素的支持度
        brl(tuple):满足可信度条件的关联规则
        minConf(float):最小可信度
    '''
    prunedH = []
    for conseq in H:
        conf = supportData[freqSet] / supportData[freqSet - conseq]
        if conf >= minConf:
            print(freqSet - conseq, '-->', conseq, 'conf:', conf)
            brl.append((freqSet - conseq, conseq, conf))
            prunedH.append(conseq)
    return prunedH


def rulesFromConseq(freqSet, H, supportData, brl, minConf=0.7):
    '''
        对频繁项集中元素超过2的项集进行合并。
        freqSet(frozenset):频繁项集
        H(frozenset):频繁项集中的所有元素，即可以出现在规则右部的元素
        supportData(dict):所有项集的支持度信息
        brl(tuple):生成的规则
    '''
    m = len(H[0])
    if len(freqSet) > m + 1:  # 查看频繁项集是否足够大，以到于移除大小为 m的子集，否则继续生成m+1大小的频繁项集
        Hmp1 = aprioriGen(H, m + 1)
        Hmp1 = calcConf(freqSet, Hmp1, supportData, brl, minConf)  # 对于新生成的m+1大小的频繁项集，计算新生成的关联规则的右则的集合
        if len(Hmp1) > 1:  # 如果不止一条规则满足要求（新生成的关联规则的右则的集合的大小大于1），进一步递归合并，
            # 这样做的结果就是会有“[1|多]->多”(右边只会是“多”，因为合并的本质是频繁子项集变大，
            # 而calcConf函数的关联结果的右侧就是频繁子项集）的关联结果
            rulesFromConseq(freqSet, Hmp1, supportData, brl, minConf)


def generateRules(L, supportData, minConf=0.7):
    '''
        根据频繁项集和最小可信度生成规则。
        L(list):存储频繁项集
        supportData(dict):存储着所有项集（不仅仅是频繁项集）的支持度
        minConf(float):最小可信度
    '''
    bigRuleList = []
    for i in range(1, len(L)):
        for freqSet in L[i]:  # 对于每一个频繁项集的集合freqSet
            H1 = [frozenset([item]) for item in freqSet]
            if i > 1:  # 如果频繁项集中的元素个数大于2，需要进一步合并，这样做的结果就是会有“[1|多]->多”(右边只会是“多”，
                # 因为合并的本质是频繁子项集变大，而calcConf函数的关联结果的右侧就是频繁子项集），的关联结果
                rulesFromConseq(freqSet, H1, supportData, bigRuleList, minConf)
            else:
                calcConf(freqSet, H1, supportData, bigRuleList, minConf)

    sorted(bigRuleList)
    return bigRuleList




In [7]:
myDat = loadTest()  # 导入数据集
myDat.shape

(8123, 20)

In [8]:

import time
t = time.time()

L, suppData = apriori(myDat, para['minSupport'])  # 选择频繁项集
# print(u"频繁项集L：", suppData)
# print(u"所有候选项集的支持度信息：", suppData)
print(f'花费的时间为:{time.time() - t:.8f}s')

102
iter is  2
gen coast:0.00034595s
300
[2m[36m(single_scan pid=4426)[0m 30
[2m[36m(single_scan pid=4424)[0m 30
[2m[36m(single_scan pid=4430)[0m 30
[2m[36m(single_scan pid=4431)[0m 30
[2m[36m(single_scan pid=4425)[0m 30
[2m[36m(single_scan pid=4429)[0m 30
[2m[36m(single_scan pid=4432)[0m 30
[2m[36m(single_scan pid=4427)[0m 30
[2m[36m(single_scan pid=4433)[0m 30
[2m[36m(single_scan pid=4428)[0m 30
scan coast:1.68275499s
iter is  3
gen coast:0.00255108s
1202
[2m[36m(single_scan pid=4426)[0m 120
[2m[36m(single_scan pid=4430)[0m 120
[2m[36m(single_scan pid=4431)[0m 120
[2m[36m(single_scan pid=4425)[0m 120
[2m[36m(single_scan pid=4432)[0m 120
[2m[36m(single_scan pid=4427)[0m 120
[2m[36m(single_scan pid=4433)[0m 120
[2m[36m(single_scan pid=4428)[0m 120
[2m[36m(single_scan pid=4424)[0m 120
[2m[36m(single_scan pid=4429)[0m 120
scan coast:9.77492213s
iter is  4
gen coast:0.02339005s
2456
[2m[36m(single_scan pid=4426)[0m 245
[2m[3

In [9]:
for i in L:
    print(len(i))

25
148
415
662
658
411
158
37
4
0
[2m[36m(single_scan pid=4426)[0m 0
[2m[36m(single_scan pid=4426)[0m 0
[2m[36m(single_scan pid=4431)[0m 0
[2m[36m(single_scan pid=4431)[0m 0
[2m[36m(single_scan pid=4431)[0m 0
[2m[36m(single_scan pid=4431)[0m 0
[2m[36m(single_scan pid=4431)[0m 0
[2m[36m(single_scan pid=4429)[0m 0
[2m[36m(single_scan pid=4429)[0m 0
[2m[36m(single_scan pid=4429)[0m 0


# 多进程实验结果

不难看出，实验正确性一致。

并且实验的性能得到了巨大提升。

我这里使用10个进程，总效率提升了5倍。单scan部分提升了接近10倍。