In [None]:
import subprocess
import findspark
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import from_json
from pyspark.sql.functions import col,year,month,dayofmonth,coalesce,lit,from_json, hour
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
findspark.init(spark_home='/opt/spark')

In [None]:
# topics = subprocess.check_output("/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server hadoop-namenode:9092", shell=True)
# topics = topics.split()
# topics = [ topic.decode("UTF-8") for topic in topics ]
# topics.pop()
# topics

In [None]:
# Spark session & context
spark = (SparkSession
         .builder
         .master('local[*]')
         .appName('Parkings')
         .config('spark.jars', 'file:///opt/smart-parking/Python/GetData/spark-sql-kafka-0-10_2.12-3.2.1.jar,file:///opt/smart-parking/Python/GetData/kafka-clients-3.1.0.jar')
         .config('spark.executor.extraClassPath','file:///opt/smart-parking/Python/GetData/spark-sql-kafka-0-10_2.12-3.2.1.jar:file:///opt/smart-parking/Python/GetData/kafka-clients-3.1.0.jar')
         .config('spark.executor.extraLibrary','file:///opt/smart-parking/Python/GetData/spark-sql-kafka-0-10_2.12-3.2.1.jar:file:///opt/smart-parking/Python/GetData/kafka-clients-3.1.0.jar')
         .config('spark.driver.extraClassPath', 'file:///opt/smart-parking/Python/GetData/spark-sql-kafka-0-10_2.12-3.2.1.jar:file:///opt/smart-parking/Python/GetData/kafka-clients-3.1.0.jar')
         .getOrCreate())

spark.sparkContext.setLogLevel("ERROR")
sc = spark.sparkContext

In [None]:
# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "hadoop-namenode:9092") \
  .option("subscribe", "XBEESmartParkingModel") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [None]:
def CountOccupation(lista):
    i=0
    for opp in lista:
        if opp == True:
            i+=1
    return i

countOcuppation = udf(lambda x: CountOccupation(x), IntegerType())      

In [None]:
def percentagetoString(sp_total, sp_ocupied):
    percnt = int((sp_ocupied*100)/sp_total)
    #print(percnt)
    if percnt > 0 and percnt <=25:
        return "Empty"
    elif percnt > 25 and percnt <=50:
        return "Almost Empty"
    elif percnt > 50 and percnt <=75:
        return "Almost Full"
    elif percnt > 75 and percnt <=100:
        return "Full"
    else:
        return "Other"
    
percentageOcuppation = udf(lambda x, y: percentagetoString(x,y), StringType())   

In [None]:
# @pandas_udf("string", PandasUDFType.GROUPED_AGG)
# def merge_udf(v):
#     return ",".join(v)

# @pandas_udf("string", PandasUDFType.GROUPED_AGG)
# def merge_udf_total(v):
#     return "|".join(v)

In [None]:
def func(batch_df, batch_id):
    batch_df.persist()
    
    df = batch_df.withColumn("value", col("value").cast("string"))
    tableSchema = StructType() \
            .add("parking_name", StringType())\
            .add("parking_address", StringType())\
            .add("parking_description", StringType())\
            .add("device_timestamp", TimestampType())\
            .add("device_address", StringType())\
            .add("parking_latitude", DoubleType())\
            .add("parking_longitude", DoubleType())\
            .add("parking_temperature", StringType())\
            .add("parking_humidity", StringType())\
            .add("parking_uuid", StringType())\
            .add("parking_id", StringType())\
            .add("level_id", StringType())\
            .add("area_id", StringType())\
            .add("area_name", StringType())\
            .add("spots", IntegerType())\
            .add("slots", ArrayType(BooleanType()))
    
    prov = df.select("*",from_json("value",tableSchema).alias("data_parsed")).select("data_parsed.*")
    length = len(prov.head()["slots"])
    dlist = prov.columns
    
    dfalldata = prov.select(dlist)
    
    #dfalldata = dfalldata.orderBy(dfalldata.device_timestamp.desc())
    dfalldata = dfalldata.dropDuplicates((['device_address']))
    dfalldata = dfalldata.withColumn("area_occupied_slots", countOcuppation(col("slots")))\
                         .withColumn("area_occupation", percentageOcuppation(col("spots"), col("area_occupied_slots")))
    windowPartitionAgg  = Window.partitionBy("level_id")              
    df2 = dfalldata.withColumn("level_total_spots", sum(col("spots")).over(windowPartitionAgg))\
                   .withColumn("level_occupied_slots", sum(col("area_occupied_slots")).over(windowPartitionAgg))
    
    """
    A nivel del streaming como se está usando el foreachBatch hace que hagarre fragmentos de información,
    en mi caso puntual este desarrollo serí para realizar la integración con Hadoop y no con el Backend
    
    en esta seccion ver parte del notebook dedicada a codigo de agregación
    
    """
 
    batch_df.unpersist()

In [None]:
# Inicia la consulta e imprime el resultado
CHECKPOINT_DIRECTORY = 'file:///opt/smart-parking/Python/GetData/CommitLog'
df \
.writeStream\
.trigger(processingTime='5 seconds')\
.outputMode("append") \
.option("checkpointLocation", CHECKPOINT_DIRECTORY)\
.foreachBatch(func) \
.start()\
.awaitTermination()

## Codigo para agregar en el otro notebook:

In [None]:
#        
# .withColumn("level_occupation", percentageOcuppation(col("level_total_spots"), col("level_occupied_slots")))\
# .withColumn('area_name', merge_udf(dfalldata['area_name']).over(windowPartitionAgg))\
# .withColumn('device_address', merge_udf(dfalldata['device_address']).over(windowPartitionAgg))\
# .withColumn('area_occupation', merge_udf(dfalldata['area_occupation']).over(windowPartitionAgg))\
# .withColumn('spots', merge_udf(dfalldata['spots'].cast("string")).over(windowPartitionAgg))\
# .withColumn('slots', merge_udf(dfalldata['slots'].cast("string")).over(windowPartitionAgg))\
# .withColumn('area_occupied_slots', merge_udf(dfalldata['area_occupied_slots'].cast("string")).over(windowPartitionAgg))\
# .dropDuplicates((['level_id']))

# windowPartitionAgg  = Window.partitionBy("parking_name")
# df3 = df2.withColumn("level_total_spots", merge_udf_total(df2['level_total_spots'].cast("string")).over(windowPartitionAgg))\
# .withColumn("level_occupied_slots", merge_udf_total(df2['level_occupied_slots'].cast("string")).over(windowPartitionAgg))\
# .withColumn("level_occupation", merge_udf_total(df2['level_occupation']).over(windowPartitionAgg))\
# .withColumn('area_name', merge_udf_total(df2['area_name']).over(windowPartitionAgg))\
# .withColumn('device_address', merge_udf_total(df2['device_address']).over(windowPartitionAgg))\
# .withColumn('area_occupation', merge_udf_total(df2['area_occupation']).over(windowPartitionAgg))\
# .withColumn('spots', merge_udf_total(df2['spots'].cast("string")).over(windowPartitionAgg))\
# .withColumn('slots', merge_udf_total(df2['slots'].cast("string")).over(windowPartitionAgg))\
# .withColumn('area_occupied_slots', merge_udf_total(df2['area_occupied_slots'].cast("string")).over(windowPartitionAgg))\
# .dropDuplicates((['parking_name']))
# 

In [None]:
df\
    .writeStream\
    .outputMode("update") \
    .option("checkpointLocation", CHECKPOINT_DIRECTORY)\
    .foreachBatch(func) \
    .start()\
    .awaitTermination()