In [None]:
import cml.data_v1 as cmldata

import configparser
import uuid
import os
from typing import Dict
from pyspark.sql.functions import to_date, col
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrameWriter
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType, DateType, StringType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime, timedelta, date
from pyspark.sql.functions import year, month, count, sum, col, ceil, when
import pandas as pd

# Sample in-code customization of spark configurations
#from pyspark import SparkContext
#SparkContext.setSystemProperty('spark.executor.cores', '1')
#SparkContext.setSystemProperty('spark.executor.memory', '2g')

CONNECTION_NAME = "pdnd-prod-dl-1"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

# Sample usage to run query through spark
EXAMPLE_SQL_QUERY = "show databases"
spark.sql(EXAMPLE_SQL_QUERY).show()

In [None]:
df_gold_notification = spark.sql( """   
                       SELECT  iun,
                               sentat,
                               CASE 
                                   WHEN tms_viewed IS NULL THEN tms_effective_date
                                   WHEN tms_effective_date IS NULL THEN tms_viewed
                                   WHEN tms_viewed < tms_effective_date THEN tms_viewed
                                   ELSE tms_effective_date
                               END AS tms_perfezionamento
                       FROM send.gold_notification_analytics
                       WHERE senderpaid = "53b40136-65f2-424b-acfb-7fae17e35c60" AND ( tms_viewed IS NOT NULL OR tms_effective_date IS NOT NULL )"""   
                    ) 

In [None]:
df_gold_notification.createOrReplaceTempView("DF_GOLD")

In [None]:
#print(df_gold_notification)
df_gold_notification = df_gold_notification.withColumn("sentat", F.col("sentat").cast("timestamp"))
df_gold_notification = df_gold_notification.withColumn("tms_perfezionamento", F.col("tms_perfezionamento").cast("timestamp"))

In [None]:
#Calcolo delle tempistiche - aggiunta colonna
df_gold_notification = df_gold_notification.withColumn(
                        "diff_sentat_perfezionamento",
                        F.round((F.unix_timestamp("tms_perfezionamento") - F.unix_timestamp("sentat")) / (3600 * 24),2)
                    )

In [None]:
#Calcolo delle perfezionate totali 
total_notifications = df_gold_notification.count()

total_notifications

In [None]:
#Calcolo dei tempi medi di perfezionamento raggruppati per mese/anno di deposito sentat?

#dic 24 - somma tempistiche / notifiche perfezionate dicembre
#gen 25 - somma tempistiche / notifiche perfezionate gennaio
#feb 25 - somma tempistiche / notifiche perfezionate febbraio

df_grouped = df_gold_notification.withColumn("anno_deposito", F.year(F.col("sentat"))) \
                                      .withColumn("mese_deposito", F.month(F.col("sentat")))



In [None]:
print(df_grouped)

In [None]:
df_tempo_medio = df_grouped.groupBy("anno_deposito", "mese_deposito").agg(
    sum("diff_sentat_perfezionamento").alias("somma_tempistiche"),
    count("tms_perfezionamento").alias("notifiche_perfezionate")
)

In [None]:
df_tempo_medio = df_tempo_medio.withColumn(
    "tempo_medio_perfezionamento", F.round(col("somma_tempistiche") / col("notifiche_perfezionate"), 2)
)

In [None]:
print(df_tempo_medio)

# Esportare il risultato in tabella

In [None]:
df_tempo_medio.createOrReplaceTempView("DF_OUTPUT")

In [None]:
 spark.sql("""SELECT * FROM DF_OUTPUT""").writeTo("send_dev.inps_deposito_perfezionamento")\
                .using("iceberg")\
                .tableProperty("format-version","2")\
                .tableProperty("engine.hive.enabled","true")\
                .createOrReplace()
#print(datetime.now()-start)