In [29]:
import csv
import json
import urllib
import os

from ipykernel import get_connection_file

from google.cloud import storage
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud import aiplatform

In [30]:
gcs_bucket = "jkwng-vi-api-files"
video_bucket_prefix = "input"
dataset_bucket_prefix = "datasets"
dataset_eval_prefix = "eval"
local_datasets_prefix = "datasets"
local_eval_prefix = "eval"
output_prediction_prefix = "prediction"

neg_input_csv_file = 'auto_ml_video_classifier/not_talking_to_camera - not_talking_to_camera.csv'
pos_input_csv_file = 'auto_ml_video_classifier/Talking_to_camera - Talking_to_camera.csv'

print(f"positive examples: {pos_input_csv_file}")
print(f"negative examples: {neg_input_csv_file}")
print(f"input path: gs://{gcs_bucket}/{video_bucket_prefix}")
print(f"datasets path: gs://{gcs_bucket}/{dataset_bucket_prefix}")
print(f"eval path : gs://{gcs_bucket}/{dataset_eval_prefix}")

current_workdir = os.getcwd()

print(f"local dataset dir: {current_workdir}/{local_datasets_prefix}/")
print(f"local eval dir: {current_workdir}/{local_eval_prefix}/")

try:
    os.mkdir(f"{current_workdir}/{local_datasets_prefix}")
except FileExistsError:
    pass

try:
    os.mkdir(f"{current_workdir}/{local_eval_prefix}")
except FileExistsError:
    pass

positive examples: auto_ml_video_classifier/Talking_to_camera - Talking_to_camera.csv
negative examples: auto_ml_video_classifier/not_talking_to_camera - not_talking_to_camera.csv
input path: gs://jkwng-vi-api-files/input
datasets path: gs://jkwng-vi-api-files/datasets
eval path : gs://jkwng-vi-api-files/eval
local dataset dir: /home/jupyter/auto_ml_video_classifier/datasets/
local eval dir: /home/jupyter/auto_ml_video_classifier/eval/


In [11]:
storage_client = storage.Client()
bucket = storage_client.bucket(gcs_bucket)

url_downloader = urllib.request.URLopener()

#blob = bucket.blob("test_blob")

# Mode can be specified as wb/rb for bytes mode.
# See: https://docs.python.org/3/library/io.html
#with blob.open("w") as f:
#    f.write("Hello world")
#
#with blob.open("r") as f:
#    print(f.read())



def num_rows(filename):
    with open(filename) as csvfile:
        csvreader = csv.DictReader(csvfile, quotechar='"')
        num_rows = len(list(csvreader))
        return num_rows

def write_blob_to_gcs(url, gcs_path):
    blob_name = url.split('/')[-1]
    #print(f"{url}, basename: {blob_name}, gcs_path: {gcs_path}")
    blob = bucket.blob(f"{video_bucket_prefix}/{blob_name}")
    if blob.exists():
        # print(f"blob exists")
        return

    # print(f"blob does not exist, downloading")
    url_downloader.retrieve(url, f"/tmp/{blob_name}")
    blob.upload_from_filename(f"/tmp/{blob_name}", retry=DEFAULT_RETRY)
    # print(f"blob uploaded")
    os.remove(f"/tmp/{blob_name}")

def upload_local_file_to_gcs(filename, gcs_path):
    obj = bucket.blob(gcs_path)
    obj.upload_from_filename(filename, retry=DEFAULT_RETRY)
    #os.remove(filename)
    print(f"wrote local file {filename} to GCS: {gcs_path}")

def write_training_dataset_record(filename, gcs_path, label, use):

    # generate the jsonl
#        {
#            "videoGcsUri": "gs://bucket/filename.ext",
#            "timeSegmentAnnotations": [{
#                "displayName": "LABEL",
#                "startTime": "start_time_of_segment",
#                "endTime": "end_time_of_segment"
#            }],
#            "dataItemResourceLabels": {
#                "aiplatform.googleapis.com/ml_use": "train|test"
#            }
#        }

    obj = {
        "videoGcsUri": gcs_path,
        "timeSegmentAnnotations": [{
            "displayName": label,
            "startTime": "0s",
            "endTime": "Infinity",
        }],
    }

    with open(filename, "a") as f:
        f.write(f"{json.dumps(obj)}\n")
        f.close()

def write_eval_dataset_record(gcs_path, filename):
    # write a file we can use to start a batch training job
    obj2 = {
        'content': gcs_path, 
        'mimeType': 'video/mp4', 
        'timeSegmentStart': '0s', 
        'timeSegmentEnd': 'Infinity'
    }

    with open(filename, "a") as f:
        f.write(f"{json.dumps(obj2)}\n")
        f.close()


1. read csv files
2. download and upload the files to input bucket if they don't exist
3. create jsonl file with positive and negative examples
4. write json files to GCS
5. write eval datasets to GCS

In [93]:

num_neg = num_rows(neg_input_csv_file)
num_pos = num_rows(pos_input_csv_file)

print(f"number of rows talking_to_camera: {num_pos}")
print(f"number of rows not_talking_to_camera: {num_neg}")



number of rows talking_to_camera: 388
number of rows not_talking_to_camera: 500


- TODO: use VI API shot detection to split up files by shot
- TODO: if we can classify talking/not_talking by shot, it may improve model accuracy

In [13]:
multi_label_dataset = {
    "train": f"{local_datasets_prefix}/vi_api_multi_label_train.jsonl",
    "test": f"{local_datasets_prefix}/vi_api_multi_label_test.jsonl",
}


ttc_dataset = {
    "pos": f"{local_datasets_prefix}/vi_api_out_ttc_pos.jsonl",
    "neg": f"{local_datasets_prefix}/vi_api_out_ttc_neg.jsonl"
}
ttc_eval = {
    "pos": f"{local_eval_prefix}/vi_api_eval_ttc_pos.jsonl",
    "neg": f"{local_eval_prefix}/vi_api_eval_ttc_neg.jsonl"
}

In [99]:
from collections import defaultdict

- split the dataset into the following:
  - train 60%
  - test 20%
  - validate 20%

In [107]:
# reset everything
all_content_types = defaultdict(list)

for filename in os.listdir(local_datasets_prefix):
    if not filename.endswith("jsonl"):
        continue
        
    print(f"removing: {local_datasets_prefix}/{filename}")
    os.remove(f"{local_datasets_prefix}/{filename}")
    
for filename in os.listdir(local_eval_prefix):
    if not filename.endswith("jsonl"):
        continue
    
    print(f"removing: {local_eval_prefix}/{filename}")
    os.remove(f"{local_eval_prefix}/{filename}")


In [108]:
with open(neg_input_csv_file) as csvfile:
    csvreader = csv.DictReader(csvfile, quotechar='"')
    rowcount = 0
    train_rowcount = 0
    eval_rowcount = 0
    
    for row in csvreader:
        # write a line to the training output:
        # train 60%
        # test 20%
        # validate 20%
        if rowcount < num_neg * 0.6:
            use = "train"
        elif rowcount < num_neg * 0.8:
            use = "test"
        else:
            use = "evaluate"

        # get the basename
        blob_name = row['preview_url'].split('/')[-1]
        gcs_path = f"gs://{gcs_bucket}/{video_bucket_prefix}/{blob_name}"
        
        # get the blob
        write_blob_to_gcs(row['preview_url'], gcs_path)

        content_types = set(map(lambda x: x.replace("{", "").replace("}", "").replace("\"", "").replace(" ", "_").lower(), row['content_types'].split(",")))
        content_types.discard("")
        for content_type in content_types:
            all_content_types[content_type].append(gcs_path)

        if use == "evaluate":
            eval_rowcount += 1
            write_eval_dataset_record(gcs_path, ttc_eval["neg"])
        else:
            train_rowcount += 1
            write_training_dataset_record(ttc_dataset["neg"], gcs_path, "not_talking_to_camera", use)

        rowcount += 1
        
print(f"total {rowcount} negative records; train: {train_rowcount}, eval: {eval_rowcount}")

total 500 negative records; train: 400, eval: 100


In [109]:
with open(pos_input_csv_file ) as csvfile:
    csvreader = csv.DictReader(csvfile, quotechar='"')

    rowcount = 0
    train_rowcount = 0
    eval_rowcount = 0
    
    for row in csvreader:
        if rowcount < num_pos * 0.6:
            use = "train"
        elif rowcount < num_pos * 0.8:
            use = "test"
        else:
            use = "evaluate"

        # get the basename
        blob_name = row['preview_url'].split('/')[-1]
        gcs_path = f"gs://{gcs_bucket}/{video_bucket_prefix}/{blob_name}"
        
        # get the blob
        write_blob_to_gcs(row['preview_url'], gcs_path)

        content_types = set(map(lambda x: x.replace("{", "").replace("}", "").replace("\"", "").replace(" ", "_").lower(), row['content_types'].split(",")))
        content_types.discard("")
        for content_type in content_types:
            all_content_types[content_type].append(gcs_path)

        # write a line to the training output:
        # train 60%
        # test 20%
        # validate 20%

        if use == "evaluate":
            eval_rowcount += 1
            write_eval_dataset_record(gcs_path, ttc_eval["pos"])
        else:
            train_rowcount += 1
            write_training_dataset_record(ttc_dataset["pos"], gcs_path, "talking_to_camera", use)

        rowcount += 1

print(f"total {rowcount} positive records; train: {train_rowcount}, eval: {eval_rowcount}")

total 388 positive records; train: 311, eval: 77


In [110]:
#for content_type, itemlist in all_content_types.items():
#    print(f"{content_type}: {len(itemlist)}")

total_records_train = 0
total_records_eval = 0

for content_type, itemlist in all_content_types.items():
    train_rowcount = 0
    eval_rowcount = 0
    
    rowcount = 0
    for row in itemlist:
        if rowcount < len(itemlist) * 0.6:
            use = "train"
        elif rowcount < len(itemlist) * 0.8:
            use = "test"
        else:
            use = "evaluate"
        
        gcs_path = row
        
        if use == "evaluate":
            eval_rowcount += 1
            write_eval_dataset_record(gcs_path, f"{local_eval_prefix}/vi_api_eval_{content_type}.jsonl")
        else:
            train_rowcount += 1
            write_training_dataset_record(multi_label_dataset[use], gcs_path, content_type, use)
        
        rowcount += 1
        
        
    print(f"content_type: {content_type}, total: {rowcount},  train: {train_rowcount}, eval: {eval_rowcount}")
    
    total_records_train += train_rowcount
    total_records_eval += eval_rowcount
    
print(f"total records: train: {total_records_train}, eval: {total_records_eval}")
    

content_type: recipe, total: 50,  train: 40, eval: 10
content_type: voiceover, total: 274,  train: 220, eval: 54
content_type: texture, total: 77,  train: 62, eval: 15
content_type: educational, total: 266,  train: 213, eval: 53
content_type: product_focus, total: 557,  train: 446, eval: 111
content_type: aesthetic, total: 292,  train: 234, eval: 58
content_type: trend, total: 158,  train: 127, eval: 31
content_type: lifestyle, total: 434,  train: 348, eval: 86
content_type: unboxing, total: 115,  train: 92, eval: 23
content_type: asmr, total: 26,  train: 21, eval: 5
content_type: humor, total: 74,  train: 60, eval: 14
content_type: talking_to_camera, total: 388,  train: 311, eval: 77
total records: train: 2174, eval: 537


In [111]:
for dataset in multi_label_dataset.values():
    upload_local_file_to_gcs(dataset, f"{dataset_bucket_prefix}/{os.path.basename(dataset)}")

#print(f"all content types: {all_content_types}")
for content_type in all_content_types:
    upload_local_file_to_gcs(f"{local_eval_prefix}/vi_api_eval_{content_type}.jsonl", f"{dataset_eval_prefix}/vi_api_eval_{content_type}.jsonl")

for dataset in ttc_dataset.values():
    upload_local_file_to_gcs(dataset, f"{dataset_bucket_prefix}/{os.path.basename(dataset)}")

for file in ttc_eval.values():
    upload_local_file_to_gcs(file, f"{dataset_eval_prefix}/{file}")

wrote local file datasets/vi_api_multi_label_train.jsonl to GCS: datasets/vi_api_multi_label_train.jsonl
wrote local file datasets/vi_api_multi_label_test.jsonl to GCS: datasets/vi_api_multi_label_test.jsonl
wrote local file eval/vi_api_eval_recipe.jsonl to GCS: eval/vi_api_eval_recipe.jsonl
wrote local file eval/vi_api_eval_voiceover.jsonl to GCS: eval/vi_api_eval_voiceover.jsonl
wrote local file eval/vi_api_eval_texture.jsonl to GCS: eval/vi_api_eval_texture.jsonl
wrote local file eval/vi_api_eval_educational.jsonl to GCS: eval/vi_api_eval_educational.jsonl
wrote local file eval/vi_api_eval_product_focus.jsonl to GCS: eval/vi_api_eval_product_focus.jsonl
wrote local file eval/vi_api_eval_aesthetic.jsonl to GCS: eval/vi_api_eval_aesthetic.jsonl
wrote local file eval/vi_api_eval_trend.jsonl to GCS: eval/vi_api_eval_trend.jsonl
wrote local file eval/vi_api_eval_lifestyle.jsonl to GCS: eval/vi_api_eval_lifestyle.jsonl
wrote local file eval/vi_api_eval_unboxing.jsonl to GCS: eval/vi_api_e

In [6]:
BUCKET_URI = f"gs://{gcs_bucket}/staging"  # @param {type:"string"}
from google.cloud import aiplatform

aiplatform.init()
#aiplatform_v1.init()

In [116]:
# create the talking_to_camera dataset

ttc_dataset = aiplatform.VideoDataset.create(
    display_name="talking_to_camera_v2",
    gcs_source=[
        f"gs://{gcs_bucket}/{dataset_bucket_prefix}/{os.path.basename(ttc_dataset['pos'])}",
        f"gs://{gcs_bucket}/{dataset_bucket_prefix}/{os.path.basename(ttc_dataset['neg'])}",
    ],
    import_schema_uri=aiplatform.schema.dataset.ioformat.video.classification,
)

print(dataset.resource_name)

projects/205512073711/locations/us-central1/datasets/5040254760213544960


In [117]:
# create multi-label training dataset

multi_label_gcs_sources = []
for dataset in multi_label_dataset.values():
    multi_label_gcs_sources.append(f"gs://{gcs_bucket}/{dataset_bucket_prefix}/{os.path.basename(dataset)}")

multi_label_dataset = aiplatform.VideoDataset.create(
    display_name="multi_label_classifier_v2",
    gcs_source=multi_label_gcs_sources,
    import_schema_uri=aiplatform.schema.dataset.ioformat.video.classification,
)

print(dataset.resource_name)

AttributeError: 'str' object has no attribute 'resource_name'

In [115]:
# create the training job for ttc

ttc_job = aiplatform.AutoMLVideoTrainingJob(
    display_name="ttc_training_job",
    prediction_type="classification",
)

print(job)

<google.cloud.aiplatform.training_jobs.AutoMLVideoTrainingJob object at 0x7fa2e6899f00>


In [None]:
model = ttc_job.run(
    dataset=ttc_dataset,
    model_display_name="talking_to_camera",
    training_fraction_split=0.8,
    test_fraction_split=0.2,
)

Run a batch prediction on one record

In [28]:
# create a batch prediction
ttc_model = aiplatform.Model("talking_to_camera_v1")
print(f"{ttc_model}")

# load the eval file with a positive prediction
eval_file = bucket.blob(f"{dataset_eval_prefix}/{ttc_eval['pos']}")
blobreader = storage.fileio.BlobReader(eval_file)
print(blobreader.tell())

line = blobreader.readline().decode('utf-8')
obj = json.loads(line)
print(f"{obj}")

prediction_input = bucket.blob(f"prediction/test_prediction.jsonl")
blobwriter = storage.fileio.BlobWriter(prediction_input)
blobwriter.write(json.dumps(obj).encode())
blobwriter.close()


job = ttc_model.batch_predict(
    job_display_name="talking_to_camera_pos",
    gcs_source=f"gs://{gcs_bucket}/prediction/test_prediction.jsonl",
    gcs_destination_prefix=f"gs://{gcs_bucket}/{output_prediction_prefix}",
    sync=True,
)



<google.cloud.aiplatform.models.Model object at 0x7faf9e53dba0> 
resource name: projects/205512073711/locations/us-central1/models/talking_to_camera_v1
0
{'content': 'gs://jkwng-vi-api-files/input/9cc7590c-fc07-48ae-b3aa-5c67de3dc95b.mp4', 'mimeType': 'video/mp4', 'timeSegmentStart': '0s', 'timeSegmentEnd': 'Infinity'}
