In [2]:
import time
print ('Inicio: '+time.strftime("%c"))
inicio = time.perf_counter()

Inicio: Sun Nov  6 18:11:28 2022


In [3]:
#spark job monitoring
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType,BooleanType,DateType,IntegerType
from pyspark.sql.functions import sum, col, desc
import pyspark.sql.functions as F
from pyspark.sql import Window

esquema = StructType() \
      .add("ctId",StringType(),True) \
      .add("fecRef",StringType(),True) \
      .add("dato",StringType(),True) \
      .add("saldo",IntegerType(),True) \
      .add("proc",StringType(),True)

"""
df = spark.read.option("mergeSchema", True).option("schema", "esquema").option("basePath", "file:///opt/workspace/datos/fakeContractsSinFechas.parquet/")\
               .format("parquet").load(['./datos/fakeContractsSinFechas.parquet/proc=CT0001', './datos/fakeContractsSinFechas.parquet/proc=CT0002', \
                                        './datos/fakeContractsSinFechas.parquet/proc=CT0015', './datos/fakeContractsSinFechas.parquet/proc=CT0016'])
"""
df = spark.read.option("mergeSchema", True).option("schema", "esquema").option("basePath", "file:///opt/workspace/datos/fakeContractsSinFechas.parquet/")\
               .format("parquet").load(['./datos/fakeContractsSinFechas.parquet'])

df.printSchema()

print(df.count())
df.show()
df.explain(extended=True)
df.explain(mode='cost')


                                                                                

root
 |-- ctId: string (nullable = true)
 |-- fecRef: string (nullable = true)
 |-- dato: string (nullable = true)
 |-- saldo: integer (nullable = true)
 |-- proc: string (nullable = true)



                                                                                

1276940


[Stage 4:>                                                          (0 + 1) / 1]

+------------+----------+--------------------+-----+------+
|        ctId|    fecRef|                dato|saldo|  proc|
+------------+----------+--------------------+-----+------+
|CT0003212981|2020-10-25|Dato del contrato...|16200|CT0003|
|CT0003213036|2021-03-06|Dato del contrato...|39900|CT0003|
|CT0003213036|2021-02-22|Dato del contrato...|34100|CT0003|
|CT0003213036|2020-09-01|Dato del contrato...|13800|CT0003|
|CT0003213036|2021-07-06|Dato del contrato...|24400|CT0003|
|CT0003213036|2021-01-28|Dato del contrato...| 8700|CT0003|
|CT0003213036|2020-05-26|Dato del contrato...| 1700|CT0003|
|CT0003213036|2020-03-27|Dato del contrato...|42800|CT0003|
|CT0003213036|2021-03-07|Dato del contrato...|30400|CT0003|
|CT0003213036|2020-10-03|Dato del contrato...|47500|CT0003|
|CT0003213036|2020-07-08|Dato del contrato...|28700|CT0003|
|CT0003213057|2020-01-27|Dato del contrato...| 8200|CT0003|
|CT0003213057|2021-08-17|Dato del contrato...|65900|CT0003|
|CT0003213057|2021-05-03|Dato del contra

                                                                                

In [4]:
print ('Final: '+time.strftime("%c"))
final = time.perf_counter()
print(f'Hecho en {round(final - inicio, 4)} segundo(s)')

Final: Sun Nov  6 18:11:40 2022
Hecho en 11.6521 segundo(s)


## Estudio de la función Window

### row_number: número de la fila dentro de cada "ventana" de registros

In [5]:
from pyspark.sql.functions import row_number

print ('Inicio: '+time.strftime("%c"))
inicio = time.perf_counter()

WinSpec = Window.partitionBy('ctId').orderBy('fecRef')

dfNew = df.withColumn("row_number",row_number().over(WinSpec)).show(50, truncate=False)

print ('Final: '+time.strftime("%c"))
final = time.perf_counter()
print(f'Hecho en {round(final - inicio, 4)} segundo(s)')

Inicio: Sun Nov  6 18:11:40 2022




+------------+----------+--------------------------------------------+-----+------+----------+
|ctId        |fecRef    |dato                                        |saldo|proc  |row_number|
+------------+----------+--------------------------------------------+-----+------+----------+
|CT0001123903|2021-11-12|Dato del contrato CT0001123903 en 2021-11-12|1100 |CT0001|1         |
|CT0001123903|2021-11-19|Dato del contrato CT0001123903 en 2021-11-19|38000|CT0001|2         |
|CT0001124153|2020-03-19|Dato del contrato CT0001124153 en 2020-03-19|39600|CT0001|1         |
|CT0001124153|2020-08-24|Dato del contrato CT0001124153 en 2020-08-24|27200|CT0001|2         |
|CT0001124153|2020-09-05|Dato del contrato CT0001124153 en 2020-09-05|15600|CT0001|3         |
|CT0001124153|2020-11-09|Dato del contrato CT0001124153 en 2020-11-09|23300|CT0001|4         |
|CT0001124153|2021-02-10|Dato del contrato CT0001124153 en 2021-02-10|74600|CT0001|5         |
|CT0001124153|2021-04-19|Dato del contrato CT00011

                                                                                

### Crea una columna con la fechas de referencia anterior y actual

In [6]:
WinSpec3 = Window.partitionBy('ctId').orderBy('fecRef').rowsBetween(-1, Window.currentRow)

df.withColumn("listaFechas",F.collect_list(F.col("fecRef")).over(WinSpec3)) \
  .show(50, truncate=False)



+------------+----------+--------------------------------------------+-----+------+------------------------+
|ctId        |fecRef    |dato                                        |saldo|proc  |listaFechas             |
+------------+----------+--------------------------------------------+-----+------+------------------------+
|CT0001123903|2021-11-12|Dato del contrato CT0001123903 en 2021-11-12|1100 |CT0001|[2021-11-12]            |
|CT0001123903|2021-11-19|Dato del contrato CT0001123903 en 2021-11-19|38000|CT0001|[2021-11-12, 2021-11-19]|
|CT0001124153|2020-03-19|Dato del contrato CT0001124153 en 2020-03-19|39600|CT0001|[2020-03-19]            |
|CT0001124153|2020-08-24|Dato del contrato CT0001124153 en 2020-08-24|27200|CT0001|[2020-03-19, 2020-08-24]|
|CT0001124153|2020-09-05|Dato del contrato CT0001124153 en 2020-09-05|15600|CT0001|[2020-08-24, 2020-09-05]|
|CT0001124153|2020-11-09|Dato del contrato CT0001124153 en 2020-11-09|23300|CT0001|[2020-09-05, 2020-11-09]|
|CT0001124153|2021-

                                                                                

### Ahora crea una columna con la fechas de referencia actual y siguiente

# Le pone la fecha de inicio y fin a cada registro basado en la fecha de referencia siguiente !!!!

In [7]:
# Declara la ventana por código de contrato y fecha de referencia ascendente pero tomando sólo el registro actual y el siguiente:

WinSpec4 = Window.partitionBy('ctId').orderBy('fecRef').rowsBetween(Window.currentRow, +1)

# Crea una columna tipo lista con dos elementos: la fecha de referencia del registro actual y la del siguiente

df2 = df.withColumn("listaFechas",F.collect_list(F.col("fecRef")).over(WinSpec4)) 

# Selecciona las columnas que queremos y deshace la lista. El campo fechFin sin valor se genera a null

df2 = df2.select('ctId', 'fecRef', 'dato', 'saldo', 'proc',
      F.col('listaFechas')[0].alias('fechIni'),
      F.col('listaFechas')[1].alias('fechFin')
      )

# Cambiamos lo nulos de la fecha de fin por el valor máximo de fecha:

df2 = df2.na.fill(value='9999-12-31',subset=['fechFin'])

# Et voila:

df2.show(24, truncate=False)

# Guardamos el DF:
df2.write.mode("overwrite").partitionBy("proc").parquet("./datos/contratosFecINIFIN.parquet")


                                                                                

+------------+----------+--------------------------------------------+-----+------+----------+----------+
|ctId        |fecRef    |dato                                        |saldo|proc  |fechIni   |fechFin   |
+------------+----------+--------------------------------------------+-----+------+----------+----------+
|CT0001123903|2021-11-12|Dato del contrato CT0001123903 en 2021-11-12|1100 |CT0001|2021-11-12|2021-11-19|
|CT0001123903|2021-11-19|Dato del contrato CT0001123903 en 2021-11-19|38000|CT0001|2021-11-19|9999-12-31|
|CT0001124153|2020-03-19|Dato del contrato CT0001124153 en 2020-03-19|39600|CT0001|2020-03-19|2020-08-24|
|CT0001124153|2020-08-24|Dato del contrato CT0001124153 en 2020-08-24|27200|CT0001|2020-08-24|2020-09-05|
|CT0001124153|2020-09-05|Dato del contrato CT0001124153 en 2020-09-05|15600|CT0001|2020-09-05|2020-11-09|
|CT0001124153|2020-11-09|Dato del contrato CT0001124153 en 2020-11-09|23300|CT0001|2020-11-09|2021-02-10|
|CT0001124153|2021-02-10|Dato del contrato CT0

                                                                                

### Aquí se usa como un group by. Para usar el row_number el Window tiene que estar ordenado y para usar las agregaciones no.
    Por eso se usan dos especificaciones de Window:

In [8]:
from pyspark.sql.functions import col,avg,sum,min,max,row_number 

WinSpec2 = Window.partitionBy('ctId')

df.withColumn("row",row_number().over(WinSpec)) \
  .withColumn("avg", avg(col("saldo")).over(WinSpec2)) \
  .withColumn("sum", sum(col("saldo")).over(WinSpec2)) \
  .withColumn("min", min(col("saldo")).over(WinSpec2)) \
  .withColumn("max", max(col("saldo")).over(WinSpec2)) \
  .withColumn("maxFec", max(col("fecRef")).over(WinSpec2)) \
  .where(col("row")==1).select("ctId","avg","sum","min","max", "maxFec") \
  .show()




+------------+------------------+------+-----+-----+----------+
|        ctId|               avg|   sum|  min|  max|    maxFec|
+------------+------------------+------+-----+-----+----------+
|CT0001123903|           19550.0| 39100| 1100|38000|2021-11-19|
|CT0001124153|           30510.0|305100| 3000|74600|2021-12-01|
|CT0001124186|21133.333333333332|253600| 1400|78700|2021-10-14|
|CT0001124241|23736.363636363636|261100| 5200|68000|2021-10-15|
|CT0001124381|           15000.0| 30000| 9400|20600|2021-11-01|
|CT0001124417|28872.727272727272|317600| 6600|57100|2021-11-06|
|CT0001124530|17633.333333333332|158700| 1400|41800|2021-07-30|
|CT0001125222|27116.666666666668|162700| 5100|52900|2021-11-26|
|CT0001125450|           24700.0| 98800|15500|33100|2021-01-16|
|CT0001126161|           28980.0|289800|14900|60000|2021-12-24|
|CT0001126562|22021.428571428572|308300| 1400|74800|2021-11-23|
|CT0001126762| 9433.333333333334| 28300| 1400|17700|2020-11-08|
|CT0001127321|29276.923076923078|380600|

                                                                                

### Ahora lo mismo de antes pero para todos los registros del window

In [9]:
df.withColumn("row",row_number().over(WinSpec)) \
  .withColumn("avg", avg(col("saldo")).over(WinSpec2)) \
  .withColumn("sum", sum(col("saldo")).over(WinSpec2)) \
  .withColumn("min", min(col("saldo")).over(WinSpec2)) \
  .withColumn("max", max(col("saldo")).over(WinSpec2)) \
  .withColumn("maxFec", max(col("fecRef")).over(WinSpec2)) \
  .show()


+------------+----------+--------------------+-----+------+---+------------------+------+----+-----+----------+
|        ctId|    fecRef|                dato|saldo|  proc|row|               avg|   sum| min|  max|    maxFec|
+------------+----------+--------------------+-----+------+---+------------------+------+----+-----+----------+
|CT0001123903|2021-11-12|Dato del contrato...| 1100|CT0001|  1|           19550.0| 39100|1100|38000|2021-11-19|
|CT0001123903|2021-11-19|Dato del contrato...|38000|CT0001|  2|           19550.0| 39100|1100|38000|2021-11-19|
|CT0001124153|2020-03-19|Dato del contrato...|39600|CT0001|  1|           30510.0|305100|3000|74600|2021-12-01|
|CT0001124153|2020-08-24|Dato del contrato...|27200|CT0001|  2|           30510.0|305100|3000|74600|2021-12-01|
|CT0001124153|2020-09-05|Dato del contrato...|15600|CT0001|  3|           30510.0|305100|3000|74600|2021-12-01|
|CT0001124153|2020-11-09|Dato del contrato...|23300|CT0001|  4|           30510.0|305100|3000|74600|2021

In [10]:
df.groupBy('proc').count().show()

+------+-----+
|  proc|count|
+------+-----+
|CT0010|79529|
|CT0014|80272|
|CT0002|80153|
|CT0008|79725|
|CT0015|79928|
|CT0005|80600|
|CT0007|79555|
|CT0004|79151|
|CT0013|80052|
|CT0009|79671|
|CT0016|79874|
|CT0001|79285|
|CT0011|79683|
|CT0003|79910|
|CT0006|80192|
|CT0012|79360|
+------+-----+



In [11]:
spark.sparkContext.stop()
print('Sacabao')

Sacabao
