In [None]:
%store -r
import os
import numpy as np
import pandas as pd
import glob
import cv2
import requests, json
from numpy import random
import datetime
import io
import re
import time
import boto3
from dkube.sdk.api import DkubeApi
import warnings
warnings.filterwarnings('ignore')

In [None]:
MONITOR_NAME = image_exp_config['MONITOR_NAME']
DKUBEUSERNAME = image_exp_config['DKUBEUSERNAME']
TOKEN = image_exp_config['TOKEN']
DKUBE_URL = image_exp_config['DKUBE_URL']
MINIO_KEY = image_exp_config['MINIO_KEY']
MINIO_SECRET_KEY = image_exp_config['MINIO_SECRET_KEY']
MINIO_ENDPOINT = image_exp_config['MINIO_ENDPOINT']
RUN_FREQUENCY = image_exp_config['RUN_FREQUENCY']
INFERENCE_URL = image_exp_config['INFERENCE_URL']
DEPLOYMENT_ID = image_exp_config['DEPLOYMENT_ID']
MINIO_BUCKET = image_exp_config['MINIO_BUCKET']

In [None]:
no_of_monitoring_runs = 10

In [None]:
image_types = ('.jpg', 'jpeg', '.png', '.svg')

class ImageData():
    def __init__(self):
        pass

    def read_data_from_dir(self, imagedir, grayscale=True, read_labels=False):
        image_files = list()
        for file_type in image_types:
            image_files.extend(glob.glob(os.path.join(imagedir, "**/*" + file_type), recursive=True))
        if len(image_files) == 0:
            return None
        images = []
        for each_image_file in image_files:
            if grayscale:
                img = cv2.imread(each_image_file, cv2.IMREAD_GRAYSCALE)
            else:
                img = cv2.imread(each_image_file)
            if img is not None:
                images.append(img)
        train_x = np.asarray(images)
        if read_labels:
            csv_files = glob.glob(os.path.join(imagedir, "**/*" + ".csv"), recursive=True)
            label_data = pd.read_csv(csv_files[-1])
            train_y = label_data.iloc[:,-1:].values
            return train_x, train_y
        else:
            return train_x

    def read_classification_data(self, datadir):
        train_x = list()
        train_y = list()
        for dp, dn, filenames in os.walk(datadir):
            if len(filenames) > 0:
                current_class_data = self.read_data_from_dir(dp)
                train_x.extend(current_class_data)
                train_y.extend([os.path.basename(dp)] * current_class_data.shape[0])
        if len(train_x) == 0:
            return None
        train_x = np.asarray(train_x)
        train_y = np.asarray(train_y)
        train_y_classes, train_y = np.unique(train_y, return_inverse=True)
        return train_x, (train_y_classes, train_y)

    def resize_images(self, images, new_shape):
        resized_images = []
        for each_image in images:
            resized_images.append(cv2.resize(each_image, new_shape, interpolation= cv2.INTER_LINEAR))
        resized_images = np.asarray(resized_images)
        return resized_images

In [None]:
class ImageDataGenerator:
    BUCKET = None
    S3_CLIENT = None
    DB_ENGINE = None
    API_CLIENT = None
    TOKEN = None
    USERNAME = None
    INFERENCE_URL = None

    def __init__(
        self,
        start_time: datetime.datetime = None,
        frequency="1H",
        model_frequency=10,
        duration: str = "10:24:12",
        margin=180,
    ):

        self.frequency  = frequency
        self.margin=margin
        self.model_frequency = model_frequency
            
        self.duration = duration
        klass = type(self)
        if not klass.BUCKET:
            klass.BUCKET = MINIO_BUCKET
        if not klass.S3_CLIENT:
            klass.S3_CLIENT = boto3.client("s3", aws_access_key_id=MINIO_KEY,
                                                 aws_secret_access_key=MINIO_SECRET_KEY,
                                                 endpoint_url = MINIO_ENDPOINT)
        if not klass.TOKEN:
            klass.TOKEN = os.getenv("DKUBE_USER_ACCESS_TOKEN",TOKEN)
        if not klass.USERNAME:
            klass.USERNAME= DKUBEUSERNAME
        if not klass.API_CLIENT:
            klass.API_CLIENT = DkubeApi(URL=os.getenv('DKUBE_URL',DKUBE_URL),token=klass.TOKEN)

        duration = self.duration.split("-")
        if len(duration) < 2:
            duration.append("0")
            duration.append("0")
        elif len(duration) < 3:
            duration.append("0")
        
    def save_dataset(self, data, data_name:str, s3_prefix):
        klass = type(self)
        return klass.save_dataset_to_s3(data, data_name, s3_prefix)
    
    @classmethod
    def save_dataset_to_s3(cls, data, name, s3_prefix):
        file_name = name + ".csv"
        file_path = os.path.join(s3_prefix, file_name)
        with io.StringIO() as csv_buffer:
            data.to_csv(csv_buffer, index=False)
            response = cls.S3_CLIENT.put_object(
                Bucket=cls.BUCKET, Key=file_path, Body=csv_buffer.getvalue()
            )
            status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
            if status == 200:
                print(f"Successful S3 put_object response. Status - {status}")
                return file_path
            else:
                print(f"Unsuccessful S3 put_object response. Status - {status}")
                    
    @property
    def frequency_ts(self):
        value = int(self.frequency[:-1])
        unit = self.frequency[-1].lower()
        seconds_per_unit = {"s": 1, "m": 60, "h": 3600, "d": 86400, "w": 604800}
        seconds_count = int(value) * seconds_per_unit[unit]
        now = datetime.datetime.utcnow()
        if unit.lower() == "h":
            delta = datetime.timedelta(hours=value)
            new_time = (now+delta).replace(minute = 0, second =0, microsecond=0) - datetime.timedelta(seconds=self.margin)
            second_remaining = (new_time-now).seconds
            result =  seconds_count if second_remaining > seconds_count or second_remaining == 0 else second_remaining
            print(f"Next Push after {datetime.timedelta(seconds=result)}")
            return result        
        elif unit == "m":
            diff = abs(now.minute%-value)
            if diff == 0:
                delta = datetime.timedelta(minutes=value)
                new_time = (now+delta).replace(second =0, microsecond=0) - datetime.timedelta(seconds=self.margin)
                result = (new_time-now).seconds
                print(f"Next Push after {datetime.timedelta(seconds=result)}")
                return result
            else:
                delta = datetime.timedelta(minutes = diff)
                new_time = (now+delta).replace(second =0, microsecond=0) - datetime.timedelta(seconds=self.margin)
                if new_time < now:
                    new_time = new_time + datetime.timedelta(minutes=value)
                second_remaining = (new_time-now).seconds
                result =  seconds_count if second_remaining > seconds_count or second_remaining == 0 else second_remaining
                print(f"Next Push after {datetime.timedelta(seconds=result)}")
                return result
        

    @property
    def awsS3Secret(self):
        if DATA_SOURCE == 'aws_s3':
            AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID",ACCESS_KEY) 
            AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY",SECRET_KEY)
            print(AWS_ACCESS_KEY)
        if AWS_ACCESS_KEY and AWS_SECRET_KEY:
            return {"access_key":AWS_ACCESS_KEY, "secret_key": AWS_SECRET_KEY}
        else:
            home_dir = os.getenv("HOME")
            if home_dir:
                creds_path = os.path.join(home_dir, ".aws","credentials")
                config = ConfigParser()
                if os.path.isfile(creds_path):
                    config.read(creds_path)
                    if "default" in config:
                        AWS_ACCESS_KEY = config["default"]["aws_access_key_id"]
                        AWS_SECRET_KEY = config["default"]["aws_secret_access_key"]
                        if AWS_ACCESS_KEY and AWS_SECRET_KEY:
                            return {"access_key":AWS_ACCESS_KEY, "secret_key": AWS_SECRET_KEY}
                
        
    @property
    def end(self):
        duration = self.duration.split(":")
        if len(duration) < 2:
            duration.append("0")
            duration.append("0")
        elif len(duration) < 3:
            duration.append("0")
        return self.start_time + datetime.timedelta(
            hours=int(duration[0]), minutes=int(duration[1]), seconds=int(duration[2])
        )


In [None]:
ImageDataGenerator.URL = DKUBE_URL
ImageDataGenerator.TOKEN = TOKEN
ImageDataGenerator.API_CLIENT = DkubeApi(URL=DKUBE_URL, token=TOKEN)
if INFERENCE_URL is not None:
    ImageDataGenerator.INFERENCE_URL = INFERENCE_URL
else:
     raise "INFERENCE_URL is Empty, Provide value for variable INFERENCE_URL"

In [None]:
generator = ImageDataGenerator(MONITOR_NAME,
                                   frequency=f"{RUN_FREQUENCY}m",
                                   model_frequency = RUN_FREQUENCY)

In [None]:
imd = ImageData()
train_x, train_y = imd.read_classification_data("data/")
train_y_classes, train_y = train_y
resized_train_x = imd.resize_images(train_x, (200,200))
resized_train_x = resized_train_x.reshape(resized_train_x.shape[0], 200, 200, 1)
resized_train_x.shape

## Shuffling data

In [None]:
indices = np.arange(resized_train_x.shape[0])
np.random.shuffle(indices)

resized_train_x = resized_train_x[indices]
train_y = train_y[indices]
resized_train_x.shape, train_y

In [None]:
predict_url = INFERENCE_URL
token = os.getenv("DKUBE_USER_ACCESS_TOKEN")

## Data push

In [None]:
ordinal = lambda n: "%d%s" % (n,"tsnrhtdd"[(n//10%10!=1)*(n%10<4)*n%10::4])
push_count = 1
for i in range(no_of_monitoring_runs):
    ## Sending 10 samples at a time. 
    outputs = []
    labels = []
    second_remaining = generator.frequency_ts
    time.sleep(second_remaining)
    no_of_samples = random.randint(10,15)
    print("Generating data")
    for i in range(no_of_samples):
        ch = random.choice(range(resized_train_x.shape[0]))
        if i%2:
            x = resized_train_x[ch:ch+1]
        else:
            x = resized_train_x[ch:ch+1].T # rotating image for drift
        payload = {
            "inputs": {'input_1': x.tolist()}
        }
        r = requests.post(predict_url, json=payload, headers = {'authorization': "Bearer " + token}, verify = False)
        prediction = json.loads(r.content.decode('utf-8'))
        each_output = np.array(prediction["outputs"])
        each_output = train_y_classes[each_output.argmax(axis=1)].tolist()
        each_label = train_y_classes[train_y[ch:ch+1]].tolist()
        outputs.extend(each_output)
        labels.extend(each_label)
        time.sleep(2)
    
    start = datetime.datetime.utcnow()
    end = start + datetime.timedelta(seconds=10)
    timestamps = pd.date_range(start, end, len(outputs))
    labelled_df = pd.DataFrame({
        "timestamp": timestamps,
        "output": outputs,
        "label": labels
    })   
    filename = f"lablled_data_{i+1}"
    g_path = generator.save_dataset(labelled_df, filename, DEPLOYMENT_ID + "/livedata")
    if g_path:
        print(g_path)
    print(f"Pushed data for {ordinal(push_count)} time, Remaining pushes: {no_of_monitoring_runs-push_count}, Monitor name: {MONITOR_NAME}")
    push_count += 1
print("***************** DATA GENERATION COMPLETED ******************************")