In [41]:
import pathlib
from typing import Iterator

import numpy as np
from bing_image_downloader import downloader
import hashlib
import cv2
import re

print(pathlib.Path.cwd())


OUTPUT_PATH = pathlib.Path("../../data/bronze").resolve()
OUTPUT_PATH

/home/paolo/git/wild-boar-detection/src/dataset_collecion


PosixPath('/home/paolo/git/wild-boar-detection/data/bronze')

In [42]:
def download_images() -> None:
    for search_term in {"wild boar", "wild boar night", "cinghiale", "cinghiale notte"}:
        downloader.download(search_term, limit=1_000,  output_dir=OUTPUT_PATH, adult_filter_off=True, force_replace=False, timeout=60, verbose=False)


def delete_duplicate_images() -> None:
    found_duplicates = False
    while True:
        images_hash = {}
        for image in OUTPUT_PATH.rglob("*.jpg"):

            sha256 = hashlib.sha256()

            try:
                with open(image, "rb") as f:
                    while True:
                        data = f.read(65536) # arbitrary number to reduce RAM usage
                        if not data:
                            break
                        sha256.update(data)
            except Exception as e:
                continue

            hash_value = sha256.hexdigest()
            if hash_value not in images_hash:
                images_hash[hash_value] = image
            else:
                print(f"Image already present in hash table. {image}")
                image.unlink(missing_ok=True)
                found_duplicates = True
        if not found_duplicates:
            break
        else:
            found_duplicates = False

In [43]:
def extract_frames_from_videos(paths: Iterator[pathlib.Path]) -> None:
    for path in paths:
        print(path)
        vidcap = cv2.VideoCapture(str(path))
        frame_counter = 1
        while True:
            vidcap.set(cv2.CAP_PROP_POS_MSEC, (1000*frame_counter))    # added this line 
            success, image = vidcap.read()

            if not success:
                break
            
            output_dir: pathlib.Path = path.parent / "images"
            output_dir.mkdir(parents=True, exist_ok=True)
        
            output_name = str(output_dir / f"{path.name.lower()}_frame_{frame_counter}.jpg")
            cv2.imwrite(output_name, image)
            
            frame_counter += 1

# extract_frames_from_videos(pathlib.Path(OUTPUT_PATH / "youtube_playlist" / "wild_boar").rglob("*.mp4"))

## Other animals

In [44]:
def extract_frames_from_videos(paths: Iterator[pathlib.Path], is_other_animal: bool = False) -> None:
    animal_dir = "wild_boar" if not is_other_animal else "other_animals"

    output_dir: pathlib.Path = pathlib.Path(f"../../data/bronze/images/{animal_dir}").resolve()
    output_dir.mkdir(parents=True, exist_ok=True)
    
    print(f"output_dir {output_dir}")

    for path in paths:
        print(f"path: {path}")
        vidcap = cv2.VideoCapture(str(path))
        frame_counter = 1
        while True:
            vidcap.set(cv2.CAP_PROP_POS_MSEC, (1000*frame_counter))    # added this line 
            success, image = vidcap.read()

            if not success:
                break

            output_name = re.sub(r"\W", "_", f"{path.name.lower()}_frame_{frame_counter}")
            output_name = f"{output_name}.jpg"            
            print(str(output_dir / output_name))
            
            if (output_dir/output_name).exists():
                continue
            
            cv2.imwrite(str(output_dir / output_name), image)

            frame_counter += 1

# extract_frames_from_videos(pathlib.Path(OUTPUT_PATH / "youtube_playlist" / "other_animals").rglob("*"), is_other_animal=True)

In [46]:
import pandas as pd

dataframe = pd.DataFrame(data={"path": ["/".join(x.parts[5:]) for x in list(pathlib.Path(OUTPUT_PATH).rglob("*.jpg"))]})
dataframe["target"] = dataframe["path"].apply(lambda x: 1 if "wild_boar" in x else 0)

In [49]:
dataframe.to_parquet("/home/paolo/git/wild-boar-detection/data/bronze/data.parquet")