## Setup connection to glue developer endpoint

Edit file $HOME/.sparkmagic/config.json and replace localhost with Glue Developer Endpoint IP address

restart Pyspark Kernel

In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame

import pyspark.sql.functions as F
import time
from pyspark.sql import Window


glueContext = GlueContext(SparkContext.getOrCreate())

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
34,application_1602576689786_0035,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Analyze database schema

Accessing directly to source Oracle Database Tabes using Catalog

In [2]:
oracle_soggetti = glueContext.create_dynamic_frame.from_catalog(database="oracle_source", table_name="orcl_admin_soggetti")
print("Count: " + str(oracle_soggetti.count()))
oracle_soggetti.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Count: 19541434
root
|-- KEY_SOGGETTI: decimal
|-- NOME: string
|-- COGNOME: string
|-- COMUNE_RESIDENZA: string
|-- NAZIONE_NASCITA: string
|-- TITOLO_DI_STUDIO: string
|-- ETA_CLIENTE: decimal
|-- DATA_ISCRIZIONE_SPORTELLO_ONLINE: timestamp
|-- CANALE_CONTATTO_PREFERENZIALE: string
|-- RATING_CREDITIZIO: string
|-- TIPOLOGIA_CLIENTE: string
|-- RETENTION_VALUE: string
|-- VAS: string

Access data saved by DMS in cdc, csv format

In [3]:
import time
start = time.time()

cdc_soggetti = glueContext.create_dynamic_frame.from_catalog(database="cdc", table_name="soggetti")
print("Count: " + str(cdc_soggetti .count()))
cdc_soggetti.printSchema()

end = time.time()
print("Execution time:",end - start)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Count: 19541434
root
|-- op: string
|-- key_soggetti: long
|-- nome: string
|-- cognome: string
|-- comune_residenza: string
|-- nazione_nascita: string
|-- titolo_di_studio: string
|-- eta_cliente: long
|-- data_iscrizione_sportello_online: string
|-- canale_contatto_preferenziale: string
|-- rating_creditizio: string
|-- tipologia_cliente: string
|-- retention_value: string
|-- vas: string

Execution time: 139.19863963127136

## Trasformazioni

### Deduplica soggetti

In [4]:
dyf_soggetti = glueContext.create_dynamic_frame.from_catalog(database="datalake", table_name="l_orcl_admin_soggetti")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
deduplica_soggetti=dyf_soggetti.toDF()
deduplica_soggetti.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- nazione_nascita: string (nullable = true)
 |-- rating_creditizio: string (nullable = true)
 |-- retention_value: string (nullable = true)
 |-- comune_residenza: string (nullable = true)
 |-- cognome: string (nullable = true)
 |-- eta_cliente: decimal(10,0) (nullable = true)
 |-- vas: string (nullable = true)
 |-- canale_contatto_preferenziale: string (nullable = true)
 |-- titolo_di_studio: string (nullable = true)
 |-- tipologia_cliente: string (nullable = true)
 |-- data_iscrizione_sportello_online: timestamp (nullable = true)
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- nome: string (nullable = true)

In [6]:
windowSpec=Window.partitionBy(deduplica_soggetti.nome,deduplica_soggetti.cognome).\
orderBy(F.col("key_soggetti").desc())

deduplica_soggetti=deduplica_soggetti.withColumn("rank",F.row_number().over (windowSpec)).filter (F.col("rank")==1)

deduplica_soggetti.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- nazione_nascita: string (nullable = true)
 |-- rating_creditizio: string (nullable = true)
 |-- retention_value: string (nullable = true)
 |-- comune_residenza: string (nullable = true)
 |-- cognome: string (nullable = true)
 |-- eta_cliente: decimal(10,0) (nullable = true)
 |-- vas: string (nullable = true)
 |-- canale_contatto_preferenziale: string (nullable = true)
 |-- titolo_di_studio: string (nullable = true)
 |-- tipologia_cliente: string (nullable = true)
 |-- data_iscrizione_sportello_online: timestamp (nullable = true)
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- nome: string (nullable = true)
 |-- rank: integer (nullable = true)

In [7]:
import time
start = time.time()

deduplica_soggetti.show()

end = time.time()
print("Execution time:",end - start)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+-----------------+---------------+------------------+----------+-----------+----+-----------------------------+-----------------+-----------------+--------------------------------+------------+------+----+
|nazione_nascita|rating_creditizio|retention_value|  comune_residenza|   cognome|eta_cliente| vas|canale_contatto_preferenziale| titolo_di_studio|tipologia_cliente|data_iscrizione_sportello_online|key_soggetti|  nome|rank|
+---------------+-----------------+---------------+------------------+----------+-----------+----+-----------------------------+-----------------+-----------------+--------------------------------+------------+------+----+
|         Italia|                B|         BRONZE|            Pesaro|   Angotti|         30|null|                     Telefono|     Laurea breve|         Business|             2018-08-09 00:00:00|    19590663|Abelie|   1|
|         Italia|                G|           null| Quartu Sant'Elena|   Argenta|         49|null|          

### Numero Contratti per soggetto

In [8]:

dyf_contratti = glueContext.create_dynamic_frame.from_catalog(database="datalake", table_name="l_orcl_admin_contratti")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
df_contratti_per_soggetto=dyf_contratti.toDF()
df_contratti_per_soggetto.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- codice_contratto: string (nullable = true)
 |-- anno_prima_attivazione_fornitura: decimal(10,0) (nullable = true)
 |-- key_contratti: decimal(10,0) (nullable = true)
 |-- data_cessazione_fornitura: timestamp (nullable = true)
 |-- data_attivazione_fornitura: timestamp (nullable = true)
 |-- nome_commerciale: string (nullable = true)
 |-- canale_di_vendita: string (nullable = true)
 |-- key_punti_di_fornitura: decimal(10,0) (nullable = true)
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- vettore: string (nullable = true)

In [10]:
df_contratti_per_soggetto=df_contratti_per_soggetto.groupBy("key_soggetti").count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
df_contratti_per_soggetto.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- count: long (nullable = false)

In [12]:
import time
start = time.time()

df_contratti_per_soggetto.show()

end = time.time()
print("Execution time:",end - start)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-----+
|key_soggetti|count|
+------------+-----+
|      142030|    3|
|     2161320|    3|
|      263659|    3|
|     1573061|    3|
|      257944|    3|
|     1333120|    3|
|     4906015|    2|
|      765951|    2|
|       10871|    1|
|     1671355|    3|
|     3402851|    3|
|     3937275|    2|
|      861714|    3|
|     5969360|    3|
|     3731174|    2|
|     1780560|    3|
|     3654783|    3|
|     3440039|    2|
|     4447387|    2|
|      781594|    3|
+------------+-----+
only showing top 20 rows

Execution time: 146.50318956375122

### Debito Medio per Soggetto

In [13]:

def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
    df = dfc.select(list(dfc.keys())[0]).toDF()
    
    df=df.withColumn("d_importo", (sf.col("importo").substr(sf.lit(1), sf.instr(sf.col("importo"), '€')-2)).cast('double'))
    debito_per_soggetto=df.groupBy("key_soggetti").agg (sf.mean("d_importo"))
    dyf_debito_per_soggetto = DynamicFrame.fromDF(debito_per_soggetto, glueContext, "debito_per_soggetto")
    return(DynamicFrameCollection({"CustomTransform0": dyf_debito_per_soggetto}, glueContext))   

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:

#dfc=DynamicFrameCollection({"CustomTransform0": dyf_credito}, glueContext)
#dynres=MyTransform(glueContext,dfc)
#df=dynres.select(list(dynres.keys())[0]).toDF()
#df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
dyf_credito=glueContext.create_dynamic_frame.from_catalog(database="datalake", table_name="l_orcl_admin_credito")
df_debito_medio_per_cliente=dyf_credito.toDF()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
df_debito_medio_per_cliente=df_debito_medio_per_cliente.withColumn("d_importo", (F.col("importo").substr(F.lit(1), F.instr(F.col("importo"), '€')-2)).cast('double'))
df_debito_medio_per_cliente=df_debito_medio_per_cliente.groupBy("key_soggetti").agg (F.mean("d_importo"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
df_debito_medio_per_cliente=df_debito_medio_per_cliente.withColumnRenamed("avg(d_importo)","debito_medio")
df_debito_medio_per_cliente.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- debito_medio: double (nullable = true)

In [23]:
import time
start = time.time()

df_debito_medio_per_cliente.show()

end = time.time()
print("Execution time:",end - start)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+------------+
|key_soggetti|debito_medio|
+------------+------------+
|     3058693|      639.91|
|     4930281|       14.55|
|      496357|      559.04|
|     3424872|      233.99|
|     7304798|       609.9|
|     2193250|      245.99|
|     6626091|       628.2|
|     7077330|      600.47|
|     5754383|      661.06|
|     7566074|      553.87|
|     7560804|      225.31|
|     5424120|      179.81|
|     4950014|       82.46|
|     1998503|      989.79|
|     6569124|      387.74|
|     5414421|      462.32|
|     2461396|      479.63|
|     6593646|      661.04|
|     2958607|      965.62|
|     7031711|        62.5|
+------------+------------+
only showing top 20 rows

Execution time: 22.983094930648804

### ELE e Gas Medio, minimo anno_prima_attivazione_fornitura su contratti attivo

In [24]:
dyf_contratti = glueContext.create_dynamic_frame.from_catalog(database="datalake", table_name="l_orcl_admin_contratti")
df=dyf_contratti.toDF()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- codice_contratto: string (nullable = true)
 |-- anno_prima_attivazione_fornitura: decimal(10,0) (nullable = true)
 |-- key_contratti: decimal(10,0) (nullable = true)
 |-- data_cessazione_fornitura: timestamp (nullable = true)
 |-- data_attivazione_fornitura: timestamp (nullable = true)
 |-- nome_commerciale: string (nullable = true)
 |-- canale_di_vendita: string (nullable = true)
 |-- key_punti_di_fornitura: decimal(10,0) (nullable = true)
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- vettore: string (nullable = true)

Converto le stringhe in timestamp

In [26]:

df=df.withColumn("ts_data_attivazione_fornitura",F.to_timestamp (df.data_attivazione_fornitura)).\
withColumn ("ts_data_cessazione_fornitura",F.to_timestamp (df.data_cessazione_fornitura)).\
drop("key_punti_di_fornitura","data_cessazione_fornitura","codice_contratto","key_contratti","canale_di_vendita","anno_prima_attivazione_fornitura")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Aggiungo i filtri sui contratti attivi

In [27]:
cd=F.current_timestamp()
df_contratti=df.filter (df.ts_data_cessazione_fornitura >= cd).\
filter (df.ts_data_attivazione_fornitura <= cd)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Carico i prodotti

In [28]:
dyf_prodotti = glueContext.create_dynamic_frame.from_catalog(database="datalake", table_name="l_orcl_admin_prodotti")
df_prodotti=dyf_prodotti.toDF()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
df_prodotti.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- data_inizio_validita: timestamp (nullable = true)
 |-- data_fine_validita: timestamp (nullable = true)
 |-- gas: decimal(10,6) (nullable = true)
 |-- nome_prodotto: string (nullable = true)
 |-- f0: decimal(10,6) (nullable = true)
 |-- f1: decimal(10,6) (nullable = true)
 |-- f2: decimal(10,6) (nullable = true)
 |-- key_prodotti: decimal(10,0) (nullable = true)
 |-- f3: decimal(10,6) (nullable = true)

Formatto le colonne in formato timestamp e elimino le colonne corrisppndenti che erano stringe

In [30]:
df_prodotti=df_prodotti.withColumn("ts_data_inizio_validita",F.to_timestamp (df_prodotti.data_inizio_validita)).\
withColumn ("ts_data_fine_validita",F.to_timestamp (df_prodotti.data_fine_validita)).\
drop ('data_inizio_validita','data_fine_validita','key_prodotti')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Rinomino la colonna di join

In [31]:
df_prodotti=df_prodotti.withColumnRenamed("nome_prodotto","nome_commerciale")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Calcolo la media delle tariffe elettriche relative al prodotto

In [32]:
df_prodotti=df_prodotti.withColumn("ELE", (df_prodotti.f0+ df_prodotti.f1+ df_prodotti.f2+ df_prodotti.f3)/4).\
drop('f0','f1','f2','f3')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
df_contratti.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- data_attivazione_fornitura: timestamp (nullable = true)
 |-- nome_commerciale: string (nullable = true)
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- vettore: string (nullable = true)
 |-- ts_data_attivazione_fornitura: timestamp (nullable = true)
 |-- ts_data_cessazione_fornitura: timestamp (nullable = true)

Metto a null le tariffe nella join relativi al prodotto non legato alla riga di contratto in esame al fine di poterle epurare dal calcolo delle medie

In [34]:
df_tariffe_contratti=df_contratti.join (df_prodotti,"nome_commerciale").\
withColumn ("realGas",F.when (F.col('vettore')=='GAS',df_prodotti.gas).otherwise (None)).\
withColumn ("realEle",F.when (F.col('vettore')=='ELE',df_prodotti.ELE).otherwise (None)).\
drop ('gas','ELE')
df_tariffe_contratti.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- nome_commerciale: string (nullable = true)
 |-- data_attivazione_fornitura: timestamp (nullable = true)
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- vettore: string (nullable = true)
 |-- ts_data_attivazione_fornitura: timestamp (nullable = true)
 |-- ts_data_cessazione_fornitura: timestamp (nullable = true)
 |-- ts_data_inizio_validita: timestamp (nullable = true)
 |-- ts_data_fine_validita: timestamp (nullable = true)
 |-- realGas: decimal(10,6) (nullable = true)
 |-- realEle: decimal(15,8) (nullable = true)

Calcolo la data minima e il valore medio per le tariffe luce e gas

In [35]:
df_tariffe_soggetti=df_tariffe_contratti.groupby (df_tariffe_contratti.key_soggetti).\
agg(F.min("data_attivazione_fornitura"),F.mean("realGas"),F.mean("realEle"))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
df_tariffe_soggetti.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- min(data_attivazione_fornitura): timestamp (nullable = true)
 |-- avg(realGas): decimal(14,10) (nullable = true)
 |-- avg(realEle): decimal(19,12) (nullable = true)

Rinomino le colonne per semplicità

In [37]:
df_tariffe_soggetti=df_tariffe_soggetti.withColumnRenamed("min(data_attivazione_fornitura)","data_attivazione_fornitura").\
withColumnRenamed("avg(realGas)","media_GAS").\
withColumnRenamed("avg(realEle)","media_ELE")

df_tariffe_soggetti.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- data_attivazione_fornitura: timestamp (nullable = true)
 |-- media_GAS: decimal(14,10) (nullable = true)
 |-- media_ELE: decimal(19,12) (nullable = true)

In [38]:
import time
start = time.time()

df_tariffe_soggetti.show()

end = time.time()
print("Execution time:",end - start)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+--------------------------+------------+--------------+
|key_soggetti|data_attivazione_fornitura|   media_GAS|     media_ELE|
+------------+--------------------------+------------+--------------+
|     2876727|       2019-03-16 00:00:00|0.2759241875|0.069715976563|
|     4228748|       2019-01-01 00:00:00|        null|0.069753239583|
|     7544023|       2019-10-12 00:00:00|0.2560663125|0.070321359375|
|    10173668|       2019-01-01 00:00:00|0.2759241875|0.069196578125|
|     9421818|       2019-05-15 00:00:00|0.2801066250|0.068470687500|
|    11024074|       2019-09-30 00:00:00|0.2625706875|0.070321359375|
|    12905429|       2019-04-21 00:00:00|0.2759241875|0.068656218750|
|    15399553|       2019-02-27 00:00:00|0.2759241875|0.071934015625|
|    16913997|       2019-11-18 00:00:00|        null|0.070855820313|
|    15175821|       2019-02-20 00:00:00|        null|0.070855820313|
|    18603071|       2019-06-23 00:00:00|0.2512031875|0.070321359375|
|     1796059|      

## Calcolo dell'indice di churn e del canale di contatto preferenziale

In [39]:
dyf_contratti = glueContext.create_dynamic_frame.from_catalog(database="datalake", table_name="l_orcl_admin_contratti")
df_contratti=dyf_contratti.toDF()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
df_contratti.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- codice_contratto: string (nullable = true)
 |-- anno_prima_attivazione_fornitura: decimal(10,0) (nullable = true)
 |-- key_contratti: decimal(10,0) (nullable = true)
 |-- data_cessazione_fornitura: timestamp (nullable = true)
 |-- data_attivazione_fornitura: timestamp (nullable = true)
 |-- nome_commerciale: string (nullable = true)
 |-- canale_di_vendita: string (nullable = true)
 |-- key_punti_di_fornitura: decimal(10,0) (nullable = true)
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- vettore: string (nullable = true)

In [41]:

df_soggetti_canale_vendita=df_contratti.groupBy (df_contratti.key_soggetti,df_contratti.canale_di_vendita).agg (F.count(df_contratti.canale_di_vendita))
df_soggetti_canale_vendita.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- canale_di_vendita: string (nullable = true)
 |-- count(canale_di_vendita): long (nullable = false)

In [42]:
windowSpec=Window.partitionBy("key_soggetti").\
orderBy(F.col("count(canale_di_vendita)").desc())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
df_soggetti_canale_vendita=df_soggetti_canale_vendita.withColumn("rank",F.row_number().over (windowSpec)).filter (F.col("rank")==1)
df_soggetti_canale_vendita.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- canale_di_vendita: string (nullable = true)
 |-- count(canale_di_vendita): long (nullable = false)
 |-- rank: integer (nullable = true)

In [44]:
df_soggetti_canale_vendita=df_soggetti_canale_vendita.drop ('count(canale_di_vendita)','rank')
df_soggetti_canale_vendita.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- canale_di_vendita: string (nullable = true)

In [45]:
df_contratti_churn=df_contratti.withColumn ("hadChurn",F.when (F.col('data_cessazione_fornitura')<cd,1).otherwise (0))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
df_contratti_churn=df_contratti_churn.groupBy("key_soggetti").agg(F.sum("hadChurn"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
df_contratti_churn=df_contratti_churn.withColumn("Churn",F.when (F.col('sum(hadChurn)')>=1,1).otherwise (0)).drop("sum(hadChurn)")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
df_contratti_churn.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- Churn: integer (nullable = false)

In [56]:
import time
start = time.time()

df_contratti_churn.show()

end = time.time()
print("Execution time:",end - start)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-----+
|key_soggetti|Churn|
+------------+-----+
|     8561429|    0|
|     9601660|    0|
|     9122857|    0|
|     9504386|    0|
|     8598775|    1|
|    10685034|    0|
|    10327079|    1|
|     9969250|    0|
|    10574149|    1|
|     7633313|    0|
|     7813673|    0|
|     8162895|    0|
|    10801897|    0|
|     9677413|    0|
|    10381113|    0|
|     9809614|    0|
|     7217907|    0|
|     7127130|    0|
|     9464453|    0|
|     7154945|    0|
+------------+-----+
only showing top 20 rows

Execution time: 95.7476737499237

### Unpivot tabella regioni

In [57]:
dyf_contratti = glueContext.create_dynamic_frame.from_catalog(database="datalake", table_name="l_orcl_admin_contratti")
df_contratti=dyf_contratti.toDF()
df_contratti.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- codice_contratto: string (nullable = true)
 |-- anno_prima_attivazione_fornitura: decimal(10,0) (nullable = true)
 |-- key_contratti: decimal(10,0) (nullable = true)
 |-- data_cessazione_fornitura: timestamp (nullable = true)
 |-- data_attivazione_fornitura: timestamp (nullable = true)
 |-- nome_commerciale: string (nullable = true)
 |-- canale_di_vendita: string (nullable = true)
 |-- key_punti_di_fornitura: decimal(10,0) (nullable = true)
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- vettore: string (nullable = true)

In [58]:
dyf_fornitura = glueContext.create_dynamic_frame.from_catalog(database="datalake", table_name="l_orcl_admin_punti_di_fornitura")
df_fornitura=dyf_fornitura.toDF()
df_fornitura.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- indirizzo: string (nullable = true)
 |-- regione: string (nullable = true)
 |-- key_punti_di_fornitura: decimal(10,0) (nullable = true)

In [59]:
df_fonitura_per_contratto=df_contratti.join(df_fornitura,"key_punti_di_fornitura").groupBy ("key_soggetti","regione").pivot("regione").agg (F.count("key_punti_di_fornitura"))
df_fonitura_per_contratto.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- regione: string (nullable = true)
 |-- Abruzzo: long (nullable = true)
 |-- Basilicata: long (nullable = true)
 |-- Calabria: long (nullable = true)
 |-- Campania: long (nullable = true)
 |-- Emilia romagna: long (nullable = true)
 |-- Friuli venezia giulia: long (nullable = true)
 |-- Lazio: long (nullable = true)
 |-- Liguria: long (nullable = true)
 |-- Lombardia: long (nullable = true)
 |-- Marche: long (nullable = true)
 |-- Molise: long (nullable = true)
 |-- Piemonte: long (nullable = true)
 |-- Puglia: long (nullable = true)
 |-- Sardegna: long (nullable = true)
 |-- Sicilia: long (nullable = true)
 |-- Toscana: long (nullable = true)
 |-- Trentino alto adige: long (nullable = true)
 |-- Umbria: long (nullable = true)
 |-- Valle d aosta: long (nullable = true)
 |-- Veneto: long (nullable = true)

## Tutto Insieme

In [60]:
output=deduplica_soggetti.join (df_contratti_per_soggetto,"key_soggetti").\
join(df_debito_medio_per_cliente,"key_soggetti").\
join(df_tariffe_soggetti,"key_soggetti").\
join(df_contratti_churn,"key_soggetti").\
join (df_soggetti_canale_vendita,"key_soggetti").\
join (df_fonitura_per_contratto,"key_soggetti")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [61]:
output.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- key_soggetti: decimal(10,0) (nullable = true)
 |-- nazione_nascita: string (nullable = true)
 |-- rating_creditizio: string (nullable = true)
 |-- retention_value: string (nullable = true)
 |-- comune_residenza: string (nullable = true)
 |-- cognome: string (nullable = true)
 |-- eta_cliente: decimal(10,0) (nullable = true)
 |-- vas: string (nullable = true)
 |-- canale_contatto_preferenziale: string (nullable = true)
 |-- titolo_di_studio: string (nullable = true)
 |-- tipologia_cliente: string (nullable = true)
 |-- data_iscrizione_sportello_online: timestamp (nullable = true)
 |-- nome: string (nullable = true)
 |-- rank: integer (nullable = true)
 |-- count: long (nullable = false)
 |-- debito_medio: double (nullable = true)
 |-- data_attivazione_fornitura: timestamp (nullable = true)
 |-- media_GAS: decimal(14,10) (nullable = true)
 |-- media_ELE: decimal(19,12) (nullable = true)
 |-- Churn: integer (nullable = false)
 |-- canale_di_vendita: string (nullable = true)
 |--

In [62]:
import time
start = time.time()

output.show()

end = time.time()
print("Execution time:",end - start)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+---------------+-----------------+---------------+----------------+-------+-----------+----+-----------------------------+-----------------+-----------------+--------------------------------+---------+----+-----+------------+--------------------------+------------+--------------+-----+-------------------+---------+-------+----------+--------+--------+--------------+---------------------+-----+-------+---------+------+------+--------+------+--------+-------+-------+-------------------+------+-------------+------+
|key_soggetti|nazione_nascita|rating_creditizio|retention_value|comune_residenza|cognome|eta_cliente| vas|canale_contatto_preferenziale| titolo_di_studio|tipologia_cliente|data_iscrizione_sportello_online|     nome|rank|count|debito_medio|data_attivazione_fornitura|   media_GAS|     media_ELE|Churn|  canale_di_vendita|  regione|Abruzzo|Basilicata|Calabria|Campania|Emilia romagna|Friuli venezia giulia|Lazio|Liguria|Lombardia|Marche|Molise|Piemonte|Puglia|Sardegna|S