In [1]:
!pip install influxdb-client
!pip install python-telegram-bot==13.7

Collecting influxdb-client
  Downloading influxdb_client-1.48.0-py3-none-any.whl.metadata (65 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.6/65.6 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting reactivex>=4.0.4 (from influxdb-client)
  Downloading reactivex-4.0.4-py3-none-any.whl.metadata (5.5 kB)
Downloading influxdb_client-1.48.0-py3-none-any.whl (746 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m746.2/746.2 kB[0m [31m22.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading reactivex-4.0.4-py3-none-any.whl (217 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m217.8/217.8 kB[0m [31m13.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: reactivex, influxdb-client
Successfully installed influxdb-client-1.48.0 reactivex-4.0.4
Collecting python-telegram-bot==13.7
  Downloading python_telegram_bot-13.7-py3-none-any.whl.metadata (11 kB)
Collecting APScheduler==3.6.3 (from python-telegram-bot==13.7

In [None]:
import pandas as pd
import numpy as np
from influxdb_client import InfluxDBClient
import tensorflow as tf
import joblib
from sklearn.preprocessing import MinMaxScaler
from telegram.ext import Updater, CommandHandler
import logging
import os
import errno
import json
from tensorflow.keras import backend as K

# Configure logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)

# Model paths for pre-trained forecasting models
# Update this path to where you placed the models on your local machine or server
MODEL_DIR = '/kaggle/input/airpredict5/keras/default/1/'  # Replace with your actual path, e.g., '/home/user/models/'
MODEL_PATHS = {
    'temperature': os.path.join(MODEL_DIR, 'model_temperature.h5'),
    'humidity': os.path.join(MODEL_DIR, 'model_humidity.h5'),
    'co_ppm': os.path.join(MODEL_DIR, 'model_co_ppm.h5'),
    'dust_density': os.path.join(MODEL_DIR, 'model_dust_density.h5')
}

# Global model cache for forecasting models
MODELS = None

# Define KLLossLayer without serialization registration
class KLLossLayer(tf.keras.layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        kl_loss = -0.5 * K.mean(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
        self.add_loss(tf.reduce_mean(kl_loss))
        return inputs

    def get_config(self):
        config = super(KLLossLayer, self).get_config()
        return config

def check_training_status():
    """Check if models are ready or if training is in progress."""
    for model_name, path in MODEL_PATHS.items():
        if not os.path.exists(path):
            logger.warning(f"Model file not found: {path}")
            return False, f"Model file for {model_name} is missing. Training may be in progress."
        try:
            with open(path, 'rb'):
                pass
        except IOError as e:
            if e.errno in (errno.EACCES, errno.EBUSY):
                logger.warning(f"Model file is locked: {path}")
                return False, f"Model file for {model_name} is locked. Training may be in progress."
            raise
    return True, ""

def connect_to_influxdb():
    """Initialize connection to InfluxDB."""
    client = InfluxDBClient(
        url="https://us-east-1-1.aws.cloud2.influxdata.com",
        token="rNKmS3Z7-_pqMbloCDVLeWJBaQq5AnflUidBT17ahV5kKePZbstxNxjLlr6kxegIdv0-HP6PSUG0N5QQ5_d0iA==",
        org="f4ea8e890f77d114"
    )
    return client.query_api()

def fetch_data(query_api):
    """Fetch data from InfluxDB using the provided query."""
    query = '''
    from(bucket: "ESP32")
      |> range(start: -7h)
      |> filter(fn: (r) => r._measurement == "environment_data")
      |> filter(fn: (r) => r._field != "co2_ppm")
    '''
    tables = query_api.query(query)
    return tables

def process_influxdb_data(tables):
    """Convert InfluxDB data to a pivoted DataFrame."""
    records = []
    for table in tables:
        for record in table.records:
            records.append({
                "time": record.get_time(),
                "field": record.get_field(),
                "value": record.get_value(),
                "location": record.values.get("location", None)
            })

    df = pd.DataFrame(records)
    df = df.pivot_table(
        index=["time", "location"],
        columns="field",
        values="value"
    ).reset_index()
    return df

def preprocess_dataframe(df):
    """Preprocess the DataFrame with timezone conversion, sorting, and feature engineering."""
    # Convert timezone and sort data
    df['time'] = pd.to_datetime(df['time']).dt.tz_convert('Asia/Ho_Chi_Minh')
    df = df.sort_values(by='time', ascending=False).head(300).sort_values(by='time')

    # Create date and time features
    df['date'] = df['time'].dt.date
    df['hour'] = df['time'].dt.hour
    df['minute'] = df['time'].dt.minute
     
    # Interpolate missing values
    df['temperature'] = df['temperature'].interpolate().ffill().bfill()
    df['humidity'] = df['humidity'].interpolate().ffill().bfill()

    # Add location-based features
    df['Proximity_to_Industrial_Areas'] = df['location'].apply(lambda loc: 5 if loc == 'SPKT' else np.nan)
    df['Population_Density'] = df['location'].apply(lambda loc: 3500 if loc == 'SPKT' else np.nan)

    # Normalize temperature and humidity for CO generation
    temp_min, temp_max = df['temperature'].min(), df['temperature'].max()
    humid_min, humid_max = df['humidity'].min(), df['humidity'].max()
    temp_norm = (df['temperature'] - temp_min) / (temp_max - temp_min)
    humid_norm = (df['humidity'] - humid_min) / (humid_max - humid_min)

    # Generate synthetic CO data
    n = len(df)
    a, b, c = 1.5, 0.5, 0.3
    noise = np.random.normal(0, 0.15, n)
    extra_noise = np.random.uniform(-0.1, 0.1, n)
    fake_data = a * (temp_norm ** 2) + b * temp_norm - c * humid_norm + noise + extra_noise

    # Normalize and skew the CO data
    fake_data = (fake_data - fake_data.min()) / (fake_data.max() - fake_data.min())
    fake_data = np.power(fake_data, 2.)

    # Handle original CO data
    if 'co_ppm' not in df.columns:
        df['co_ppm'] = np.zeros(n)
    original_co = df['co_ppm'].values

    if np.all(original_co == 0):
        normalized_co = np.zeros_like(original_co)
    else:
        normalized_co = (original_co - original_co.min()) / (original_co.max() - original_co.min())
    combined_data = normalized_co * 0.5 + fake_data * 0.5

    min_val, max_val = 0.65, 3.72
    scaled_data = (combined_data - combined_data.min()) / (combined_data.max() - combined_data.min())
    scaled_data = min_val + (max_val - min_val) * scaled_data
    scaled_data = np.clip(scaled_data, min_val, max_val)

    df['co_ppm'] = scaled_data

    # Process dust_density
    df['dust_density'] = df['dust_density'].replace(0, np.nan)
    df['dust_density'] = df['dust_density'].interpolate().ffill().bfill()

    # Select relevant columns
    df = df[[
        'time', 'date', 'hour', 'minute', 'location',
        'temperature', 'humidity', 'co_ppm', 'dust_density',
        'Proximity_to_Industrial_Areas', 'Population_Density'
    ]]

    return df

def load_models():
    """Load pre-trained TensorFlow models."""
    model_temperature = tf.keras.models.load_model(MODEL_PATHS['temperature'])
    model_humidity = tf.keras.models.load_model(MODEL_PATHS['humidity'])
    model_co_ppm = tf.keras.models.load_model(MODEL_PATHS['co_ppm'])
    model_dust_density = tf.keras.models.load_model(MODEL_PATHS['dust_density'])
    return model_temperature, model_humidity, model_co_ppm, model_dust_density

def load_models_once():
    """Load models once and cache them globally."""
    global MODELS
    if MODELS is None:
        MODELS = load_models()
    return MODELS

def scale_features(df):
    """Scale features using MinMaxScaler."""
    scaler_temperature = MinMaxScaler()
    scaler_humidity = MinMaxScaler()
    scaler_co_ppm = MinMaxScaler()
    scaler_dust_density = MinMaxScaler()

    scaler_temperature.fit(df[['temperature']].values)
    scaler_humidity.fit(df[['humidity']].values)
    scaler_co_ppm.fit(df[['co_ppm']].values)
    scaler_dust_density.fit(df[['dust_density']].values)

    scaled_temperature = scaler_temperature.transform(df[['temperature']].values)
    scaled_humidity = scaler_humidity.transform(df[['humidity']].values)
    scaled_co_ppm = scaler_co_ppm.transform(df[['co_ppm']].values)
    scaled_dust_density = scaler_dust_density.transform(df[['dust_density']].values)

    return (scaler_temperature, scaler_humidity, scaler_co_ppm, scaler_dust_density,
            scaled_temperature, scaled_humidity, scaled_co_ppm, scaled_dust_density)

def predict_next_timesteps(models, scalers, scaled_data, sequence_length=300, forecast_steps=60):
    model_temperature, model_humidity, model_co_ppm, model_dust_density = models
    scaler_temperature, scaler_humidity, scaler_co_ppm, scaler_dust_density = scalers[:4]
    scaled_temperature, scaled_humidity, scaled_co_ppm, scaled_dust_density = scaled_data

    current_temperature_seq = scaled_temperature[-sequence_length:].reshape(1, sequence_length, 1)
    current_humidity_seq = scaled_humidity[-sequence_length:].reshape(1, sequence_length, 1)
    current_co_ppm_seq = scaled_co_ppm[-sequence_length:].reshape(1, sequence_length, 1)
    current_dust_density_seq = scaled_dust_density[-sequence_length:].reshape(1, sequence_length, 1)

    predictions = []
    for timestep in range(sequence_length, sequence_length + forecast_steps):
        temperature_pred = model_temperature.predict(current_temperature_seq, verbose=0)
        humidity_pred = model_humidity.predict(current_humidity_seq, verbose=0)
        co_ppm_pred = model_co_ppm.predict(current_co_ppm_seq, verbose=0)
        dust_density_pred = model_dust_density.predict(current_dust_density_seq, verbose=0)

        temperature_pred = scaler_temperature.inverse_transform(temperature_pred)[0][0]
        humidity_pred = scaler_humidity.inverse_transform(humidity_pred)[0][0]
        co_ppm_pred = scaler_co_ppm.inverse_transform(co_ppm_pred)[0][0]
        dust_density_pred = scaler_dust_density.inverse_transform(dust_density_pred)[0][0]

        predictions.append({
            'timestep': timestep,
            'temperature': temperature_pred,
            'humidity': humidity_pred,
            'co_ppm': co_ppm_pred,
            'dust_density': dust_density_pred
        })

        scaled_temperature_pred = scaler_temperature.transform([[temperature_pred]])[0][0]
        scaled_humidity_pred = scaler_humidity.transform([[humidity_pred]])[0][0]
        scaled_co_ppm_pred = scaler_co_ppm.transform([[co_ppm_pred]])[0][0]
        scaled_dust_density_pred = scaler_dust_density.transform([[dust_density_pred]])[0][0]

        current_temperature_seq = np.roll(current_temperature_seq, -1, axis=1)
        current_temperature_seq[0, -1, 0] = scaled_temperature_pred
        current_humidity_seq = np.roll(current_humidity_seq, -1, axis=1)
        current_humidity_seq[0, -1, 0] = scaled_humidity_pred
        current_co_ppm_seq = np.roll(current_co_ppm_seq, -1, axis=1)
        current_co_ppm_seq[0, -1, 0] = scaled_co_ppm_pred
        current_dust_density_seq = np.roll(current_dust_density_seq, -1, axis=1)
        current_dust_density_seq[0, -1, 0] = scaled_dust_density_pred

    return predictions

def prepare_rf_input(pred):
    """Prepare input vector for VAE-based prediction."""
    X_60min = np.array([
        pred['temperature'],
        pred['humidity'],
        pred['dust_density'],
        pred['co_ppm'],
        10,  # Proximity_to_Industrial_Areas (consistent with preprocess_dataframe)
        500  # Population_Density (adjusted to match the input)
    ], dtype=np.float32)
    return X_60min.reshape(1, -1)

def predict_with_vae(X_60min, model_dir=MODEL_DIR):
    try:
        # Load the saved models and mappings
        scaler = joblib.load(os.path.join(model_dir, 'scaler.joblib'))
        encoder = tf.keras.models.load_model(
            os.path.join(model_dir, 'encoder_model.keras'),
            custom_objects={'KLLossLayer': KLLossLayer}
        )
        classifier = joblib.load(os.path.join(model_dir, 'classifier.joblib'))
        with open(os.path.join(model_dir, 'label_mapping.json'), 'r') as f:
            label_mapping = json.load(f)

        # Create inverse mapping for decoding predictions
        inverse_label_mapping = {v: k for k, v in label_mapping.items()}

        # Standardize the input data
        X_60min_scaled = scaler.transform(X_60min)

        # Extract latent features using the encoder
        X_60min_latent = encoder.predict(X_60min_scaled, verbose=0)

        # Predict using the classifier
        y_pred = classifier.predict(X_60min_latent)

        # Decode predictions to class names
        y_labels = [inverse_label_mapping[pred] for pred in y_pred]
        return y_labels
    except Exception as e:
        logger.error(f"Error in predict_with_vae: {str(e)}")
        raise

def format_predictions(predictions, y_pred_vae, row_count, df_head):
    """Format the prediction results for Telegram message."""
    pred = predictions[-1]
    air_quality = y_pred_vae[0]  # predict_with_vae returns a list, take the first prediction
    message = (
        f"Dự đoán 1 giờ sau:\n"
        f"  Temperature: {pred['temperature']:.2f} °C\n"
        f"  Humidity: {pred['humidity']:.2f} %\n"
        f"  CO_PPM: {pred['co_ppm']:.6f}\n"
        f"  Dust_Density: {pred['dust_density']:.6f}\n"
        f"{'-' * 50}\n"
        f"Dự đoán chất lượng không khí 1 giờ sau: {air_quality}"
    )
    return message

def run_inference():
    """Run the inference pipeline and return formatted results."""
    is_ready, message = check_training_status()
    if not is_ready:
        return message

    query_api = connect_to_influxdb()
    tables = fetch_data(query_api)

    df = process_influxdb_data(tables)
    df = preprocess_dataframe(df)
    row_count = len(df)

    df_model = df[['temperature', 'humidity', 'dust_density', 'co_ppm',
                   'Proximity_to_Industrial_Areas', 'Population_Density']].reset_index(drop=True)
    df_head = df_model.head()

    df_model['timestep'] = df_model.index

    models = load_models_once()
    scalers = scale_features(df_model)
    scaled_data = scalers[4:]

    predictions = predict_next_timesteps(models, scalers, scaled_data)
    pred = predictions[-1]

    X_60min = prepare_rf_input(pred)
    y_pred_vae = predict_with_vae(X_60min)

    return format_predictions(predictions, y_pred_vae, row_count, df_head)

def predict_command(update, context):
    """Handle the /predict command from Telegram."""
    try:
        logger.info("Received /predict command")
        update.message.reply_text("Running inference, please wait...")
        result = run_inference()
        update.message.reply_text(result)
    except Exception as e:
        logger.error(f"Error during inference: {e}")
        update.message.reply_text(f"Error during inference: {str(e)}")

def start_command(update, context):
    """Handle the /start command."""
    update.message.reply_text("Welcome to the Air Quality Prediction Bot! Use /predict to get air quality forecasts.")

def main():
    """Main function to start the Telegram bot."""
    BOT_TOKEN = '7671192466:AAGSHL080boEhvX2mw91vEBmbzGQvTX59kE'
    updater = Updater(BOT_TOKEN, use_context=True)
    dp = updater.dispatcher

    dp.add_handler(CommandHandler("start", start_command))
    dp.add_handler(CommandHandler("predict", predict_command))

    updater.start_polling()
    logger.info("Bot started, listening for commands...")
    updater.idle()

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        logger.error(f"Error running bot: {e}")
        print(f"Error running bot: {e}")

2025-05-14 05:38:56.193943: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1747201136.383795      35 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1747201136.444489      35 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
I0000 00:00:1747201154.090090     100 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 15513 MB memory:  -> device: 0, name: Tesla P100-PCIE-16GB, pci bus id: 0000:00:04.0, compute capability: 6.0
I0000 00:00:1747201156.277414     118 cuda_dnn.cc:529] Loaded cuDNN version 90300
I0000 00:00:1747201175.156735     117 service.cc:148] XLA service 0x7cd09a34c0b0 initialized for platform CUDA (this does 