# Image processing pipeline with Google Cloud Storage python client

In [1]:
cd ../../../../Apps/Python/cams-rio

C:\Users\luisr\Desktop\Repositories\Apps\Python\cams-rio


### Write and upload videos from images - Custom class

In [31]:
import os, cv2
from tempfile import NamedTemporaryFile
from modules.googlecloudstorage import GCS

class Video:
    def __init__(self, shape:str='auto', fps:int=3, ext:str='.mp4', codec:str='mp4v', skip=None, gcs=None):
        self.shape = shape
        self.fps = fps
        self.ext = ext
        self.codec = codec
        self.skip = skip
        self.gcs = gcs
                
    def write(self, frames:list, path:str):
        if type(self.shape) is str and self.shape == 'auto':
            height, width, _ = frames[0].shape
            shape = (width, height) 
        else: shape = self.shape
        fourcc = cv2.VideoWriter_fourcc(*self.codec)
        writer = cv2.VideoWriter(path, fourcc, self.fps, shape)
        for frame in frames: writer.write(frame)
        cv2.destroyAllWindows(); writer.release()

    def read_images_from_folder(self, folder:str, ext:str='.jpg'):
        '''
        folder: full path to folder containing images.
        '''
        stamp_files = [path for path in os.listdir(folder) if path.endswith(ext)]
        stamp_images = []
        for stamp_file in stamp_files:
            stamp_path = f'{folder}/{stamp_file}'
            image = cv2.imread(stamp_path)
            if image is None:
                print(f'IMAGE READ FAILED. FILE: {stamp_file} · FOLDER: {folder}')
                continue
            if self.skip is not None and self.skip(image): continue
            stamp_images.append(image)
        return stamp_images

    def upload(self, frames:list, blob_name:str, bucket_name:str, content_type:str='video/mp4', overwrite:bool=False):
        if not len(frames):
            print(f'VIDEO UPLOAD FAILED. EMPTY FRAME LIST PROVIDED. BLOB: {blob_name} · BUCKET: {bucket_name}')
            return False
        if not overwrite and self.gcs.is_blob_in_bucket(blob_name, bucket_name):
            print(f'VIDEO UPLOAD FAILED. BLOB ALREADY EXISTS. BLOB: {blob_name} · BUCKET: {bucket_name}')
            return False
        with NamedTemporaryFile() as temp:
            tname = f"{temp.name}{self.ext}"
            self.write(frames, tname)
            self.gcs.upload_from_filename(tname, blob_name, bucket_name, content_type, overwrite=True)
        print(f'VIDEO UPLOAD SUCCESS. BLOB: {blob_name} · BUCKET: {bucket_name}')            
        return True
        
    def upload_from_images(self, folder:str, blob_name:str, bucket_name:str, ext:str='.jpg', content_type:str='video/mp4', overwrite:bool=False):
        if not overwrite and self.gcs.is_blob_in_bucket(blob_name, bucket_name):
            print(f'VIDEO UPLOAD FAILED. BLOB ALREADY EXISTS. BLOB: {blob_name} · BUCKET: {bucket_name}')
            return False
        return self.upload(self.read_images_from_folder(folder, ext), blob_name, bucket_name, content_type, overwrite=True)


---

# Images processing pipeline

### 1. Download blob files to folder

In [3]:
from IPython.display import clear_output as co

#### Google Cloud Storage module

In [4]:
from modules.googlecloudstorage import GCS

sa_json = 'auth/octacity-iduff.json' # 'auth/pluvia-sa.json'
user_project = 'octacity'

gcs = GCS(sa_json, user_project)

#### List blobs in blob

In [8]:
bucket_name = 'city-camera-images'
prefix = 'rain/'
delimiter = None
ext = '.jpg'

blobs = gcs.list_blobs(bucket_name, prefix, delimiter)

blobs_ext = []
folders = []
for i, blob_name in enumerate(blobs):
    if i % 500 == 0: print(); print(f'Blobs Searched: {i+1}/{len(blobs)}'); co(True)
    if blob_name.endswith(ext): blobs_ext.append(blob_name)
    if blob_name.endswith('/'): folders.append(blob_name)

print(); print(f'Blobs Query: {len(blobs_ext)}/{len(blobs)}')


Blobs Query: 173601/191452


#### File count per folder

    file_count = {
        'pics': '756982',
        'rain': '173601',
    }

#### Download blobs in blob to folder

In [7]:
folder = '../../../Dados/Downloads'
bucket_name = 'city-camera-images'
prefix = 'test/'
delimiter = None

pipeline_download = [
#     'auto/flood',
#     'flood',
#     'manual',
    'rain', # missing
#     'pics', # missing
#     'test',
    
]

for prefix in pipeline_download: gcs.download_to_folder(
    folder, bucket_name, prefix, delimiter, 
    overwrite=False, report_freq=10, skip=82000
)


PREFIX: rain · RUNNING: 145.7 min · RATE: 0.0955 s/file · FINISH-ESTIMATE: 0.0 min · PROGRESS: 173600/173601 · DOWNLOADS: 91324/91601


### 2. Buckets Image and video files processing pipeline

- Read files from disk
- Convert images to video
- Upload to cloud storage bucket

#### Computer vision method settings (Image Similarity)

In [8]:
from modules.image_similarity import similarity_classifier

baseimgs = ['static/gabaritos/cam.jpg', 'static/gabaritos/dark.jpg']

p = 0.05
clf_diff = similarity_classifier(baseimgs, p)

clf_diff.predict_any

  from pandas.core.computation.check import NUMEXPR_INSTALLED


<bound method similarity_classifier.predict_any of <modules.image_similarity.similarity_classifier object at 0x000001A86CCDF5E0>>

#### Set pipeline parameters

In [34]:
import pandas as pd
from time import time

cameras = pd.read_csv('static/city/cameras.csv')
cameras['Codigo'] = cameras['Codigo'].astype(int)
cameras['cluster_id'] = cameras['cluster_id'].astype(int)
cameras.set_index('Codigo', inplace=True)


gcs = GCS('auth/octacity-iduff.json', 'octacity')

video = Video(skip=clf_diff.predict_any, gcs=gcs)

path = '../../../Dados/Downloads'
to_bucket_name = 'flood-video-collection'

pipeline_config = [{ 
#     'event_type': 'auto/flood',
#     'bucket_folder': 'polygons/flood',
#     'blob_type': 'polygon',
#     'upload_from': 'video_file',
# }, {
#     'event_type': 'flood',
#     'bucket_folder': 'polygons/flood',
#     'blob_type': 'polygon',
#     'upload_from': 'image_folder',
# }, {
#     'event_type': 'manual/2023-08-02',
#     'bucket_folder': 'polygons/manual',
#     'blob_type': 'polygon',
#     'upload_from': 'image_folder',
# }, {
    'event_type': 'rain', # missing
    'bucket_folder': 'rain',
    'blob_type': 'camera',
    'upload_from': 'image_folder',
    'skip': 0,
    'overwrite': True,
# }, {
#     'event_type': 'pics', # missing
#     'bucket_folder': 'polygons/flood',
#     'blob_type': 'polygon',
#     'upload_from': 'image_folder',
}]

# obs. 'event_type' values 'auto/rain', 'comando' and 'waze' were transferred to new bucket manually.

#### Loop through pipeline stages

In [35]:
start = time()
for i, config in enumerate(pipeline_config):

    event_type = config['event_type'] # Get 'event_type' folder config
    bucket_folder = config['bucket_folder']
    
    event_folder = f'{path}/{event_type}' # Get 'event_type' folder
    event_codes = os.listdir(event_folder)

    if 'overwrite' not in config: config['overwrite'] = False
    if 'skip' not in config: config['skip'] = 0
    skip = config['skip']
    overwrite = config['overwrite']
    
    n = len(event_codes) - skip
    done = 0
    for j, code in enumerate(event_codes): # Get 'event_type' code
        if 'skip' in config and j < skip: continue
        if config['blob_type'] == 'polygon':
            polygon_id = cameras.loc[int(code), 'cluster_id']
            
        code_folder = f'{event_folder}/{code}' # Get 'code' folder
        code_stamps = os.listdir(code_folder)
        
        for k, stamp in enumerate(code_stamps): # get 'code' timestamp

            if 'blob_type' not in config:
                config['blob_type'] = 'camera'
            if 'upload_from' not in config:
                config['upload_from'] = 'video_file'

            stamp_folder = f'{code_folder}/{stamp}'
            to_blob_name =  f'{bucket_folder}'
            if config['blob_type'] == 'polygon': to_blob_name += f'/{polygon_id}'
            to_blob_name += f'/{code}'
            to_blob_name += f'/{stamp}' if config['upload_from'] == 'video_file' else f'/CODE{code} {stamp}.mp4'

            co(True)
            if config['upload_from'] == 'video_file':
                status = video.gcs.upload_from_filename(stamp_folder, to_blob_name, to_bucket_name, content_type='video/mp4', overwrite=overwrite)
            elif config['upload_from'] == 'image_folder':
                status = video.upload_from_images(
                    stamp_folder, to_blob_name, to_bucket_name,
                    ext='.jpg', content_type='video/mp4', overwrite=overwrite
                )
                
            done += 1; left = n - done
            running = time() - start
            rate = running / done
            expect_total = rate * n
            expect_finish = rate * left
            running_min = round(running / 60, 1)
            expect_total_min = round(expect_total / 60, 1)
            expect_finish_min = round(expect_finish / 60, 1)
            print(); print(f'- PIPELINE: {config["event_type"]} · BLOB: {to_blob_name}')
            print(); print(f'- STAGE: {i+1}/{len(pipeline_config)} · CODES: {done}/{n} · FILES: {k+1}/{len(code_stamps)}')
            print(); print(f'- RUNNING: {running_min} / {expect_total_min} min · EXPECT-FINISH: {expect_finish_min} min')

KeyboardInterrupt: 

### 3. Clean up storage

#### Delete blobs that match provided extension

In [77]:
bucket_name = 'flood-video-collection'
prefix = 'rain/'
delimiter = None

ext = '.mp4.mp4'
# pattern = 'CODE'

blobs = gcs.list_blobs(bucket_name, prefix, delimiter)

blobs_ext = []
for blob_name in blobs:
    
    if blob_name.endswith(ext):
        blobs_ext.append(blob_name)
#     if len(blob_name.split(pattern)) - 1 == 2:
#         blobs_ext.append(blob_name)

print('Blobs:', len(blobs), '· Query:', len(blobs_ext))

bucket = gcs.get_bucket(bucket_name)

deleted = 0
for i, blob in enumerate(bucket.list_blobs(prefix=prefix, delimiter=delimiter)):

    if blob.name.endswith(ext):
        blob.delete(); deleted += 1
#     if len(blob.name.split(pattern)) - 1 == 2:
#         blob.delete(); deleted += 1

    co(True); print(f'Iteration: {i+1}/{len(blobs)} · Deleted: {deleted}/{len(blobs_ext)}')

Iteration: 18/18 · Deleted: 0/0


#### Check result

In [66]:
blobs = gcs.list_blobs(bucket_name, prefix, delimiter)

blobs_ext = []
for blob_name in blobs:
    if blob_name.endswith(ext): blobs_ext.append(blob_name)
#     if len(blob_name.split(pattern)) - 1 == 2: blobs_ext.append(blob_name)

print(f'Blobs: {len(blobs)} · Query: {len(blobs_ext)}')

Blobs: 577 · Query: 0
