# Data Management

## This file download dataset to local ( if necessary )

## Imports

In [3]:
import json
import numpy as np
import os
import pandas as pd
import boto3  # boto3: high-level API
import random
# import language_tool_python
import sys

sys.path.append("../")

from botocore import UNSIGNED  # botocore: lower-level API and components
from botocore.config import Config
from IPython.display import Image
from pyngrok import ngrok
from onnxruntime import InferenceSession
# from transformers import CLIPProcessor
from typing import Optional

from great_expectations.checkpoint.types.checkpoint_result import (  # type: ignore[import]
    CheckpointResult,
)

from zenml.integrations.constants import GREAT_EXPECTATIONS, SKLEARN
# from zenml.integrations.great_expectations.steps import (
#     GreatExpectationsProfilerParameters,
#     GreatExpectationsProfilerStep,
#     GreatExpectationsValidatorParameters,
#     GreatExpectationsValidatorStep,
# )
# from zenml.integrations.great_expectations.visualizers import (
#     GreatExpectationsVisualizer,
# )
from zenml.pipelines import pipeline
from zenml.steps import BaseParameters, Output, step

from zenml.steps import (
    STEP_ENVIRONMENT_NAME,
    StepEnvironment,
)
from zenml.environment import Environment
from typing import cast

from zenml.config import DockerSettings

from zenml.post_execution import get_pipeline

# tool = language_tool_python.LanguageTool('en-US')



In [4]:

import deeplake
import torchaudio
from constants import DATASET_PATH
print(DATASET_PATH)
os.environ['DEEPLAKE_DOWNLOAD_PATH'] = str(DATASET_PATH)
ds = deeplake.load("hub://activeloop/gtzan-genre", access_method='download')

/tmp/pycharm_project_834/data/downloaded


In [None]:
ARTIFACT_PATH = "../core/artifacts/"
ANN_PATH = ARTIFACT_PATH + "coco_annotations/"
CAPTIONS = ANN_PATH + "captions_train2014.json"
PREPROCESSED = ANN_PATH + "remixer-pica.json"

SIMILARITY_PATH = ARTIFACT_PATH + "coco_clip_new/"
IDXS = SIMILARITY_PATH + "okvqa_qa_line2sample_idx_train2014.json"
QUESTION_FEATURES = SIMILARITY_PATH + "coco_clip_vitb16_train2014_okvqa_question.npy"
IMG_FEATURES = SIMILARITY_PATH + "coco_clip_vitb16_train2014_okvqa_convertedidx_image.npy"

clip_processor = ARTIFACT_PATH + "transformers/openai/clip-vit-base-patch16"
clip_onnx = ARTIFACT_PATH + "onnx/clip.onnx"

In [None]:
# install packages, answering all with yes
!zenml integration install great_expectations s3 dash -y

!zenml artifact-store register remixer_s3_great_expectations \
    --flavor=s3 \
    --path=s3: // remixer-pica-zenml-greatexpectations  # Register S3 bucket allowing ZenML write access

!zenml data-validator register great_expectations \
    --flavor=great_expectations

# set as active stack
!zenml stack register remixer-great-expectations \
    -o default \
    -a remixer_s3_great_expectations \
    -dv great_expectations \
    --set

## Exploration

In [None]:
s3_bucket_name = "remixer-pica"  # objects are placed into buckets
s3_directory_path = "images"  # buckets can contain "folders" for organization
# we combine this information into a base URL format for the data:
s3_url = f"https://{s3_bucket_name}.s3.us-west-1.amazonaws.com/{s3_directory_path}"
s3_url

In [None]:
person_idx = 1
image_idx = 1
img_url = f"{s3_url}/{str(person_idx).zfill(3)}_{str(image_idx).zfill(2)}.png"
print(img_url)
Image(url=img_url, width=360)

## Local Annotation
- Note: not used since annotations are obtained with Scale.ai, but here for future use

In [None]:
username = "remixer@localhost"
password = "moonshine"

%env LABEL_STUDIO_USERNAME={username}
%env LABEL_STUDIO_PASSWORD={password}

In [None]:
config_file = ngrok.conf.DEFAULT_NGROK_CONFIG_PATH
config_file_exists = os.path.exists(config_file)
config_file_contents = !cat {config_file}

auth_token_found = config_file_exists \
                   and config_file_contents \
                   and "authtoken" in config_file_contents[0] \
                   and ": exit" not in config_file_contents  # state if interrupted
import getpass

if not auth_token_found:
    print("Enter your ngrok auth token, which can be copied from https://dashboard.ngrok.com/auth")
    !ngrok authtoken {getpass.getpass()}

In [None]:
LABEL_STUDIO_PORT = 8081
%env LABEL_STUDIO_PORT={LABEL_STUDIO_PORT}
# create ngrok tunnel for label_studio
https_tunnel = ngrok.connect(LABEL_STUDIO_PORT, bind_tls=True)
print(https_tunnel)

In [None]:
# Start Label Studio 
import conda
import subprocess

subprocess.run("conda deactivate", shell=True, check=True)

!python3 -m venv label-env
# !conda deactivate
!source label-env/bin/activate
!pip install -qqq label-studio
# !export LABEL_STUDIO_PORT=8081
!label-studio start --port=$LABEL_STUDIO_PORT

In [None]:
print(https_tunnel.public_url)
print("u:", username)
print("p:", password)

### Uploading Data

In [None]:
img_urls = []
for person_idx in range(1, 104):
    for image_idx in range(1, 13):
        img_urls.append(f"{s3_url}/{str(person_idx).zfill(3)}_{str(image_idx).zfill(2)}.png")
len(img_urls)

### Teardown

In [None]:
# Run in the terminal
import conda
!conda deactivate
!conda activate remixer

## Preprocessing

### Load Data
- Depends on Annotation Source

#### LabelStudio

#### Scale

In [None]:
# Want to convert s3_filenames -> img_urls to save to df
s3_filenames[0], img_urls[0]

In [None]:
# img_urls is sorted, so we save the sorted filenames and pull the url for the corresponding filename
ordered_filenames = sorted(s3_filenames)
ordered_filenames[0], img_urls[ordered_filenames.index(s3_filenames[0])]

In [None]:
s3_links = [img_urls[ordered_filenames.index(s3_filenames[id])] for id in range(len(img_urls))]
s3_links[0]

In [None]:
image_ids = []
question_ids = []
num_gen = 0

while num_gen < len(df):
    rand_num = random.randint(10000, 999999)
    if rand_num not in keys:
        image_ids.append(rand_num)
        question_ids.append(str(rand_num) + '5')
        num_gen += 1

print(len(df), len(image_ids), len(question_ids), image_ids[0], question_ids[0])

### Saving preprocessed data

In [None]:
train_idx = json.load(
    open(
        IDXS,
        "r",
    )
)
train_feature = np.load(QUESTION_FEATURES)
image_train_feature = np.load(
    IMG_FEATURES
)
train_feature.shape, len(train_idx), image_train_feature.shape

In [None]:
combine_ids = [str(image_id) + "<->" + str(question_id) for image_id, question_id in
               zip(list(df['image_id']), list(df['question_id']))]
num_idx = list(range(9009, 9009 + len(df)))
num_idx = list(map(str, num_idx))
idx_add = dict(zip(num_idx, combine_ids))
train_idx.update(idx_add)
len(train_idx)

In [None]:
clip_session = InferenceSession(str(clip_onnx))
clip_processor = CLIPProcessor.from_pretrained(clip_processor)

data_path = DOWNLOADED_DATA_DIRNAME / "remixer-pica/images/"
images = [str(data_path / f) for f in os.listdir(str(data_path)) if os.path.isfile(os.path.join(str(data_path), f))]
images_pil = []

for image in images:
    image_pil = Image.open(image)
    if image_pil.mode != "RGB":
        image_pil = image_pil.convert(mode="RGB")
    images_pil.append(image_pil)

inputs = clip_processor(text=list(df['question']), images=images_pil, return_tensors="np", padding=True)
outputs = clip_session.run(
    output_names=["logits_per_image", "logits_per_text", "text_embeds", "image_embeds"], input_feed=dict(inputs)
)

train_feature = np.concatenate((train_feature, outputs[2]))
image_train_feature = np.concatenate((image_train_feature, outputs[3]))
train_feature.shape, image_train_feature.shape

In [None]:
with open(IDXS, "w") as f: json.dump(train_idx, f)
with open(QUESTION_FEATURES, 'wb') as f: np.save(f, train_feature)
with open(IMG_FEATURES, 'wb') as f: np.save(f, image_train_feature)

## Data Validation

### Define ZenML Steps


In [None]:
from zenml.steps import BaseParameters, step, Output


class DataLoaderParameters(BaseParameters):
    reference_data: bool = True


@step
def importer(
        params: DataLoaderParameters,
) -> Output(dataset=pd.DataFrame, condition=bool):
    # Load labeled projects
    df = pd.read_json(PREPROCESSED)
    return df, params.reference_data

In [None]:
# instantiate a builtin Great Expectations data profiling step
ge_profiler_params = GreatExpectationsProfilerParameters(
    expectation_suite_name="remixer-pica",
    data_asset_name="remixer-pica_test_df",
)
ge_profiler_step = GreatExpectationsProfilerStep(params=ge_profiler_params)

# instantiate a builtin Great Expectations data validation step
ge_validator_params = GreatExpectationsValidatorParameters(
    expectation_suite_name="remixer-pica",
    data_asset_name="remixer-pica_test_df",
)
ge_validator_step = GreatExpectationsValidatorStep(params=ge_validator_params)

In [None]:
@step
def analyze_result(
        result: CheckpointResult,
) -> str:
    """Analyze the Great Expectations validation result and return a true/false value indicating
    whether it passed or failed."""
    step_env = cast(StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME])
    pipeline_name = step_env.pipeline_name
    pipeline_run_id = step_env.pipeline_run_id
    step_name = step_env.step_name
    pipeline_context = f"Pipeline {pipeline_name}, with run {pipeline_run_id}, in step {step_name} produced the following output:\n\n"
    if result.success:
        message = pipeline_context + "Great Expectations data validation was successful!"
    else:
        message = pipeline_context + "Great Expectations data validation failed!"
    print(message)
    return message

### Define ZenML Pipelines

In [None]:
docker_settings = DockerSettings(required_integrations=[GREAT_EXPECTATIONS])


@pipeline(enable_cache=False, settings={"docker": docker_settings})
def profiling_pipeline(
        importer, profiler
):
    """Data profiling pipeline for Great Expectations.

    The pipeline imports a reference dataset from a source then uses the builtin
    Great Expectations profiler step to generate an expectation suite (i.e.
    validation rules) inferred from the schema and statistical properties of the
    reference dataset.

    Args:
        importer: reference data importer step
        profiler: data profiler step
    """
    dataset, _ = importer()
    profiler(dataset)

In [None]:
@pipeline(enable_cache=False, settings={"docker": docker_settings})
def validation_pipeline(
        importer, validator, checker
):
    """Data validation pipeline for Great Expectations.

    The pipeline imports a test data from a source, then uses the builtin
    Great Expectations data validation step to validate the dataset against
    the expectation suite generated in the profiling pipeline.

    Args:
        importer: test data importer step
        validator: dataset validation step
        checker: checks the validation results
    """
    dataset, condition = importer()
    results = validator(dataset, condition)
    message = checker(results)

### Run the pipelines

In [None]:
profiling_pipeline(
    importer=importer(params=DataLoaderParameters(reference_data=True)),
    profiler=ge_profiler_step,
).run()

In [None]:
validation_pipeline(
    importer=importer(params=DataLoaderParameters(reference_data=True)),
    validator=ge_validator_step,
    checker=analyze_result(),
).run()

### Post execution workflow

In [None]:
def start_pipeline_visualizer(name: str):
    from zenml.integrations.dash.visualizers.pipeline_run_lineage_visualizer import (
        PipelineRunLineageVisualizer,
    )

    latest_run = get_pipeline(name).runs[-1]
    PipelineRunLineageVisualizer().visualize(latest_run)

In [None]:
def visualize_results(pipeline_name: str, step_name: str) -> None:
    pipeline = get_pipeline(pipeline_name)
    last_run = pipeline.runs[-1]
    step = last_run.get_step(step=step_name)
    GreatExpectationsVisualizer().visualize(step)

In [None]:
# start_pipeline_visualizer("profiling_pipeline")

In [None]:
# start_pipeline_visualizer("validation_pipeline")

In [None]:
# visualize_results("profiling_pipeline", "profiler")

In [None]:
# visualize_results("validation_pipeline", "validator")