In [None]:
%pip install humanize

In [None]:
import sys
import os
sys.path.append("/Workspace/Users/pdacosta@integralads.com/.ide/ctx-inference_stack/src")
sys.path.append("/Workspace/Users/pdacosta@integralads.com/.ide/ctx-logoface-detector-a4cd2ffb")
os.environ["AWS_PROFILE"]="saml"

In [None]:
from compute_engine.units.generator import UnitGenerator, UnitDeclatation
from compute_engine.structures.entities import RunInfo
from compute_engine.structures.messages import MessageHeader, RunInfoMessage, EndOfComputeMessage
from compute_engine.structures.entities import FrameDetection, FrameDetections
from compute_engine.units.handler import Handler, RunVariable
from compute_engine.utils.message_capture import MessageCapture

from concurrent.futures import Future
import pyarrow.parquet as pq
import asyncio
import pyarrow as pa
import tqdm
import pandas as pd
from pyspark.sql.functions import concat, lit, col

In [None]:
n_workers = 2
model_batch_size = 16
detection_min_size_percentage = 0.01
confidence_threshold = 0.4


batch_size = 16
generate_uris = True
ignore_progress = True

dbfs_mnt_path_faces = '/dbfs/mnt/innovation/pdacosta/data/wider_face/preds_faces/xywh'
dbfs_mnt_path_uris = '/dbfs/mnt/innovation/pdacosta/data/wider_face/preds_faces/xywh/uris'

pyspark_mnt_path_faces = dbfs_mnt_path_faces.replace('/dbfs', '')
pyspark_mnt_path_uris = dbfs_mnt_path_uris.replace('/dbfs', '')

csv_mnt_path = "/dbfs/mnt/innovation/pdacosta/data/wider_face/dataset/"
pyspark_csv_mnt_path = csv_mnt_path.replace('/dbfs', '')

s3_prefix = "s3://mls.us-east-1.innovation/pdacosta/data/wider_face/dataset/"

progress_file = os.path.join(dbfs_mnt_path_uris, 'progress.txt')

In [None]:
i = 0
def gen_message(url):
    global i
    run_info = {}
    run_info["export"] = 'json'
    run_info["pipeline"] = {"mode": "semi_auto"}
    run_info["source"] = {
        "kind": "image",
        "url": url,
        "uuid": "0"
    }
    run_info["run"] = {
        "id": i
    }
    i += 1
    run_info["company"] = {"id": 0}
    run_info = RunInfo(**run_info, atomic=None)
    return run_info

In [None]:
unit_generator = UnitGenerator(model_engine="tf")
unit_generator.set_config("database_client", None)
unit_generator.set_config("bucket_name", "reminiz.production")
unit_generator.set_config("cloud_provider", "aws")
unit_generator.set_config("models_local_path", "./")
unit_generator.set_config("s3_bucket_url", "")


units = []
with unit_generator:
    input_unit = unit_generator.units.downloader()
    x = input_unit
    units.append(x)
    x @= unit_generator.units.run_frame_extractor()
    units.append(x)
    x @= unit_generator.units.frame_resizer(target_size=416)
    units.append(x)
    x @= unit_generator.units.detector(
        max_workers= n_workers,
        model_path= "models/detectors/faces/face_detector_march_20-20230116-tf/",
        batchsize= model_batch_size,
        detection_min_size_percentage= detection_min_size_percentage,
        confidence_threshold=confidence_threshold
    )

In [None]:
if generate_uris:
    
    # read the csv files with the annotations
    val_df = spark.read.csv(os.path.join(pyspark_csv_mnt_path, "val.csv"), header=True)
    
    # stack the dataframes
    df = val_df
    
    # create the correct uri for the images, we need to add the s3 prefix to the paths
    df = df.withColumn("uri", concat(lit(s3_prefix), col("path")))
    
    # filter out the images that are too small, cannot have a dimension smaller than 96 px
    df = df.filter((col("width") >= 96) & (col("height") >= 96))
    
    #keep only the columns we need
    df = df.select("asset", "uri")
    
    # read the assets already saved
    # if the file does not exist, avoid the error
    try:
        assets_saved = spark.read.parquet(pyspark_mnt_path_faces)
        assets_saved = assets_saved.select("asset").distinct()
        # filter out the assets already saved
        df = df.join(assets_saved, on="asset", how="left_anti")
    except:
        pass

    # make sure we have no duplicates
    df = df.dropDuplicates(["asset"])
    
    #save df as a csv file 
    df.write.mode("overwrite").csv(os.path.join(pyspark_mnt_path_uris, "uris.csv"), header=True)
        
else:
    
    df = spark.read.csv(os.path.join(pyspark_mnt_path_uris, "uris.csv"), header=True)
    

# collect the uris and assets into a python lists
rows = df.collect()
assets, uris = [row.asset for row in rows], [row.uri for row in rows]
df.unpersist()

print(f"Number of images to process: {len(assets)}")
print(f"Assets: {assets[:2]}")
print(f"Uris: {uris[:2]}")    

In [None]:
f = input_unit(
    run_info=gen_message(
        url=uris[0]
    )
)
f.result()

In [None]:
def save_batch(assets, uris, boxes, idx):
    
    
    data = [{"asset":asset, "uri": uri, "boxes": boxes} for asset, uri, boxes in zip(assets, uris, boxes)]
    
    df_ = pd.DataFrame(data)
    table = pa.Table.from_pandas(df_)
    parquet_filename = os.path.join(dbfs_mnt_path_faces, f"face_boxes_{str(idx).zfill(6)}.parquet")
    try:
        pq.write_table(table, parquet_filename)
        with open(progress_file, 'w') as f:
            f.write(str(idx))
    except:
        raise Exception('Could not save the parquet file')

In [None]:
if not ignore_progress:
    # read the progress index from a file
    try:
        with open(progress_file, 'r') as f:
            progress = int(f.read())
    except:
        progress = 0
else:
    progress = 0

n_uris = len(uris)
print(f'Progress: {progress} out of {n_uris} images')

In [None]:

save_uris = []
save_boxes = []
save_assets = []
for i in range(progress, n_uris, batch_size):
    batch_uris = uris[i:i+batch_size]
    batch_assets = assets[i:i+batch_size]
    # we call the model with the batch of uris
    calls = [asyncio.wrap_future(input_unit(run_info=gen_message(url=uri))) for uri in batch_uris]
    
    results = await asyncio.gather(*calls)
    
    # we need to get the boxes now
    batch_boxes = []
    for res in results:
        frame_detections = res[0]["frame_detections"]
        detections = frame_detections.detections
        boxes = []
        if detections:
            for frame_detection in detections:
                score = frame_detection.classification[0][-1]
                box = frame_detection.box
                box = transf_any_box(box, "xyxy", "xywh")
                boxes.append({"score": score, "box": box})
                
        batch_boxes.append(boxes)
    
    save_uris += batch_uris
    save_boxes += batch_boxes
    save_assets += batch_assets
    
    
    if (i + batch_size) % 20000 == 0:
        
        save_batch(save_assets, save_uris, save_boxes, i+batch_size)

        # reset the batch count, save uris, and save boxes
        save_uris = []
        save_boxes = []
        save_assets = []
        
# Handle the final batch
if save_uris:
    save_batch(save_assets, save_uris, save_boxes, n_uris)