
**install_df**: get all unique installation_id to create installation_list that will later be iterated though to get relevant data for all installations \
**df**: get all BIN_UPDATE ASLog entries where bin mode is PORT \
**timestamp_df**: get local timestamp for each BIN_UPDATE where bin mode is PORT and convert that timestamp to a day of the week

In [0]:
import pandas
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("Installation Data").getOrCreate()

install_df = spark.sql("""
                select distinct installation_id
                from test_unify_analytics.bronze.logs_partition_1
            """)
# display(install_df)
installation_list = install_df.toPandas()['installation_id'].tolist()
# print(len(installation_list))

# tag == 242 --> BIN_UPDATE
# values[5] == 14 --> bin in PORT
# df = spark.sql("""
#             select installation_id, tag, unix_timestamp, values
#             from test_unify_analytics.bronze.logs_partition_1
#             where tag = 242 AND values[5] = 14
#         """)

# tag == 249 --> PORT_CLOSEBIN
df = spark.sql("""
            select installation_id, tag, local_installation_timestamp, unix_timestamp, values, values[0] as BinNo
            from test_unify_analytics.bronze.logs_partition_1
            where tag = 249
        """)

# values[5] == 14 --> bin in PORT
# timestamp_df = spark.sql("""
#                     select local_installation_timestamp, values[0] as BinNo
#                     from test_unify_analytics.bronze.logs_partition_1
#                     where tag = 242 AND values[5] = 14
#                 """)
# timestamp_df = timestamp_df.withColumn('dotw', date_format(col('local_installation_timestamp'), 'EEEE'))


iterate through **installation_list** to filter **df** by a specific **installation_id** and create a corresponding CSV file for each installation_id that displays the average time between presentations in seconds and minutes for each bin in that installation

In [0]:
import pandas
from pyspark.sql.functions import *
from pyspark.sql.window import Window

dbutils.fs.mkdirs('dbfs:/FileStore/tables/avg_bin_time')

df_bintimes = df.select(['installation_id', 'unix_timestamp', 'values'])
df_bintimes.cache().count()
for installation in installation_list:
    df_loop = df_bintimes.filter((col('installation_id') == installation)) 

    df_loop = df_loop.withColumn('bin num', col('values')[0]) \
                     .withColumn('unix timestamp', col('unix_timestamp'))
    window_spec = Window.partitionBy('bin num').orderBy('unix_timestamp')
    df_loop = df_loop.withColumn('prev_timestamp', lag('unix_timestamp').over(window_spec)) \
                     .withColumn('time diff (sec)', col('unix_timestamp') - col('prev_timestamp'))

    df_loop = df_loop.groupBy('bin num').agg(round(mean('time diff (sec)')).alias('avg time between presentations (sec)')).orderBy('avg time between presentations (sec)')
    df_loop = df_loop.withColumn('avg time between presentations (min)', round(col('avg time between presentations (sec)') / 60))
            
    pd = df_loop.toPandas() #.sort_values(by=['avg time between presentations (sec)'])
    # display(pd)

    store_filepath = '/dbfs/FileStore/tables/avg_bin_time/' + installation + '.csv'
    pd.to_csv(store_filepath)

df_bintimes.unpersist()
files = dbutils.fs.ls('/FileStore/tables/avg_bin_time') 
print('num files: ', len([f.name for f in files]))

iterate through each installation in **installation_list** to filter **timestamp_df** in order to determine the number of presentations for each bin on each day of the week throughout the entire timespan of the data in the table for each individual installation and save the results to a CSV

In [0]:
import pandas as pd
from datetime import *
from pyspark.sql.functions import *

dbutils.fs.mkdirs('dbfs:/FileStore/tables/dotw')

timestamp_df = df.select(['installation_id', 'local_installation_timestamp', 'BinNo'])
timestamp_df = timestamp_df.withColumn('dotw', date_format(col('local_installation_timestamp'), 'EEEE'))
timestamp_df.cache().count()
for installation in installation_list:
    df_dotw = timestamp_df.filter((col('installation_id') == installation)).groupBy(['BinNo', 'dotw']).count()
    # timestamp_df = timestamp_df.groupBy(['BinNo', 'dotw']).count()
    pd_dotw = df_dotw.orderBy(desc('BinNo'), desc('count')).toPandas()

    store_filepath = '/dbfs/FileStore/tables/dotw/' + installation + '.csv'
    pd_dotw.to_csv(store_filepath)

timestamp_df.unpersist()
files = dbutils.fs.ls('/FileStore/tables/dotw') 
print('num files: ', len([f.name for f in files]))

# time_list = timestamp_df.toPandas()['local_installation_timestamp'].tolist()
# print(time_list[0].day_name())
# for i, val in enumerate(time_list):
#     time_list[i] = val.to_pydatetime()
# print(time_list)

# dotw = time_list[0].strftime("%A")
# print(dotw)

In [0]:
import random
import numpy as np
import pandas as pd

import plotly.express as px
import plotly.graph_objects as go
import webbrowser as wb

top_num = 1000

installation = "a037a00000pwUFlAAM"
base_path = '/dbfs/FileStore/tables/dotw/'
read_filepath = base_path + installation + ".csv"
df_dotw = pd.read_csv(read_filepath).drop("Unnamed: 0", axis=1)

df_bins = pd.DataFrame({'BinNo' : df_dotw['BinNo']}).drop_duplicates()
# display(df_bins.head(top_num))

day_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
df_days = pd.DataFrame({'dotw' : day_order})

df_all_combos = df_days.merge(df_bins, how='cross')
# display(df_bins.head(top_num))

df_dotw = df_all_combos.merge(df_dotw, on=['dotw', 'BinNo'], how='left').fillna(0)
df_dotw['count'] = df_dotw['count'].astype(int)
df_dotw['dotw'] = pd.Categorical(df_dotw['dotw'], categories=day_order, ordered=True)
df_dotw = df_dotw.sort_values(['dotw','count'], ascending=[True, False])
# display(df_dotw.head(top_num))

# rand = random.randint(0, len(df_bins['BinNo']) - top_num)
# df_test = df_dotw[df_dotw['BinNo'].isin(df_bins['BinNo'][rand:rand+top_num])].copy()

df_sum = df_dotw.groupby(['BinNo']).sum('count') \
                .sort_values(['count'], ascending=False) \
                .reset_index()
df_top = df_sum.drop('count', axis=1) \
                .head(top_num) \
                .merge(df_dotw, on='BinNo', how='left')

# display(df_sum)
# display(df_top)

df_mon = df_dotw[df_dotw['dotw'] == 'Monday'].rename(columns={'count':'count_mon'}).drop('dotw', axis=1).copy()
df_tue = df_dotw[df_dotw['dotw'] == 'Tuesday'].rename(columns={'count':'count_tue'}).drop('dotw', axis=1).copy()
df_wed = df_dotw[df_dotw['dotw'] == 'Wednesday'].rename(columns={'count':'count_wed'}).drop('dotw', axis=1).copy()
df_thu = df_dotw[df_dotw['dotw'] == 'Thursday'].rename(columns={'count':'count_thu'}).drop('dotw', axis=1).copy()
df_fri = df_dotw[df_dotw['dotw'] == 'Friday'].rename(columns={'count':'count_fri'}).drop('dotw', axis=1).copy()
df_sat = df_dotw[df_dotw['dotw'] == 'Saturday'].rename(columns={'count':'count_sat'}).drop('dotw', axis=1).copy()
df_sun = df_dotw[df_dotw['dotw'] == 'Sunday'].rename(columns={'count':'count_sun'}).drop('dotw', axis=1).copy()

df_join = pd.merge(df_mon, df_tue, on='BinNo', how='outer') \
            .merge(df_wed, on='BinNo', how='outer') \
            .merge(df_thu, on='BinNo', how='outer') \
            .merge(df_fri, on='BinNo', how='outer') \
            .merge(df_sat, on='BinNo', how='outer') \
            .merge(df_sun, on='BinNo', how='outer') \
            .merge(df_sum, on='BinNo') \
            .rename(columns={'count':'count_tot'}) \
            .sort_values(by=['count_tot'], ascending=False)

mean_list = df_join.head(top_num).mean().drop(['BinNo', 'count_tot']).tolist()
std_list = df_join.head(top_num).std().drop(['BinNo', 'count_tot']).tolist()
df_stats = pd.DataFrame({'dotw' : day_order, 'mean' : mean_list, 'std' : std_list})

# display(df_join.head(top_num))
# display(df_stats)

# fig = px.line(df_top, x='dotw', y='count', color='BinNo', markers=True)

fig = go.Figure()

for bin_no in df_top['BinNo'].unique():
    df_bin = df_top[df_top['BinNo'] == bin_no]
    fig.add_trace(go.Scatter(
        x=df_bin['dotw'],
        y=df_bin['count'],
        name=f'Bin {bin_no}',
        mode='lines+markers',
        marker=dict(symbol='circle'),
        line=dict(width=1)
    ))

fig.add_trace(go.Scatter(
    x=df_stats['dotw'],
    y=df_stats['mean'],
    name='MEAN+STD',
    mode='markers',
    marker=dict(color='black', symbol='diamond', size=10),
    legendrank=1,
    error_y=dict(
        type='data',
        array=2.5 * df_stats['std'],
        visible=True
    )
))
fig.show()
display(df_join.head(top_num))

In [0]:
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
import pandas as pd
import numpy as np

df_cb_content = df.select(['local_installation_timestamp', 'tag', 'values'])
data = tuple((row['content_code'], row['count']) for row in df_cb_content.collect())
print(len(data))
if len(data) == 0:
    dbutils.notebook.exit("no data")
    
content_code, count = zip(*data)
# print(type(data[0][0]))


kmeans = KMeans(n_clusters=2)
kmeans.fit(data)
# print(kmeans.labels_)
centers = kmeans.cluster_centers_
grouping = kmeans.labels_

fig, axs = plt.subplots(1, 2)

axs[0].scatter(content_code, count, c=kmeans.labels_, s=10)
axs[0].scatter(centers[:, 0], centers[:, 1], c='red', s=20, alpha=0.5, marker='X')
axs[0].set_yscale('log')
axs[0].set_xlabel('content code')
axs[0].set_ylabel('count')
axs[0].set_ylim(0, 10e8)
axs[0].grid(True, which='both', linestyle='--', linewidth=0.5)

axs[1].scatter(centers[:, 0], centers[:, 1], c='red', s=20, alpha=0.5, marker='X')
axs[1].set_xlabel('content code')
axs[1].set_yscale('log')
axs[1].set_ylim(0, 10e8)
axs[1].grid(True, which='both', linestyle='--', linewidth=0.5)

plt.tight_layout()
plt.show()

high_throughput = []
for i in range(0, len(content_code)):
    if grouping[i] == 1:
        high_throughput.append(content_code[i])
print('content_code:', content_code)
print('count:', count)
print('groupings:', grouping)
print('high throughput content codes:', high_throughput)

