In [1]:
from ksql import KSQLAPI
import socket

In [2]:
import json
import re
import pandas as pd
import time

In [3]:
client = KSQLAPI('http://localhost:8088')
streamProperties = {
    "ksql.streams.auto.offset.reset": "earliest", 
    "ksql.query.pull.table.scan.enabled": "true"
}

In [4]:
import datetime


In [5]:
def parse_columns(columns_str):
    regex = r"(?<!\<)`(?P<name>[A-Z_]+)` (?P<type>[A-z]+)[\<, \"](?!\>)"
    result = []

    matches = re.finditer(regex, columns_str)
    for matchNum, match in enumerate(matches, start=1):
        result.append({"name": match.group("name"), "type": match.group("type")})

    return result


def process_row(row, column_names):
    row = row.replace(",\n", "").replace("]\n", "").rstrip("]")
    row_obj = json.loads(row)
    if "finalMessage" in row_obj:
        return None
    column_values = row_obj["row"]["columns"]
    index = 0
    result = {}
    for column in column_values:
        result[column_names[index]["name"]] = column
        index += 1

    return result

def process_query_result(results, return_objects=None):
    if return_objects is None:
        yield from results

    # parse rows into objects
    try:
        header = next(results)
    except StopIteration:
        return
    columns = parse_columns(header)

    for result in results:
        row_obj = process_row(result, columns)
        if row_obj is None:
            return
        yield row_obj

In [6]:
def return_query_df(ksql_string):
    query = client.query(ksql_string, stream_properties=streamProperties)
    tspm = list(process_query_result(query, return_objects=True))
    return pd.DataFrame(tspm)
    

In [7]:


def ts_unix_to_ts(time_unix):
    dt = datetime.datetime.fromtimestamp(time_unix/1000)
    return dt.strftime('%Y-%m-%dT%H:%M:%S.%f')    
    
def ts_str_to_unix(time_str):
    dt = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S.%f')
    return int(time.mktime(dt.timetuple()) * 1000)

In [27]:
# FUNCTION TO RETURN COUNTS FOR A SIGNAL, PHASE BETWEEN START AND END TIME. 5 sec aggregation
def return_ph_agg_counts(isc_name, start_time, end_time, type_str, phase):
    '''
    isc_name: isc name string
    start_time: unix time in ms (INT)
    end_time: unix time in ms(INT)
    type_str: 'stp' or 'adv' (str)
    phase:[1,8] (INT)
    '''
    TABLE_NAME = 'ATSPM_STREAM_UNIX_COUNTS_PHASE_5SEC'  #THIS IS A TABLE
    sig_phase = f"{isc_name}_{type_str}_{phase}"
    ksql_string = f'SELECT * FROM {TABLE_NAME} where SIG_PHASE_TYPE = ' +  "'" + sig_phase + "'"   + f' AND WINDOWSTART>={start_time} AND WINDOWSTART <= {end_time}'
    print(ksql_string) 
    temp_df = return_query_df(ksql_string)
    temp_df.drop(columns = ['SIG_PHASE_TYPE', 'WINDOWEND'], inplace = True)
    new_row = pd.Series(data={'WINDOWSTART':start_time,'COUNT':0}, name=0)
    temp_df = temp_df.append(new_row, ignore_index=False)
    new_row = pd.Series(data={'WINDOWSTART':end_time,'COUNT':0}, name=0)
    temp_df = temp_df.append(new_row, ignore_index=False)
    temp_df['timestamp']  = pd.to_datetime(temp_df['WINDOWSTART'], unit = 'ms').dt.tz_localize('UTC').dt.tz_convert('US/Eastern')
    temp_df.set_index('timestamp', inplace = True)
    temp_df.drop(columns = ['WINDOWSTART'], inplace = True)
    temp_df.sort_index(inplace = True)

    final_df = temp_df.resample('5s').sum()
    return final_df

# FUNCTION TO RETURN MOST RECENT TIMESTAMP FOR A INTERSECTION
def return_recent_data_ts(signalid):
    '''
    signalid: isc name string
    '''
    TABLE_NAME = 'ATSPM_TABLE_STATUS' #THIS IS A TABLE
    ksql_string = f'SELECT ts from {TABLE_NAME} where SIGNALID=' + "'" + f'{signalid}' + "'"
    print(ksql_string)
    df = return_query_df(ksql_string)
    return df

# FUNCTION TO RETURN COUNTS FOR A DETECTOR BETWEEN START AND END TIME. 5 sec aggregation
def return_det_counts_data(detector_id, time_start, time_end):
    '''
    detector_id: detector_id string (signalID + channel value)
    start_time: unix time in ms (INT)
    end_time: unix time in ms(INT)
    '''
    TABLE_NAME = 'ATSPM_STREAM_UNIX_COUNTS_AGG_5SEC_NEW' #Detector ID to counts  #THIS IS A TABLE
    ksql_string = f'SELECT * FROM {TABLE_NAME} where DETECTORID = ' +  "'" + det_id + "'"   + f' AND WINDOWSTART>={time_start} AND WINDOWSTART <= {time_end}'
    print(ksql_string)
    temp_df = return_query_df(ksql_string)
    
    temp_df.drop(columns = ['DETECTORID', 'WINDOWEND'], inplace = True)
    new_row = pd.Series(data={'WINDOWSTART':time_start,'COUNT':0}, name=0)
    temp_df = temp_df.append(new_row, ignore_index=False)
    new_row = pd.Series(data={'WINDOWSTART':time_end,'COUNT':0}, name=0)
    temp_df = temp_df.append(new_row, ignore_index=False)
    temp_df['timestamp']  = pd.to_datetime(temp_df['WINDOWSTART'], unit = 'ms').dt.tz_localize('UTC').dt.tz_convert('US/Eastern')
    temp_df.set_index('timestamp', inplace = True)
    temp_df.drop(columns = ['WINDOWSTART'], inplace = True)
    temp_df.sort_index(inplace = True)

    final_df = temp_df.resample('5s').sum()
    
    
    return final_df

In [54]:
# FUNCTION TO RETURN SIGNAL TIMING FOR A ISC BETWEEN START AND END TIME. NO AGGREGATION
def return_signal_timing_data(signalid, time_start, time_end):
    '''
    signalid: isc name string
    start_time: unix time in ms (INT)
    end_time: unix time in ms(INT)
    '''
    TABLE_NAME = 'ATSPM_STREAM_SIG_GREEN_RED' 
    ksql_string = f'SELECT * FROM {TABLE_NAME} where SIGNALID = ' +  "'" + signalid + "'"   + f' AND ROWTIME>={time_start} AND ROWTIME <= {time_end}'
    print(ksql_string)
    temp_df = return_query_df(ksql_string)
#     temp_df.drop(columns = ['DETECTORID'])
    return temp_df.drop_duplicates()
    

In [51]:
# get most recent data timestamps for a particular signal
window_size = 400 # seconds
recent_ts =  return_recent_data_ts('1430')['TS'].values[0] # unix timestamp ms
time_start = recent_ts - (1000*window_size)
print(f"window time start {ts_unix_to_ts(time_start)} end {ts_unix_to_ts(recent_ts)}")

SELECT ts from ATSPM_TABLE_STATUS where SIGNALID='1430'
window time start 2021-09-24T09:53:19.000000 end 2021-09-24T09:59:59.000000


In [52]:
# return sig data 
signalid = '1430'
sig_df = return_signal_timing_data(signalid, time_start, recent_ts)

SELECT * FROM ATSPM_STREAM_SIG_GREEN_RED where SIGNALID = '1430' AND ROWTIME>=1632491599000 AND ROWTIME <= 1632491999000


In [29]:
# return count data  detector
det_id = str('14301')
df_counts = return_det_counts_data(det_id, time_start, recent_ts)

SELECT * FROM ATSPM_STREAM_UNIX_COUNTS_AGG_5SEC_NEW where DETECTORID = '14301' AND WINDOWSTART>=1632491599000 AND WINDOWSTART <= 1632491999000


In [56]:
# return count data  phase
isc_name, start_time, end_time, type_str, phase = '1430', 1632488400000, 1632488475000 , 'stp', 2
temp_df = return_ph_agg_counts(isc_name, start_time, end_time, type_str, phase)

SELECT * FROM ATSPM_STREAM_UNIX_COUNTS_PHASE_5SEC where SIG_PHASE_TYPE = '1430_stp_2' AND WINDOWSTART>=1632488400000 AND WINDOWSTART <= 1632488475000


In [None]:
df_counts_singa

In [None]:
def process_data(signal_id,city,date):
    
    df2 = return_orlando_data(date, signal_id)
#     print(df2)
 
    df2['t'] = df2.index
 
    df3_ = 

    df3 = df2[df2['EventCode'].isin([81])] # counts 
    df6 = df2[df2['EventCode'].isin([1, 11])] # signal timing
 
    current = set()
    stime_list = []
    etime_list = []
    start_time = ''
    end_time = ''
    lis_ = []
 
    ts = df6['t'][0]
 
    start_time = ts
 
    for row in df6.values:
        if (ts == row[3]):
            # print('a')
            if (row[1] == 1):
                # print('b')
                current.add(row[2])
            elif (row[1] == 11):
                # print('c')
                current.discard(row[2])
        else:
            temp = ','.join(map(str, current))
            # current = set()
            lis_.append(temp)
            ts = row[3]
            end_time = ts
            stime_list.append(start_time)
            etime_list.append(end_time)
            start_time = ts
            if (row[1] == 1):
                current.add(row[2])
            elif (row[1] == 11):
                current.discard(row[2])
    # n grams
    # print("Starting n-grams")
    grams = ngrams(lis_, 6)
    gramFreq = collections.Counter(grams)
 
    time_plan = pd.DataFrame(columns=['green_phases', 'start_time', 'end_time', 'plan_number'],
                             index=range(1, len(lis_) + 1))
    time_plan['green_phases'] = lis_
    time_plan['start_time'] = stime_list
    time_plan['end_time'] = etime_list
    
    
    
    
    
    list_res = []
    for channel in df3['EventParam'].unique():
       
        df4 = df3[df3['EventParam'] ==  channel]

        start_time = str(df4.index[0])[:10] + ' ' +'00' + ':00:01'
        end_time = str(df4.index[0])[:10] + ' ' + '23' + ':59:59'


        new_row = pd.Series(data={'SignalID':1, 'EventCode':81, 'EventParam':channel, 't':pd.to_datetime(start_time) }, name=pd.to_datetime(start_time))
        df4 = df4.append(new_row, ignore_index=False)


        new_row = pd.Series(data={'SignalID':1, 'EventCode':81, 'EventParam':channel, 't':pd.to_datetime(end_time)  }, name=pd.to_datetime(end_time))
        df4 = df4.append(new_row, ignore_index=False)
        
        
        df4.sort_index(inplace = True)
       






        df4= df4.resample('5s').count()
        df4['time'] =df4.index.values
        df4 = df4[['EventParam','time']]
        
        df8 = pd.merge_asof(df4, time_plan, left_on='Timestamp', right_on='start_time')
        df8 = df8[['time','EventParam','green_phases']]
        df8.columns = ['time','count', 'green_phases']
        df8['detector_channel'] = channel
        list_res.append(df8)

    counts_df  = pd.concat(list_res)
    counts_df = counts_df.dropna()


    counts_df= counts_df[['time','count','green_phases','detector_channel']]
    
    
    
    return time_plan, counts_df