# Loading Ratings to Kinesis Data Stream

In [1]:
import time
import json
import datetime
import random
import math
import configparser

import boto3
import pandas as pd
import numpy as np
from tqdm import tqdm

In [2]:
config = configparser.ConfigParser()
config.read("config.conf")

stream_name = config.get("kinesis","stream_name")
file = 'app-data/ratings.csv'

In [3]:
class MovieLensKinesisLoader:
    
    AVAILABLE = "AVAILABLE"
    STOPPED = "STOPPED"
    RUNNING = "RUNNING"
    DESTROYED = "DESTOYED"
    
    def __init__(self, file, stream_name, window=1, dtype={}, date_fields=[]):
        """
        
        """
        self.file = file
        self.kinesis_client = boto3.client('kinesis')
        self.stream_name = stream_name
        self.window = window
        self.current_window = -1
        self.num_registers = 0
        self.df = None
        self.df = pd.read_csv(f"{self.file}",dtype=dtype, parse_dates=date_fields)
        
        # -----------
        # UI ELEMENTS
        # 
        self.out = widgets.Output()
        start_button = widgets.Button(description='Start')
        stop_button = widgets.Button(description='Stop')
        destroy_button = widgets.Button(description='Destroy Process')
        
        def start_button_clicked(_):
            self.__status["state"] = self.RUNNING
            
        def stop_button_clicked(_):
            self.__status["state"] = self.STOPPED
            
        def destroy_button_clicked(_):
            self.__status["state"] = self.DESTROYED
            
        start_button.on_click(start_button_clicked)
        stop_button.on_click(stop_button_clicked)
        destroy_button.on_click(destroy_button_clicked)
        
        self.buttons = widgets.HBox([start_button, stop_button, destroy_button])    
        self.box = widgets.VBox([self.buttons, self.out])

    
    def __load(self, max_iters=None, partition_ket, delay=0, restart=False):
        """
        
        """
        self.current_window = 0 if restart else self.current_window
        num_iters = math.ceil(len(self.df)/self.window)
        iters = 0
        
        for i in tqdm(range(num_iters)):
            iters += 1
            if i > self.current_window:
                temp_df = self.df.iloc[i*self.window:(i+1)*self.window]
                self.kinesis_client.put_record(
                    StreamName=self.stream_name,
                    Data=json.dumps(temp_df.to_dict(orient='records')),
                    PartitionKey=partition_ket
                )
                self.num_registers += len(temp_df)
                self.current_window = i
                time.sleep(delay)
            if max_iters != None and iters >= max_iters:
                break
    
    def loadx(self, max_iters=None, partition_ket, delay=0, restart=False):
        """
        
        """
        if self.__status["state"] == self.RUNNING or self.__status["state"] == self.STOPPED:
            print("There is already a 'loadx' load in progress")
        else:
            _thread.start_new_thread(self.__load, (max_iters, partition_ket, delay, restart))
        return self.box

    
    def status(self):
        return {
            "CurrentWindow":self.current_window,
            "RegistersSend":self.num_registers,
            "Window":self.window
        }

## 1. Read data from S3

In [None]:
mloader = MovieLensKinesisLoader(
    data_folder,
    file,
    stream_name,
    window = 1,
    dtype = {
        "userId":np.int64,
        "movieId":np.int64,
        "rating":np.str,
    },
    #date_fields = ["timestamp"] #type Timestamp is not JSON serializable
)
display(mloader.df.dtypes)
mloader.df.head(3)

## 2. Send Data to the Data Setream

In [None]:
mloader.load(100)

In [None]:
mloader.status()