In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import os

env = os.environ.get('Environment')
print(f"Environment: {env!r}")

Environment: 'prod'


In [2]:
%reload_ext jupyter_dmdg

#### Part 1: Obtain a dataframe with the process events that a building of a site has in the Data Lake (DL)

##### 1.1. Query process events from the DL

In [3]:
# Selecting these event variables is sufficient to obtain all process events
# In addition, we select site_id and building_id because we will filter based on these two variables

In [4]:
%%athena_to_df --out df_DL
SELECT
    site_id,
    building_id,
    event_name,
    event_name_full AS recipe_full,
    event_parameter_name AS parameter_name,  
    event_parameter_value AS parameter_value,
    event_start AS start_date,
    event_stop AS end_date
FROM "prod_plant_connectivity_prepared_data"."process_events"

##### 1.2. Enter a specific site_id and a specific building_id

In [5]:
# Thus the resulting dataframe (df) only includes events for the site building entered

In [6]:
column_name1 = input("Enter site_id: ")
value1 = input("Enter your site_id: ")
column_name2 = input("Enter building_id: ")
value2 = input("Enter your building_id: ")
df_DL_site_building = df_DL[(df_DL[column_name1] == value1) & (df_DL[column_name2] == value2)]

Enter site_id:  site_id
Enter your site_id:  MLE
Enter building_id:  building_id
Enter your building_id:  0V10


##### 1.3. Match the resulting df to the format of the csv containing the process events extracted directly from the sources

In [7]:
# We are asked to indicate the time difference
# This is because in the site the timestamp columns are usually filled with local time, while in DL UTC is used

In [8]:
df_DL_site_building = df_DL_site_building.copy()
time_difference = int(input("Enter time difference: "))
df_DL_site_building.loc[:, 'start_date'] += pd.Timedelta(hours=time_difference)
df_DL_site_building.loc[:, 'end_date'] += pd.Timedelta(hours=time_difference)
df_DL_site_building.loc[:, 'parameter_value'] = df_DL_site_building['parameter_value'].astype(float)
df_DL_site_building = df_DL_site_building.astype(str)
len(df_DL_site_building)

Enter time difference:  1


141270

#### Part 2: Obtain a df with the process events extracted directly from the data sources (DS)

##### 2.1. Read the file coming from DS and take into account the deduplication of the process events in the DL

In [9]:
# Such deduplication means when two or more process events are identical except for the parameter_ts, 
# only one is kept in the DL and the rest are deleted

In [10]:
# Paramater_ts is the time when the info was available/saved into the historian
# Its name depends on the site (for Marcy is DATE_PARAMETRE, e.g.); there are sites that do not use this column yet
# That is why it is asked to specify the name of this variable (if it is not used yet, enter na)

In [11]:
source_file_to_read = input("Enter your source csv file: ")
df_all_process_events = pd.read_csv(source_file_to_read)
df_all_process_events = df_all_process_events.reset_index().rename(columns={'index': 'ID'})
df_all_process_events['ID'] = df_all_process_events['ID'].astype(str)
sort_column = input("Enter the parameter_ts as it appears in the source: ")
if sort_column != "na":
    df_all_process_events.sort_values(by=sort_column, ascending=False, inplace=True)
df_all_process_events.drop_duplicates(subset=['RECIPE_FULL', 'PARAMETER_NAME', 'PARAMETER_VALUE', 'START_DATE', 'END_DATE'], keep='first', inplace=True)
len(df_all_process_events)

Enter your source csv file:  MLE/0V10/MLE_0V10.csv
Enter the parameter_ts as it appears in the source:  DATE_PARAMETRE


5182

##### 2.2. Make sure that the process events come from mapped equipment

In [12]:
# In Streaming Engine's Confluence space you have a page that explains how to extract this info

In [None]:
mapped_equipment_file_to_read = input("Enter your mapped equipment file: ")
df_mapped_equipment = pd.read_csv(mapped_equipment_file_to_read)
df_DS = df_all_process_events[df_all_process_events['EQUIPMENT_NAME'].isin(df_mapped_equipment['EQUIPMENT_NAME'])]
len(df_DS)

##### 2.3. Adapt the resulting df to the format of the df with the process events extracted from the DL

In [None]:
df_DS.columns = df_DS.columns.str.lower()
df_DS = df_DS.astype(str)
df_DS['start_date'] = df_DS['start_date'].str.slice(stop=-4)
df_DS['end_date'] = df_DS['end_date'].str.slice(stop=-4)
len(df_DS)

#### Part 3: Obtain a csv with datagaps only

##### 3.1. See if the process events extracted directly from the source are in the df extracted from the DL or not

In [None]:
df_merge = pd.merge(df_DS, df_DL_site_building, on=['recipe_full', 'parameter_name', 'parameter_value', 'start_date', 'end_date'], how='left')

In [None]:
# Keep only those process events that do not appear in the DL 

In [None]:
datagaps = df_merge[df_merge['event_name'].isna()]
datagaps = datagaps.drop('event_name', axis=1)
len(datagaps)

##### 3.2. To have the df with datagaps in the same format as the input file with the source data

In [None]:
mask = df_all_process_events['ID'].isin(datagaps['id'])
result = df_all_process_events[mask]
result = result.drop('ID', axis=1)
len(result)

##### 3.3. Keep only datagaps whose end_date is greater than parameter_ts 

In [None]:
# The reason is that a process event is defined by a name, a start date and an end date
# If the parameter_ts is not within that time interval, yes, there is datagap,
# but the root cause is not Streaming Enigne but a lower level
# This step only applies if the site uses the variable parameter_ts

In [None]:
if sort_column != "na":
    filtered_result = result[result[sort_column] <= result['END_DATE']]
else:
    filtered_result = result
len(filtered_result)

In [None]:
# To get a csv with all the datagaps for the building of the site in question

In [None]:
file_path = f'{value1}/{value2}/datagaps_{value1}_{value2}.csv'
filtered_result.to_csv(file_path)

In [None]:
# To represent the number of datagaps per equipment

In [None]:
var1 = filtered_result['EQUIPMENT_NAME'].value_counts()
plt.figure(figsize=(10, 5))
var1.plot(kind='bar')
plt.xlabel('Equipment name')
plt.ylabel('Number of datagaps')
plt.title('Number of datagaps per equipment')
fig_path = f'{value1}/{value2}/datagaps_{value1}_{value2}_per_equipment.png'
plt.savefig(fig_path)
plt.close()

In [None]:
# To represent the number of datagaps vs the day they were stored in the historian

In [None]:
filtered_result = filtered_result.copy()
filtered_result['DATE_PARAMETRE'] = pd.to_datetime(filtered_result['DATE_PARAMETRE'])
filtered_result.set_index('DATE_PARAMETRE', inplace=True)
weekly_counts = filtered_result.groupby(pd.Grouper(freq='D')).count()
plt.figure(figsize=(20, 10))
plt.bar(weekly_counts.index, weekly_counts['BATCH_ID'])
plt.xlabel('Date parameter day')
plt.ylabel('Number of datagaps')
plt.title('Number of datagaps per date parameter day')
fig_path = f'{value1}/{value2}/datagaps_{value1}_{value2}_per_day.png'
plt.savefig(fig_path)
plt.close()

In [None]:
# Percentage of missing data 

In [None]:
print("Considering total number of rows of the initial csv as total number of process events and only datagaps when date parameter is lower than end date: " + str(round(len(filtered_result)*100/len(df_all_process_events),2)))