In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .config("spark.driver.memory", "5g")\
    .appName("sesion_1")\
    .master("local[*]")\
    .getOrCreate()

spark.conf.set("spark.sql.adaptive.enabled", "false")

In [None]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

In [None]:
spark.sparkContext.uiWebUrl

In [None]:
import pyspark.sql.functions as f
import pyspark.sql.types as t

In [None]:
big_contracts_df = spark.read.parquet("../../resources/data/parquet/big_contracts")\
    .withColumnRenamed("cod_titular","cod_client")

big_clients_df = spark.read.parquet("../../resources/data/parquet/big_clients")

big_contracts_df.show(1)
big_clients_df.show(1)

In [None]:
print("Contracts:", big_contracts_df.count())
print("Clients:", big_clients_df.count())

### Coalesce y Repartition

##### Número de particiones

In [None]:
print(big_contracts_df.rdd.getNumPartitions())
print(big_clients_df.rdd.getNumPartitions())

In [None]:
big_clients_df.groupBy(f.spark_partition_id()).count().show()

In [None]:
big_contracts_df.groupBy(f.spark_partition_id()).count().show()

In [None]:
big_contracts_df.join(big_clients_df, ["id"]).rdd.getNumPartitions()

In [None]:
big_contracts_df.join(big_clients_df, ["id"]).explain()

In [None]:
big_contracts_df.join(big_clients_df.sample(False, 0.0001), "id").count()

##### repartition -> Aumenta o disminuye el numero de particiones,

In [None]:
big_contracts_df.repartition(16).explain()

In [None]:
big_contracts_df.repartition(16).groupBy(f.spark_partition_id()).count().show(20)

In [None]:
big_contracts_df.repartition(f.col("cod_client")).explain()

In [None]:
big_contracts_df.repartition(30, f.col("cod_client")).explain()

In [None]:
big_contracts_df.select("cod_client").distinct().count()

In [None]:
big_contracts_df.repartition(30, f.col("cod_client")).groupBy(f.spark_partition_id()).count().show()

In [None]:
big_contracts_df.repartition(30, f.col("cod_client")).groupBy(f.spark_partition_id(), f.col("cod_client")).count().show()

In [None]:
big_contracts_df.repartition(30, f.col("id")).groupBy(f.spark_partition_id(), f.col("id")).count().show()

In [None]:
big_contracts_df.repartition(30, f.col("id")).groupBy(f.spark_partition_id()).count().show()

In [None]:
big_contracts_df.repartition(30, f.col("id")).write.mode("overwrite").parquet("../../resources/data/parquet/t_repartition")

##### coalesce -> Unicamente disminuye el numero de particiones, puede provocar DATA SKEW

In [None]:
big_contracts_df.coalesce(9).rdd.getNumPartitions()

In [None]:
big_contracts_df.repartition(f.col("cod_client")).groupBy(f.spark_partition_id()).count().show()

In [None]:
big_contracts_df.repartition(f.col("cod_client")).coalesce(4).groupBy(f.spark_partition_id()).count().show()

In [None]:
big_contracts_df.repartition(f.col("cod_client")).repartition(4).groupBy(f.spark_partition_id()).count().show()

##### Aplicamos transformaciones con tabla grande

In [None]:
#join_tables_df = big_contracts_df.join(big_clients_df, ["id"], "outer") // -> Transformation_1
from pyspark.sql.window import Window
from pyspark.sql import DataFrame

def transformation1(contracts_df: DataFrame, clients_df: DataFrame) -> DataFrame:
    aggWindow = Window.partitionBy("id")
    lagWindow = Window.partitionBy("id").orderBy(f.col("operation_day").desc())
    
    return contracts_df\
        .join(clients_df.sample(False,0.001, 0).drop("id"), ["cod_client"])\
        .withColumn("vip_true", f.when(f.col("vip") == True, f.lit(1)).otherwise(f.lit(0)))\
        .withColumn("vip_false", f.when(f.col("vip") == False, f.lit(1)).otherwise(f.lit(0)))\
        .withColumn("count_vip_true", f.sum("vip_true").over(aggWindow))\
        .withColumn("count_vip_false", f.sum("vip_false").over(aggWindow))\
        .withColumn("prev_amount", f.lag(f.col("amount"), 5).over(lagWindow))\
        .withColumn("diff_amount", f.col("amount") - f.col("prev_amount"))\
        .filter(f.col("diff_amount") < -800)\
        .filter(f.col("fec_alta") > "2015-01-01")\
        .select("id", "amount", "diff_amount", "fec_alta", "operation_day", "count_vip_true", "count_vip_false")\
        .distinct()

join_tables_df = transformation1(big_contracts_df, big_clients_df)

In [None]:
join_tables_df.write.mode("overwrite").parquet("../../resources/data/parquet/join_tables_df")

In [None]:
def transformation2(df:DataFrame) -> DataFrame:
    aggWindow = Window.partitionBy("fec_alta")
    return df\
        .select(
            f.col("id"),
            f.min(f.col("diff_amount")).over(aggWindow).alias("min_diff_amount"),
            f.max(f.col("diff_amount")).over(aggWindow).alias("max_diff_amount"),
            f.mean(f.col("diff_amount")).over(aggWindow).alias("mean_diff_amount"),
            f.stddev_pop(f.col("diff_amount")).over(aggWindow).alias("stddev_diff_amount")
        )


def transformation3(df:DataFrame) -> DataFrame:
    return df\
        .select(
            f.abs(f.col("count_vip_true") - f.col("count_vip_false")).alias("id"),
            f.date_sub(f.col("fec_alta"), f.col("count_vip_false").cast(t.IntegerType())).alias("new_fec_alta"),
            f.date_sub(f.col("fec_alta"), f.col("count_vip_true").cast(t.IntegerType())).alias("new_operation_day"),
            (f.col("count_vip_true") + f.col("count_vip_false")).alias("new_count_vip_true"),
            (f.col("count_vip_false") + f.col("count_vip_true")).alias("new_count_vip_false")
        )


In [None]:
join_tables_df = transformation1(big_contracts_df, big_clients_df)

transformation2_df = transformation2(join_tables_df)
transformation3_df = transformation3(join_tables_df)

expensive_df = join_tables_df\
    .join(transformation2_df, ["id"])\
    .join(transformation3_df, ["id"])

In [None]:
expensive_df.write.mode("overwrite").parquet("../../resources/data/parquet/expensive_df")

### Cache y Persist

### What is a Caching?
In applications that reuse the same datasets over and over, one of the most useful optimizations is caching. Caching will place a DataFrame or table into temporary storage across the executors in your cluster and make subsequent reads faster.

**Without Spark Caching**
```
          +------------------+     +------------------+
input --> | Transformation 1 | --> | Transformation 2 | --> Output 1
          +------------------+     +------------------+
          +------------------+     +------------------+
input --> | Transformation 1 | --> | Transformation 3 | --> Output 2
          +------------------+     +------------------+
          +------------------+     +------------------+
input --> | Transformation 1 | --> | Transformation 4 | --> Output 3
          +------------------+     +------------------+
```
**With Spark Caching**
```
                                                     +------------------+
                                               +---> | Transformation 2 | --> Output 1
                                               |     +------------------+
          +------------------+     +-------+   |     +------------------+
input --> | Transformation 1 | --> | Cache | --+---> | Transformation 3 | --> Output 2
          +------------------+     +-------+   |     +------------------+
                                               |     +------------------+
                                               +---> | Transformation 4 | --> Output 3
                                                     +------------------+
```

In [2]:
# - Almacenar el contenido de un Dataframe en cualquier nivel del procesamiento

# Almacenamiento por defecto en DataFrames -> MEMORY_AND_DISK_DESER
# Almacenamiento por defecto en RDD -> MEMORY_ONLY
from pyspark import StorageLevel

In [None]:
join_persisted_df = join_tables_df.persist(StorageLevel.)

In [None]:
# El proceso de cache es lazy, por lo tanto se activa posterior a una acción
join_persisted_df.show()

In [None]:
join_persisted_df.count()

##### unpersist

In [None]:
join_persisted_df.unpersist()

##### aplicación de cache/persist

In [None]:
join_tables_df = transformation1(big_contracts_df, big_clients_df)

join_persisted_df = join_tables_df.persist(StorageLevel.MEMORY_AND_DISK)

transformation2_df = transformation2(join_persisted_df)
transformation3_df = transformation3(join_persisted_df)

expensive_df = join_persisted_df\
    .join(transformation2_df, ["id"])\
    .join(transformation3_df, ["id"])

In [None]:
expensive_df.write.mode("overwrite").parquet("../../resources/data/parquet/expensive_df")

In [None]:
join_persisted_df.unpersist()

### Broadcast

In [None]:
big_contracts_df\
    .join(big_clients_df.sample(False,0.001, 0).drop("id"), ["cod_client"])\
    .write.mode("overwrite").parquet("../../resources/data/parquet/join_tables_df")

In [None]:
big_contracts_df\
    .join(f.broadcast(big_clients_df.sample(False,0.001, 0).drop("id")), ["cod_client"])\
    .write.mode("overwrite").parquet("../../resources/data/parquet/join_tables_df")

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760")

In [None]:
join_tables_df = transformation1(big_contracts_df, big_clients_df)

transformation2_df = transformation2(join_tables_df)
transformation3_df = transformation3(join_tables_df)

expensive_df = join_tables_df\
    .join(transformation2_df, ["id"])\
    .join(transformation3_df, ["id"])

In [None]:
expensive_df.write.mode("overwrite").parquet("../../resources/data/parquet/expensive_df")

#### Broadcast + persist

In [None]:
join_tables_df = transformation1(big_contracts_df, big_clients_df)

join_persisted_df = join_tables_df.persist(StorageLevel.MEMORY_AND_DISK)

transformation2_df = transformation2(join_persisted_df)
transformation3_df = transformation3(join_persisted_df)

expensive_df = join_persisted_df\
    .join(f.broadcast(transformation2_df), ["id"])\
    .join(f.broadcast(transformation3_df), ["id"])

In [None]:
expensive_df.write.mode("overwrite").parquet("../../resources/data/parquet/expensive_df")

In [None]:
join_persisted_df.unpersist()