In [None]:
import logging
from typing import List

import json
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt

from confluent_kafka import Consumer, KafkaError, KafkaException


In [None]:
import warnings
warnings.filterwarnings('ignore')


# Consume events

In [None]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

kafka_config = {
    'bootstrap.servers': 'MBP-tommy:9092', 
    'group.id': 'data_quality_group',
    'auto.offset.reset': 'earliest'
}
topic = 'water_quality_data'


In [None]:
cols_agg = ['CountryCode', 'observedPropertyDeterminandBiologyEQRCode',
       'parameterWaterBodyCategory', 'parameterNCSWaterBodyType',
       'parameterWFDIntercalibrationWaterBodyType', 'parameterNaturalAWBHMWB',
       'parameterICStatusOfDeterminandBiologyEQR',
       'parameterBoundaryValueClasses12', 'parameterBoundaryValueClasses23',
       'parameterBoundaryValueClasses34', 'parameterBoundaryValueClasses45',
       'procedureBiologicalAnalyticalMethodDescription',
       'resultObservationStatus', 'Remarks', 'metadata_versionId',
       'metadata_beginLifeSpanVersion', 'metadata_statusCode',
       'metadata_observationStatus', 'metadata_statements', 'UID']

data_df = pd.DataFrame(columns=cols_agg)


In [None]:
cols_eqr = ['monitoringSiteIdentifier', 'monitoringSiteIdentifierScheme',
       'parameterWaterBodyCategory', 'parameterNCSWaterBodyType',
       'observedPropertyDeterminandBiologyEQRCode',
       'phenomenonTimeReferenceYear', 'parameterSamplingPeriod',
       'resultEcologicalStatusClassValue', 'resultNumberOfSamples',
       'resultEQRValue', 'resultNormalisedEQRValue', 'resultObservationStatus',
       'Remarks', 'metadata_versionId', 'metadata_beginLifeSpanVersion',
       'metadata_statusCode', 'metadata_observationStatus',
       'metadata_statements', 'UID']

data_df = pd.DataFrame(columns=cols_eqr)


In [None]:
def consume_messages():
    try:
        consumer = Consumer(kafka_config)
        consumer.subscribe([topic])

        while True:
            message = consumer.poll(1.0)

            if message is None:
                continue

            if message.error():
                if message.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    logger.error("Error while consuming: %s", message.error())
                    break

            key = message.key()
            value = message.value()

            try:
                data_dict = json.loads(value)
                yield data_dict
            except json.JSONDecodeError as e:
                logger.error(f"Error decoding JSON: {e}")
                continue
            
    except KafkaException as e:
        logger.error("Kafka error: %s", e)
    finally:
        consumer.close()


In [None]:
chunk_size = 1000
chunk = []
processed_data_thres = 5000
processed_data_count = 0

for message_data in consume_messages():
    if message_data is None:
        break
    processed_data_count += 1
    chunk.append(message_data)

    if len(chunk) == chunk_size:
        chunk_df = pd.DataFrame(chunk, columns=cols_eqr)
        data_df = pd.concat([data_df, chunk_df], ignore_index=True)
        chunk = []
    
    if processed_data_count % processed_data_thres == 0:
        logger.info(f"Received events: {processed_data_count}")


In [None]:
# add rest of chunk
chunk_df = pd.DataFrame(chunk, columns=cols_eqr)
data_df = pd.concat([data_df, chunk_df], ignore_index=True)


In [None]:
data_df.shape


In [None]:
data_df.tail().T


# Data quality analysis

In [None]:
data_df['countryCode'] = data_df['monitoringSiteIdentifier'].str.extract(r'^([A-Za-z]{2})')


## share of columns with missing data

In [None]:
null_percentage = (data_df.isnull().sum() / len(data_df)) * 100
null_percentage


In [None]:
plt.figure(figsize=(10, 6))
plt.bar(null_percentage.index, null_percentage.values, color='skyblue')
plt.xlabel('Columns')
plt.ylabel('Percentage of Null Values')
plt.title('Percentage of Null Values in Each Column')
plt.xticks(rotation=90)
plt.grid(axis='y', linestyle='--', alpha=0.7)


## water data quality

In [None]:
data_df["resultQualityNumberOfSamplesBelowLOQ"].fillna(0, inplace=True)


In [None]:
def calculate_aboveLOQ_share(df: pd.DataFrame, numerator_col: str, denominator_col: str, group_by_cols: List[str]):
    grouped = df.groupby(group_by_cols)
    result = 1 - grouped[numerator_col].sum() / grouped[denominator_col].sum()
    result = result.reset_index()
    result = result.rename(columns={0: 'aboveLOQ_share_result'})
    return result


In [None]:
def plot_share_bar_chart(grouped_data, group_by_cols, share_col):
    grouped_data = grouped_data.sort_values(share_col, ascending=False)
    
    plt.figure(figsize=(10, 6))
    plt.bar(grouped_data.index, grouped_data[share_col], color='skyblue')
    plt.xlabel(', '.join(group_by_cols))
    plt.ylabel(f'Share of {share_col}')
    plt.title(f'Share of {share_col} by {", ".join(group_by_cols)}')
    plt.xticks(grouped_data.index, grouped_data[group_by_cols].apply(lambda x: ', '.join(map(str, x)), axis=1), rotation=90)
    plt.grid(axis='y', linestyle='--', alpha=0.7)

    plt.show()


In [None]:
df_aboveLOQ_share_by_countryCode = calculate_aboveLOQ_share(data_df, 'resultQualityNumberOfSamplesBelowLOQ', 'resultNumberOfSamples', ['countryCode'])
plot_share_bar_chart(df_aboveLOQ_share_by_countryCode, ['countryCode'], 'aboveLOQ_share_result')


In [None]:
df_aboveLOQ_share_by_year = calculate_aboveLOQ_share(data_df, 'resultQualityNumberOfSamplesBelowLOQ', 'resultNumberOfSamples', ['phenomenonTimeReferenceYear'])
df_aboveLOQ_share_by_year.dropna(inplace=True)
plot_share_bar_chart(df_aboveLOQ_share_by_year, ['phenomenonTimeReferenceYear'], 'aboveLOQ_share_result')


In [None]:
df_aboveLOQ_share_by_water_body_category = calculate_aboveLOQ_share(data_df, 'resultQualityNumberOfSamplesBelowLOQ', 'resultNumberOfSamples', ['parameterWaterBodyCategory'])
plot_share_bar_chart(df_aboveLOQ_share_by_water_body_category, ['parameterWaterBodyCategory'], 'aboveLOQ_share_result')


In [None]:
df_aboveLOQ_share_by_monitoring_site = calculate_aboveLOQ_share(data_df, 'resultQualityNumberOfSamplesBelowLOQ', 'resultNumberOfSamples', ['monitoringSiteIdentifierScheme'])
plot_share_bar_chart(df_aboveLOQ_share_by_monitoring_site, ['monitoringSiteIdentifierScheme'], 'aboveLOQ_share_result')


## save to parquet

In [None]:
data_df.to_parquet('data_df.parquet')
