In [36]:
import numpy as np
import pandas as pd
import pyspark
import re
import matplotlib.pyplot as plt

from pyspark import SparkContext
from pyspark.sql import SparkSession, Row, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import StructField, StructType, BooleanType, StringType, DoubleType, IntegerType, ArrayType

from stpm.visualization import plot_pattern
from stpm.statistics import sum_delta_distrib, compute_mean_delta, count_delta_diff_from_zero, count_n_elements
from collections import defaultdict

from typing import List, Union, Dict, Optional, Tuple

In [None]:
!zip -r stpm.zip stpm

In [38]:
sc.addPyFile('stpm.zip')

In [40]:
def split(s: str) -> List[List[str]]:
    assert isinstance(s, str)
    res = []
    stack = []
    accumulator = []
    str_acc = ''
    for c in s:
        if c == '[':
            stack.append(c)
        elif c == ']':
            stack.pop()
            if len(stack) > 0:
                if len(str_acc) > 0:
                    accumulator.append(str_acc)
                    str_acc = ''
                res.append(accumulator)
                accumulator = []
        elif c == ',':
            if len(str_acc) > 0:
                accumulator.append(str_acc)
                str_acc = ''
        elif c == ' ':
            continue
        else:
            str_acc += c
            
    return res

In [44]:
basepath = 'datasets/SFBayAreaBikeSharing/'
mode = 'radius'
subfolder = f'patterns_{mode}_100m_DEF/'
files = [
    f'global_{mode}.csv',
    f'mv_{mode}.csv',
    f'pa_{mode}.csv',
    f'rc_{mode}.csv',
    f'sf_{mode}.csv',
    f'sj_{mode}.csv'
]

fullpaths = [basepath + subfolder + f for f in files]

In [45]:
splitter = spark.udf.register('seqparser', split, ArrayType(ArrayType(StringType())))
filter_center = spark.udf.register('filter_center', lambda it: any('S0_T0' in x for x in it[0]), 'boolean')

def load_files(paths: List[str], filter_data: bool=True) -> List[DataFrame]:
    res = []
    schema = StructType([
        StructField('freq', IntegerType()),
        StructField('relFreq', DoubleType()),
        StructField('confidence', DoubleType()),
        StructField('sequence', StringType())
    ])
    for f in paths:
        df = spark.read.csv(f, header=None, inferSchema=False, schema=schema) \
                        .withColumn('sequence', splitter(F.col('sequence')))
        if filter_data:
            df = df.filter('filter_center(sequence)')
        res.append(df)
    return res

In [46]:
dfs = load_files(fullpaths, filter_data=True)

In [24]:
def generate_nrules_table(file_list: List[str], df_list: List[DataFrame]) -> pd.DataFrame:
    res_dict = {'filename': [], 'count':[]}
    for f, df in zip(file_list, df_list):
        c = df.count()
        print(f'{f} - Number of rules: {c}')
        res_dict['filename'].append(f)
        res_dict['count'].append(c)
        
    return pd.DataFrame(res_dict)

def compute_statistics(file_list: List[str], df_list: List[DataFrame]) -> pd.DataFrame:
    res_dict = defaultdict(list)
    for f, df in zip(file_list, df_list):
        df = df.cache()
        mean_n_elements = df.rdd.map(lambda it: count_n_elements(it['sequence'])).mean()
        mean_spatial_delta = compute_mean_delta(df, delta_type='spatial')
        mean_temporal_delta = compute_mean_delta(df, delta_type='temporal')
        npatterns_spat_delta_gt0 = df.rdd.filter(lambda it: count_delta_diff_from_zero(it['sequence'], delta_type='spatial') > 0).count()
        npatterns_temp_delta_gt0 = df.rdd.filter(lambda it: count_delta_diff_from_zero(it['sequence'], delta_type='temporal') > 0).count()
        npatterns_both_delta_gt0 = df.rdd.filter(lambda it: count_delta_diff_from_zero(it['sequence'], delta_type='both') > 0).count()
        df = df.unpersist()
        
        res_dict['files'].append(f)
        res_dict['tot_patterns'].append(df.count())
        res_dict['mean_num_el'].append(mean_n_elements)
        res_dict['mean_spat_delta'].append(mean_spatial_delta)
        res_dict['mean_temp_delta'].append(mean_temporal_delta)
        res_dict['patterns_spat_delta_gt0'].append(npatterns_spat_delta_gt0)
        res_dict['patterns_temp_delta_gt0'].append(npatterns_temp_delta_gt0)
        res_dict['patterns_both_delta_gt0'].append(npatterns_both_delta_gt0)
        
        print('#' * 50)
        print(f)
        print(f'total patterns: {df.count()}')
        print(f'mean number of elements: {mean_n_elements}')
        print(f'mean spatial delta: {mean_spatial_delta}')
        print(f'mean temporal delta: {mean_temporal_delta}')
        print(f'number of patterns with at least one element with spatial delta > 0: {npatterns_spat_delta_gt0} ({npatterns_spat_delta_gt0/df.count()})')
        print(f'number of patterns with at least one element with temporal delta > 0: {npatterns_temp_delta_gt0} ({npatterns_temp_delta_gt0/df.count()})')
        print(f'number of patterns with at least one element with spatial AND temporal delta > 0: {npatterns_both_delta_gt0} ({npatterns_both_delta_gt0/df.count()})')
        
    return pd.DataFrame(dict(res_dict))

In [None]:
nrules_table = generate_nrules_table(files, dfs)

In [None]:
compute_statistics(files, dfs)

In [None]:
def compute_compression_df(relative_radius,
                           absolute_radius,
                           relative_incoming,
                           absolute_incoming,
                           file_list,
                           mapper_dict=None) -> pd.DataFrame:
    if mapper_dict is None:
        mapper_dict = {x: x for x in file_list}
    relative_radius_dfs = load_files(relative_radius, filter_data=True)
    absolute_radius_dfs = load_files(absolute_radius, filter_data=False)
    relative_incoming_dfs = load_files(relative_incoming, filter_data=True)
    absolute_incoming_dfs = load_files(absolute_incoming, filter_data=False)

    res_dict = {
        'city': [],
        'relative_radius': [],
        'absolute_radius': [],
        'radius_compression': [],
        'radius_rules_saving': [],
        'relative_incoming': [],
        'absolute_incoming': [],
        'incoming_compression': [],
        'incoming_rules_saving': []
    }
    for f, rdf_radius, adf_radius, rdf_inc, adf_inc  in zip(files,
                                                             relative_radius_dfs,
                                                             absolute_radius_dfs,
                                                             relative_incoming_dfs,
                                                             absolute_incoming_dfs):
        rcount_radius = rdf_radius.count()
        acount_radius = adf_radius.count()
        if acount_radius == 0:
            print(f'found empty dataframe in absolute radius mode for file {f}')
            saving_radius = -1
        else:
            saving_radius = 1 - rcount_radius / acount_radius
        if rcount_radius == 0:
            print(f'found empty dataframe in relative radius mode for file {f}')
            compression_radius = -1
        else:
            compression_radius = acount_radius / rcount_radius
        
        
        rcount_inc = rdf_inc.count()
        acount_inc = adf_inc.count()
        if acount_inc == 0:
            print(f'found empty dataframe in absolute incoming mode for file {f}')
            saving_inc = -1
        else:
            saving_inc = 1 - rcount_inc / acount_inc
        if rcount_inc == 0:
            print(f'found empty dataframe in relative incoming mode for file {f}')
            compression_inc = -1
        else:
            compression_inc = acount_inc / rcount_inc
        
        res_dict['city'].append(mapper_dict[f])
        res_dict['relative_radius'].append(rcount_radius)
        res_dict['absolute_radius'].append(acount_radius)
        res_dict['radius_compression'].append(compression_radius)
        res_dict['radius_rules_saving'].append(saving_radius)
        res_dict['relative_incoming'].append(rcount_inc)
        res_dict['absolute_incoming'].append(acount_inc)
        res_dict['incoming_compression'].append(compression_inc)
        res_dict['incoming_rules_saving'].append(saving_inc)
        
    return pd.DataFrame(res_dict)

In [None]:
###### compute compression rate for the 100m case by comparing extracted projected patterns
###### and extracted absolute patterns
basepath = 'datasets/SFBayAreaBikeSharing/'

relative_radius_path = basepath + 'patterns_radius_100m_DEF/'
absolute_radius_path = basepath + 'patterns_radius_absolute_100m_DEF/'

relative_incoming_path = basepath + 'patterns_incoming_100m_future_DEF/'
absolute_incoming_path = basepath + 'patterns_incoming_absolute_100m_DEF/'

files = [
    'pa_{}.csv',
    'mv_{}.csv',
    'sj_{}.csv',
    'rc_{}.csv'
]

radius_files = [x.replace('{}', 'radius') for x in files]
incoming_files = [x.replace('{}', 'incoming') for x in files]

relative_radius_fullpaths = [relative_radius_path + f for f in radius_files]
absolute_radius_fullpaths = [absolute_radius_path + f for f in radius_files]
relative_incoming_fullpaths = [relative_incoming_path + f for f in incoming_files]
absolute_incoming_fullpaths = [absolute_incoming_path + f for f in incoming_files]

mapper_dict = {
    'pa_{}.csv': 'Palo Alto',
    'mv_{}.csv': 'Mountain View',
    'sj_{}.csv': 'San Jose',
    'rc_{}.csv': 'Redwood City'
}

compression_df = compute_compression_df(relative_radius_fullpaths,
                                        absolute_radius_fullpaths,
                                        relative_incoming_fullpaths,
                                        absolute_incoming_fullpaths,
                                        files,
                                        mapper_dict)
compression_df