In [1]:
%cd ../../

/home/PJWSTK/s14028/engineer/Practical


In [2]:
from typing import List, Dict, Tuple, Iterable

import os

import numpy as np
import pandas as pd
import keras
import scipy.io as mat

from common import *
from augmentation import add_pmap
from testing import mse
from testing import mae

from CNN.CNN_v9 import CNN_v9

Using TensorFlow backend.


In [3]:
perspective = mat.loadmat("mall_dataset/perspective_roi.mat")["pMapN"]

perspective /= np.min(perspective)
perspective = np.round(perspective).astype(np.uint8)

train, test = data_sets()
image_tensors = train[0], test[0]
person_coo_tensors = train[1], test[1]
count_matrix = train[2], test[2]

image_train, image_test = image_tensors
person_coo_train, person_coo_test = person_coo_tensors
count_train, count_test = count_matrix
count_train = count_train.astype(np.uint16)
count_test = count_test.astype(np.uint16)

image_train = add_pmap(image_train, perspective)
image_test = add_pmap(image_test, perspective)

In [4]:
cnn = CNN_v9((480, 640, 4), split_into_parts=20)

images = cnn._prepare_images(image_test)
anwsers = cnn._prepare_anwsers(person_coo_test)

In [5]:
working_directory = os.path.join("CNN", "CNN_v9")
weights_prefix = os.path.join(working_directory, "weights")

model_names = [
    "cnn_v9_1e_6_18",
    "cnn_v9_1e_6_99"
]

model_paths = [os.path.join(weights_prefix, name) for name in model_names]

In [6]:
models = [CNN_v9((480, 640, 4), split_into_parts=20) for i in range(len(model_names))]
models = list(zip(model_paths, models))

for model_name, model in models:
    model.def_model()
  
    model.model.fit(images[:1], anwsers[:1])
    model.model = keras.utils.multi_gpu_model(model.model, gpus=2, cpu_merge=False)
    model.model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
    
    model.model.load_weights(model_name)

Epoch 1/1
Epoch 1/1


In [7]:
INCLUSIVE = 1

def get_all_possible_cut_of_points() -> np.ndarray:
    possible_cut_of_points = np.linspace(0, 1, 11)
    return possible_cut_of_points

def get_all_possible_lower_and_upper_cut_off_points(cut_off_points: np.ndarray) -> Iterable[Tuple[int, int]]:
    return ((cut_off_points[lower], cut_off_points[upper]) for lower, upper in get_lower_and_upper_cut_off_points_indices(cut_off_points))

def get_lower_and_upper_cut_off_points_indices(cut_off_points: np.ndarray) -> Iterable[Tuple[int, int]]:
    for upper in range(len(cut_off_points)):
        for lower in range(upper + INCLUSIVE):
            yield (lower, upper)

cut_of_points: np.ndarray = get_all_possible_cut_of_points()
lower_and_upper_cut_off_points: Iterable[Tuple[int, int]] = get_all_possible_lower_and_upper_cut_off_points(cut_of_points)

lower_and_upper_cut_off_points = list(lower_and_upper_cut_off_points)

In [8]:
WITHOUT_FIRST = 1
WITHOUT_LAST = -1
INCLUSIVE = 1

def get_all_possible_cut_of_points() -> np.ndarray:
    possible_cut_of_points_with_first_and_last = np.linspace(0, 1, 11)
    possible_cut_of_points = possible_cut_of_points_with_first_and_last[WITHOUT_FIRST:WITHOUT_LAST]
    return possible_cut_of_points

def get_all_possible_lower_and_upper_cut_off_points(cut_off_points: np.ndarray) -> Iterable[Tuple[int, int]]:
    return ((cut_off_points[lower], cut_off_points[upper]) for lower, upper in get_lower_and_upper_cut_off_points_indices(cut_off_points))

def get_lower_and_upper_cut_off_points_indices(cut_off_points: np.ndarray) -> Iterable[Tuple[int, int]]:
    for upper in range(len(cut_off_points)):
        for lower in range(upper + INCLUSIVE):
            yield (lower, upper)

cut_of_points: np.ndarray = get_all_possible_cut_of_points()
lower_and_upper_cut_off_points: Iterable[Tuple[int, int]] = get_all_possible_lower_and_upper_cut_off_points(cut_of_points)

lower_and_upper_cut_off_points = list(lower_and_upper_cut_off_points)

In [9]:
LOWER_CATEGORY_FILLER = 0
MIDDLE_CATEGORY_FILLER = 1 / 2
UPPER_CATEGORY_FILLER = 1

def count_crowd(probabilities_for_each_image: np.ndarray, cut_off_points: List[Tuple[int, int]]) -> Iterable[np.ndarray]:
    for lower, upper in cut_off_points:
        yield count_crowd_with_lower_and_upper_cut_off_points(probabilities_for_each_image, lower, upper)

def count_crowd_with_lower_and_upper_cut_off_points(probabilities_for_each_image: np.ndarray, lower: float, upper: float) -> np.ndarray:    
    crowd_counts: np.ndarray = get_crowd_count_matrix_for_lower_and_upper_cut_off_points(probabilities_for_each_image, lower, upper)
    crowd_counts = np.sum(crowd_counts, axis=1)
    
    return crowd_counts

def get_crowd_count_matrix_for_lower_and_upper_cut_off_points(probabilities_for_each_image: np.ndarray, lower: float, upper: float) -> np.ndarray:
    crowd_counts: np.ndarray = create_empty_crowd_counts(probabilities_for_each_image)
    
    middle_and_upper_category_indices = probabilities_for_each_image > lower
    upper_category_indices = probabilities_for_each_image > upper
    
    crowd_counts[middle_and_upper_category_indices] = MIDDLE_CATEGORY_FILLER
    crowd_counts[upper_category_indices] = UPPER_CATEGORY_FILLER
    
    return crowd_counts

def create_empty_crowd_counts(probabilities_for_each_image: np.ndarray) -> np.ndarray:
    return np.full_like(probabilities_for_each_image, LOWER_CATEGORY_FILLER, dtype=np.float)

In [10]:
def calculate_statistics(difference: np.ndarray) -> Dict[str, float]:
    statistics = {
        "accuracy": calculate_accuracy(difference),
        "mse": mse(difference),
        "mae": mae(difference)
    }
    
    return statistics

def calculate_accuracy(difference: np.ndarray) -> float:
    return np.mean(calculate_accuracy_core(difference)) * 100

def calculate_accuracy_core(difference: np.ndarray) -> float:
    return np.clip(1 - (np.abs(difference) / count_test), 0, 1)

cut_off_points = lower_and_upper_cut_off_points

crowd_counts_probabilities: List[np.ndarray] = [model.predict_proba(image_test) for _, model in models]
crowd_counts_generators: Iterable[Iterable[np.ndarray]] = (count_crowd(probabilities, cut_off_points) for probabilities in crowd_counts_probabilities)

counts_differences_generators: Iterable[Iterable[np.ndarray]] = ((crowd_counts - count_test for crowd_counts in generator) for generator in crowd_counts_generators)
models_statistics: List[List[Dict[str, float]]] = [[calculate_statistics(difference) for difference in generator] for generator in counts_differences_generators]

In [11]:
def groupby_keys(statistics: List[Dict[str, float]]) -> Dict[str, List[float]]:
    grouped_statistics: Dict[str, List[float]] = create_empty_groups(statistics)
    
    for substatistics in statistics:
        grouped_statistics: Dict[str, List[float]] = append_substatistics_to_groups(substatistics, grouped_statistics)
    
    return grouped_statistics

def create_empty_groups(statistics: List[Dict[str, float]]) -> Dict[str, List[float]]:
    any_statistics, *_ = statistics
    empty_groups = {key: [] for key in any_statistics}
    return empty_groups

def append_substatistics_to_groups(substatistics: Dict[str, float], groups: Dict[str, List[float]]) -> Dict[str, List[float]]:
    for key, value in substatistics.items():
        groups[key].append(value)
    
    return groups

models_statistics: List[Dict[str, List[float]]] = [groupby_keys(statistics) for statistics in models_statistics]

In [12]:
def concatenate(left_frame: pd.DataFrame, right_frame: pd.DataFrame) -> pd.DataFrame:
    return pd.concat([left_frame, right_frame], axis=1, sort=False)

models_statistics_frames = [pd.DataFrame(statistics) for statistics in models_statistics]
cut_off_points_frame = pd.DataFrame(cut_off_points, columns=["lower", "upper"])

models_statistics_with_cut_off_points_frames = [concatenate(frame, cut_off_points_frame) for frame in models_statistics_frames]

In [13]:
def make_directory(path: str):
    try:
        os.mkdir(path)
    except FileExistsError:
        pass

statistics_path = os.path.join(working_directory, "statistics")
make_directory(statistics_path)

In [14]:
for model_name, statistics in zip(model_names, models_statistics_with_cut_off_points_frames):
    current_statistics_path = os.path.join(statistics_path, f"{model_name}.csv")
    statistics.to_csv(current_statistics_path)

In [15]:
for model_name, statistics in zip(model_names, models_statistics_frames):
    statistics_description = statistics.describe()
    statistics_description_path = os.path.join(statistics_path, f"{model_name}_description.csv")
    statistics_description.to_csv(statistics_description_path)

In [16]:
highest_accuracy_statistics = [frame.loc[frame.loc[:, "accuracy"].idxmax()] for frame in models_statistics_with_cut_off_points_frames]
highest_accuracy_statistics = [pd.Series(statistics, name=name) for name, statistics in zip(model_names, highest_accuracy_statistics)]
highest_accuracy_statistics = pd.concat(highest_accuracy_statistics, axis=1)

lowest_mae_statistics = [frame.loc[frame.loc[:, "mae"].idxmin()] for frame in models_statistics_with_cut_off_points_frames]
lowest_mae_statistics = [pd.Series(statistics, name=name) for name, statistics in zip(model_names, lowest_mae_statistics)]
lowest_mae_statistics = pd.concat(lowest_mae_statistics, axis=1)

lowest_mse_statistics = [frame.loc[frame.loc[:, "mse"].idxmin()] for frame in models_statistics_with_cut_off_points_frames]
lowest_mse_statistics = [pd.Series(statistics, name=name) for name, statistics in zip(model_names, lowest_mse_statistics)]
lowest_mse_statistics = pd.concat(lowest_mse_statistics, axis=1)

In [17]:
highest_accuracy_statistics_path = os.path.join(statistics_path, f"highest_accuracy_statistics.csv")
highest_accuracy_statistics.to_csv(highest_accuracy_statistics_path)

lowest_mae_statistics_path = os.path.join(statistics_path, f"lowest_mae_statistics.csv")
lowest_mae_statistics.to_csv(lowest_mae_statistics_path)

lowest_mse_statistics_path = os.path.join(statistics_path, f"lowest_mse_statistics.csv")
lowest_mse_statistics.to_csv(lowest_mse_statistics_path)

### Crowd count without cut-off-point

In [18]:
def count_crowd(probabilities_for_each_image: np.ndarray) -> Iterable[np.ndarray]:
    crowd_counts = np.sum(probabilities_for_each_image, axis=1)
    crowd_counts = np.around(crowd_counts)
    yield crowd_counts

In [19]:
crowd_counts_generators: Iterable[Iterable[np.ndarray]] = (count_crowd(probabilities) for probabilities in crowd_counts_probabilities)

counts_differences_generators: Iterable[Iterable[np.ndarray]] = ((crowd_counts - count_test for crowd_counts in generator) for generator in crowd_counts_generators)
models_statistics: List[List[Dict[str, float]]] = [[calculate_statistics(difference) for difference in generator] for generator in counts_differences_generators]

In [20]:
FIRST = 0

models_statistics: List[Dict[str, List[float]]] = [groupby_keys(statistics) for statistics in models_statistics]

flatten_models_statistics: List[Dict[str, float]] = [{key: value[FIRST] for key, value in statistics.items()} for statistics in models_statistics]
flatten_models_statistics = groupby_keys(flatten_models_statistics)

models_statistics_frames = pd.DataFrame(flatten_models_statistics, index=model_names)
models_statistics_frames = models_statistics_frames.T

In [21]:
models_statistics_path = os.path.join(statistics_path, f"identity_filler.csv")
models_statistics_frames.to_csv(models_statistics_path)

### Crowd count with linear filler between double cut-off-points

In [22]:
LOWER_CATEGORY_FILLER = 0
UPPER_CATEGORY_FILLER = 1

def count_crowd(probabilities_for_each_image: np.ndarray, cut_off_points: List[Tuple[int, int]]) -> Iterable[np.ndarray]:
    for lower, upper in cut_off_points:
        yield count_crowd_with_lower_and_upper_cut_off_points(probabilities_for_each_image, lower, upper)

def count_crowd_with_lower_and_upper_cut_off_points(probabilities_for_each_image: np.ndarray, lower: float, upper: float) -> np.ndarray:    
    crowd_counts: np.ndarray = get_crowd_count_matrix_for_lower_and_upper_cut_off_points(probabilities_for_each_image, lower, upper)
    crowd_counts = np.sum(crowd_counts, axis=1)
    
    return crowd_counts

def get_crowd_count_matrix_for_lower_and_upper_cut_off_points(probabilities_for_each_image: np.ndarray, lower: float, upper: float) -> np.ndarray:
    lower_and_upper_category_indices = probabilities_for_each_image > lower
    crowd_counts = np.where(lower_and_upper_category_indices, probabilities_for_each_image, LOWER_CATEGORY_FILLER)
    
    upper_category_indices = lower_and_upper_category_indices & (probabilities_for_each_image > upper)
    crowd_counts = np.where(upper_category_indices, UPPER_CATEGORY_FILLER, crowd_counts)
    
    return crowd_counts

In [23]:
crowd_counts_generators: Iterable[Iterable[np.ndarray]] = (count_crowd(probabilities, cut_off_points) for probabilities in crowd_counts_probabilities)

counts_differences_generators: Iterable[Iterable[np.ndarray]] = ((crowd_counts - count_test for crowd_counts in generator) for generator in crowd_counts_generators)
models_statistics: List[List[Dict[str, float]]] = [[calculate_statistics(difference) for difference in generator] for generator in counts_differences_generators]

In [24]:
models_statistics: List[Dict[str, List[float]]] = [groupby_keys(statistics) for statistics in models_statistics]
models_statistics_frames = [pd.DataFrame(statistics) for statistics in models_statistics]
cut_off_points_frame = pd.DataFrame(cut_off_points, columns=["lower", "upper"])

models_statistics_with_cut_off_points_frames = [concatenate(frame, cut_off_points_frame) for frame in models_statistics_frames]

In [25]:
for model_name, statistics in zip(model_names, models_statistics_with_cut_off_points_frames):
    current_statistics_path = os.path.join(statistics_path, f"linear_filler_{model_name}.csv")
    statistics.to_csv(current_statistics_path)

In [26]:
for model_name, statistics in zip(model_names, models_statistics_frames):
    statistics_description = statistics.describe()
    statistics_description_path = os.path.join(statistics_path, f"linear_filler_{model_name}_description.csv")
    statistics_description.to_csv(statistics_description_path)

In [27]:
highest_accuracy_statistics = [frame.loc[frame.loc[:, "accuracy"].idxmax()] for frame in models_statistics_with_cut_off_points_frames]
highest_accuracy_statistics = [pd.Series(statistics, name=name) for name, statistics in zip(model_names, highest_accuracy_statistics)]
highest_accuracy_statistics = pd.concat(highest_accuracy_statistics, axis=1)

lowest_mae_statistics = [frame.loc[frame.loc[:, "mae"].idxmin()] for frame in models_statistics_with_cut_off_points_frames]
lowest_mae_statistics = [pd.Series(statistics, name=name) for name, statistics in zip(model_names, lowest_mae_statistics)]
lowest_mae_statistics = pd.concat(lowest_mae_statistics, axis=1)

lowest_mse_statistics = [frame.loc[frame.loc[:, "mse"].idxmin()] for frame in models_statistics_with_cut_off_points_frames]
lowest_mse_statistics = [pd.Series(statistics, name=name) for name, statistics in zip(model_names, lowest_mse_statistics)]
lowest_mse_statistics = pd.concat(lowest_mse_statistics, axis=1)

In [28]:
highest_accuracy_statistics_path = os.path.join(statistics_path, f"linear_filler_highest_accuracy_statistics.csv")
highest_accuracy_statistics.to_csv(highest_accuracy_statistics_path)

lowest_mae_statistics_path = os.path.join(statistics_path, f"linear_filler_lowest_mae_statistics.csv")
lowest_mae_statistics.to_csv(lowest_mae_statistics_path)

lowest_mse_statistics_path = os.path.join(statistics_path, f"linear_filler_lowest_mse_statistics.csv")
lowest_mse_statistics.to_csv(lowest_mse_statistics_path)