In [1]:
import pandas as pd
import requests 
from requests.exceptions import HTTPError
from datetime import datetime, timedelta
from time import time, sleep
from itertools import repeat
import json
from jsondiff import diff
import random
import os
import sys
import boto3
import botocore
import concurrent.futures
import plotly.express as px
import plotly.graph_objects as go
import gzip

import secrets_veson
import secrets_aws
import secrets_proxy

from nonExposedTablesVeson import tablesOutOfScopeVeson
from VIP_data_observability_helper_function import initAWSClient, prepare_folder, getTimeSeries, getMissingTimeSlots, getListFilesDay, getDeltaJobsbMetrics, getAllJobsbMetrics, getDeltaTablesbMetrics, getAllTablesbMetrics, getDeltaJobsSizeMetrics, getAllJobsSizeMetrics,getListTablesCurrentSchema, loadSchema, getListTablesCurrentJob, aggregateJobMetrics, aggregateTableMetrics,drawScatterSingleLine, drawScatterDoubleLine, drawScatterTripleLine

In [2]:
## This code is designed to work on-prems (analytics data is stored locally) but it can easily be adapted to be run on AWS Glue or Lambda with data stored on S3

# Init AWS settings
aws_access_key_id=secrets_aws.access_key_id
aws_secret_access_key=secrets_aws.secret_key_id
bucket=secrets_aws.bucket
proxy = secrets_proxy.proxies

s3, client, BUCKET = initAWSClient(aws_access_key_id, aws_secret_access_key,bucket,proxy)

# Prepare a directory to store csv files from observability procedures
folderName = 'csv'
csv_directory = prepare_folder(folderName,clear=False)
historical_job_metrics_file_name = 'historical_job_metrics.csv'
historical_table_metrics_file_name = 'historical_table_metrics.csv'
historical_jobSize_metrics_file_name = 'historical_job_size.csv'

### Prepare the timeframe for the telemetry
# Prod suffix is dedicated to the post-PoC period starting in end of February 2022. Job configurations are different
Prod_t0 = '2022/02/08 08:00' # format '%Y/%m/%d %H'
Prod_t0 = pd.to_datetime(Prod_t0, infer_datetime_format=True)
Prod_now = datetime.utcnow()
Prod_security = 30 # Minutes buffer to include or not a timeslot

# PoC suffix is dedicated to the PoC August-September 2021 period. Job configurations are different and PoC & Prod datasets 
# should therefore not be mixed
PoC_t0 = '2021/08/25 15:00' # format '%Y/%m/%d %H'
PoC_t0 = pd.to_datetime(PoC_t0, infer_datetime_format=True)
PoC_t1 = '2021/09/15 13:00'
PoC_t1 = pd.to_datetime(PoC_t1, infer_datetime_format=True)
PoC_security = 30 # Minutes buffer to include or not a timeslot

# Synthetical time slot list for the Prod period => generates a list of timestamps, starting from PoC or Prod d0 up to current time
Prod_TS = getTimeSeries(Prod_t0, Prod_now, Prod_security)   # PoC time series
PoC_TS = getTimeSeries(PoC_t0, PoC_t1, PoC_security)        # Prod time series


#*********************************************************************
#               OPTIONAL
#*********************************************************************

#### Generate the intial dataframe if historical csv are not yet available for incremental update
## Initial Job Metrics
# getAllJobsbMetrics(BUCKET, ts=Prod_TS, csv=True, csv_directory=csv_directory, filename=historical_job_metrics_file_name)
## Initial Job Completeness Metrics
# getAllTablesbMetrics(client, BUCKET, bucket, tablesOutOfScopeVeson, ts=Prod_TS, csv=True, csv_directory=csv_directory, filename=historical_table_metrics_file_name)
## Initial Job Size Metrics
# getAllJobsSizeMetrics(BUCKET, ts=Prod_TS,csv=True, csv_directory=csv_directory,filename=historical_jobSize_metrics_file_name)


#*********************************************************************
#               BASIC DATA OBSERVABILITY
#*********************************************************************

# Check missing telemetry records
security = 30 # minutes, example: job running at 12 AM will be considered as potentially completed at 12:30 AM

### Observe missing hourly jobs
missingTimeSlots_job = getMissingTimeSlots(csv_directory,historical_job_metrics_file_name,security)
print(f'missingTimeSlots_job: {missingTimeSlots_job}')
jobsbMetrics = getDeltaJobsbMetrics(BUCKET, csv_directory, missingTimeSlots_job, historical_job_metrics_file_name, security, csv = False)

### Observe job completeness, missing tables, schema changes
missingTimeSlots_table = getMissingTimeSlots(csv_directory,historical_table_metrics_file_name, security)
print(f'missingTimeSlots_table: {missingTimeSlots_table}')
tablesMetrics = getDeltaTablesbMetrics(client, BUCKET, bucket, tablesOutOfScopeVeson,ts= missingTimeSlots_table, csv=False, csv_directory= csv_directory, filename=historical_table_metrics_file_name, security=security)

### Observe job size fluctuation, for changes in snapshot logic, change in schema
missingTimeSlots_jobSize = getMissingTimeSlots(csv_directory,historical_jobSize_metrics_file_name, security)
print(f'missingTimeSlots_jobSize: {missingTimeSlots_jobSize}')
jobSizeMetrics = getDeltaJobsSizeMetrics(BUCKET, security, ts=missingTimeSlots_jobSize, csv=False, csv_directory=csv_directory, filename=historical_jobSize_metrics_file_name)


#*********************************************************************
#               DATAVIZ
#*********************************************************************

## Optional filtering to get rid of unsuccessful jobs (size = 0) for more lisibility
jobSizeMetrics = jobSizeMetrics[jobSizeMetrics['files'] > 1] 

## Aggregation for daily figures
jobsbMetricsAggr = aggregateJobMetrics(jobsbMetrics) 
tablesMetricsAggr = aggregateTableMetrics(tablesMetrics)

## Veson VIP products SLA
SLA = 0.994 # (99.4% reliability and accuracy)

#*************************************
#   PLOTS
#*************************************

######### VOLUME
# Files Delivered (partitions, schema, scripts included) in Successful Jobs, Hourly
drawScatterSingleLine(go, x=jobSizeMetrics['date'], y=jobSizeMetrics['files'], track="Delivered", 
    title="Files Delivered (partitions, schema, scripts included) in Successful Jobs, Hourly", xaxis='Time', yaxis= 'Number of Files Delivered', w=1000, h=400, font=dict(size=11,))

# Size of Files Delivered (partitions, schema, scripts included), Hourly
drawScatterSingleLine(go, x=jobSizeMetrics['date'], y=jobSizeMetrics['size'], track="Delivered", 
    title="Size of Files Delivered (partitions, schema, scripts included), Hourly", xaxis='Time', yaxis= 'Size KB', w=1000, h=400, font=dict(size=11,))

######### JOB LEVEL
# Job Delivery Completeness, Averaged Daily
drawScatterTripleLine(go, x=pd.to_datetime(jobsbMetricsAggr.index), y1=jobsbMetricsAggr['jobDelivered'],
y2=jobsbMetricsAggr['JobExpected'], y3=[SLA * 24 for i in range(len(jobsbMetricsAggr))] , track1="Delivered", track2='Expected', track3 =f'VIP SLA: {SLA}', title="Job Delivery Completeness, Averaged Daily", xaxis='Time', yaxis= 'Jobs Delivered', w=1000, h=400, font=dict(size=11,))

# Job Delivery Completeness Ratio, Averaged Daily
drawScatterTripleLine(go, x=pd.to_datetime(jobsbMetricsAggr.index), y1=jobsbMetricsAggr['successRatio'],
y2=[1 for i in range(len(jobsbMetricsAggr))], y3=[SLA for i in range(len(jobsbMetricsAggr))] , track1="Delivered", track2='Expected', track3 =f'VIP SLA: {SLA}', title="Job Delivery Completeness Ratio, Averaged Daily", xaxis='Time', yaxis= 'Jobs Delivered', w=1000, h=400, font=dict(size=11,))

# Job Delivery Completeness (out of 1, hourly)
drawScatterDoubleLine(go, x=pd.to_datetime(jobsbMetrics['date']), y1=jobsbMetrics['jobDelivered'], 
y2=[1 for i in range(len(jobsbMetrics))], track1="Delivered", track2='Expected',title="Job Delivery Completeness (out of 1, hourly)", xaxis='Time', yaxis= 'Jobs Delivered', w=1000, h=400, font=dict(size=11,))

######### TABLE LEVEL
# Table Delivery Completeness, Averaged Daily
drawScatterTripleLine(go, x=pd.to_datetime(tablesMetricsAggr.index), y1=tablesMetricsAggr['tablesDelivered'], 
y2=tablesMetricsAggr['tablesExpected'], y3=SLA * tablesMetricsAggr['tablesExpected'], track1="Delivered", track2='Expected', track3 =f'VIP SLA: {SLA}', title="Table Delivery Completeness, Averaged Daily", xaxis='Time', yaxis= 'Number of Files Delivered', w=1000, h=400, font=dict(size=11,))

# Table Delivery Completeness Ratio, Averaged Daily
drawScatterTripleLine(go, x=pd.to_datetime(tablesMetricsAggr.index), y1=tablesMetricsAggr['completenessRatio'], 
y2=[1 for i in range(len(tablesMetricsAggr))], y3=[SLA for i in range(len(tablesMetricsAggr))], track1="Delivered", track2='Expected', track3 =f'VIP SLA: {SLA}', title="Table Delivery Completeness Ratio, Averaged Daily", xaxis='Time', yaxis= 'Number of Files Delivered', w=1000, h=400, font=dict(size=11,))

# Table Delivery Completeness, Hourly
drawScatterTripleLine(go, x=tablesMetrics['date'], y1=tablesMetrics['tablesDelivered'], 
y2=tablesMetrics['tablesExpected'], y3=SLA * tablesMetrics['tablesExpected'], track1="Delivered", track2='Expected', track3 =f'VIP SLA: {SLA}', title="Table Delivery Completeness, Hourly", xaxis='Time', yaxis= 'Number of Files Delivered', w=1000, h=400, font=dict(size=11,))

# Table Delivery Failure, Averaged Daily
drawScatterTripleLine(go, x=pd.to_datetime(tablesMetricsAggr.index), y1=tablesMetricsAggr['missingTables'], 
y2=[0 for i in range(len(tablesMetricsAggr))], y3=(1-SLA) * tablesMetricsAggr['tablesExpected'], track1="Delivered", track2='Expected', track3 =f'VIP SLA: {SLA}', title="Table Delivery Failure, Averaged Daily", xaxis='Time', yaxis= 'Number of Missing Files', w=1000, h=400, font=dict(size=11,))

# Table Delivery Failure, Hourly
drawScatterTripleLine(go, x=tablesMetrics['date'], y1=tablesMetrics['missingTables'],
y2=[0 for i in range(len(tablesMetrics))], y3=(1-SLA) * tablesMetrics['tablesExpected'], track1="Delivered", track2='Expected', track3 =f'VIP SLA: {SLA}', title="Table Delivery Failure, Hourly", xaxis='Time', yaxis= 'Number of Missing Files', w=1000, h=400, font=dict(size=11,))

AWS S3 client created
folder "csv" created
missingTimeSlots_job: ['2022060106', '2022060107']
Saved
missingTimeSlots_table: ['2022053121', '2022053122', '2022053123', '2022060100', '2022060101', '2022060102', '2022060103', '2022060104', '2022060105', '2022060106', '2022060107']
missingTimeSlots_jobSize: ['2022060106', '2022060107']

