In [1]:
import boto3
from google.cloud import vision
import pandas as pd
import re
import nltk
from nltk.corpus import stopwords
import requests
from PIL import Image, ExifTags
from io import BytesIO
import os

def generate_presigned_url(bucket_name, object_name, expiration=3600):
    """Generate a presigned URL to share an S3 object."""
    s3_client = boto3.client('s3')
    try:
        response = s3_client.generate_presigned_url('get_object',
                                                    Params={'Bucket': bucket_name,
                                                            'Key': object_name},
                                                    ExpiresIn=expiration)
    except Exception as e:
        print(f"Error generating presigned URL for {object_name}: {e}")
        return None
    return response

def detect_logos_uri(uri):
    """Detects logos in the file located at the provided URI and returns logo annotations."""
    client = vision.ImageAnnotatorClient()
    image = vision.Image()
    image.source.image_uri = uri
    response = client.logo_detection(image=image)
    return response.logo_annotations  # Return the full annotations

def detect_text_uri(uri):
    """Detects text in the file located at the provided URI and returns a list of texts."""
    client = vision.ImageAnnotatorClient()
    image = vision.Image()
    image.source.image_uri = uri
    response = client.text_detection(image=image)
    texts = response.text_annotations

    # Return a list of descriptions, skipping the first element which usually contains all detections combined
    text_results = [text.description for text in texts[1:]]  # texts[0] is typically the entire block of text
    return text_results

# Download NLTK stop words the first time
nltk.download('stopwords')
stop_words = set(stopwords.words('english'))

def clean_text(texts):
    """Remove stop words and special characters from text detections."""
    filtered_texts = []
    for text in texts:
        # Removing special characters and digits
        cleaned_text = re.sub('[^a-zA-Z]', ' ', text)
        # Split into words and remove stop words
        words = cleaned_text.split()
        words = [word for word in words if word.lower() not in stop_words and len(word) > 1]
        filtered_texts.extend(words)
    return filtered_texts

def save_yolo_format(logos, file_path, image_width, image_height, class_id_map):
    """Save logo bounding box information in YOLO format including the confidence score."""
    if logos:
        with open(file_path, 'w') as file:
            for logo in logos:
                class_id = class_id_map.get(logo.description, -1)  # Use -1 if not found, which should never happen
                vertices = logo.bounding_poly.vertices
                x_min = min(vertex.x for vertex in vertices)
                x_max = max(vertex.x for vertex in vertices)
                y_min = min(vertex.y for vertex in vertices)
                y_max = max(vertex.y for vertex in vertices)
                x_center = ((x_min + x_max) / 2) / image_width
                y_center = ((y_min + y_max) / 2) / image_height
                bbox_width = (x_max - x_min) / image_width
                bbox_height = (y_max - y_min) / image_height
                file.write(f"{class_id} {x_center:.6f} {y_center:.6f} {bbox_width:.6f} {bbox_height:.6f} {logo.score:.6f}\n")

def get_image_dimensions_from_url(url):
    """
    Get the dimensions of an image from its URL, handling EXIF orientation.
    """
    response = requests.get(url, stream=True)
    response.raise_for_status()  # Ensure the request was successful
    with Image.open(BytesIO(response.content)) as img:
        # Handle EXIF orientation
        if hasattr(img, '_getexif'):
            exif = img._getexif()
            if exif is not None:
                orientation_key = next((key for key, value in ExifTags.TAGS.items() if value == 'Orientation'), None)
                if orientation_key and orientation_key in exif:
                    orientation = exif[orientation_key]
                    if orientation == 3:
                        img = img.rotate(180, expand=True)
                    elif orientation == 6:
                        img = img.rotate(270, expand=True)
                    elif orientation == 8:
                        img = img.rotate(90, expand=True)
        
        return img.width, img.height

def process_images_in_bucket(bucket_name):
    s3_client = boto3.client('s3')
    paginator = s3_client.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket_name)

    class_id_map = {}
    class_id_counter = 0
    total_processed = 0

    csv_path = 'olm-pics-s3-results.csv'  # CSV file path
    header_written = False  # Flag to check if header is already written

    for page in page_iterator:
        if 'Contents' in page:
            for obj in page['Contents']:
                print(f"Processing file: {obj['Key']}")
                file_uri = generate_presigned_url(bucket_name, obj['Key'])
                if file_uri:
                    logo_results = detect_logos_uri(file_uri)
                    text_results = detect_text_uri(file_uri)
                    cleaned_texts = clean_text(text_results)

                    for logo in logo_results:
                        if logo.description not in class_id_map:
                            class_id_map[logo.description] = class_id_counter
                            class_id_counter += 1

                    base_filename = os.path.splitext(obj['Key'])[0]
                    formatted_file_path = base_filename.replace('/', '_')
                    width, height = get_image_dimensions_from_url(file_uri)
                    save_yolo_format(logo_results, f"./logo_annotations/{formatted_file_path}.txt", width, height, class_id_map)

                    # Prepare data to write
                    result = {
                        's3_path': f"s3://{bucket_name}/{obj['Key']}",
                        'logos': ', '.join([logo.description for logo in logo_results]),
                        'texts': ', '.join(cleaned_texts)
                    }

                    # Write result to CSV
                    df = pd.DataFrame([result])
                    if not header_written:  # If header not written, write with header
                        df.to_csv(csv_path, mode='w', header=True, index=False)
                        header_written = True
                    else:  # Else append without header
                        df.to_csv(csv_path, mode='a', header=False, index=False)

                    print(f"Detected Logos for {obj['Key']}: {[logo.description for logo in logo_results]}")
                    print(f"Detected Texts for {obj['Key']}: {cleaned_texts}")

                    total_processed += 1
                    print(f"Processed {total_processed} images. Latest: {obj['Key']}")

    # Output class mappings at the end
    with open('./logo_annotations/class.txt', 'w') as class_file:
        for description, class_id in sorted(class_id_map.items(), key=lambda x: x[1]):
            class_file.write(f"{class_id}: {description}\n")

    print(f"Total processed images: {total_processed}")

# Example usage
process_images_in_bucket('olm-pics-s3')

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/nickjohnson/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Processing file: 1/tTyxAD9TsRwRm6yHD8TZrmin76zbW08A9NXCaxz9.jpeg
Detected Logos for 1/tTyxAD9TsRwRm6yHD8TZrmin76zbW08A9NXCaxz9.jpeg: ['Craftsman', 'Marketside']
Processed 1 images. Latest: 1/tTyxAD9TsRwRm6yHD8TZrmin76zbW08A9NXCaxz9.jpeg
Processing file: 112/18RfT51ZmG4CbSBoRLQKOSFB5J6p8Hc1XX3qosy6.jpeg
Detected Logos for 112/18RfT51ZmG4CbSBoRLQKOSFB5J6p8Hc1XX3qosy6.jpeg: []
Detected Texts for 112/18RfT51ZmG4CbSBoRLQKOSFB5J6p8Hc1XX3qosy6.jpeg: []
Processed 2 images. Latest: 112/18RfT51ZmG4CbSBoRLQKOSFB5J6p8Hc1XX3qosy6.jpeg
Processing file: 112/2ObwUbTXkBbQKOmW0xXLhf6msJCqk9G4pQDyHzpd.jpeg
Detected Logos for 112/2ObwUbTXkBbQKOmW0xXLhf6msJCqk9G4pQDyHzpd.jpeg: []
Detected Texts for 112/2ObwUbTXkBbQKOmW0xXLhf6msJCqk9G4pQDyHzpd.jpeg: ['DIET', 'TEA']
Processed 3 images. Latest: 112/2ObwUbTXkBbQKOmW0xXLhf6msJCqk9G4pQDyHzpd.jpeg
Processing file: 112/3pci4ANxHOGByqVp5xHuNUAjkJLUqnRIcBPTX237.jpeg
Detected Logos for 112/3pci4ANxHOGByqVp5xHuNUAjkJLUqnRIcBPTX237.jpeg: []
Detected Texts for 112/3pci4

ServiceUnavailable: 503 502:Bad Gateway

In [89]:
import pandas as pd

# Load the CSV file
df = pd.read_csv('olm-pics-s3-results.csv')

# Assuming 'logos' column contains the comma-separated logos
class_id_map = {}
for logos in df['logos'].dropna():
    for logo in logos.split(', '):
        if logo not in class_id_map:
            class_id_map[logo] = len(class_id_map)

# Output the mapping to a new file or print it
with open('recovered_classes.txt', 'w') as file:
    for logo, id_ in class_id_map.items():
        file.write(f"{id_}: {logo}\n")

print("Recovered class ID map is saved as 'recovered_classes.txt'.")

Recovered class ID map is saved as 'recovered_classes.txt'.


In [18]:
all_logos = df['logos'].dropna().str.cat(sep=', ').split(', ')

pd.DataFrame(all_logos).value_counts().to_csv('logo_counts.csv', header=['count'])

In [20]:
all_txt = df['texts'].dropna().str.cat(sep=', ').split(', ')

pd.DataFrame(all_txt).value_counts().to_csv('txt_counts.csv', header=['count'])

In [27]:
logo_words = set(word.lower() for logo in unique_logos for word in logo.split())

# Now, define a function that filters words based on their presence in the 'logo_words' set.
def filter_texts(text):
    # Split the text into words and filter them.
    filtered_words = [word for word in text.split() if word.lower() in logo_words]
    # Join the words back into a single string.
    return ' '.join(filtered_words)

# Apply this function to the 'texts' column of the dataframe.
df['texts'] = df['texts'].dropna().apply(filter_texts)

Unnamed: 0,s3_path,logos,texts
0,s3://olm-pics-s3/1/tTyxAD9TsRwRm6yHD8TZrmin76z...,"Craftsman, Marketside",
1,s3://olm-pics-s3/112/18RfT51ZmG4CbSBoRLQKOSFB5...,,
2,s3://olm-pics-s3/112/2ObwUbTXkBbQKOmW0xXLhf6ms...,,TEA
3,s3://olm-pics-s3/112/3pci4ANxHOGByqVp5xHuNUAjk...,,
4,s3://olm-pics-s3/112/3q5V6dsu75SItwjaXdJ7bvODc...,,
...,...,...,...
22103,s3://olm-pics-s3/2022/07/05/V4fUcScIgkUm89yq0M...,,
22104,s3://olm-pics-s3/2022/07/05/VETa5Gdd7U2ghpFLlZ...,,
22105,s3://olm-pics-s3/2022/07/05/VVK6X8vx6OmjbPuhy3...,,
22106,s3://olm-pics-s3/2022/07/05/WfI8uHNKYyjcMAD4LN...,,


In [95]:
import os
import yaml
import unidecode  # for handling accents and special characters

def normalize_class_name(name):
    name = unidecode.unidecode(name)
    name = name.replace('-', ' ').replace('_', ' ')
    name = ''.join(c for c in name if c.isalnum() or c.isspace()).lower()
    return ' '.join(name.split())

def class_match(google_class, roboflow_classes):
    google_class = normalize_class_name(google_class)
    for roboflow_class in roboflow_classes:
        if google_class in normalize_class_name(roboflow_class) or normalize_class_name(roboflow_class) in google_class:
            return roboflow_classes.index(roboflow_class)
    return None

def load_class_mapping(file_path):
    with open(file_path, 'r') as file:
        class_mapping = {}
        for line in file:
            # Split the line at the first colon and consider the rest as the brand name
            parts = line.strip().split(': ', 1)
            if len(parts) == 2:
                key, value = parts
                class_mapping[int(key)] = value.strip()
        return class_mapping

def load_roboflow_classes(file_path):
    with open(file_path, 'r') as file:
        return yaml.load(file, Loader=yaml.FullLoader)['names']

def process_annotations(google_path, roboflow_path, google_to_roboflow, roboflow_classes):
    for annotation in os.listdir(google_path):
        google_file_path = os.path.join(google_path, annotation)
        roboflow_file_path = os.path.join(roboflow_path, annotation)
        if not os.path.exists(roboflow_file_path):
            with open(google_file_path, 'rb') as file:
                encoding = chardet.detect(file.read(10000))['encoding']

            with open(google_file_path, 'r', encoding=encoding) as google_file:
                google_lines = google_file.readlines()

            updated_roboflow_lines = []

            for line in google_lines:
                parts = line.split()
                if len(parts) == 6 and float(parts[-1]) > 0.7:
                    class_id, *bbox = map(float, parts[:-1])
                    google_class = google_to_roboflow.get(int(class_id))
                    adjusted_class_id = class_match(google_class, roboflow_classes)
                    if adjusted_class_id is not None:
                        updated_roboflow_lines.append((adjusted_class_id, *bbox))

            with open(roboflow_file_path, 'w') as file:
                for line in updated_roboflow_lines:
                    file.write(' '.join(map(str, line)) + '\n')

def main():
    google_path = './google_api_data/logo_annotations'
    roboflow_path = './google_api_data/Capstone_OLM_Logo_Recognition.v4i.yolov8/train/labels'
    classes_path = './google_api_data/classes.txt'
    data_yaml_path = './google_api_data/Capstone_OLM_Logo_Recognition.v4i.yolov8/data.yaml'
    
    google_to_roboflow = load_class_mapping(classes_path)
    roboflow_classes = load_roboflow_classes(data_yaml_path)
    
    process_annotations(google_path, roboflow_path, google_to_roboflow, roboflow_classes)

if __name__ == '__main__':
    main()

In [96]:
for annotation in os.listdir('./google_api_data/Capstone_OLM_Logo_Recognition.v4i.yolov8/train/labels'):
    with open(f'./google_api_data/Capstone_OLM_Logo_Recognition.v4i.yolov8/train/labels/{annotation}', 'r') as file:
        lines = file.readlines()
        if not lines:
            os.remove(f'./google_api_data/Capstone_OLM_Logo_Recognition.v4i.yolov8/train/labels/{annotation}')