In [0]:
import pyspark
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

spark=SparkSession.builder.getOrCreate()

In [0]:
orden_compra=spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .option("nullValue", "NA") \
    .option("sep", ",") \
    .load("/FileStore/tables/orden_compra.csv")

orden_compra_producto=spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .option("nullValue", "NA") \
    .option("sep", ",") \
    .load("/FileStore/tables/orden_compra_producto.csv")

In [0]:
display(orden_compra)
display(orden_compra_producto)

nroOrdenCompra,idContrato,FechaOrdenCompra,DireccionDestino
100008207568290,102762,2019-02-24,832 Village Drive
100008584802475,125078,2011-05-10,2 Weeping Birch Parkway
100008717287704,104954,2018-10-27,4 Talmadge Alley
100011757187804,120885,2013-04-19,30030 Kinsman Trail
100015219034897,46248,2022-01-06,5220 Fair Oaks Park
100016773137287,100123,2019-03-31,930 Sutteridge Crossing
100019923732714,107587,2017-05-14,403 Redwing Plaza
100027022138051,51901,2019-08-07,6735 Rutledge Street
100027381349784,13953,2020-07-24,06 Maryland Drive
100031874804782,985,2021-10-08,03110 Artisan Alley


idProducto,nroOrdenCompra,Cantidad,PrecioUnitario,Descripcion
1,100015219034897,3473,5.5,Esparrago Verde
1,100186049889598,354,5.5,Esparrago Verde
1,100189751318170,29,5.5,Esparrago Verde
1,100236649551131,165,5.5,Esparrago Verde
1,100322125850311,3226,5.5,Esparrago Verde
1,100364803950295,303,5.5,Esparrago Verde
1,100437291554606,6617,5.5,Esparrago Verde
1,100645512622367,7300,5.5,Esparrago Verde
1,100651746140722,5174,5.5,Esparrago Verde
1,100654650172008,9727,5.5,Esparrago Verde


In [0]:
# Total por orden de compra
display(
    orden_compra.join(orden_compra_producto, orden_compra.nroOrdenCompra == orden_compra_producto.nroOrdenCompra,"inner") \
    .groupBy(orden_compra.nroOrdenCompra) \
    .agg(F.round(F.sum(orden_compra_producto.Cantidad*orden_compra_producto.PrecioUnitario),3) \
         .alias("Monto Total de Compra"))
)

nroOrdenCompra,Monto Total de Compra
100127509325368,2277.0
100626122736699,2873.2
100654995379168,4080.0
100809479994216,4020.0
100873832397356,2965.6
101059652853038,478.5
101667038754457,5785.0
101716357716314,29130.5
101835852977283,20733.6
102955290976186,2150.0


In [0]:
display(
    orden_compra.join(orden_compra_producto, orden_compra.nroOrdenCompra == orden_compra_producto.nroOrdenCompra,"inner") \
    .groupBy(orden_compra.nroOrdenCompra) \
    .agg(F.sum(orden_compra_producto.Cantidad*orden_compra_producto.PrecioUnitario) \
         .alias("Monto Total de Compra"))
)

nroOrdenCompra,Monto Total de Compra
100127509325368,2277.0
100626122736699,2873.2000000000003
100654995379168,4080.0
100809479994216,4020.0
100873832397356,2965.6000000000004
101059652853038,478.5
101667038754457,5785.0
101716357716314,29130.5
101835852977283,20733.6
102955290976186,2150.0


In [0]:

#Creando mi tabla CTE_ProductoVendido
cte_productovendido = orden_compra.join(orden_compra_producto, orden_compra.nroOrdenCompra == orden_compra_producto.nroOrdenCompra,"inner") \
    .groupBy(F.year(orden_compra.FechaOrdenCompra).alias("year"), F.month(orden_compra.FechaOrdenCompra).alias("mes"), orden_compra_producto.Descripcion.alias("producto")) \
    .agg(F.sum(orden_compra_producto.Cantidad).cast("int") \
         .alias("cantidad")) \
    .orderBy(F.year(orden_compra.FechaOrdenCompra).asc(), F.month(orden_compra.FechaOrdenCompra).asc())

#Obteniendo para lista de filtros para ser incluida en isin()
cte_listado = cte_productovendido.groupBy(cte_productovendido.year, cte_productovendido.mes) \
            .agg(F.max(cte_productovendido.cantidad).alias("Max")) \
            .orderBy(cte_productovendido.year.asc(), cte_productovendido.mes.asc()) \
            .select("Max")

#Obteniendo los resultados finales
cte_productovendido.groupBy(cte_productovendido.year, cte_productovendido.mes, cte_productovendido.producto) \
    .agg(F.max(cte_productovendido.cantidad).alias("Max")) \
    .filter(F.col("Max").isin(cte_listado.rdd.flatMap(lambda x: x).collect())) \
    .show()


+----+---+--------------------+-----+
|year|mes|            producto|  Max|
+----+---+--------------------+-----+
|2010|  1|                Chia| 8590|
|2010|  2|                Chia| 8578|
|2010|  3|         Uva Allison| 7225|
|2010|  4|                Chia| 8643|
|2010|  5|           Alcachofa| 6751|
|2010|  6|                Chia| 6451|
|2010|  7|               Salsa| 8099|
|2010|  8|                Chia| 7650|
|2010|  9|       Papaya Andina| 7490|
|2010| 10|       Quinua Blanca| 8435|
|2010| 11|                Chia|10664|
|2010| 12|           Alcachofa| 8793|
|2011|  1|                Chia|12407|
|2011|  2|                Chia| 6682|
|2011|  3|                Chia| 9101|
|2011|  4|         Uva Arra 15| 8367|
|2011|  5|Quinua cocida neg...| 9529|
|2011|  6|                Chia| 7235|
|2011|  7|     Esparrago Verde| 6993|
|2011|  7|            Amaranto| 9029|
+----+---+--------------------+-----+
only showing top 20 rows



In [0]:
orden_compra.join(orden_compra_producto, orden_compra.nroOrdenCompra == orden_compra_producto.nroOrdenCompra,"inner") \
    .groupBy(F.year(orden_compra.FechaOrdenCompra).alias("year"), F.month(orden_compra.FechaOrdenCompra).alias("mes")) \
    .agg(F.sum(orden_compra_producto.Cantidad*orden_compra_producto.PrecioUnitario).cast("int") \
         .alias("Monto Vendido")) \
    .orderBy(F.year(orden_compra.FechaOrdenCompra).asc(), F.month(orden_compra.FechaOrdenCompra).asc()) \
    .show()

+----+---+-------------+
|year|mes|Monto Vendido|
+----+---+-------------+
|2010|  1|       671976|
|2010|  2|       647067|
|2010|  3|       773195|
|2010|  4|       658126|
|2010|  5|       650844|
|2010|  6|       509539|
|2010|  7|       744849|
|2010|  8|       680574|
|2010|  9|       697829|
|2010| 10|       707146|
|2010| 11|       782544|
|2010| 12|       649222|
|2011|  1|       710546|
|2011|  2|       673955|
|2011|  3|       781340|
|2011|  4|       675159|
|2011|  5|       660371|
|2011|  6|       561309|
|2011|  7|       707939|
|2011|  8|       687166|
+----+---+-------------+
only showing top 20 rows

