In [1]:
import pandas as pd
# import fireducks.pandas as pd
import numpy as np
import re
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
from pandas.api.types import is_numeric_dtype
from pymongo import MongoClient
import json

import warnings
warnings.filterwarnings('ignore')

In [2]:
def query_mongodb(client, collection: str = '', query_json: dict = {}, query_project: dict = {}, skip: int = 0, limit: int = 0):
    # Define MongoDB parameters
    mongo_srv_uri = 'mongodb://localhost:27017/'
    mongo_user = ''
    mongo_password = ''
    mongo_db_name = 'facsat2'
    mongo_collection_name = collection

    try:
        # Access the MongoDB database
        db = client[mongo_db_name]
        collection = db[mongo_collection_name]

        # Perform MongoDB operations
        result = collection.find(query_json, query_project).skip(skip).limit(limit) 
    except Exception as e:
        print("No fue posible realizar la consulta\nError:\n")
        print(e)
    
    data = list(result) # pl.DataFrame(list(result))
    return data

In [3]:

def column_summary(df: pd.DataFrame, system, node, tabla, data_info):
    summary_data = []
    # tabla = df["Tabla"].unique()[0]
    df.drop(columns=["Tabla"], inplace=True)
    
    for col_name in df.columns:

        try:
            valid_value = data_info.loc[(~data_info['Valido'].isna()) & (data_info['Variable'] == col_name) & (data_info['Nodo'] == node) & (data_info['Tabla'] == tabla), 'Valido'].unique()
            invalid_value = data_info.loc[(~data_info['Invalido'].isna()) & (data_info['Variable'] == col_name) & (data_info['Nodo'] == node) & (data_info['Tabla'] == tabla), 'Invalido'].unique()

            if len(valid_value) > 0:
                invalidos = df.loc[~df[col_name].isin(json.loads(valid_value[0])), col_name].count()
                df = df.loc[df[col_name].isin(json.loads(valid_value[0]))]
            elif not pd.isna(invalid_value):
                invalidos = df.loc[df[col_name] == invalid_value[0], col_name].count()
                df = df.loc[df[col_name] != invalid_value[0]]
        except:
            invalidos = np.nan

        # df[col_name] = df[col_name].astype(int) if np.all(df[col_name] == df[col_name].astype(int)) else df[col_name]
        col_dtype = df[col_name].dtype
        num_of_nulls = df[col_name].isnull().sum()
        num_of_non_nulls = df[col_name].notnull().sum()
        num_of_distinct_values = df[col_name].nunique()

        if is_numeric_dtype(df[col_name]):
            min_value = np.round(df[col_name].min(),3)
            max_value = np.round(df[col_name].max(),3)
            mean_of_values = np.round(df[col_name].dropna().mean(), 3)
            q_1 = np.round(np.quantile(df[col_name].dropna(), 0.25), 3)
            median_of_values = np.round(df[col_name].dropna().median(), 3)
            q_3 = np.round(np.quantile(df[col_name].dropna(), 0.75), 3)
            variance_of_values = np.round(np.var(df[col_name].dropna()), 3)
        else:
            min_value = df[col_name].min()
            max_value = df[col_name].max()
            mean_of_values = np.nan
            q_1 = np.nan
            median_of_values = np.nan
            q_3 = np.nan
            variance_of_values = np.nan
        
        # if num_of_distinct_values <= 10:
        #     distinct_values_counts = df[col_name].value_counts().to_dict()
        # else:
        #     top_10_values_counts = df[col_name].value_counts().head(10).to_dict()
        #     distinct_values_counts = {k: v for k, v in sorted(top_10_values_counts.items(), key=lambda item: item[1], reverse=True)}

        summary_data.append({
            'sistema': system,
            'nodo': node,
            'tabla': tabla,
            'variable': col_name,
            'tipo': col_dtype,
            'num_de_nulos': num_of_nulls,
            'num_de_no_nulos': num_of_non_nulls,
            'num_de_val_distintos': num_of_distinct_values,
            'num_de_val_invalidos': invalidos,
            'min': min_value,
            'max': max_value,
            'promedio': mean_of_values,
            'Q1': q_1,
            'mediana': median_of_values,
            'Q3': q_3,
            'varianza': variance_of_values
            # 'distinct_values_counts': distinct_values_counts
        })
    
    summary_df = pd.DataFrame(summary_data)
    return summary_df

In [4]:
def crear_dataframe(data):

    final_df = pd.DataFrame()
    # Normalizar los datos JSON
    df = pd.json_normalize(data)
    
    if len(df) > 0:
        # Convertir la columna de timestamps a formato de fecha
        df["Date"] = df["Ts"].apply(datetime.fromtimestamp)
        
        if "Param.Index" in df.columns:
            # Modificar "Param.Index" incrementando en 1 y luego rellenando NaN con 0
            df.loc[~df["Param.Index"].isna(), "Param.Index"] += 1
            df["Param.Index"] = df["Param.Index"].fillna(0).astype(int)

            # Restructurar los datos con pivot para hacer un solo merge
            final_df = df.pivot_table(index="Date", columns=["Param.Name", "Param.Index"], values="Val")

            # Aplanar las columnas del DataFrame resultante
            final_df.columns = [f"{name}{'_'+str(idx) if idx > 0 else ''}" for name, idx in final_df.columns]
        else:
            # Restructurar los datos con pivot para hacer un solo merge
            final_df = df.pivot_table(index="Date", columns=["Param.Name"], values="Val")
        
        # agregar el numero de la tabla
        final_df['Tabla'] = df["Param.Table"].unique()[0]

        # Resetear el índice para que 'Date' sea una columna normal
        final_df.reset_index(inplace=True)
    else:
        print("No hay informacion de telemetria!!")

    return final_df


In [5]:
ultima_fecha = datetime(2024, 8, 25, 0, 0) # datetime(2023, 3, 16, 0, 0)
un_mes_atras = ultima_fecha - timedelta(days=30)
timestamp_mes_atras = int(un_mes_atras.timestamp())


# Descripcion de los datos

In [None]:
# Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/", connectTimeoutMS=30000)

# Definir el tamaño del lote
batch_size = 1000000
final_data_resume = pd.DataFrame()

# Se lee el diccionario de datos, el cual contiene los valores validos e invalidos de cada variable
dic_datos = pd.read_csv("outputs/diccionario_de_datos.csv")
dic_datos.loc[dic_datos['Indice'] >= 0, 'Indice'] += 1
dic_datos['Indice'] = dic_datos['Indice'].fillna(0)
dic_datos['Variable'] = [row['Variable'] + "_" + str(int(row['Indice'])) if row["Indice"] > 0 else row['Variable'] for _, row in dic_datos.iterrows()]

loop_over_nodes = dic_datos.groupby(by=['Sistema','Nodo', 'Tabla'], as_index=False)['Indice'].mean().sort_values(by=['Nodo', 'Tabla'])
# loop_over_nodes = loop_over_nodes[0:2] # Prueba

# Iterar sobre los nodos y procesar cada nodo en lotes
for sistema, num_nodo, num_tabla in zip(loop_over_nodes.Sistema, loop_over_nodes.Nodo, loop_over_nodes.Tabla):
    skip_count = 0
    print(f"Nodo {sistema}: {num_nodo}, Tabla # {num_tabla}")
    node_data_frames = []  # Lista para almacenar DataFrames de cada lote
    expected_columns = None  # Para almacenar las columnas esperadas

    while True:
        # Realizar la consulta por lotes
        node_dict = query_mongodb(client=client, collection='ParamData', query_json={"Param.Node": num_nodo, "Param.Table": num_tabla}, limit=batch_size, skip=skip_count)
        
        # Verificar si el lote tiene datos
        if not node_dict:
            break

        # Convertir el lote a DataFrame de Polars y procesarlo
        node_df = crear_dataframe(node_dict)
        
        # Agregar el DataFrame a la lista
        node_data_frames.append(node_df)
        
        # Incrementar el contador de saltos
        skip_count += batch_size

    # Concatenar todos los DataFrames del nodo en uno solo
    if node_data_frames:
        combined_node_df = pd.concat(node_data_frames)
        
        # Ejecutar column_summary una sola vez con el DataFrame combinado
        node_summary_df = column_summary(combined_node_df, sistema, num_nodo, num_tabla, dic_datos)

        # Concatenar al resultado final
        final_data_resume = pd.concat([final_data_resume, node_summary_df])

# Cerrar la conexión
client.close()
final_data_resume.to_csv("/outputs/descripcion_de_datos.csv", float_format="%.2f", encoding="utf-8", index=False)

Nodo OBC: 1, Tabla # 4
Nodo OBC: 1, Tabla # 92
Nodo OBC: 1, Tabla # 93
Nodo ADCS: 4, Tabla # 1
Nodo ADCS: 4, Tabla # 4
Nodo ADCS: 4, Tabla # 5
Nodo ADCS: 4, Tabla # 150
Nodo ADCS: 4, Tabla # 151
Nodo ADCS: 4, Tabla # 152
Nodo ADCS: 4, Tabla # 153
Nodo ADCS: 4, Tabla # 154
Nodo ADCS: 4, Tabla # 156
Nodo AX2150: 5, Tabla # 4
Nodo P60 DOCK: 6, Tabla # 4
Nodo P60 PDU 1: 7, Tabla # 4
Nodo P60 PDU 2: 8, Tabla # 4
Nodo P60 ACU: 10, Tabla # 4
Nodo mddvbs2-control-app: 14, Tabla # 4
Nodo mddvbs2-control-app: 14, Tabla # 14
Nodo Payload App: 20, Tabla # 8
Nodo Payload App: 20, Tabla # 9
Nodo Payload App: 20, Tabla # 11
Nodo Payload App: 20, Tabla # 12
Nodo AFE8250: 23, Tabla # 4
