In [1]:
from time import sleep
from json import dumps
from kafka import KafkaProducer
import pickle
import datetime
import pandas as pd
import psycopg2
import yaml
import numpy as np




In [2]:
#retrieve data from pickle 
def restore_db(pickle_name):
    """
    pickle_name is a string, that is how the compressed file is named
    this function returns the data in the same format it was stored
    """
    infile = open(pickle_name,'rb')
    source= pickle.load(infile)
    print('File type:',type(source))
    infile.close()
    return source

In [3]:
def get_timestamp_previous_day():
    now=datetime.datetime.now()-datetime.timedelta(hours=5.5)
    before=now-datetime.timedelta(hours=24)
    return(before,now)

In [4]:
def get_timestamp_from_epoch(_epoch):
    return datetime.datetime.fromtimestamp(_epoch/1000).strftime('%Y-%m-%d %H:%M:%S.%f')

In [5]:

#pickle the dataframe for future retrieval if any
def store_db(dataframe,pickle_name):
    """
    pickle_name is a string, that is how the compressed file is named
    dataframe is name of the dataframe
    
    """
    outfile = open(pickle_name,'wb')
    pickle.dump(dataframe,outfile)
    outfile.close()
    print('Data saved as {0} successfully'.format(pickle_name))


In [6]:
def query_raw_data(monitor, start_time, end_time, login_detail, IST=False):
    # by default start time and endtime are UTC string in GMT zone
    # if 'ist_time' flag is True, the input values are considered as IST and converted to GMT format

    if IST:
        start_time = pd.to_datetime(start_time) - datetime.timedelta(hours=5.5)
        end_time = pd.to_datetime(end_time) - datetime.timedelta(hours=5.5)

    try:

        _user = login_detail[0]
        _pwd = login_detail[1]
        _host = login_detail[2]
        _port = login_detail[3]
        _db = login_detail[4]

        connection = psycopg2.connect(user=_user,
                                      password=_pwd,
                                      host=_host,
                                      port=_port,
                                      database=_db)

        cursor = connection.cursor()

        query = f"""
        SELECT * FROM "raw_acceleration_data" 
        WHERE "monitor_id" = '{monitor}' 
        AND "time" >= '{start_time}'
        AND "time" <= '{end_time}' 
        LIMIT 50
        """

        cursor.execute(query)
        records = cursor.fetchall()

        result = []

        for row in records:
            data = {}
            data['x_raw'] = row[1]
            data['y_raw'] = row[2]
            data['z_raw'] = row[3]
            data['block_size'] = row[4]
            data['mac'] = row[5]
            data['fw_version'] = row[6]
            data['monitor_id'] = row[7]
            data['sampling_rate'] = row[8]
            data['timestamp'] = row[9]

            result.append(data)

    except (Exception, psycopg2.Error) as error:
        print('Error while connecting to PostgreSQL', error)
        raise Exception('Error while connecting to PostgreSQL')
    finally:
        if (connection):
            cursor.close()
            connection.close()
    return result

In [7]:
import os
def load_env_variables():
    try:
        user = os.getenv('USER')
        password = os.getenv('PASSWORD')
        host = os.getenv('IP')
        port = os.getenv('PORT')
        database = os.getenv('DATABASE')
        #logger.info('Login USER: {}'.format(user))
        #logger.info('Login DATABASE: {}'.format(database))
        return [user, password, host, port, database]
    except Exception as e:
        raise Exception("Login details not available in Environment variables", e)

In [8]:
login=load_env_variables()
login

[None, None, None, None, None]

In [None]:
producer1 = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))


In [None]:
start_time=int(pd.to_datetime('2021-12-27 18:30:00').timestamp()*1000)
end_time=int(pd.to_datetime('2021-12-28 18:30:00').timestamp()*1000)
message={"startTime":start_time,
        "endTime":end_time
       }
producer1.send('diagnostic_recommendation_request1', value=message)

#"plantList":["UTCL_Dadri","UTCL_Dalla","UTCL_Bara"]
