## CLICKHOUSE QUERY

creating the view to be consumed by analytics app

In [None]:
%%sql
-- CREATE VIEW

CREATE VIEW sales_data_view
AS
SELECT
`date`,
name,
market_area,
toInt32(number_of_sales) as number_of_sales,
pricing_unit,
number_of_sales * pricing_unit as revenue,
toDateTime64(insert_time_clickhouse, 3, 'Asia/Jakarta') as insert_time_clickhouse,
toDateTime64(now(), 3 , 'Asia/Jakarta') as source_collected
FROM `default`.sales_data;

In [None]:
%%sql
-- CORRECTION FOR BASE, KAFKA QUEUE AND MV
-- BASE
CREATE TABLE default.sales_data
(
    `date` Datetime,
    `name` String,
    `market_area` String,
    `number_of_sales` UInt32,
    `pricing_unit` Float64,
    `insert_time_clickhouse` SimpleAggregateFunction(max,
 DateTime('Asia/Jakarta')) DEFAULT now()
)
ENGINE = ReplacingMergeTree
PARTITION BY name
ORDER BY (date,
 name,
 market_area)
SETTINGS index_granularity = 8192;

--KAFKA QUEUE
CREATE TABLE default.sales_data_queue
(
    `date` UInt64,
    `name` String,
    `market_area` String,
    `number_of_sales` Int32,
    `pricing_unit` Float64
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'stack-kafka-kafka-1:9092',
 kafka_topic_list = 'etl.public.sales_data',
 kafka_group_name = 'test-1',
 kafka_format = 'JSONEachRow';

-- MV
CREATE MATERIALIZED VIEW default.sales_data_queue_mv TO default.sales_data
AS
SELECT
    toDateTime(`date` / 1000000) as date,
    name,
    market_area,
    number_of_sales,
    pricing_unit
FROM default.sales_data_queue;

## STREAMLIT SAMPLE DASHBOARD
save as app.py and run using streamlit command

`streamlit run app.py`

In [None]:
!pip install streamlit --no-cache-dir
!pip install clickhouse-sqlalchemy --no-cache-dir

In [None]:
import streamlit as st
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import time
#

try:
    conn_str = 'clickhouse://default:password@localhost:18123/default'
    engine = create_engine(conn_str)
    session = sessionmaker(bind=engine)()
except Exception as e:
    print("Error while connectint to Clickhoust " + str(e))


st.set_page_config(layout="wide")
st.title("Sales Data Analytics Real-Time ▁ ▂ ▃ ▄ ▅ ▆ █")

now = datetime.now()
dt_string = now.strftime("%d %B %Y %H:%M:%S")
st.write(f"Last update: {dt_string}")

if not "sleep_time" in st.session_state:
    st.session_state.sleep_time = 5

if not "auto_refresh" in st.session_state:
    st.session_state.auto_refresh = True

#
mapping = {
    "1 hour": {"period": "60", "granularity": "minute", "raw": 60},
    "30 minutes": {"period": "30", "granularity": "minute", "raw": 30},
    "10 minutes": {"period": "10", "granularity": "minute", "raw": 10},
    "5 minutes": {"period": "5", "granularity": "minute", "raw": 5}
}

with st.expander("Configure Dashboard", expanded=True):
    left, right = st.columns(2)

    with left:
        auto_refresh = st.checkbox('Auto Refresh?', st.session_state.auto_refresh)

        if auto_refresh:
            number = st.number_input('Refresh rate in seconds', value=st.session_state.sleep_time)
            st.session_state.sleep_time = number

    with right:
            time_ago = st.radio("Time period to cover", mapping.keys(), horizontal=True, key="time_ago")



st.header("Live Kafka Streaming Sales Data ..." )

minute = mapping[time_ago]["period"]
print(str(minute))
query = f"""select
date,
name,
market_area,
number_of_sales,
revenue
from `default`.sales_data_view
where source_collected >=  insert_time_clickhouse -  INTERVAL {minute} MINUTE ;"""

df = pd.read_sql(query, engine)
df.style.format('{:,}')

metric1, metric2 = st.columns(2)

metric1.metric(
    label="Total Number of Deals",
    value=(df['number_of_sales'].sum()),
)

metric2.metric(
    label="Sales Revenue",
    value=(df['revenue'].sum()),
)

#
st.header(f"Incoming data completion since last {minute} minutes..." )
#
st.line_chart(data = df, x= "date", y = "number_of_sales")
#
data_1, data_2 = st.columns(2)
data_1.markdown('## Number of Member Deals')
data_1.dataframe(df.groupby(['name'])['number_of_sales'].sum())

data_2.markdown('## Number of Area Deals')
data_2.dataframe(df.groupby(['market_area'])['number_of_sales'].sum())
#
if auto_refresh:
    time.sleep(number)
    st.rerun()
