# Brand Logo and Name Extraction Pipeline

## Import Libraries

In [1]:
import os
import string

import pandas as pd
import cv2
from pathlib import Path
import logging
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
from copy import deepcopy

from fuzzywuzzy import fuzz
from ultralytics import YOLO
from paddleocr import PaddleOCR
from skimage.metrics import structural_similarity as ssim

In [2]:
root_dir = Path.cwd().parent.parent
root_dir

PosixPath('/mnt/d/Projects_D/Brand_Extractor')

In [3]:
%env YOLO_VERBOSE = False

env: YOLO_VERBOSE=False


In [4]:
logging.basicConfig(level=logging.ERROR)

## Config

In [5]:
class Config:
    sample_fps = 1 # Sample sample_fps frame per second
    padding = 10 # Padding for the logo bounding box
    min_brand_name_len = 3 # Minimum length of the brand name
    brand_name_match_threshold = 80 # Fuzzy matching threshold for brand names
    max_brand_name_occurance_ratio = 0.1 # Maximum occurance ratio of a brand name in the frames


In [6]:
CONFIG = Config()

## Data

### Video Data

In [7]:
data_dir = root_dir / 'data' / 'video_preprocessing' / 'videos'

In [8]:
videos = list(data_dir.glob('*.mp4'))
videos

[PosixPath('/mnt/d/Projects_D/Brand_Extractor/data/video_preprocessing/videos/cities_walk_dubai_mall.mp4'),
 PosixPath('/mnt/d/Projects_D/Brand_Extractor/data/video_preprocessing/videos/london_city_tour_luxury_shopping.mp4'),
 PosixPath('/mnt/d/Projects_D/Brand_Extractor/data/video_preprocessing/videos/royal_houseguard_westfield_mall.mp4'),
 PosixPath('/mnt/d/Projects_D/Brand_Extractor/data/video_preprocessing/videos/tanishq_mishra_vlogs_lulu_mall.mp4'),
 PosixPath('/mnt/d/Projects_D/Brand_Extractor/data/video_preprocessing/videos/the_explorer_raj_dlf_mall.mp4'),
 PosixPath('/mnt/d/Projects_D/Brand_Extractor/data/video_preprocessing/videos/travel_with_chris_crazy_market_spree.mp4'),
 PosixPath('/mnt/d/Projects_D/Brand_Extractor/data/video_preprocessing/videos/walking_tours_macys_nordstorm.mp4')]

### Model Data

In [9]:
model_dir = root_dir / 'training_history' / 'yolov8' / 'logos'
model_dir

PosixPath('/mnt/d/Projects_D/Brand_Extractor/training_history/yolov8/logos')

In [10]:
model_checkpoint = model_dir / 'weights' / 'best.pt'
model_checkpoint

PosixPath('/mnt/d/Projects_D/Brand_Extractor/training_history/yolov8/logos/weights/best.pt')

## Pipeline Steps

The pipeline consists of the following steps:
1. Video processing (extracting frames)
2. Detect brand logo (Object Detection)
3. Extracting brand name (Text Recognition)
4. Saving the extracted brand logo and name

## Utils

In [11]:
class UnionFind:
    def __init__(self, n):
        self.parent = list(range(n))
        self.rank = [0] * n

    def find(self, x):
        if self.parent[x] != x:
            self.parent[x] = self.find(self.parent[x])
        return self.parent[x]

    def union(self, x, y):
        root_x, root_y = self.find(x), self.find(y)
        if root_x != root_y:
            if self.rank[root_x] > self.rank[root_y]:
                self.parent[root_y] = root_x
            else:
                self.parent[root_x] = root_y
                if self.rank[root_x] == self.rank[root_y]:
                    self.rank[root_y] += 1

## Detect Brand Logo

In [12]:
class BrandLogoDetector:
    def __init__(self):
        
        self.brand_logo_frames = []
        self.__load_model()
        
    def __load_model(self):
        self.model = YOLO(model_checkpoint)
        print(self.model.info())    
    
    def extract_brand_logo(self, frame_data):
        results = self.model(frame_data['frame'], verbose=False)
        if len(results[0].boxes) > 0:
            frame_data['logo_detection'] = results[0].boxes
            frame_data['contains_logo'] = True
        
        return frame_data

## Extracting Brand Name

In [13]:
class BrandNameExtractor:
    def __init__(self, frame_shape):
        self.__load_model()
        self.frame_shape = frame_shape
                
    def __load_model(self):
        self.ocr = PaddleOCR(show_log = False, use_angle_cls=True, use_gpu=True)
        logging.getLogger('ppocr').handlers = []
        
    def extract_brand_name(self, frame_data):
        
        if frame_data.get('logo_detection') is None:
            return frame_data
        
        results = frame_data.get('logo_detection')
        
        for box in results:
            x1 = int(box.xyxy[0][0].item())
            y1 = int(box.xyxy[0][1].item())
            x2 = int(box.xyxy[0][2].item())
            y2 = int(box.xyxy[0][3].item())
            
            # add some padding to the box
            x1 = max(0, x1 - CONFIG.padding)
            y1 = max(0, y1 - CONFIG.padding)
            x2 = min(self.frame_shape[1], x2 + CONFIG.padding)
            y2 = min(self.frame_shape[0], y2 + CONFIG.padding)
            
            cropped_frame = frame_data['frame'][y1:y2, x1:x2]
            result = self.ocr.ocr(cropped_frame, cls=True, det=True, rec=True)

            if result[0] is not None:

                frame_data['contains_brand_name'] = True
                frame_data['brand_names'] = []
                for items in result[0]:
                    brand_name = items[1][0]
                
                    brand_name_data = {
                        'brand_name': brand_name,
                        'box_coordinates': (x1, y1, x2, y2)
                    }
                    frame_data['brand_names'].append(brand_name_data)
        
        return frame_data


## Video Processing

In [14]:
class VideoProcessor:
    def __init__(self, video_path):

        self.video_path = video_path
        self.video_name = video_path.stem
        self.video = cv2.VideoCapture(str(video_path))
        self.fps = self.video.get(cv2.CAP_PROP_FPS)
        self.total_frames = int(self.video.get(cv2.CAP_PROP_FRAME_COUNT))
        self.frame_width = int(self.video.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.frame_height = int(self.video.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.frame_shape = (self.frame_height, self.frame_width)
        self.sample_fps = CONFIG.sample_fps
        
        self.frames = []
        self.brand_data = []
        
        self.logo_detector = BrandLogoDetector()
        self.brand_name_extractor = BrandNameExtractor(frame_shape=self.frame_shape)

    def extract_frames(self, frame_interval=1):
        step = max(1, int(self.fps / self.sample_fps)) * frame_interval
        current_frame = 0
        self.video.set(cv2.CAP_PROP_POS_FRAMES, 0)

        for i in tqdm(range(self.total_frames)):
            ret, frame = self.video.read()
            if not ret:
                break
            if i % step == 0:
                frame_data = {
                    'frame_index': int(current_frame),
                    'timestamp': round(self.video.get(cv2.CAP_PROP_POS_MSEC)/1000, 2),
                    'frame': cv2.cvtColor(frame, cv2.COLOR_BGR2RGB),
                    'contains_logo': False,
                    'contains_brand_name': False
                }
                
                current_frame += 1

                frame_data = self.logo_detector.extract_brand_logo(frame_data)
                frame_data = self.brand_name_extractor.extract_brand_name(frame_data)
                frame_data.pop('frame')
                
                self.frames.append(frame_data)
        
        self.video.release()
        self.process_brand_name()
        
        return self.frames

    def process_brand_name(self):

        # remove names less than 3 characters or only numbers

        for frame in self.frames:
            if frame['contains_brand_name']:
                frame['brand_names'] = [brand for brand in frame['brand_names'] if len(brand['brand_name']) > CONFIG.min_brand_name_len and not brand['brand_name'].isdigit()]

        brand_names = []

        for frame in self.frames:
            if frame['contains_brand_name']:
                brand_names.extend([(brand["brand_name"], frame['frame_index'], index) for index, brand in enumerate(frame['brand_names'])])

        brand_names = np.array(brand_names)

        unique_brand_names = np.unique(brand_names[:, 0])

        # find similar names using fuzzy matching with partial ratio > 80
        similar_names = []


        for name1 in unique_brand_names:
            for name2 in unique_brand_names:
                if name1 != name2 and fuzz.partial_ratio(name1, name2) > CONFIG.brand_name_match_threshold and len(name1) - len(name2) < 3:
                    similar_names.append((name1, name2))

        # print(similar_names)

        # create clusters of similar names
        uf = UnionFind(len(unique_brand_names))
        for name1, name2 in similar_names:
            uf.union(np.where(unique_brand_names == name1)[0][0], np.where(unique_brand_names == name2)[0][0])

        clusters = {}

        for i in range(len(unique_brand_names)):
            root = uf.find(i)
            if root not in clusters:
                clusters[root] = []
            clusters[root].append(unique_brand_names[i])

        # print(clusters)

        # replace the name with lower word count with the higher word count for similar names cluster

        for cluster in clusters.values():
            cluster = sorted(cluster, key=lambda x: len([i for i in x if i not in string.punctuation]), reverse=True)
            print(cluster)
            for i in range(1, len(cluster)):
                brand_names[brand_names == cluster[i]] = cluster[0]

        # assign the processed brand names to the frames

        for frame in self.frames:
            if frame['contains_brand_name']:
                brand_data = brand_names[brand_names[:, 1] == str(frame['frame_index'])]
                frame['brand_names'] = [{'brand_name': brand[0], 'box_coordinates': frame['brand_names'][int(brand[2])]['box_coordinates']} for brand in brand_data]
                if len(frame['brand_names']) == 0:
                    frame['contains_brand_name'] = False


        # remove names that occur in more than 10% of the frames

        brand_names = []
        frame_with_brand = 0
        for frame in self.frames:
            if frame['contains_brand_name']:
                brand_names.extend([brand['brand_name'] for brand in frame['brand_names']])
                frame_with_brand += 1

        unique_brand_names = np.unique(brand_names)

        for name in unique_brand_names:
            count = 0
            for frame in self.frames:
                if frame['contains_brand_name']:
                    for brand in frame['brand_names']:
                        if brand['brand_name'].lower() == name.lower():
                            count += 1
                            break
            if count > CONFIG.max_brand_name_occurance_ratio * frame_with_brand:
                for frame in self.frames:
                    if frame['contains_brand_name']:
                        frame['brand_names'] = [brand for brand in frame['brand_names'] if brand['brand_name'] != name]
                        if len(frame['brand_names']) == 0:
                            frame['contains_brand_name'] = False

## Saving Extracted Brand Logo and Name

In [15]:
def merge_nearby_boxes(brand_boxes):
    import numpy as np

    if not brand_boxes:
        return []

    # Convert box coordinates into a more usable format
    def convert(box):
        x1, y1, x2, y2 = box['box_coordinates']
        return x1, y1, x2, y2

    boxes = [convert(box) for box in brand_boxes]

    # Calculate intersections and unions for merging
    def intersects(box1, box2):
        x1, y1, x2, y2 = box1
        x3, y3, x4, y4 = box2
        return not (x2 < x3 or x4 < x1 or y2 < y3 or y4 < y1)

    def merge_boxes(box1, box2):
        x1, y1, x2, y2 = box1
        x3, y3, x4, y4 = box2
        new_x1 = min(x1, x3)
        new_y1 = min(y1, y3)
        new_x2 = max(x2, x4)
        new_y2 = max(y2, y4)
        return new_x1, new_y1, new_x2, new_y2

    # Initialize merged list with dictionaries to store merged details
    merged_details = [{'coordinates': box, 'brands': set([b['brand_name']])} for box, b in zip(boxes, brand_boxes)]

    # Iteratively merge all intersecting boxes with brand accumulation
    merged = True
    while merged:
        merged = False
        new_details = []
        while merged_details:
            current = merged_details.pop()
            found = False
            for i, other in enumerate(new_details):
                if intersects(current['coordinates'], other['coordinates']):
                    # Merge the boxes
                    other['coordinates'] = merge_boxes(current['coordinates'], other['coordinates'])
                    # Accumulate brand names
                    other['brands'].update(current['brands'])
                    found = True
                    merged = True
                    break
            if not found:
                new_details.append(current)
        merged_details = new_details

    # Map merged boxes and brands back to original brand data format
    merged_brand_boxes = []
    for detail in merged_details:
        representative = next((b for b in brand_boxes if intersects(detail['coordinates'], convert(b))), None)
        if representative:
            merged_brand_boxes.append({
                'frame_index': representative['frame_index'],
                'timestamp': representative['timestamp'],
                'brand_name': ' '.join(detail['brands']),  # Join all brands as a comma-separated string
                'box_coordinates': detail['coordinates']
            })

    return merged_brand_boxes

In [16]:
def save_brand_name_data(sample_frames, video_name):
    # store the output in csv file
    output_dir = root_dir / 'output' / video_name
    output_dir.mkdir(parents=True, exist_ok=True)
        
    frame_data = []
    
    for frame in sample_frames:
        if frame['contains_brand_name']:
            brands = []
            for brand in frame['brand_names']:
                brands.append({
                    'frame_index': frame['frame_index'],
                    'timestamp': frame['timestamp'],
                    'brand_name': brand['brand_name'],
                    'box_coordinates': brand['box_coordinates']
                })

            merged_boxes = merge_nearby_boxes(brands)
            
            frame_data.extend(merged_boxes)
                
    df = pd.DataFrame(frame_data)
                
    csv_file = output_dir / f'{video_name}.csv'
    
    df.to_csv(csv_file, index=False)
    
    return csv_file

## Pipeline

### Video

In [17]:
video = videos[2]
video

PosixPath('/mnt/d/Projects_D/Brand_Extractor/data/video_preprocessing/videos/royal_houseguard_westfield_mall.mp4')

### Video Processor

In [18]:
video_processor = VideoProcessor(video)

Model summary: 295 layers, 25856899 parameters, 0 gradients, 79.1 GFLOPs
(295, 25856899, 0, 79.0656)


In [19]:
video_processor.fps

30.0

In [20]:
frame_data = video_processor.extract_frames()

100%|██████████| 37667/37667 [03:48<00:00, 164.94it/s]


['&stradivarius', 'stradivarius', 'stradivarrus', '&stradivariu', 'tradivarius', '&stradivari', 'stradiva', 'stradiv', '&stra']
['60%OFF']
['7Martens', 'D.Marlens', 'D.Martens', 'D.Marte']
['@pullandbear']
['AANDMIADE']
['AFFE']
['AFRICA']
['AILANO']
['H·SAMUEI', 'H·SAMUEL', 'H.SAMUEL', 'H.SAMULL', 'SAMUEI', 'AMUEI', 'AMUEL', 'MUEL']
['LANCOME', 'ANCOME']
['ANDEY']
['ARKET']
['ARMANI EXCHANGE', 'ARMANI EXCHANG', 'ARMANI']
['ASPERS']
['BALANS']
['BANK HERE']
['BEAOO']
['HUGO BOSS', 'HUGO BOS', 'HUGOBOSS', 'JUGO BOS', 'MOSSBROS', 'HUGO OS', 'HUGOBOS', 'BOSS', 'BROS', 'HUGO']
['LOMBON', 'BPON']
['BROWNS']
['BUZZBIKE', 'BUZZOIKE', 'DUZZBIKE', 'DUZZDIKE', 'DUZZOIKE']
['BUZZOZRE']
['BoOLs']
['BooLs']
['Bools']
['Boots']
['Bots']
['Broadway']
['Bupenthy']
['Buy Now']
['CALLIOENLE']
['CALLLOLNLL']
['CALZEDONIA', 'CALZEDON', 'CALZEDO']
['CARVELA', 'CARVI', 'CARV']
['CASINO']
['PHONE CHARGING HERE', 'CHARGING']
['CHOICE·', 'CHGICE', 'CHOICE', 'HOICE']
['CLINIC', 'INIC']
['CONS']
['COSMETICS']
['

In [21]:
len(video_processor.frames)

1256

### Saving Extracted Brand Name

In [22]:
output_file = save_brand_name_data(frame_data, video_processor.video_name)

In [23]:
df = pd.read_csv(output_file)
df.head()

Unnamed: 0,frame_index,timestamp,brand_name,box_coordinates
0,0,0.0,Levi's,"(1119, 361, 1258, 412)"
1,1,1.0,&stradivarius,"(295, 476, 393, 523)"
2,2,2.0,CARVELA,"(676, 433, 801, 497)"
3,3,3.0,CARVELA,"(1059, 429, 1185, 495)"
4,4,4.0,@pullandbear,"(283, 674, 418, 749)"


# End