In [None]:
!pip install cleanvision

In [2]:
from PIL import Image
from PIL import ImageStat, ImageFilter
import numpy as np
from typing import Union
from typing import List, Dict, Any
import math
import os

# Brightness Score

light_score =1 - per_cal(5)

dark_score = per_cal(99)

In [3]:
def calculate_brightness(
    red: Union[float, "np.ndarray[Any, Any]"],
    green: Union[float, "np.ndarray[Any, Any]"],
    blue: Union[float, "np.ndarray[Any, Any]"],
) -> Union[float, "np.ndarray[Any, Any]"]:
    cur_bright = (
        np.sqrt(0.241 * (red * red) + 0.691 * (green * green) + 0.068 * (blue * blue))
    ) / 255

    return cur_bright
def calc_avg_brightness(image: Image) -> float:
    stat = ImageStat.Stat(image)
    try:
        red, green, blue = stat.mean
        # print(red, green, blue)
    except ValueError:
        red, green, blue = (
            stat.mean[0],
            stat.mean[0],
            stat.mean[0],
        )  # deals with black and white images
    cur_bright: float = calculate_brightness(red, green, blue)
    return cur_bright
def calc_percentile_brightness(
    image: Image, percentiles: List[int]
) -> "np.ndarray[Any, Any]":
    imarr = np.asarray(image)
    if len(imarr.shape) == 3:
        r, g, b = (
            imarr[:, :, 0].astype("int"),
            imarr[:, :, 1].astype("int"),
            imarr[:, :, 2].astype("int"),
        )
        pixel_brightness = calculate_brightness(
            r, g, b
        )  # np.sqrt(0.241 * r * r + 0.691 * g * g + 0.068 * b * b)
    else:
        pixel_brightness = imarr / 255.0
    perc_values: "np.ndarray[Any, Any]" = np.percentile(pixel_brightness, percentiles)
    return perc_values

def calculate_brightness_score(image: Image) -> Dict[str, Union[float, str]]:
        percentiles = [1, 5, 10, 15, 90, 95, 99]
        perc_values = calc_percentile_brightness(image, percentiles=percentiles)
        raw_values = {
            f"brightness_perc_{p}": value for p, value in zip(percentiles, perc_values)
        }
        raw_values["brightness"] = calc_avg_brightness(image)
        return raw_values

In [5]:
gray_image = Image.open('/media/tung/New Volume/Ubuntu/Programing/MQ Solutions/Task Data/data quality/dermet_image_train/03AnalExcoriation051204.jpg')
avg_brightness = calc_avg_brightness(gray_image)
print("avg_brightness :", avg_brightness)

per_brightness = calc_percentile_brightness(gray_image, [1, 5, 10, 15, 90, 95, 99])
print("per_brightness :", per_brightness)

raw_values = calculate_brightness_score(image = gray_image)
print("raw_value :", raw_values)

avg_brightness : 0.447781200317847
per_brightness : [0.12905143 0.25272759 0.30606882 0.33616486 0.59083415 0.61968549
 0.66514291]
raw_value : {'brightness_perc_1': 0.12905142941503522, 'brightness_perc_5': 0.25272758880010526, 'brightness_perc_10': 0.30606882199684926, 'brightness_perc_15': 0.33616485950247893, 'brightness_perc_90': 0.590834154461015, 'brightness_perc_95': 0.6196854878131554, 'brightness_perc_99': 0.6651429068291944, 'brightness': 0.447781200317847}


# Aspect Ratio

aspect_ratio_score = min(width / height, height / width)

In [4]:
def calc_aspect_ratio_score(image: Image) -> float:
    width, height = image.size
    size_score = min(width / height, height / width)  # consider extreme shapes
    assert isinstance(size_score, float)
    return size_score


# Entropy Score

low_information_score = entropyscore

In [51]:
def calc_entropy_score(image: Image) -> float:
    entropy = image.entropy()
    assert isinstance(
        entropy, float
    )  # PIL does not have type ann stub so need to assert function return
    return entropy/10

# Blurriness Score

blurriness_score = minimum(blur_scores + std_scores, 1)

In [70]:
MAX_RESOLUTION_FOR_BLURRY_DETECTION = 64
def get_edges(gray_image: Image) -> Image:
    edges = gray_image.filter(ImageFilter.FIND_EDGES)
    return edges

def calc_blurriness(gray_image: Image) -> float:
    edges = get_edges(gray_image)
    blurriness = ImageStat.Stat(edges).var[0]
    return np.sqrt(blurriness)  # type:ignore

def calc_std_grayscale(gray_image: Image) -> float:
    return np.std(gray_image.histogram())  # type: ignore

def calculate_blurriness_score(image: Image) -> Dict[str, Union[float, str]]:
    ratio = max(image.width, image.height) / MAX_RESOLUTION_FOR_BLURRY_DETECTION
    if ratio > 1:
        resized_image = image.resize(
            (max(int(image.width // ratio), 1), max(int(image.height // ratio), 1))
        )
    else:
        resized_image = image.copy()
    gray_image = resized_image.convert("L")
    blur_scores = 1 - np.exp(-1 * calc_blurriness(gray_image) / 100)
    std_scores = 1 - np.exp(
            -1 * calc_std_grayscale(gray_image) / 100
    )
    blur_std_score = np.minimum(blur_scores + std_scores, 1)
    return {
        # "blurriness_score": calc_blurriness(gray_image),
        # "blurriness_grayscale_std": calc_std_grayscale(gray_image),
        "score" : blur_std_score
    }

# ColorSpace Score

grayscale_score = 1 if RGB, else = 0

In [104]:
def calc_color_space(image: Image) -> str:
    return get_image_mode(image)

def get_image_mode(image: Image) -> str:
    if image.mode:
        image_mode = image.mode
        assert isinstance(image_mode, str)
        return image_mode
    else:
        imarr = np.asarray(image)
        if len(imarr.shape) == 2 or (
            len(imarr.shape) == 3
            and (np.diff(imarr.reshape(-1, 3).T, axis=0) == 0).all()
        ):
            return "L"
        else:
            return "UNK"

def calculate_space_color(image: Image) -> Dict[str, Union[float, str]]:
    return 1 if calc_color_space(image) == "RGB" else 0


# Size Score

In [90]:
def calc_image_area_sqrt(image: Image) -> float:
    w, h = image.size
    return math.sqrt(w) * math.sqrt(h)

def get_image_area_sqrt_sizes(folder_path: str) -> list:
    image_sqrt_sizes = []
    for file_name in os.listdir(folder_path):
        if file_name.lower().endswith(('.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.gif')):
            try:
                with Image.open(os.path.join(folder_path, file_name)) as img:
                    image_sqrt_size = calc_image_area_sqrt(img)
                    image_sqrt_sizes.append(image_sqrt_size)
            except IOError:
                print(f"Cannot open {file_name}.")
    return image_sqrt_sizes

def calculate_image_size_score(image: Image, folderpath: str, iqr_factor: float = 3.0) -> float:
    image_sizes = get_image_area_sqrt_sizes(folderpath)

    q1, q3 = np.percentile(image_sizes, [25, 75])
    iqr = q3 - q1
    min_threshold = q1 - iqr_factor * iqr
    max_threshold = q3 + iqr_factor * iqr
    mid_threshold = (min_threshold + max_threshold) / 2

    image_size = calc_image_area_sqrt(image)
    distance = abs(image_size - mid_threshold)
    norm_value = max_threshold - min_threshold if max_threshold - min_threshold > 0 else mid_threshold
    norm_dist = distance / norm_value
    score_value = 1 - np.clip(norm_dist, 0, 1)
    
    return score_value


# Near Duplicates

### Using P hash

In [2]:
import os
from PIL import Image
import imagehash

def find_duplicate_images(folder_path, hash_size=8):
    hash_method = imagehash.phash 
    image_hashes = {}

    for filename in os.listdir(folder_path):
        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
            image_path = os.path.join(folder_path, filename)
            with Image.open(image_path) as img:
                img_hash = str(hash_method(img, hash_size=hash_size))
                if img_hash in image_hashes:
                    image_hashes[img_hash].append(filename)
                else:
                    image_hashes[img_hash] = [filename]
                    
    duplicates = {}
    for img_hash, filenames in image_hashes.items():
        if len(filenames) > 1:
            longest_name = max(filenames, key=len)
            duplicates[longest_name] = [fname for fname in filenames if fname != longest_name]

    return duplicates

folder_path = '/media/tung/New Volume/Ubuntu/Programing/MQSolutions/TaskData/dataset/fire_mq_data/'
duplicates = find_duplicate_images(folder_path)
for img_hash, filenames in duplicates.items():
    print(f"Image: {img_hash} have {len(filenames)} Duplicates image : {filenames}")


Image: frame_check_13500.jpg have 7 Duplicates image : ['frame_check_9600.jpg', 'frame_check_17670.jpg', 'frame_check_17730.jpg', 'frame_check_13050.jpg', 'frame_check_12960.jpg', 'frame_check_12990.jpg', 'frame_check_13020.jpg']
Image: frame_check_20190.jpg have 19 Duplicates image : ['frame_check_20310.jpg', 'frame_check_20340.jpg', 'frame_check_20430.jpg', 'frame_check_20610.jpg', 'frame_check_8880.jpg', 'frame_check_8820.jpg', 'frame_check_8850.jpg', 'frame_check_5550.jpg', 'frame_check_20010.jpg', 'frame_check_20040.jpg', 'frame_check_20100.jpg', 'frame_check_20130.jpg', 'frame_check_8040.jpg', 'frame_check_8160.jpg', 'frame_check_8190.jpg', 'frame_check_8220.jpg', 'frame_check_8250.jpg', 'frame_check_20820.jpg', 'frame_check_21030.jpg']
Image: frame_check_20220.jpg have 2 Duplicates image : ['frame_check_7380.jpg', 'frame_check_20970.jpg']
Image: frame_check_20250.jpg have 5 Duplicates image : ['frame_check_20370.jpg', 'frame_check_20400.jpg', 'frame_check_20640.jpg', 'frame_chec

In [3]:
# Split images vertically
import os
from PIL import Image

def split_images_vertically(folder_path, output_folder):
    # Create the output folder if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Get the list of image files in the folder
    image_files = [f for f in os.listdir(folder_path) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]

    # Iterate over each image file
    for i, image_file in enumerate(image_files):
        # Open the image
        image_path = os.path.join(folder_path, image_file)
        with Image.open(image_path) as img:
            # Get the width and height of the image
            width, height = img.size

            # Calculate the split position
            split_position = width // 2

            # Split the image vertically
            left_image = img.crop((0, 0, split_position, height))
            right_image = img.crop((split_position, 0, width, height))

            # Save the split images with numbered names
            left_image.save(os.path.join(output_folder, f"{i+1}_left.jpg"))
            right_image.save(os.path.join(output_folder, f"{i+1}_right.jpg"))

# Example usage
folder_path = "/media/tung/New Volume/Ubuntu/Programing/MQSolutions/TaskData/dataset/camera_tampering/results/"
output_folder = "/media/tung/New Volume/Ubuntu/Programing/MQSolutions/TaskData/dataset/camera_tampering/camera_spliit_test/"
split_images_vertically(folder_path, output_folder)

### Using ANNOY

In [None]:
import os
import pickle
import numpy as np
import matplotlib.pyplot as plt
from keras.preprocessing import image
from keras.applications.resnet50 import ResNet50, preprocess_input
from tqdm import tqdm
from annoy import AnnoyIndex

# Initialize ResNet50 model
model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Function to extract features from an image
def extract_features(img_path, model):
    """Extract features from an image using ResNet50."""
    img = image.load_img(img_path, target_size=(224, 224))
    img_array = image.img_to_array(img)
    expanded_img_array = np.expand_dims(img_array, axis=0)
    preprocessed_img = preprocess_input(expanded_img_array)
    features = model.predict(preprocessed_img)
    flattened_features = features.flatten()
    normalized_features = flattened_features / np.linalg.norm(flattened_features)
    return normalized_features

# Function to build an index of images
def get_file_list(root_dir, extensions=['.jpg', '.jpeg', '.png']):
    """Get a list of image paths from a directory."""
    file_list = []
    for root, _, filenames in os.walk(root_dir):
        for filename in filenames:
            if any(filename.lower().endswith(ext) for ext in extensions):
                file_list.append(os.path.join(root, filename))
    return file_list


In [None]:
# Load images and extract features
root_dir = '/media/tung/New Volume/Ubuntu/Programing/MQSolutions/TaskData/dataset/ads compare/TV/VIETTEL TV360 15s 0605/VIETTEL TV360 15s 0605/'
filenames = get_file_list(root_dir)
feature_list = [extract_features(filename, model) for filename in tqdm(filenames)]

In [4]:
feature_list[0].shape

(100352,)

In [1]:
import matplotlib.pyplot as plt
from PIL import Image
# Build ANNOY index
f = len(feature_list[0])
t = AnnoyIndex(f, 'euclidean')
for i, feature in enumerate(feature_list):
    t.add_item(i, feature)
t.build(150)

duplicate_threshold = 0.4 

def visualize_similar_images(image_paths):
    """Visualize a list of images."""
    plt.figure(figsize=(20, 10))
    for i, img_path in enumerate(image_paths):
        # img = image.load_img(img_path)
        img = Image.open(img_path)
        plt.subplot(1, len(image_paths), i + 1)
        plt.imshow(img)
        plt.title(os.path.basename(img_path))
        plt.axis('off')
    plt.show()

# Đã xử lý hình ảnh
processed_images = set()

duplicate_images = {}  # Dictionary to store near-duplicate image groups

for i in range(len(feature_list)):
    if i in processed_images:
        continue  # Bỏ qua nếu hình ảnh đã được xử lý

    nearest_ids = t.get_nns_by_item(i, 30)  # Find the 10 nearest images
    near_duplicates = []
    for j in nearest_ids:
        if i != j and np.linalg.norm(np.array(feature_list[i]) - np.array(feature_list[j])) < duplicate_threshold:
            near_duplicates.append(filenames[j])
            processed_images.add(j)  # Đánh dấu hình ảnh này đã được xử lý
    
    if near_duplicates:
        duplicate_images[filenames[i]] = near_duplicates
        processed_images.add(i)  # Đánh dấu hình ảnh cơ sở cũng đã được xử lý

# Now visualize each group of near-duplicate images
for base_image, duplicates in duplicate_images.items():
    print(f"Base Image: {base_image}")
    all_images = [base_image] + duplicates
    visualize_similar_images(all_images)


NameError: name 'feature_list' is not defined

In [7]:
total_duplicates = 0
for base_image, duplicates in duplicate_images.items():
    total_duplicates += len(duplicates)

print(f"Total number of duplicate images: {total_duplicates}")

Total number of duplicate images: 293


In [9]:
duplicate_images
num_keys = len(duplicate_images.keys())
print("Number of keys in the dictionary:", num_keys)

Number of keys in the dictionary: 30


In [5]:
if __name__ == "__main__":
    folder_path = '/media/tung/New Volume/Ubuntu/Programing/MQSolutions/TaskData/dataset/fire_mq_data'  # Cập nhật đường dẫn tới thư mục chứa ảnh
    analyst = IssueAnalyst(folder_path)
    results_df = analyst.analyze_images(issue_types=['dark', 'light', 'blurry', 'duplicate', 'near_duplicate'])
    
    # Xuất kết quả ra file CSV sử dụng PySpark
    output_path = 'output.csv'
    results_df.write.csv(output_path, header=True, mode='overwrite')

    # Chuyển đổi PySpark DataFrame về Pandas DataFrame trên driver node để in kết quả phân tích
    results_pd_df = results_df.toPandas()
    
    # In kết quả phân tích
    print(sumary_issue(results_pd_df))  # In kết quả phân tích


<class 'numpy.float64'>


## Duplicates

### Using P Hash and check by MSE and SSIM

In [None]:
import os
from PIL import Image
import cv2 
from skimage.metrics import structural_similarity as ssim
from tdqm import tqdm

def _dhash(self, image):
    resized_img = cv2.resize(image, (self.hash_size + 1, self.hash_size))
    diff = resized_img[:, 1:] > resized_img[:, :-1]
    return sum([2 ** i for (i, v) in enumerate(diff.flatten()) if v])

def _mse(self, first_img, second_img):
    err = np.sum((first_img.astype("float") - second_img.astype("float")) ** 2)
    err /= float(first_img.shape[0] * first_img.shape[1])
    return err

def find_near_duplicate_image(self):
    image_data = {}
    pic_hashes = {}

    for rel_path in os.listdir(self.folder_path):
        path = os.path.join(self.folder_path, rel_path)
        img = cv2.imread(path, 0)
        if img is None:
            continue
        image_data[rel_path] = img  # Use filename instead of full path
        image_hash = self._dhash(img)
        pic_hashes.setdefault(image_hash, []).append(rel_path)  # Use filename instead of full path

    duplicates_dict = {}
    for hash_key, files in pic_hashes.items():
        if len(files) > 1:
            for i in range(len(files)):
                for j in range(i + 1, len(files)):
                    img1 = image_data[files[i]]
                    img2 = image_data[files[j]]
                    if img1.shape == img2.shape:
                        mse_val = self._mse(img1, img2)
                        ssim_val = ssim(img1, img2)
                        if mse_val < 20 and ssim_val > 0.95:
                            duplicates_dict.setdefault(files[i], []).append(files[j])
                            duplicates_dict.setdefault(files[j], []).append(files[i])

    # Filter out non-duplicates and ensure each image is listed once
    near_duplicates = {key: list(set(values)) for key, values in duplicates_dict.items() if len(values) > 0}
    return near_duplicates

## Sharpness Laplacian

In [1]:
import numpy as np
import cv2
from scipy.ndimage import gaussian_filter
from scipy import signal
import pywt
import scipy.stats
from PIL import Image

def eval_sharpness_laplacian(image: Image) -> float:
    '''
    Image sharpness metric used laplacian transform
    
    Parameters
    ------------
    image: PIL.JpegImageFile
        Input image (3 channels)
    
    Returns
    ------------
    sharpness: float
        Image Sharpness ratio
    '''

    # Convert the PIL.JpegImageFile object to a numpy array
    image = np.array(image)

    gray = 0.299 * image[:, :, 0] + 0.587 * image[:, :, 1] + 0.114 * image[:, :, 2]
    gray = gaussian_filter(gray, sigma=.5)
    kernel = np.zeros((3, 3), float)
    kernel[1][1] = - 4
    kernel[0][1] = 1
    kernel[1][0] = 1
    kernel[1][2] = 1
    kernel[2][1] = 1
    convoluted = np.absolute(signal.convolve2d(gray, kernel))
    threshold = 10
    crop = 5
    sharpness_result = np.zeros((convoluted.shape[0], convoluted.shape[1]), np.float)
    sharpness_result[np.where(convoluted > threshold)] = 255
    if convoluted.shape[0] > crop * 2 and convoluted.shape[1] > crop * 2:
        ratio = np.sum(sharpness_result[crop:convoluted.shape[0] - crop, crop:convoluted.shape[1] - crop]) / \
                ((convoluted.shape[0] - crop * 2) * (convoluted.shape[1] - crop * 2) + 1)
    else:
        ratio = np.sum(sharpness_result) / (convoluted.shape[0] * convoluted.shape[1] + 1)

In [None]:
from dom import DOM
import cv2

#img = cv2.imread("images/image_quality_estimation/02_2sigma_blurred.tif", 1)
img1 = cv2.imread("images/image_quality_estimation/02.tif", 1)
img2 = cv2.imread("images/image_quality_estimation/02_2sigma_blurred.tif", 1)
img3 = cv2.imread("images/image_quality_estimation/02_3sigma_blurred.tif", 1)
img4 = cv2.imread("images/image_quality_estimation/02_5sigma_blurred.tif", 1)


# initialize DOM
iqa = DOM()

#Calculate scores
score1 = iqa.get_sharpness(img1)
score2 = iqa.get_sharpness(img2)
score3 = iqa.get_sharpness(img3)
score4 = iqa.get_sharpness(img4)

print("Sharpness for reference image:", score1)
print("Sharpness for 2 sigma blurred image:", score2)
print("Sharpness for 3 sigma blurred image:", score3)
print("Sharpness for 5 sigma blurred image:", score4)

In [None]:
img = Image.open('/media/tung/New Volume/Ubuntu/Programing/MQ Solutions/Task Data/Data_quality_local/data/dermet_image_train/1IMG014.jpg')
eval_sharpness_laplacian(img)

### BRISQUE

In [5]:
import numpy as np
from skimage import io, img_as_float
import imquality.brisque as brisque
from PIL import Image

#img = img_as_float(io.imread('noisy_images/BSE.jpg', as_gray=True))
img = Image.open("/media/tcb/New Volume/Ubuntu/Programing/MQSolutions/data-cleaning/dataset/ads_compare/raw_dataset/1_dataset 3/anh chup poster/VIETBANK (đủ ảnh)/101 LANG HA (45).jpg")
img = img_as_float(img)

score = brisque.score(img)
print("Brisque score = ", score)

TypeError: rescale() got an unexpected keyword argument 'multichannel'