In [None]:
import tensorflow as tf
#import tensorflow.tools

from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, IntegerType
from kafka import KafkaProducer

import gzip
import io
from fastavro import reader
from astropy.io import fits
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import time
import json

import os
import glob
import fastavro
from io import BytesIO
import warnings
from astropy.io.fits.verify import VerifyWarning

from os import listdir
from os.path import isfile, join


# Kafka

In [None]:
files_path = ".\\datos\\"
onlyfiles = [files_path + f for f in listdir(files_path)]

metadata = ["ra","dec","magpsf","sigmapsf","isdiffpos","diffmaglim","fwhm","sgscore1","sgscore2","sgscore3",
            "distpsnr1","distpsnr2","distpsnr3","classtar","ndethist","ncovhist","chinr","sharpnr","approx_nondet",
            "gal_lat","gal_lng","ecl_lat","ecl_lng"]

for file in onlyfiles:
    with open(file, 'rb') as f:
        avro_reader = reader(f)

        for record in avro_reader:

            for name in ["objectId", "candidate", "cutoutScience", "cutoutTemplate", "cutoutDifference"]:
                cutout = record[name]

                if name == "objectId":
                    print(cutout) # send

                elif name == "candidate":
                    for key in metadata:
                        if key in ["gal_lat","gal_lng","ecl_lat","ecl_lng"]:
                            if key in record.keys():
                                print(record[key]) #send
                            else:
                                print(np.nan) #send Nan

                        elif key == "approx_nondet":
                            print(cutout.get("ncovhist") - cutout["ndethist"]) #send
                        else:
                            print(cutout[key]) #send


                elif 'stampData' in cutout:
                    # Extraer y descomprimir los datos gzip
                    compressed_data = cutout['stampData']
                    with gzip.GzipFile(fileobj=io.BytesIO(compressed_data)) as gz:
                        hdulist = fits.open(gz)
                        image_data = hdulist[0].data
                        # send imagen
                    plt.imshow(image_data)
                    plt.show()
                else:
                    print(f"{name} no contiene datos de imagen.")
            
            break 



In [None]:

files_path = ".\\datos\\"
onlyfiles = [files_path + f for f in listdir(files_path)]
print(onlyfiles)


# Kafka topic
PRODUCER = KafkaProducer(bootstrap_servers='localhost:9092')#, value_serializer=lambda v: v)
TOPIC = "Imagen"

for file in onlyfiles:
    with open(file, 'rb') as f:
        avro_reader = reader(f)
        for record in avro_reader:
            # Accede a los cutouts
            for name in ["cutoutScience", "cutoutTemplate", "cutoutDifference"]:
                cutout = record[name]

                if 'stampData' in cutout:
                    # Extraer y descomprimir los datos gzip
                    compressed_data = cutout['stampData']
                    with gzip.GzipFile(fileobj=io.BytesIO(compressed_data)) as gz:
                        hdulist = fits.open(gz)
                        image_data = hdulist[0].data
                        PRODUCER.send_messages(TOPIC, image_data)
                        # Mostrar la imagen
                else:
                    print(f"{name} no contiene datos de imagen.")
            
            break  # Solo el primer registro



['.\\datos\\3045169090015010003.avro', '.\\datos\\3045169090115010000.avro', '.\\datos\\3045169090115010001.avro', '.\\datos\\3045169090215010002.avro', '.\\datos\\3045169090215010006.avro', '.\\datos\\3045169090215015001.avro', '.\\datos\\3045169090215015002.avro', '.\\datos\\3045169090315010002.avro', '.\\datos\\3045169090315010003.avro', '.\\datos\\3045169090415010004.avro', '.\\datos\\3045169090415010005.avro', '.\\datos\\3045169090415010006.avro', '.\\datos\\3045169090415010009.avro', '.\\datos\\3045169090515015000.avro', '.\\datos\\3045169090515015001.avro', '.\\datos\\3045169090515015002.avro', '.\\datos\\3045169090515015005.avro', '.\\datos\\3045169090515015012.avro', '.\\datos\\3045169090615010004.avro', '.\\datos\\3045169090615010005.avro', '.\\datos\\3045169090615010007.avro', '.\\datos\\3045169090615010010.avro', '.\\datos\\3045169090615015000.avro', '.\\datos\\3045169090615015004.avro', '.\\datos\\3045169090615015011.avro', '.\\datos\\3045169090715015000.avro', '.\\datos\\

NoBrokersAvailable: NoBrokersAvailable

In [None]:
"""
-Saved_model_path es una variable para ingresa la ruta del modelo
-filename es la variable que tendra el nombre del archivo con la red ya entrenado
"""
spark = SparkSession.builder \
    .appName("Clasificacion_imagenes") \
    .getOrCreate()
    
esquema = StructType() \
    .add("id", IntegerType()) \
    .add("value", IntegerType())

spark.sparkContext.addFile("saved_model_path")

def predecir(data):
    model_file_local = SparkFiles.get("filename")
    model = tf.keras.models.load_model(model_file_local, compile=False)
    model.predict(data)

df_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "Imagen") \
    .option("startingOffsets", "earliest") \
    .load()

df = df_raw.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), esquema).alias("data")) \
    .select("data.*")

query = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()