# README

In [1]:
# insert here your kaggle credentials
KAGGLE_USERNAME = ''
KAGGLE_KEY = ''

# if you want to visualize frequent itemsets
# N.B. execution time will increase a lot because a dataframe will be created
# with DUMP=False only the counts of frequent itemsets will be shown
DUMP = False
# if you want to save frequent itemsets on file
SAVE = False

# Setup

In [2]:
!pip install kaggle==1.5.12
!pip install pyspark==3.1.2

Collecting pyspark==3.1.2
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 63 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 22.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=e78030fc721ed7e7d54369d399907adb9911fee2fce5b300b390206a148fa329
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [3]:
import csv
import logging
import os
import psutil
import random
import shutil

from collections import defaultdict
from datetime import datetime
from itertools import combinations
from pathlib import Path
from pyspark import AccumulatorParam, RDD, Broadcast, Accumulator
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window
from types import FunctionType
from typing import Any, Union, Callable, Dict, Optional, Iterator, Iterable, List, Tuple, Set

In [4]:
def delete_path(path: Union[str, Path]) -> None:
    """
    Remove folder and all subfolders.
    
    :param path: path to be removed
    """
    shutil.rmtree(path, ignore_errors=True)


def get_path(base_path: Union[str, Path], *args: Any, create: bool = False, delete: bool = False) -> Path:
    """
    Join path with default separator. Optionally it can create and/or clean the specified path if it is a folder.

    :param base_path: base path to use to new path
    :param args: eventually subfolders
    :param create: create the folder if not exists
    :param delete: clear folder if it already exists
    :return: the required path
    """
    path = Path(base_path, *[str(arg) for arg in args if arg])
    if delete:
        delete_path(path)
    if create:
        os.makedirs(path, exist_ok=True)
    return path


def get_filename(path: Union[str, Path]) -> str:
    """
    Get filename or last folder from a path.

    :param path: path of a file
    :return: the filename or last folder
    """
    return Path(path).stem


def set_env_variables(**kwargs: Any) -> None:
    """
    Set environment variables.

    :param kwargs: dictionary of key-value environment variables
    """
    for key, value in kwargs.items():
        os.environ[key] = str(value)


def is_empty(path: Union[str, Path]) -> bool:
    """
    Check if a folder is empty.

    :param path: path of a folder
    :return: true if path does not exists of if it has no files
    """
    path = Path(path)

    return not (path.exists() and bool(os.listdir(path)))


def timer(func: FunctionType) -> Callable:
    """
    Calculating execution time for a specific function.
    
    :param func: function to be executed
    :return: wrapper for the function
    """
    def wrapper(*args, **kwargs) -> Any:
        start = datetime.now()
        result = func(*args, **kwargs)
        end = datetime.now()
        print(f'Execution time for {func.__name__ }: {end - start}')

        return result

    return wrapper


def memory_used(func: FunctionType) -> Callable:
    """
    Calculating memory usage for a specific function.
    
    :param func: function to be executed
    :return: wrapper for the function
    """
    def wrapper(*args, **kwargs) -> Any:
        start_mem = psutil.virtual_memory().used
        result = func(*args, **kwargs)
        end_mem = psutil.virtual_memory().used
        print(f'Memory used for {func.__name__ }: {end_mem - start_mem}')
        
        return result
    
    return wrapper


def read_csvfile(path: Union[str, Path]) -> Iterator[Tuple[str, str]]:
    """
    Read csv file as list of fields skipping header.

    :param path: path to csv file
    :param header: if header is present
    :return:
    """
    logging.info(f'Reading csv path {path}')
    with open(path) as file:
        reader = csv.reader(file)
        for row in reader:
            yield row


def save_csvfile(data: Iterator, path: Union[str, Path]) -> None:
    """
    Save an iterator as csv file.

    :param data: dict to save to file
    :param path: path where save csv file
    :return:
    """
    with open(path, 'w') as file:
        writer = csv.writer(file)
        writer.writerows(data)

In [5]:
def get_spark() -> SparkSession:
    """
    Create a SparkSession with predefined values.

    :return: a SparkSession object
    """
    return (SparkSession.builder
            .appName('amd')
            .getOrCreate())
    

def check_empty(df: DataFrame) -> bool:
    """
    Check if a dataframe is empty.

    :param df: dataframe to be checked
    :return: true if dataframe has 0 rows, false otherwise
    """
    return df.take(1).count == 0


def read_csv_rdd(path: Union[Path, str], sep: str = ',') -> Optional[RDD]:
    """
    Read a csv file and put it on a RDD. It returns None if no such path exists.

    :param path: path to csv file
    :param sep: separator used in csv file
    :return: dataframe from selected path
    """
    logging.info(f'Reading csv path {path} with sep:{sep}')

    path = Path(path)
    if path.exists():
        return (get_spark()
                .sparkContext
                .textFile(str(path))
                .map(lambda row: row.split(sep))
                )

    return None


def read_csv_df(path: Union[Path, str], header: bool = True, sep: str = ',') -> Optional[DataFrame]:
    """
    Read a csv file and put it on a DataFrame. It will escape quote to allow reading multiple lines column.
    It returns None if no such path exists.

    :param path: path to csv file
    :param header: if file has a first row with column names
    :param sep: separator used in csv file
    :return: dataframe from selected path
    """
    logging.info(f'Reading csv path {path} with header:{header} and sep:{sep}')

    path = Path(path)
    if path.exists():
        return (get_spark()
                .read
                .option('header', header)
                .option('multiLine', True)
                .option("escape", "\"")
                .option('sep', sep)
                .csv(str(path)))

    return None


@timer
def save_csv_df(df: DataFrame, path: Union[Path, str], header: bool = True) -> None:
    """
    Save a DataFrame in a csv file.
    
    :param df: dataframe to be saved
    :param path: path where to save the file
    :param header: if csv file will have an header
    :return: 
    """
    logging.info(f'Saving csv path {path}')
    
    df.write.csv(str(path), header=header)


def read_parquet(path: Union[Path, str]) -> Optional[DataFrame]:
    """
    Read a parquet file and put in a DataFrame. It returns None if no such path exists.
    
    :param path: path to parquet file
    :return: dataframe from selected path
    """
    logging.info(f'Reading parquet path {path}')

    path = Path(path)
    if path.exists():
        return get_spark().read.parquet(str(path))

    return None


@timer
def save_parquet(df: DataFrame, path: Union[Path, str]) -> None:
    """
    Save a DataFrame in a parquet file.
    
    :param df: dataframe to be saved
    :param path: path where to save the file
    :return: 
    """
    logging.info(f'Saving parquet path {path}')
    
    df.write.parquet(str(path))



class DictParam(AccumulatorParam):
    """
    It allows to have a dict accumulator.
    """
    def zero(self, value: Dict = None) -> Dict:
        return {}

    def addInPlace(self, value1: Dict, value2: Dict) -> Dict:
        value1.update(value2)

        return value1


# Configuration

In [6]:
# directories
ROOT_DIR = Path('.').parent.absolute()
DATASETS = get_path(ROOT_DIR, 'datasets', create=True)
RESULTS = get_path(ROOT_DIR, 'results', create=True)

In [7]:
# Kaggle
set_env_variables(KAGGLE_USERNAME=KAGGLE_USERNAME, KAGGLE_KEY=KAGGLE_KEY)

In [8]:
# dataset
DATASET = 'ashirwadsangwan/imdb-dataset'
APRIORI_THRESHOLD = 30
SON_CHUNKS = 5
TOIVONEN_MAX_ITERATIONS = 5
TOIVONEN_SIZE_SAMPLE = 50000
TOIVONEN_THRESHOLD_ADJUST = 0.8
DATASET_PATH = get_path(DATASETS, DATASET)
RAW_PATH = get_path(DATASET_PATH, 'raw')

In [9]:
# loggging
LOG_FORMAT = '[%(levelname)s %(asctime)-15s]:%(name)s: %(message)s'
LOG_LEVEL = logging.INFO

logging.basicConfig(format=LOG_FORMAT, level=LOG_LEVEL)

# Download dataset from Kaggle


In [10]:
@timer
def download_dataset(name: str, force: bool = False) -> None:
    """
    Download the specified dataset if not already download.
@timer
    :param name: name of the dataset
    :param force: force downlaod if dataset already exists
    """
    path = get_path(DATASETS, name, create=True, delete=force)
    if not is_empty(path):
        logging.info(f'Dataset {name} already downloaded!')
        return

    # require authentication before importing
    from kaggle import api

    api.dataset_download_cli(dataset=name, path=path, unzip=True)
    logging.info(f'Dataset {name} downloaded!')


In [11]:
download_dataset(DATASET)

  0%|          | 0.00/1.44G [00:00<?, ?B/s]

Downloading imdb-dataset.zip to /content/datasets/ashirwadsangwan/imdb-dataset


100%|██████████| 1.44G/1.44G [00:14<00:00, 108MB/s] 





[INFO 2021-07-20 11:03:19,796]:root: Dataset ashirwadsangwan/imdb-dataset downloaded!


Execution time for download_dataset: 0:02:09.299070


# Data extraction and preprocessing

In [12]:
@timer
def extract_data(force: bool = False) -> DataFrame:
    """
    Extract dataframe using movies, actors and names.

    :return: a dataframe
    """
    path = get_path(RAW_PATH, delete=force)
    raw_parquet = get_path(path, 'parquet')
    raw_csv = get_path(path, 'csv')

    if not is_empty(raw_parquet):
        logging.info('Reading already extracted data')

        return read_parquet(raw_parquet)

    if not is_empty(raw_csv):
        logging.info('Reading already extracted data')

        return read_csv_df(raw_csv)

    logging.info('Extracing data from movies, actors and names')

    movies = read_csv_df(get_path(DATASET_PATH, 'title.basics.tsv.gz'), sep='\t').filter('titleType = "movie"')
    actors = (read_csv_df(get_path(DATASET_PATH, 'title.principals.tsv.gz'), sep='\t')
              .filter('category IN ("actor", "actress")'))
    names = read_csv_df(get_path(DATASET_PATH, 'name.basics.tsv.gz'), sep='\t')

    df = (actors
          .join(movies, on='tconst')
          .join(names, on='nconst', how='left')
          .select('tconst', 'nconst', 'primaryName')
          .withColumnRenamed('tconst', 'movie')
          .withColumnRenamed('nconst', 'actor1')
          .withColumnRenamed('primaryName', 'actor1Name')
          ).persist()
          
    save_parquet(df, raw_parquet)
    save_csv_df(df.select('movie', 'actor1')
               .groupBy('movie').agg(F.collect_list('actor1').alias('actors'))
               .withColumn('actors', F.concat_ws('|', 'actors'))
               .coalesce(1), raw_csv, header=False)

    return df

In [13]:
dataframe = extract_data()

[INFO 2021-07-20 11:03:19,836]:root: Extracing data from movies, actors and names
[INFO 2021-07-20 11:03:19,838]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/title.basics.tsv.gz with header:True and sep:	
[INFO 2021-07-20 11:03:34,819]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/title.principals.tsv.gz with header:True and sep:	
[INFO 2021-07-20 11:03:35,113]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/name.basics.tsv.gz with header:True and sep:	
[INFO 2021-07-20 11:03:36,414]:root: Saving parquet path /content/datasets/ashirwadsangwan/imdb-dataset/raw/parquet
[INFO 2021-07-20 11:05:27,505]:root: Saving csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv


Execution time for save_parquet: 0:01:50.949921
Execution time for save_csv_df: 0:00:22.556925
Execution time for extract_data: 0:02:30.230504


In [14]:
dataframe.show()

+---------+---------+--------------+
|    movie|   actor1|    actor1Name|
+---------+---------+--------------+
|tt0083109|nm0000086|Louis de Funès|
|tt0062120|nm0000086|Louis de Funès|
|tt0057422|nm0000086|Louis de Funès|
|tt0058089|nm0000086|Louis de Funès|
|tt0067274|nm0000086|Louis de Funès|
|tt0079200|nm0000086|Louis de Funès|
|tt0048994|nm0000086|Louis de Funès|
|tt8768374|nm0000086|Louis de Funès|
|tt0066423|nm0000086|Louis de Funès|
|tt0063674|nm0000086|Louis de Funès|
|tt0056906|nm0000086|Louis de Funès|
|tt0078528|nm0000086|Louis de Funès|
|tt0046453|nm0000086|Louis de Funès|
|tt0061728|nm0000086|Louis de Funès|
|tt0057051|nm0000086|Louis de Funès|
|tt0069747|nm0000086|Louis de Funès|
|tt0054394|nm0000086|Louis de Funès|
|tt0059168|nm0000086|Louis de Funès|
|tt0057414|nm0000086|Louis de Funès|
|tt0052686|nm0000086|Louis de Funès|
+---------+---------+--------------+
only showing top 20 rows



# Data analysis

In [15]:
Itemset = Tuple[int, ...]
CandidateFrequentItemsets = Dict[Itemset, int]
FrequentItemsets = CandidateFrequentItemsets
BitMap = Dict[int, bool]
Transaction = Tuple[str, str]


class State:
    """
    State of an algorithm.
    At least k (size of the itemset), lk (frequent itemset) and force (if it is needed to recalculate) are present.
    """
    def __init__(self, **state):
        self.state = state

    def __setitem__(self, key: str, value: Any) -> None:
        self.state[key] = value

    def __getitem__(self, item: str) -> Any:
        return self.state[item]

    def __add__(self, other: 'State') -> 'State':
        state = {**self.state}
        state.update(other.state)

        return State(**state)

    def update(self, other: 'State') -> None:
        self.state.update(other.state)


Algorithm = Iterator[State]

In [16]:
@timer
def dump_frequent_itemsets_stats(df: DataFrame, n: int) -> None:
    """
    Dump some useful information about frequent itemsets.
    
    :param df: dataframe which contains frequent itemsets
    :param n: lenght of frequent itemset
    :return: 
    """
    logging.info('Calculating frequent itemsets for dataframe')
    columns = [column.format(i) for i in range(1, n + 1) for column in ['actor{}', 'actor{}Name']]
    # distinct to remove duplicate from eliminating movie column
    df = df.select('support', *columns).distinct().orderBy(F.col('support').desc())
    df.show()
    stats = df.select(
        F.count('*').alias('count'),
        F.avg('support').alias('avg'),
        F.max('support').alias('max'),
        F.min('support').alias('min')
    )
    stats.show()


def create_temp_df(lk: CandidateFrequentItemsets, names: DataFrame) -> DataFrame:
    """
    Create a dataframe starting from frequent itemsets.

    :param lk: frequent itemsets with count
    :param names: names of the actors
    :return: dataframe for lk
    """
    df = get_spark().createDataFrame(lk.items(), ['actors', 'support'])
    df = df.select('actors.*', 'support')
    columns = ['support']

    for column in df.columns:
        if column.startswith('_'):
            number = column[1:]
            new_column = f'actor{number}'
            name_column = f'{new_column}Name'
            df = (df
                  .withColumn(column, F.concat(F.lit('nm'), F.lpad(column, 7, '0')))
                  .withColumnRenamed(column, new_column)
                  )
            df = (df
                  .join(names, names['nconst'] == F.col(new_column), how='left')
                  .withColumnRenamed('PrimaryName', name_column)
                  .select(*df.columns, new_column, name_column)
                  )

            columns.append(new_column)
            columns.append(name_column)

    return df.select(*columns)


def find_csv(path: Union[str, Path]) -> Path:
    """
    Find the csv file created by spark.

    :param path: path where to search the csv file
    :return: the correct path to csv file
    """
    for f in os.listdir(path):
        if f.endswith('.csv'):
            return get_path(path, f)


def read_frequent_itemsets(path: Union[str, Path]) -> FrequentItemsets:
    """
    Read csv file as frequent itemsets.

    :param path: path where frequent itemsets are stored
    :return: frequent itemsets in the csv file
    """
    path = get_path(path, 'results.csv')

    return {tuple([int(item) for item in row[0].split('|')]): int(row[1]) for row in read_csvfile(path)}


def save_frequent_itemsets(itemsets: FrequentItemsets, path: Union[str, Path]) -> None:
    """
    Save frequent itemsets as csv file where keys are the first column and values the second column.

    :param itemsets:
    :param path:
    :return:
    """
    get_path(path, create=True)
    save_csvfile((('|'.join([str(item) for item in key]), value) for key, value in itemsets.items()),
                 get_path(path, 'results.csv'))

In [17]:
file = find_csv(get_path(RAW_PATH, 'csv'))
file

PosixPath('/content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv')

## Apriori

In [18]:
@timer
def get_ck(df: DataFrame, *cols: str) -> DataFrame:
    """
    Extract candidate sets of lenght + 1 with respect to existing itemsets.

    :param df: dataframe which contains itemsets
    :param cols: columns needed to create new candidate itemsets
    :return: a dataframe which contains candidate itemsets
    """

    size = len(cols)

    # singleton
    if not size:
        return df

    column = cols[-1]
    next_column = f'actor{size + 1}'

    small_df = (df.select('movie', f'{column}Name', *cols)
                .withColumnRenamed(column, next_column)
                .withColumnRenamed(f'{column}Name', f'{next_column}Name'))

    # join on movie and all actors needed
    join_cond = ['movie'] + list(cols)[:-1]
    return df.join(small_df, on=join_cond).filter(f'{column} < {next_column}').persist()


@timer
def get_lk(state: State) -> DataFrame:
    """
    Extract frequent itemsets from candidate itemsets.

    :param state: state of the algorithm
    """
    df = state['df']
    threshold = state['threshold']
    size = state['k']
    cols = [f'actor{i}' for i in range(1, size + 1)]
    force = state['force']

    path = get_path(RESULTS, f'apriori_{threshold}_{size}', 'parquet', 'apriori', delete=force)

    if not is_empty(path):
        logging.info('Reading already extracted data')

        return read_parquet(path)

    logging.info(f'Executing apriori algorithm with {size} items and {threshold} as threshold')

    actors_movies = Window.partitionBy(*cols)

    df = get_ck(df, *cols[:-1])

    df_with_count = df.withColumn('support', F.count('*').over(actors_movies))
    df = df_with_count.filter(f'support >= {threshold}').persist()
    # dataframe has movie column so without distinct there are duplicates
    logging.info(f'Found {df.select(*cols).distinct().count()} frequent itemsets')

    if state['save']:
        save_parquet(df, path)

    return df


def apriori_algorithm(state: State) -> Algorithm:
    """
    Executing apriori algorithm starting from data and a given threshold.

    :param state: state of the algorithm:
        - df: dataframe which contains candidate itemsets
        - threshold: threshold for the apriori algorithm
        - force: to force recalculating frequent itemsets
    :return: dataframe with frequent itemsets
    """
    state = State(k=1, force=False, save=SAVE) + state
    while not check_empty(state['df']):
        state['df'] = get_lk(state)
        state['k'] += 1

        yield state

In [19]:
algorithm = apriori_algorithm(State(df=dataframe, threshold=APRIORI_THRESHOLD))

singleton = next(algorithm)['df']
doubleton = next(algorithm)['df']
triple = next(algorithm)['df']
quadruple = next(algorithm)['df']
quintuple = next(algorithm)['df']

[INFO 2021-07-20 11:05:50,810]:root: Executing apriori algorithm with 1 items and 30 as threshold


Execution time for get_ck: 0:00:00.000014


[INFO 2021-07-20 11:05:56,994]:root: Found 7453 frequent itemsets
[INFO 2021-07-20 11:05:57,129]:root: Executing apriori algorithm with 2 items and 30 as threshold


Execution time for get_lk: 0:00:06.190069
Execution time for get_ck: 0:00:00.173500


[INFO 2021-07-20 11:06:28,806]:root: Found 373 frequent itemsets
[INFO 2021-07-20 11:06:28,973]:root: Executing apriori algorithm with 3 items and 30 as threshold


Execution time for get_lk: 0:00:31.682864
Execution time for get_ck: 0:00:00.294696


[INFO 2021-07-20 11:06:34,865]:root: Found 90 frequent itemsets


Execution time for get_lk: 0:00:05.900464


[INFO 2021-07-20 11:06:35,133]:root: Executing apriori algorithm with 4 items and 30 as threshold


Execution time for get_ck: 0:00:00.311984


[INFO 2021-07-20 11:06:40,858]:root: Found 34 frequent itemsets


Execution time for get_lk: 0:00:05.738673


[INFO 2021-07-20 11:06:41,248]:root: Executing apriori algorithm with 5 items and 30 as threshold


Execution time for get_ck: 0:00:00.491614


[INFO 2021-07-20 11:06:47,607]:root: Found 5 frequent itemsets


Execution time for get_lk: 0:00:06.369266


In [20]:
if DUMP:
    dump_frequent_itemsets_stats(singleton, 1)
    dump_frequent_itemsets_stats(doubleton, 2)
    dump_frequent_itemsets_stats(triple, 3)
    dump_frequent_itemsets_stats(quadruple, 4)
    dump_frequent_itemsets_stats(quintuple, 5)

[INFO 2021-07-20 11:06:47,641]:root: Calculating frequent itemsets for dataframe


+-------+---------+------------------+
|support|   actor1|        actor1Name|
+-------+---------+------------------+
|    797|nm0103977|      Brahmanandam|
|    585|nm0006982|       Adoor Bhasi|
|    565|nm0648803|  Matsunosuke Onoe|
|    506|nm0305182|      Eddie Garcia|
|    436|nm0623427|        Prem Nazir|
|    411|nm0793813|      Sung-il Shin|
|    391|nm0246703|      Paquito Diaz|
|    387|nm0619107|  Masayoshi Nogami|
|    380|nm0007123|         Mammootty|
|    356|nm7390393|    Aachi Manorama|
|    348|nm0046850|           Bahadur|
|    344|nm0482320|          Mohanlal|
|    330|nm0149822|Mithun Chakraborty|
|    323|nm0304262|   Shivaji Ganesan|
|    315|nm0706691|       Sultan Rahi|
|    313|nm0619309|            Nagesh|
|    311|nm0007106|     Shakti Kapoor|
|    303|nm0001000|         Tom Byron|
|    303|nm0419653|       Jayabharati|
|    303|nm0659250|       Pandharibai|
+-------+---------+------------------+
only showing top 20 rows



[INFO 2021-07-20 11:06:52,120]:root: Calculating frequent itemsets for dataframe


+-----+-----------------+---+---+
|count|              avg|max|min|
+-----+-----------------+---+---+
| 7453|57.35167046826781|797| 30|
+-----+-----------------+---+---+

Execution time for dump_frequent_itemsets_stats: 0:00:04.478142
+-------+---------+--------------------+---------+--------------------+
|support|   actor1|          actor1Name|   actor2|          actor2Name|
+-------+---------+--------------------+---------+--------------------+
|    236|nm0006982|         Adoor Bhasi|nm0623427|          Prem Nazir|
|    169|nm0006982|         Adoor Bhasi|nm0046850|             Bahadur|
|    162|nm0006982|         Adoor Bhasi|nm0419653|         Jayabharati|
|    147|nm0648803|    Matsunosuke Onoe|nm2082516|        Kijaku Ôtani|
|    126|nm0648803|    Matsunosuke Onoe|nm2373718|    Kitsuraku Arashi|
|    122|nm0006982|         Adoor Bhasi|nm0619779|Thikkurisi Sukuma...|
|    113|nm0648803|    Matsunosuke Onoe|nm2077739|   Suminojo Ichikawa|
|    113|nm2082516|        Kijaku Ôtani|nm237

[INFO 2021-07-20 11:06:56,524]:root: Calculating frequent itemsets for dataframe


+-----+------------------+---+---+
|count|               avg|max|min|
+-----+------------------+---+---+
|  373|45.501340482573724|236| 30|
+-----+------------------+---+---+

Execution time for dump_frequent_itemsets_stats: 0:00:04.404519
+-------+---------+-------------------+---------+--------------------+---------+------------------+
|support|   actor1|         actor1Name|   actor2|          actor2Name|   actor3|        actor3Name|
+-------+---------+-------------------+---------+--------------------+---------+------------------+
|    112|nm0648803|   Matsunosuke Onoe|nm2082516|        Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|    100|nm0648803|   Matsunosuke Onoe|nm2077739|   Suminojo Ichikawa|nm2082516|      Kijaku Ôtani|
|     95|nm0648803|   Matsunosuke Onoe|nm2077739|   Suminojo Ichikawa|nm2373718|  Kitsuraku Arashi|
|     87|nm2077739|  Suminojo Ichikawa|nm2082516|        Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     80|nm0648803|   Matsunosuke Onoe|nm1770187| Sen'nosuke

[INFO 2021-07-20 11:07:00,797]:root: Calculating frequent itemsets for dataframe


+-----+-----------------+---+---+
|count|              avg|max|min|
+-----+-----------------+---+---+
|   90|45.44444444444444|112| 30|
+-----+-----------------+---+---+

Execution time for dump_frequent_itemsets_stats: 0:00:04.272377
+-------+---------+-------------------+---------+-------------------+---------+-----------------+---------+------------------+
|support|   actor1|         actor1Name|   actor2|         actor2Name|   actor3|       actor3Name|   actor4|        actor4Name|
+-------+---------+-------------------+---------+-------------------+---------+-----------------+---------+------------------+
|     86|nm0648803|   Matsunosuke Onoe|nm2077739|  Suminojo Ichikawa|nm2082516|     Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     62|nm0648803|   Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2082516|     Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     54|nm0648803|   Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2077739|Suminojo Ichikawa|nm2082516|      Kijaku Ôtani|
|  

[INFO 2021-07-20 11:07:05,587]:root: Calculating frequent itemsets for dataframe


+-----+------------------+---+---+
|count|               avg|max|min|
+-----+------------------+---+---+
|   34|39.705882352941174| 86| 30|
+-----+------------------+---+---+

Execution time for dump_frequent_itemsets_stats: 0:00:04.790236
+-------+---------+----------------+---------+-------------------+---------+-----------------+---------+----------------+---------+------------------+
|support|   actor1|      actor1Name|   actor2|         actor2Name|   actor3|       actor3Name|   actor4|      actor4Name|   actor5|        actor5Name|
+-------+---------+----------------+---------+-------------------+---------+-----------------+---------+----------------+---------+------------------+
|     44|nm0648803|Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2077739|Suminojo Ichikawa|nm2082516|    Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     42|nm0648803|Matsunosuke Onoe|nm2077739|  Suminojo Ichikawa|nm2082516|     Kijaku Ôtani|nm2373151|  Chosei Kataoka|nm2373718|  Kitsuraku Arashi|
|    

## Apriori in memory

In [21]:
with open(file) as csvfile:
  print(next(csvfile))
  print(next(csvfile))

tt0002591,nm0509573|nm0029806

tt0003689,nm0585503|nm0694718|nm0101071|nm0910564|nm0527801|nm0399988|nm0728289



In [22]:
@timer
def get_ck_v2(transactions: Iterator[Transaction], k: int,
           monotonicity_filter: Callable[[Itemset], int]) -> CandidateFrequentItemsets:
    """
    Scan the file and extract candidate frequent itemsets checking the monotonicity condition.

    :param transactions: iterator of transactions
    :param k: size of the itemset
    :param monotonicity_filter: filter for the monotonicity condition
    :return: candidate frequent itemsets
    """
    accumulator = defaultdict(int)
    for row in transactions:
        raw_actors = [int(actor[2:]) for actor in row[1].split('|')]
        for comb in combinations(sorted(raw_actors), k):
            if monotonicity_filter(comb):
                accumulator[comb] += 1
    return accumulator


@timer
def get_lk_v2(transactions: Iterator[Transaction], state: State) -> FrequentItemsets:
    """
    Extract frequent itemsets checking the support.

    :param transactions: iterator of transactions
    :param state: state of the algorithm
    :return: frequent itemsets
    """
    size = state['k']
    old_lk = state['lk']
    threshold = state['threshold']
    force = state['force']

    path = get_path(RESULTS, f'apriori_{threshold}_{size}', 'csv', 'apriori', delete=force)

    if not is_empty(path):
        logging.info('Reading already extracted data')

        return read_frequent_itemsets(path)

    logging.info(f'Executing apriori algorithm with {size} items and {threshold} as threshold')

    if size == 1:
        ck = get_ck_v2(transactions, size, lambda _: True)
    else:
        ck = get_ck_v2(transactions, size,
                    lambda itemset: all([item in old_lk for item in combinations(itemset, size - 1)]))

    lk = {item: support
          for item, support in ck.items()
          if support >= threshold}
    logging.info(f'Found {len(lk)} frequent itemsets')

    if state['save']:
        save_frequent_itemsets(lk, path)

    return lk


def apriori_algorithm_v2(transactions: Callable[[], Iterator[Transaction]], state: State) -> Algorithm:
    """
    Executing apriori algorithm starting from data and a given threshold.

    :param transactions: callable to an iterator of transactions
    :param state: state of the algorithm:
        - threshold: threshold for the apriori algorithm
        - k: size of the itemsets
        - lk: frequent itemsets with size k-1
        - force: to force recalculating frequent itemsets
    :return: dict of frequent itemsets
    """
    state = State(k=1, lk={}, force=False, save=SAVE) + state

    while state['k'] == 1 or state['lk']:
        state['lk'] = get_lk_v2(transactions(), state)
        state['k'] += 1

        yield state

In [23]:
algorithm_v2 = apriori_algorithm_v2(lambda: read_csvfile(file), State(threshold=APRIORI_THRESHOLD))

singleton_v2 = next(algorithm_v2)['lk']
doubleton_v2 = next(algorithm_v2)['lk']
triple_v2 = next(algorithm_v2)['lk']
quadruple_v2 = next(algorithm_v2)['lk']
quintuple_v2 = next(algorithm_v2)['lk']

[INFO 2021-07-20 11:07:11,007]:root: Executing apriori algorithm with 1 items and 30 as threshold
[INFO 2021-07-20 11:07:11,010]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv
[INFO 2021-07-20 11:07:13,843]:root: Found 7453 frequent itemsets
[INFO 2021-07-20 11:07:13,867]:root: Executing apriori algorithm with 2 items and 30 as threshold
[INFO 2021-07-20 11:07:13,870]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v2: 0:00:02.794869
Execution time for get_lk_v2: 0:00:02.859134


[INFO 2021-07-20 11:07:18,751]:root: Found 373 frequent itemsets
[INFO 2021-07-20 11:07:18,764]:root: Executing apriori algorithm with 3 items and 30 as threshold
[INFO 2021-07-20 11:07:18,767]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v2: 0:00:04.868524
Execution time for get_lk_v2: 0:00:04.896488


[INFO 2021-07-20 11:07:24,127]:root: Found 90 frequent itemsets
[INFO 2021-07-20 11:07:24,128]:root: Executing apriori algorithm with 4 items and 30 as threshold
[INFO 2021-07-20 11:07:24,136]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v2: 0:00:05.359565
Execution time for get_lk_v2: 0:00:05.364041


[INFO 2021-07-20 11:07:29,758]:root: Found 34 frequent itemsets
[INFO 2021-07-20 11:07:29,760]:root: Executing apriori algorithm with 5 items and 30 as threshold
[INFO 2021-07-20 11:07:29,766]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v2: 0:00:05.622153
Execution time for get_lk_v2: 0:00:05.631350


[INFO 2021-07-20 11:07:34,710]:root: Found 5 frequent itemsets


Execution time for get_ck_v2: 0:00:04.943719
Execution time for get_lk_v2: 0:00:04.951622


In [24]:
if DUMP:
    names = read_csv_df(get_path(DATASET_PATH, 'name.basics.tsv.gz'), sep='\t')
    dump_frequent_itemsets_stats(create_temp_df(singleton_v2, names), 1)
    dump_frequent_itemsets_stats(create_temp_df(doubleton_v2, names), 2)
    dump_frequent_itemsets_stats(create_temp_df(triple_v2, names), 3)
    dump_frequent_itemsets_stats(create_temp_df(quadruple_v2, names), 4)
    dump_frequent_itemsets_stats(create_temp_df(quintuple_v2, names), 5)

[INFO 2021-07-20 11:07:34,729]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/name.basics.tsv.gz with header:True and sep:	
[INFO 2021-07-20 11:07:35,802]:root: Calculating frequent itemsets for dataframe


+-------+---------+------------------+
|support|   actor1|        actor1Name|
+-------+---------+------------------+
|    797|nm0103977|      Brahmanandam|
|    585|nm0006982|       Adoor Bhasi|
|    565|nm0648803|  Matsunosuke Onoe|
|    506|nm0305182|      Eddie Garcia|
|    436|nm0623427|        Prem Nazir|
|    411|nm0793813|      Sung-il Shin|
|    391|nm0246703|      Paquito Diaz|
|    387|nm0619107|  Masayoshi Nogami|
|    380|nm0007123|         Mammootty|
|    356|nm7390393|    Aachi Manorama|
|    348|nm0046850|           Bahadur|
|    344|nm0482320|          Mohanlal|
|    330|nm0149822|Mithun Chakraborty|
|    323|nm0304262|   Shivaji Ganesan|
|    315|nm0706691|       Sultan Rahi|
|    313|nm0619309|            Nagesh|
|    311|nm0007106|     Shakti Kapoor|
|    303|nm0659250|       Pandharibai|
|    303|nm0419653|       Jayabharati|
|    303|nm0001000|         Tom Byron|
+-------+---------+------------------+
only showing top 20 rows

+-----+-----------------+---+---+
|cou

[INFO 2021-07-20 11:08:27,677]:root: Calculating frequent itemsets for dataframe


+-------+---------+--------------------+---------+--------------------+
|support|   actor1|          actor1Name|   actor2|          actor2Name|
+-------+---------+--------------------+---------+--------------------+
|    236|nm0006982|         Adoor Bhasi|nm0623427|          Prem Nazir|
|    169|nm0006982|         Adoor Bhasi|nm0046850|             Bahadur|
|    162|nm0006982|         Adoor Bhasi|nm0419653|         Jayabharati|
|    147|nm0648803|    Matsunosuke Onoe|nm2082516|        Kijaku Ôtani|
|    126|nm0648803|    Matsunosuke Onoe|nm2373718|    Kitsuraku Arashi|
|    122|nm0006982|         Adoor Bhasi|nm0619779|Thikkurisi Sukuma...|
|    113|nm2082516|        Kijaku Ôtani|nm2373718|    Kitsuraku Arashi|
|    113|nm0648803|    Matsunosuke Onoe|nm2077739|   Suminojo Ichikawa|
|    109|nm0046850|             Bahadur|nm0419653|         Jayabharati|
|    103|nm0659173|            Panchito|nm1006879|              Dolphy|
|    101|nm0619779|Thikkurisi Sukuma...|nm0623427|          Prem

[INFO 2021-07-20 11:09:26,629]:root: Calculating frequent itemsets for dataframe


+-------+---------+-------------------+---------+--------------------+---------+------------------+
|support|   actor1|         actor1Name|   actor2|          actor2Name|   actor3|        actor3Name|
+-------+---------+-------------------+---------+--------------------+---------+------------------+
|    112|nm0648803|   Matsunosuke Onoe|nm2082516|        Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|    100|nm0648803|   Matsunosuke Onoe|nm2077739|   Suminojo Ichikawa|nm2082516|      Kijaku Ôtani|
|     95|nm0648803|   Matsunosuke Onoe|nm2077739|   Suminojo Ichikawa|nm2373718|  Kitsuraku Arashi|
|     87|nm2077739|  Suminojo Ichikawa|nm2082516|        Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     80|nm0648803|   Matsunosuke Onoe|nm1770187| Sen'nosuke Nakamura|nm2082516|      Kijaku Ôtani|
|     75|nm0006982|        Adoor Bhasi|nm0046850|             Bahadur|nm0419653|       Jayabharati|
|     74|nm0006982|        Adoor Bhasi|nm0619779|Thikkurisi Sukuma...|nm0623427|        Prem Nazir|


[INFO 2021-07-20 11:10:32,789]:root: Calculating frequent itemsets for dataframe


+-------+---------+-------------------+---------+-------------------+---------+-----------------+---------+------------------+
|support|   actor1|         actor1Name|   actor2|         actor2Name|   actor3|       actor3Name|   actor4|        actor4Name|
+-------+---------+-------------------+---------+-------------------+---------+-----------------+---------+------------------+
|     86|nm0648803|   Matsunosuke Onoe|nm2077739|  Suminojo Ichikawa|nm2082516|     Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     62|nm0648803|   Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2082516|     Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     54|nm0648803|   Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2077739|Suminojo Ichikawa|nm2082516|      Kijaku Ôtani|
|     51|nm1698868|    Enshô Jitsukawa|nm2366585|      Ritoku Arashi|nm2367854|     Shôzô Arashi|nm2384746|       Hôshô Bandô|
|     51|nm0648803|   Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2077739|Suminojo Ichikawa|nm2373718|  Ki

[INFO 2021-07-20 11:11:47,681]:root: Calculating frequent itemsets for dataframe


+-------+---------+----------------+---------+-------------------+---------+-----------------+---------+----------------+---------+------------------+
|support|   actor1|      actor1Name|   actor2|         actor2Name|   actor3|       actor3Name|   actor4|      actor4Name|   actor5|        actor5Name|
+-------+---------+----------------+---------+-------------------+---------+-----------------+---------+----------------+---------+------------------+
|     44|nm0648803|Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2077739|Suminojo Ichikawa|nm2082516|    Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     42|nm0648803|Matsunosuke Onoe|nm2077739|  Suminojo Ichikawa|nm2082516|     Kijaku Ôtani|nm2373151|  Chosei Kataoka|nm2373718|  Kitsuraku Arashi|
|     31|nm0648803|Matsunosuke Onoe|nm2077739|  Suminojo Ichikawa|nm2082516|     Kijaku Ôtani|nm2373718|Kitsuraku Arashi|nm2426685|Kakumatsuro Arashi|
|     30|nm0648803|Matsunosuke Onoe|nm2077739|  Suminojo Ichikawa|nm2082516|     Kijaku Ôtani|

## PCY multihash


In [25]:
@timer
def get_ck_v3(transactions: Iterator[Transaction], monotonicity_filter: Callable[[Itemset], bool],
           bitmaps_filter: Callable[[Itemset], bool],
           state: State) -> Tuple[CandidateFrequentItemsets, List[BitMap]]:
    """
    Scan the file and extract candidate frequent itemsets checking the monotonicity condition and bitmaps.

    :param transactions: iterator of transactions
    :param monotonicity_filter: filter for the monotonicity condition
    :param bitmaps_filter: filter for the bitmaps
    :param state: state of the algorithm
    :return: candidate frequent itemsets
    """
    k = state['k']
    hash_functions = state['hash_functions']
    buckets = state['buckets']
    threshold = state['threshold']

    accumulator = defaultdict(int)
    counters = [defaultdict(int)] * len(hash_functions)
    for row in transactions:
        raw_actors = [int(actor[2:]) for actor in row[1].split('|')]
        for comb in combinations(sorted(raw_actors), k):
            if monotonicity_filter(comb) and bitmaps_filter(comb):
                accumulator[comb] += 1
        for comb in combinations(sorted(raw_actors), k + 1):
            for counter, hash_function in zip(counters, hash_functions):
                bucket = hash_function(buckets, comb)
                counter[bucket] += 1
    return accumulator, [
        {bucket: count >= threshold for bucket, count in counter.items()}
        for counter in counters
    ]


@timer
def get_lk_v3(transactions: Iterator[Transaction], state: State) -> Tuple[FrequentItemsets, List[BitMap]]:
    """
    Extract frequent itemsets checking the support.

    :param transactions: iterator of transactions
    :param state: state of the algorithm
    :return:
    """
    size = state['k']
    old_lk = state['lk']
    threshold = state['threshold']
    bitmaps = state['bitmaps']
    old_buckets = state['old_buckets']
    hash_functions = state['hash_functions']
    force = state['force']

    path = get_path(RESULTS, f'apriori_{threshold}_{size}', 'csv', 'pcy', delete=force)

    if not is_empty(path):
        logging.info('Reading already extracted data')

        return read_frequent_itemsets(path), []

    logging.info(f'Executing apriori algorithm with {size} items and {threshold} as threshold')

    def monotonicity(itemset: Itemset) -> bool:
        return size == 1 or all([item in old_lk for item in combinations(itemset, size - 1)])

    def check_bitmaps(itemset: Itemset) -> bool:
        return all([bitmap[hash_function(old_buckets, itemset)]]
                   for bitmap, hash_function in zip(bitmaps, hash_functions))

    ck, bitmaps = get_ck_v3(transactions, monotonicity, check_bitmaps, state)
    lk = {item: support
          for item, support in ck.items()
          if support >= threshold}
    logging.info(f'Found {len(lk)} frequent itemsets')

    if state['save']:
        save_frequent_itemsets(lk, path)

    return lk, bitmaps


def pcy_algorithm(transactions: Callable[[], Iterator[Transaction]], state: State) -> Algorithm:
    """
    Executing apriori algorithm starting from data and a given threshold.

    :param transactions: callable to an iterator of transactions
    :param state: state of the algorithm:
        - threshold: threshold for the apriori algorithm
        - k: size of the itemsets
        - lk: frequent itemsets with size k-1
        - bitmaps: bitmaps previously calculated for frequent itemset with size k+1
        - hash_functions: hash function to be applied to candidate frequent itemsets
        - old_buckets: number of buckets for the previous bitmaps
        - buckets: number of buckets for the current bitmaps
        - force: to force recalculating frequent itemsets
    :return: dict of frequent itemsets
    """
    length = len(state['hash_functions'])
    buckets = int(psutil.virtual_memory().free * 0.7 / length)
    state = State(k=1, lk={}, bitmaps=[], old_buckets=buckets, buckets=buckets, force=False, save=SAVE) + state

    while state['k'] == 1 or state['lk']:
        lk, bitmaps = get_lk_v3(transactions(), state)
        state['lk'] = lk
        state['bitmaps'] = bitmaps
        state['k'] += 1
        old_buckets, buckets = buckets, int(psutil.virtual_memory().free * 0.7 / length)

        yield state

In [26]:
def hash_function1(buckets: int, itemset: Itemset) -> int:
    acc = 1
    for item in itemset:
        acc *= item

    return acc % buckets


def hash_function2(buckets: int, itemset: Itemset) -> int:
    coeff = list(range(1, len(itemset) + 1))

    acc = 1
    for c, item in zip(coeff, itemset):
        acc *= c * item

    return acc % buckets

In [27]:
algorithm_v3 = pcy_algorithm(lambda: read_csvfile(file), State(threshold=APRIORI_THRESHOLD,
                                                               hash_functions=(hash_function1, hash_function2)))

singleton_v3 = next(algorithm_v3)['lk']
doubleton_v3 = next(algorithm_v3)['lk']
triple_v3 = next(algorithm_v3)['lk']
quadruple_v3 = next(algorithm_v3)['lk']
quintuple_v3 = next(algorithm_v3)['lk']

[INFO 2021-07-20 11:13:12,580]:root: Executing apriori algorithm with 1 items and 30 as threshold
[INFO 2021-07-20 11:13:12,582]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv
[INFO 2021-07-20 11:13:33,218]:root: Found 7453 frequent itemsets
[INFO 2021-07-20 11:13:33,260]:root: Executing apriori algorithm with 2 items and 30 as threshold
[INFO 2021-07-20 11:13:33,263]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v3: 0:00:20.597911
Execution time for get_lk_v3: 0:00:20.678678


[INFO 2021-07-20 11:14:03,526]:root: Found 373 frequent itemsets
[INFO 2021-07-20 11:14:03,643]:root: Executing apriori algorithm with 3 items and 30 as threshold
[INFO 2021-07-20 11:14:03,645]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v3: 0:00:30.248995
Execution time for get_lk_v3: 0:00:30.290497


[INFO 2021-07-20 11:14:30,824]:root: Found 90 frequent itemsets
[INFO 2021-07-20 11:14:30,971]:root: Executing apriori algorithm with 4 items and 30 as threshold
[INFO 2021-07-20 11:14:30,973]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v3: 0:00:27.178608
Execution time for get_lk_v3: 0:00:27.185568


[INFO 2021-07-20 11:14:52,388]:root: Found 34 frequent itemsets
[INFO 2021-07-20 11:14:52,553]:root: Executing apriori algorithm with 5 items and 30 as threshold
[INFO 2021-07-20 11:14:52,555]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v3: 0:00:21.414002
Execution time for get_lk_v3: 0:00:21.418590


[INFO 2021-07-20 11:15:06,477]:root: Found 5 frequent itemsets


Execution time for get_ck_v3: 0:00:13.921718
Execution time for get_lk_v3: 0:00:13.925489


In [28]:
if DUMP:
    names = read_csv_df(get_path(DATASET_PATH, 'name.basics.tsv.gz'), sep='\t')
    dump_frequent_itemsets_stats(create_temp_df(singleton_v3, names), 1)
    dump_frequent_itemsets_stats(create_temp_df(doubleton_v3, names), 2)
    dump_frequent_itemsets_stats(create_temp_df(triple_v3, names), 3)
    dump_frequent_itemsets_stats(create_temp_df(quadruple_v3, names), 4)
    dump_frequent_itemsets_stats(create_temp_df(quintuple_v3, names), 5)

[INFO 2021-07-20 11:15:06,605]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/name.basics.tsv.gz with header:True and sep:	
[INFO 2021-07-20 11:15:07,350]:root: Calculating frequent itemsets for dataframe


+-------+---------+------------------+
|support|   actor1|        actor1Name|
+-------+---------+------------------+
|    797|nm0103977|      Brahmanandam|
|    585|nm0006982|       Adoor Bhasi|
|    565|nm0648803|  Matsunosuke Onoe|
|    506|nm0305182|      Eddie Garcia|
|    436|nm0623427|        Prem Nazir|
|    411|nm0793813|      Sung-il Shin|
|    391|nm0246703|      Paquito Diaz|
|    387|nm0619107|  Masayoshi Nogami|
|    380|nm0007123|         Mammootty|
|    356|nm7390393|    Aachi Manorama|
|    348|nm0046850|           Bahadur|
|    344|nm0482320|          Mohanlal|
|    330|nm0149822|Mithun Chakraborty|
|    323|nm0304262|   Shivaji Ganesan|
|    315|nm0706691|       Sultan Rahi|
|    313|nm0619309|            Nagesh|
|    311|nm0007106|     Shakti Kapoor|
|    303|nm0419653|       Jayabharati|
|    303|nm0001000|         Tom Byron|
|    303|nm0659250|       Pandharibai|
+-------+---------+------------------+
only showing top 20 rows



[INFO 2021-07-20 11:15:57,646]:root: Calculating frequent itemsets for dataframe


+-----+-----------------+---+---+
|count|              avg|max|min|
+-----+-----------------+---+---+
| 7453|57.35167046826781|797| 30|
+-----+-----------------+---+---+

Execution time for dump_frequent_itemsets_stats: 0:00:50.123674
+-------+---------+--------------------+---------+--------------------+
|support|   actor1|          actor1Name|   actor2|          actor2Name|
+-------+---------+--------------------+---------+--------------------+
|    236|nm0006982|         Adoor Bhasi|nm0623427|          Prem Nazir|
|    169|nm0006982|         Adoor Bhasi|nm0046850|             Bahadur|
|    162|nm0006982|         Adoor Bhasi|nm0419653|         Jayabharati|
|    147|nm0648803|    Matsunosuke Onoe|nm2082516|        Kijaku Ôtani|
|    126|nm0648803|    Matsunosuke Onoe|nm2373718|    Kitsuraku Arashi|
|    122|nm0006982|         Adoor Bhasi|nm0619779|Thikkurisi Sukuma...|
|    113|nm2082516|        Kijaku Ôtani|nm2373718|    Kitsuraku Arashi|
|    113|nm0648803|    Matsunosuke Onoe|nm207

[INFO 2021-07-20 11:16:52,698]:root: Calculating frequent itemsets for dataframe


+-------+---------+-------------------+---------+--------------------+---------+------------------+
|support|   actor1|         actor1Name|   actor2|          actor2Name|   actor3|        actor3Name|
+-------+---------+-------------------+---------+--------------------+---------+------------------+
|    112|nm0648803|   Matsunosuke Onoe|nm2082516|        Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|    100|nm0648803|   Matsunosuke Onoe|nm2077739|   Suminojo Ichikawa|nm2082516|      Kijaku Ôtani|
|     95|nm0648803|   Matsunosuke Onoe|nm2077739|   Suminojo Ichikawa|nm2373718|  Kitsuraku Arashi|
|     87|nm2077739|  Suminojo Ichikawa|nm2082516|        Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     80|nm0648803|   Matsunosuke Onoe|nm1770187| Sen'nosuke Nakamura|nm2082516|      Kijaku Ôtani|
|     75|nm0006982|        Adoor Bhasi|nm0046850|             Bahadur|nm0419653|       Jayabharati|
|     74|nm0006982|        Adoor Bhasi|nm0619779|Thikkurisi Sukuma...|nm0623427|        Prem Nazir|


[INFO 2021-07-20 11:17:57,697]:root: Calculating frequent itemsets for dataframe


+-------+---------+-------------------+---------+-------------------+---------+-----------------+---------+------------------+
|support|   actor1|         actor1Name|   actor2|         actor2Name|   actor3|       actor3Name|   actor4|        actor4Name|
+-------+---------+-------------------+---------+-------------------+---------+-----------------+---------+------------------+
|     86|nm0648803|   Matsunosuke Onoe|nm2077739|  Suminojo Ichikawa|nm2082516|     Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     62|nm0648803|   Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2082516|     Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     54|nm0648803|   Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2077739|Suminojo Ichikawa|nm2082516|      Kijaku Ôtani|
|     51|nm1698868|    Enshô Jitsukawa|nm2366585|      Ritoku Arashi|nm2367854|     Shôzô Arashi|nm2384746|       Hôshô Bandô|
|     51|nm0648803|   Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2077739|Suminojo Ichikawa|nm2373718|  Ki

[INFO 2021-07-20 11:19:11,132]:root: Calculating frequent itemsets for dataframe


+-------+---------+----------------+---------+-------------------+---------+-----------------+---------+----------------+---------+------------------+
|support|   actor1|      actor1Name|   actor2|         actor2Name|   actor3|       actor3Name|   actor4|      actor4Name|   actor5|        actor5Name|
+-------+---------+----------------+---------+-------------------+---------+-----------------+---------+----------------+---------+------------------+
|     44|nm0648803|Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2077739|Suminojo Ichikawa|nm2082516|    Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     42|nm0648803|Matsunosuke Onoe|nm2077739|  Suminojo Ichikawa|nm2082516|     Kijaku Ôtani|nm2373151|  Chosei Kataoka|nm2373718|  Kitsuraku Arashi|
|     31|nm0648803|Matsunosuke Onoe|nm2077739|  Suminojo Ichikawa|nm2082516|     Kijaku Ôtani|nm2373718|Kitsuraku Arashi|nm2426685|Kakumatsuro Arashi|
|     30|nm0648803|Matsunosuke Onoe|nm2077739|  Suminojo Ichikawa|nm2082516|     Kijaku Ôtani|

## SON

In [29]:
@timer
def get_ck_v4(rdd: RDD, algorithm: Callable[[Any], Algorithm],
           old_state: Broadcast, new_state: Accumulator) -> Set[Itemset]:
    """
    Scan the chunk and extract candidate frequent itemsets.

    :param rdd: rdd which contains transactions
    :param algorithm: algorithm to extract frequent itemsets
    :param old_state: previous state of the algorithm
    :param new_state: next state of the algorithm
    :return: candidate frequent itemsets
    """

    def apply_algorithm(index: int, transactions: Iterable[Transaction]) -> CandidateFrequentItemsets:
        name = f'partitions_{index}'
        if name not in old_state.value:
            buckets = [bucket for bucket in transactions]
            state = old_state.value
            state['threshold'] *= len(buckets) / old_state.value['n']
            state['save'] = False
            alg = algorithm(lambda: buckets, State(**old_state.value))
        else:
            state = old_state.value[name]
            state['lk'] = old_state.value['lk']
            state['save'] = False
            alg = algorithm(lambda: transactions, State(**state))

        state = next(alg)
        new_state.add({name: state.state})
        return {itemset: 1 for itemset in state['lk'].items()}

    # set is useful to lookup. RDD cannot be used inside mapPartitions
    return set(rdd.mapPartitionsWithIndex(apply_algorithm).map(lambda x: x[0]).collect())


@timer
def get_lk_v4(rdd: RDD, algorithm: Callable[[Any], Algorithm], state: State) -> FrequentItemsets:
    """
    Extract frequent itemsets checking the support.

    :param rdd: rdd which contains transactions
    :param algorithm: algorithm to extract frequent itemsets
    :param state: previous state of the algorithm
    :return:
    """
    threshold = state['threshold']
    size = state['k']
    force = state['force']

    path = get_path(RESULTS, f'apriori_{threshold}_{size}', 'csv', 'son', delete=force)

    if not is_empty(path):
        logging.info('Reading already extracted data')

        return read_frequent_itemsets(path)

    logging.info(f'Executing apriori algorithm with {size} items and {threshold} as threshold')

    old_state = get_spark().sparkContext.broadcast(state.state)
    new_state = get_spark().sparkContext.accumulator(state.state, DictParam())
    ck = get_spark().sparkContext.broadcast(get_ck_v4(rdd, algorithm, old_state, new_state))
    state.update(State(**new_state.value))

    def get_support(transactions: Iterable[Transaction]) -> Iterable[Tuple[Itemset, int]]:
        accumulator = defaultdict(int)
        for row in transactions:
            raw_actors = [int(actor[2:]) for actor in row[1].split('|')]
            for comb in combinations(sorted(raw_actors), size):
                if comb in ck.value:
                    accumulator[comb] += 1
        return accumulator.items()

    lk = dict(rdd.mapPartitions(get_support)
              .reduceByKey(lambda x, y: x + y)
              .filter(lambda x: x[1] >= threshold)
              .collect())
    logging.info(f'Found {len(lk)} frequent itemsets')

    if state['save']:
        save_frequent_itemsets(lk, path)

    return lk


def son_algorithm(rdd: RDD, algorithm: Callable[[Any], Algorithm], state: State) -> Algorithm:
    """
    Executing son algorithm starting from data and a given threshold.

    :param rdd: dataframe which contains transactions
    :param algorithm: algorithm to extract frequent itemsets
    :param state: state of the algorithm:
        - threshold: threshold for the algorithm
        - k: size of the itemsets
        - lk: frequent itemsets with size k-1
        - n: number of distinct buckets
        - force: to force recalculating frequent itemsets
    :return: dict of frequent itemsets
    """
    state = State(n=rdd.count(), chunks=rdd.getNumPartitions(), k=1, lk={}, force=False, save=SAVE) + state

    while state['k'] == 1 or state['lk']:
        state['lk'] = get_lk_v4(rdd, algorithm, state)
        state['k'] += 1

        yield state

In [30]:
data = read_csv_rdd(file).repartition(SON_CHUNKS).map(lambda row: (row[0], row[1])).persist()

algorithm_v4 = son_algorithm(data, apriori_algorithm_v2, State(threshold=APRIORI_THRESHOLD))

singleton_v4 = next(algorithm_v4)['lk']
doubleton_v4 = next(algorithm_v4)['lk']
triple_v4 = next(algorithm_v4)['lk']
quadruple_v4 = next(algorithm_v4)['lk']
quintuple_v4 = next(algorithm_v4)['lk']

[INFO 2021-07-20 11:20:34,509]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv with sep:,
[INFO 2021-07-20 11:20:38,270]:root: Executing apriori algorithm with 1 items and 30 as threshold


Execution time for get_ck_v4: 0:00:03.265958


[INFO 2021-07-20 11:20:49,163]:root: Found 7453 frequent itemsets
[INFO 2021-07-20 11:20:49,168]:root: Executing apriori algorithm with 2 items and 30 as threshold


Execution time for get_lk_v4: 0:00:10.896021
Execution time for get_ck_v4: 0:00:05.264171


[INFO 2021-07-20 11:20:59,536]:root: Found 373 frequent itemsets
[INFO 2021-07-20 11:20:59,539]:root: Executing apriori algorithm with 3 items and 30 as threshold


Execution time for get_lk_v4: 0:00:10.369836
Execution time for get_ck_v4: 0:00:05.339171


[INFO 2021-07-20 11:21:07,828]:root: Found 90 frequent itemsets
[INFO 2021-07-20 11:21:07,831]:root: Executing apriori algorithm with 4 items and 30 as threshold


Execution time for get_lk_v4: 0:00:08.292175
Execution time for get_ck_v4: 0:00:05.410620


[INFO 2021-07-20 11:21:16,093]:root: Found 34 frequent itemsets
[INFO 2021-07-20 11:21:16,096]:root: Executing apriori algorithm with 5 items and 30 as threshold


Execution time for get_lk_v4: 0:00:08.264625
Execution time for get_ck_v4: 0:00:04.952236


[INFO 2021-07-20 11:21:23,379]:root: Found 5 frequent itemsets


Execution time for get_lk_v4: 0:00:07.284737


In [31]:
if DUMP:
    names = read_csv_df(get_path(DATASET_PATH, 'name.basics.tsv.gz'), sep='\t')
    dump_frequent_itemsets_stats(create_temp_df(singleton_v4, names), 1)
    dump_frequent_itemsets_stats(create_temp_df(doubleton_v4, names), 2)
    dump_frequent_itemsets_stats(create_temp_df(triple_v4, names), 3)
    dump_frequent_itemsets_stats(create_temp_df(quadruple_v4, names), 4)
    dump_frequent_itemsets_stats(create_temp_df(quintuple_v4, names), 5)

[INFO 2021-07-20 11:21:23,397]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/name.basics.tsv.gz with header:True and sep:	
[INFO 2021-07-20 11:21:23,884]:root: Calculating frequent itemsets for dataframe


+-------+---------+------------------+
|support|   actor1|        actor1Name|
+-------+---------+------------------+
|    797|nm0103977|      Brahmanandam|
|    585|nm0006982|       Adoor Bhasi|
|    565|nm0648803|  Matsunosuke Onoe|
|    506|nm0305182|      Eddie Garcia|
|    436|nm0623427|        Prem Nazir|
|    411|nm0793813|      Sung-il Shin|
|    391|nm0246703|      Paquito Diaz|
|    387|nm0619107|  Masayoshi Nogami|
|    380|nm0007123|         Mammootty|
|    356|nm7390393|    Aachi Manorama|
|    348|nm0046850|           Bahadur|
|    344|nm0482320|          Mohanlal|
|    330|nm0149822|Mithun Chakraborty|
|    323|nm0304262|   Shivaji Ganesan|
|    315|nm0706691|       Sultan Rahi|
|    313|nm0619309|            Nagesh|
|    311|nm0007106|     Shakti Kapoor|
|    303|nm0001000|         Tom Byron|
|    303|nm0419653|       Jayabharati|
|    303|nm0659250|       Pandharibai|
+-------+---------+------------------+
only showing top 20 rows



[INFO 2021-07-20 11:22:15,555]:root: Calculating frequent itemsets for dataframe


+-----+-----------------+---+---+
|count|              avg|max|min|
+-----+-----------------+---+---+
| 7453|57.35167046826781|797| 30|
+-----+-----------------+---+---+

Execution time for dump_frequent_itemsets_stats: 0:00:51.514500
+-------+---------+--------------------+---------+--------------------+
|support|   actor1|          actor1Name|   actor2|          actor2Name|
+-------+---------+--------------------+---------+--------------------+
|    236|nm0006982|         Adoor Bhasi|nm0623427|          Prem Nazir|
|    169|nm0006982|         Adoor Bhasi|nm0046850|             Bahadur|
|    162|nm0006982|         Adoor Bhasi|nm0419653|         Jayabharati|
|    147|nm0648803|    Matsunosuke Onoe|nm2082516|        Kijaku Ôtani|
|    126|nm0648803|    Matsunosuke Onoe|nm2373718|    Kitsuraku Arashi|
|    122|nm0006982|         Adoor Bhasi|nm0619779|Thikkurisi Sukuma...|
|    113|nm2082516|        Kijaku Ôtani|nm2373718|    Kitsuraku Arashi|
|    113|nm0648803|    Matsunosuke Onoe|nm207

[INFO 2021-07-20 11:23:16,209]:root: Calculating frequent itemsets for dataframe


+-------+---------+-------------------+---------+--------------------+---------+------------------+
|support|   actor1|         actor1Name|   actor2|          actor2Name|   actor3|        actor3Name|
+-------+---------+-------------------+---------+--------------------+---------+------------------+
|    112|nm0648803|   Matsunosuke Onoe|nm2082516|        Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|    100|nm0648803|   Matsunosuke Onoe|nm2077739|   Suminojo Ichikawa|nm2082516|      Kijaku Ôtani|
|     95|nm0648803|   Matsunosuke Onoe|nm2077739|   Suminojo Ichikawa|nm2373718|  Kitsuraku Arashi|
|     87|nm2077739|  Suminojo Ichikawa|nm2082516|        Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     80|nm0648803|   Matsunosuke Onoe|nm1770187| Sen'nosuke Nakamura|nm2082516|      Kijaku Ôtani|
|     75|nm0006982|        Adoor Bhasi|nm0046850|             Bahadur|nm0419653|       Jayabharati|
|     74|nm0006982|        Adoor Bhasi|nm0619779|Thikkurisi Sukuma...|nm0623427|        Prem Nazir|


[INFO 2021-07-20 11:24:23,884]:root: Calculating frequent itemsets for dataframe


+-------+---------+-------------------+---------+-------------------+---------+-----------------+---------+------------------+
|support|   actor1|         actor1Name|   actor2|         actor2Name|   actor3|       actor3Name|   actor4|        actor4Name|
+-------+---------+-------------------+---------+-------------------+---------+-----------------+---------+------------------+
|     86|nm0648803|   Matsunosuke Onoe|nm2077739|  Suminojo Ichikawa|nm2082516|     Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     62|nm0648803|   Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2082516|     Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     54|nm0648803|   Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2077739|Suminojo Ichikawa|nm2082516|      Kijaku Ôtani|
|     51|nm1698868|    Enshô Jitsukawa|nm2366585|      Ritoku Arashi|nm2367854|     Shôzô Arashi|nm2384746|       Hôshô Bandô|
|     51|nm0648803|   Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2077739|Suminojo Ichikawa|nm2373718|  Ki

[INFO 2021-07-20 11:25:39,035]:root: Calculating frequent itemsets for dataframe


+-------+---------+----------------+---------+-------------------+---------+-----------------+---------+----------------+---------+------------------+
|support|   actor1|      actor1Name|   actor2|         actor2Name|   actor3|       actor3Name|   actor4|      actor4Name|   actor5|        actor5Name|
+-------+---------+----------------+---------+-------------------+---------+-----------------+---------+----------------+---------+------------------+
|     44|nm0648803|Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2077739|Suminojo Ichikawa|nm2082516|    Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     42|nm0648803|Matsunosuke Onoe|nm2077739|  Suminojo Ichikawa|nm2082516|     Kijaku Ôtani|nm2373151|  Chosei Kataoka|nm2373718|  Kitsuraku Arashi|
|     31|nm0648803|Matsunosuke Onoe|nm2077739|  Suminojo Ichikawa|nm2082516|     Kijaku Ôtani|nm2373718|Kitsuraku Arashi|nm2426685|Kakumatsuro Arashi|
|     30|nm0648803|Matsunosuke Onoe|nm2077739|  Suminojo Ichikawa|nm2082516|     Kijaku Ôtani|

## Toivonen

In [38]:
@timer
def get_ck_v5(transactions: Iterator[Transaction], k: int,
           monotonicity_filter: Callable[[Itemset], int]) -> CandidateFrequentItemsets:
    """
    Scan the sample and extract candidate frequent itemsets checking the monotonicity condition.

    :param transactions: iterator of transactions
    :param k: size of the itemset
    :param monotonicity_filter: filter for the monotonicity condition
    :return: candidate frequent itemsets
    """
    accumulator = defaultdict(int)
    for row in transactions:
        raw_actors = [int(actor[2:]) for actor in row[1].split('|')]
        for comb in combinations(sorted(raw_actors), k):
            if monotonicity_filter(comb):
                accumulator[comb] += 1
    return accumulator


@timer
def get_lk_v5(transactions: Iterator[Transaction], sample: List[Transaction], state: State) -> Optional[FrequentItemsets]:
    """
    Extract frequent itemsets checking the support.

    :param transactions: iterator of transactions
    :param sample: sample of all transactions
    :param state: state of the algorithm
    :return: frequent itemsets
    """
    n = state['n']
    size = state['k']
    old_lk = state['lk']
    threshold_adjust = state['threshold_adjust']
    threshold = threshold_adjust * len(sample) / n * state['threshold']
    force = state['force']

    path = get_path(RESULTS, f'apriori_{threshold}_{size}', 'csv', 'toivonen', delete=force)

    if not is_empty(path):
        logging.info('Reading already extracted data')

        return read_frequent_itemsets(path)

    logging.info(f'Executing apriori algorithm with {size} items and {threshold} as threshold')

    if size == 1:
        ck = get_ck_v5(sample, size, lambda _: True)
    else:
        ck = get_ck_v5(sample, size,
                    lambda itemset: all([item in old_lk for item in combinations(itemset, size - 1)]))

    lk = {item
          for item, support in ck.items()
          if support >= threshold}
    negative_border = {item
            for item, support in ck.items()
            if support < threshold}

    full_support = last_full_scan(transactions, size, lk, negative_border)
    real_lk = {item: support
          for item, support in full_support.items()
          if support >= state['threshold']}

    common = set(real_lk.keys()) & negative_border
    if common:
        logging.warning(f'Found {len(common)} frequent itemsets in the negative border.\n'
                        f'Example: {[(i, ck[i], full_support[i]) for i in list(common)[:10]]}')
        return None

    not_found = set(real_lk.keys()) - lk
    if not_found:
        logging.warning(f'Found {len(not_found)} frequent itemsets not found.\n'
                        f'Example: {not_found}')
        return None

    logging.info(f'Found {len(real_lk)} frequent itemsets')

    if state['save']:
        save_frequent_itemsets(real_lk, path)

    return real_lk


def sampling(df: DataFrame, k: int) -> DataFrame:
    """
    Get a random sample of the dataframe of size k.

    :param df: dataframe to be sampled
    :param k: size of the sample
    :return: sampled dataframe
    """
    n = df.count()
    df = (df
          .withColumn('fake', F.lit(0))
          .withColumn('row', F.row_number().over(Window.partitionBy('fake').orderBy('fake')))
          .drop('fake')
          )
    sample = get_spark().createDataFrame(random.choices(range(1, n + 1), k=k), T.IntegerType())
    return df.join(sample, on=df.row == sample.value).drop('row', 'value')


def last_full_scan(transactions: Iterator[Transaction], k: int,
                   lk: Set[Itemset], negative_border: Set[Itemset]) -> FrequentItemsets:
    """
    Scan the file and extract candidate frequent itemsets.

    :param transactions: iterator of transactions
    :param k: size of the itemset
    :param lk: candidate frequent itemset
    :param negative_border: infrequent itemset
    :return: candidate frequent itemsets
    """
    accumulator = defaultdict(int)
    for row in transactions:
        raw_actors = [int(actor[2:]) for actor in row[1].split('|')]
        for comb in combinations(sorted(raw_actors), k):
            if comb in lk or comb in negative_border:
                accumulator[comb] += 1
    return accumulator


def toivonen_algorithm(transactions: Callable[[], Iterator[Transaction]], 
                       sample: Iterator[Transaction],  state: State) -> Algorithm:
    """
    Executing apriori algorithm starting from data and a given threshold.

    :param transactions: callable to an iterator of transactions
    :param state: state of the algorithm:
        - threshold: threshold for the apriori algorithm
        - k: size of the itemsets
        - lk: frequent itemsets with size k-1
        - force: to force recalculating frequent itemsets
    :return: dict of frequent itemsets
    """
    state = State(k=1, lk={}, force=False, save=SAVE) + state

    while state['k'] == 1 or state['lk']:
        state['lk'] = get_lk_v5(transactions(), sample, state)
        if state['lk'] is None:
            return state
        state['k'] += 1

        yield state

In [33]:
df1 = read_csv_df(file, header=False)
n = df1.count()

[INFO 2021-07-20 11:27:04,378]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv with header:False and sep:,


In [39]:
for i in range(TOIVONEN_MAX_ITERATIONS):
    print(f'\tIteration number {i}')
    sample = [(row['_c0'], row['_c1']) for row in sampling(df1, TOIVONEN_SIZE_SAMPLE).collect()]
    algorithm_v5 = toivonen_algorithm(lambda: read_csvfile(file), sample,
                                    State(threshold_adjust=TOIVONEN_THRESHOLD_ADJUST,
                                          threshold=APRIORI_THRESHOLD, n=n))
    try:
        singleton_v5 = next(algorithm_v5)['lk']
        doubleton_v5 = next(algorithm_v5)['lk']
        triple_v5 = next(algorithm_v5)['lk']
        quadruple_v5 = next(algorithm_v5)['lk']
        quintuple_v5 = next(algorithm_v5)['lk']
        print('Toivonen completed')
        break
    except StopIteration:
      pass

	Iteration number 0


[INFO 2021-07-20 11:32:24,164]:root: Executing apriori algorithm with 1 items and 3.047549389347291 as threshold
[INFO 2021-07-20 11:32:24,486]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v5: 0:00:00.279458


Example: [((687958,), 3, 38), ((1069512,), 3, 32), ((131209,), 2, 45), ((223591,), 3, 30), ((284608,), 3, 34), ((719749,), 3, 51), ((888978,), 3, 36), ((661293,), 2, 32), ((7545,), 3, 34), ((176039,), 2, 31)]


Execution time for get_lk_v5: 0:00:03.290164
	Iteration number 1


[INFO 2021-07-20 11:32:32,620]:root: Executing apriori algorithm with 1 items and 3.047549389347291 as threshold
[INFO 2021-07-20 11:32:32,953]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v5: 0:00:00.287669


Example: [((687958,), 1, 38), ((1186,), 3, 43), ((1069512,), 1, 32), ((131209,), 3, 45), ((137638,), 3, 39), ((7217,), 2, 42), ((661293,), 2, 32), ((720979,), 2, 34), ((1774,), 3, 38), ((7545,), 3, 34)]


Execution time for get_lk_v5: 0:00:03.355296
	Iteration number 2


[INFO 2021-07-20 11:32:40,335]:root: Executing apriori algorithm with 1 items and 3.047549389347291 as threshold
[INFO 2021-07-20 11:32:40,652]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v5: 0:00:00.272786


Example: [((3132784,), 2, 51), ((36208,), 3, 34), ((284608,), 1, 34), ((820865,), 3, 46), ((1259779,), 2, 36), ((661293,), 3, 32), ((1774,), 3, 38), ((387340,), 3, 31), ((176039,), 3, 31), ((748430,), 3, 31)]


Execution time for get_lk_v5: 0:00:03.296022
	Iteration number 3


[INFO 2021-07-20 11:32:48,243]:root: Executing apriori algorithm with 1 items and 3.047549389347291 as threshold
[INFO 2021-07-20 11:32:48,558]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v5: 0:00:00.268261


Example: [((1227581,), 1, 37), ((135624,), 3, 35), ((1069512,), 3, 32), ((36208,), 3, 34), ((223591,), 2, 30), ((705384,), 1, 39), ((661293,), 2, 32), ((720979,), 1, 34), ((1774,), 3, 38), ((248890,), 3, 44)]


Execution time for get_lk_v5: 0:00:03.289923
	Iteration number 4


[INFO 2021-07-20 11:32:56,289]:root: Executing apriori algorithm with 1 items and 3.047549389347291 as threshold
[INFO 2021-07-20 11:32:56,600]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v5: 0:00:00.271795


Example: [((279072,), 3, 55), ((284608,), 3, 34), ((595115,), 3, 58), ((888978,), 3, 36), ((1774,), 3, 38), ((725,), 2, 30), ((631102,), 1, 43), ((1993,), 3, 47), ((1962458,), 2, 31), ((837430,), 3, 40)]


Execution time for get_lk_v5: 0:00:03.253238


In [40]:
for i in range(TOIVONEN_MAX_ITERATIONS):
    print(f'\tIteration number {i}')
    sample = [(row['_c0'], row['_c1']) for row in sampling(df1, 200000).collect()]
    algorithm_v5 = toivonen_algorithm(lambda: read_csvfile(file), sample,
                                    State(threshold_adjust=0.25,
                                          threshold=APRIORI_THRESHOLD, n=n))
    try:
        singleton_v5 = next(algorithm_v5)['lk']
        doubleton_v5 = next(algorithm_v5)['lk']
        triple_v5 = next(algorithm_v5)['lk']
        quadruple_v5 = next(algorithm_v5)['lk']
        quintuple_v5 = next(algorithm_v5)['lk']
        print('Toivonen completed')
        break
    except StopIteration:
      pass

	Iteration number 0


[INFO 2021-07-20 11:33:06,862]:root: Executing apriori algorithm with 1 items and 3.8094367366841144 as threshold
[INFO 2021-07-20 11:33:08,088]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v5: 0:00:01.113258


[INFO 2021-07-20 11:33:11,621]:root: Found 7453 frequent itemsets
[INFO 2021-07-20 11:33:11,658]:root: Executing apriori algorithm with 2 items and 3.8094367366841144 as threshold


Execution time for get_lk_v5: 0:00:04.793771


[INFO 2021-07-20 11:33:13,987]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v5: 0:00:02.285314


[INFO 2021-07-20 11:33:16,954]:root: Found 373 frequent itemsets
[INFO 2021-07-20 11:33:16,976]:root: Executing apriori algorithm with 3 items and 3.8094367366841144 as threshold


Execution time for get_lk_v5: 0:00:05.317190


[INFO 2021-07-20 11:33:19,517]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v5: 0:00:02.539058


[INFO 2021-07-20 11:33:21,646]:root: Found 90 frequent itemsets
[INFO 2021-07-20 11:33:21,649]:root: Executing apriori algorithm with 4 items and 3.8094367366841144 as threshold


Execution time for get_lk_v5: 0:00:04.672669


[INFO 2021-07-20 11:33:24,284]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v5: 0:00:02.631854


[INFO 2021-07-20 11:33:26,266]:root: Found 34 frequent itemsets
[INFO 2021-07-20 11:33:26,268]:root: Executing apriori algorithm with 5 items and 3.8094367366841144 as threshold


Execution time for get_lk_v5: 0:00:04.618827


[INFO 2021-07-20 11:33:28,682]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/raw/csv/part-00000-52e9e128-7801-425a-aba9-349daecf1e8d-c000.csv


Execution time for get_ck_v5: 0:00:02.410882


[INFO 2021-07-20 11:33:30,814]:root: Found 5 frequent itemsets


Execution time for get_lk_v5: 0:00:04.548032
Toivonen completed


In [41]:
if DUMP:
    names = read_csv_df(get_path(DATASET_PATH, 'name.basics.tsv.gz'), sep='\t')
    dump_frequent_itemsets_stats(create_temp_df(singleton_v5, names), 1)
    dump_frequent_itemsets_stats(create_temp_df(doubleton_v5, names), 2)
    dump_frequent_itemsets_stats(create_temp_df(triple_v5, names), 3)
    dump_frequent_itemsets_stats(create_temp_df(quadruple_v5, names), 4)
    dump_frequent_itemsets_stats(create_temp_df(quintuple_v5, names), 5)

[INFO 2021-07-20 11:33:30,833]:root: Reading csv path /content/datasets/ashirwadsangwan/imdb-dataset/name.basics.tsv.gz with header:True and sep:	
[INFO 2021-07-20 11:33:31,533]:root: Calculating frequent itemsets for dataframe


+-------+---------+------------------+
|support|   actor1|        actor1Name|
+-------+---------+------------------+
|    797|nm0103977|      Brahmanandam|
|    585|nm0006982|       Adoor Bhasi|
|    565|nm0648803|  Matsunosuke Onoe|
|    506|nm0305182|      Eddie Garcia|
|    436|nm0623427|        Prem Nazir|
|    411|nm0793813|      Sung-il Shin|
|    391|nm0246703|      Paquito Diaz|
|    387|nm0619107|  Masayoshi Nogami|
|    380|nm0007123|         Mammootty|
|    356|nm7390393|    Aachi Manorama|
|    348|nm0046850|           Bahadur|
|    344|nm0482320|          Mohanlal|
|    330|nm0149822|Mithun Chakraborty|
|    323|nm0304262|   Shivaji Ganesan|
|    315|nm0706691|       Sultan Rahi|
|    313|nm0619309|            Nagesh|
|    311|nm0007106|     Shakti Kapoor|
|    303|nm0001000|         Tom Byron|
|    303|nm0419653|       Jayabharati|
|    303|nm0659250|       Pandharibai|
+-------+---------+------------------+
only showing top 20 rows



[INFO 2021-07-20 11:34:26,825]:root: Calculating frequent itemsets for dataframe


+-----+-----------------+---+---+
|count|              avg|max|min|
+-----+-----------------+---+---+
| 7453|57.35167046826781|797| 30|
+-----+-----------------+---+---+

Execution time for dump_frequent_itemsets_stats: 0:00:55.114554
+-------+---------+--------------------+---------+--------------------+
|support|   actor1|          actor1Name|   actor2|          actor2Name|
+-------+---------+--------------------+---------+--------------------+
|    236|nm0006982|         Adoor Bhasi|nm0623427|          Prem Nazir|
|    169|nm0006982|         Adoor Bhasi|nm0046850|             Bahadur|
|    162|nm0006982|         Adoor Bhasi|nm0419653|         Jayabharati|
|    147|nm0648803|    Matsunosuke Onoe|nm2082516|        Kijaku Ôtani|
|    126|nm0648803|    Matsunosuke Onoe|nm2373718|    Kitsuraku Arashi|
|    122|nm0006982|         Adoor Bhasi|nm0619779|Thikkurisi Sukuma...|
|    113|nm2082516|        Kijaku Ôtani|nm2373718|    Kitsuraku Arashi|
|    113|nm0648803|    Matsunosuke Onoe|nm207

[INFO 2021-07-20 11:35:32,117]:root: Calculating frequent itemsets for dataframe


+-------+---------+-------------------+---------+--------------------+---------+------------------+
|support|   actor1|         actor1Name|   actor2|          actor2Name|   actor3|        actor3Name|
+-------+---------+-------------------+---------+--------------------+---------+------------------+
|    112|nm0648803|   Matsunosuke Onoe|nm2082516|        Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|    100|nm0648803|   Matsunosuke Onoe|nm2077739|   Suminojo Ichikawa|nm2082516|      Kijaku Ôtani|
|     95|nm0648803|   Matsunosuke Onoe|nm2077739|   Suminojo Ichikawa|nm2373718|  Kitsuraku Arashi|
|     87|nm2077739|  Suminojo Ichikawa|nm2082516|        Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     80|nm0648803|   Matsunosuke Onoe|nm1770187| Sen'nosuke Nakamura|nm2082516|      Kijaku Ôtani|
|     75|nm0006982|        Adoor Bhasi|nm0046850|             Bahadur|nm0419653|       Jayabharati|
|     74|nm0006982|        Adoor Bhasi|nm0619779|Thikkurisi Sukuma...|nm0623427|        Prem Nazir|


[INFO 2021-07-20 11:36:44,867]:root: Calculating frequent itemsets for dataframe


+-------+---------+-------------------+---------+-------------------+---------+-----------------+---------+------------------+
|support|   actor1|         actor1Name|   actor2|         actor2Name|   actor3|       actor3Name|   actor4|        actor4Name|
+-------+---------+-------------------+---------+-------------------+---------+-----------------+---------+------------------+
|     86|nm0648803|   Matsunosuke Onoe|nm2077739|  Suminojo Ichikawa|nm2082516|     Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     62|nm0648803|   Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2082516|     Kijaku Ôtani|nm2373718|  Kitsuraku Arashi|
|     54|nm0648803|   Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2077739|Suminojo Ichikawa|nm2082516|      Kijaku Ôtani|
|     51|nm1698868|    Enshô Jitsukawa|nm2366585|      Ritoku Arashi|nm2367854|     Shôzô Arashi|nm2384746|       Hôshô Bandô|
|     51|nm0648803|   Matsunosuke Onoe|nm1770187|Sen'nosuke Nakamura|nm2077739|Suminojo Ichikawa|nm2373718|  Ki

[INFO 2021-07-20 11:38:06,264]:root: Calculating frequent itemsets for dataframe


+-----+----+---+---+
|count| avg|max|min|
+-----+----+---+---+
|    5|35.4| 44| 30|
+-----+----+---+---+

Execution time for dump_frequent_itemsets_stats: 0:01:30.737729
