In [1]:
from pyspark.sql import SparkSession, dataframe
from pyspark.sql.functions import when, col, sum, count, isnan, round
from pyspark.sql.functions import regexp_replace, concat_ws, sha2, rtrim
from pyspark.sql.functions import unix_timestamp, from_unixtime, to_date
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql import HiveContext

from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import when

from pyspark.sql.functions import countDistinct 
from pyspark.sql.functions import year, month, dayofmonth, quarter 

spark = SparkSession.builder.master("local[*]")\
    .enableHiveSupport()\
    .getOrCreate()

### 01.Criar dataframes das tabelas que estão no hive

In [7]:
# Tabela Categorias
df_categorias = spark.sql("""
                    SELECT *
                    FROM aula_hive_stg.tbl_categoria
                    WHERE id_categoria != 'id_categoria'
                    """)
df_categorias.show()

+------------+---------------+-------------+--------+
|id_categoria|   ds_categoria|perc_parceiro| dt_foto|
+------------+---------------+-------------+--------+
|           1|Categoria - 001|          2.0|20230603|
|           2|Categoria - 002|          2.0|20230603|
|           3|Categoria - 003|          2.0|20230603|
|           4|Categoria - 004|          2.0|20230603|
|           5|Categoria - 005|          5.0|20230603|
|           6|Categoria - 006|          1.0|20230603|
|           7|Categoria - 007|          5.0|20230603|
|           8|Categoria - 008|          3.0|20230603|
|           9|Categoria - 009|          5.0|20230603|
|          10|Categoria - 010|          6.0|20230603|
|          11|Categoria - 011|          6.0|20230603|
|          12|Categoria - 012|          4.0|20230603|
|          13|Categoria - 013|          3.0|20230603|
|          14|Categoria - 014|          1.0|20230603|
|          15|Categoria - 015|          4.0|20230603|
|          16|Categoria - 01

In [8]:
# Tabela cidade
df_cidade = spark.sql("""
                SELECT *
                FROM aula_hive_stg.tbl_cidade
                WHERE id_cidade != 'id_cidade'
                """)
df_cidade.show()

+---------+----------------+---------+--------+
|id_cidade|       ds_cidade|id_estado| dt_foto|
+---------+----------------+---------+--------+
|     1058|           Betim|        1|20230603|
|       33|      ACRELANDIA|        2|20230603|
|      485|    ASSIS BRASIL|        2|20230603|
|      958|       BRASILEIA|        2|20230603|
|     1388|        CAPIXABA|        2|20230603|
|     1851| CRUZEIRO DO SUL|        2|20230603|
|     2022|       Cravinhos|        2|20230603|
|     2232|  EPITACIOLANDIA|        2|20230603|
|     2347|           FEIJO|        2|20230603|
|     3879|     MANCIO LIMA|        2|20230603|
|     3895|   MANOEL URBANO|        2|20230603|
|     5628|      RIO BRANCO|        2|20230603|
|     5767|      Rio Branco|        2|20230603|
|     6585|  SENA MADUREIRA|        2|20230603|
|     6593|SENADOR GUIOMARD|        2|20230603|
|     6845|       SÃO PAULO|        2|20230603|
|     6987|        TARAUACA|        2|20230603|
|     7241|      UBERLANDIA|        2|20

In [9]:
# Tabela estado
df_estado = spark.sql("""
                SELECT *
                FROM aula_hive_stg.tbl_estado
                WHERE id_estado != 'id_estado'
                """)
df_estado.show()

+---------+---------+--------+
|id_estado|ds_estado| dt_foto|
+---------+---------+--------+
|        1|       31|20230603|
|        2|       AC|20230603|
|        3|       AL|20230603|
|        4|       AM|20230603|
|        5|       AP|20230603|
|        6|       BA|20230603|
|        7|       CE|20230603|
|        8|       DF|20230603|
|        9|       Df|20230603|
|       10|       ES|20230603|
|       11|       GO|20230603|
|       12|       Go|20230603|
|       13|       MA|20230603|
|       14|       MG|20230603|
|       15|       MS|20230603|
|       16|       MT|20230603|
|       17|       Mg|20230603|
|       18|       PA|20230603|
|       19|       PB|20230603|
|       20|       PE|20230603|
+---------+---------+--------+
only showing top 20 rows



In [11]:
# Tabela clientes
df_clientes = spark.sql("""
                    SELECT *
                    FROM aula_hive_stg.tbl_cliente
                    WHERE id_cliente != 'id_cliente'
                    """)
df_clientes.show()

+----------+--------------------+---------+--------+
|id_cliente|          nm_cliente|flag_ouro| dt_foto|
+----------+--------------------+---------+--------+
|  78262350|Cliente Magalu - ...|        0|20230603|
|  57301020|Cliente Magalu - ...|        0|20230603|
|   4639167|Cliente Magalu - ...|        1|20230603|
|  53130287|Cliente Magalu - ...|        0|20230603|
|  16456085|Cliente Magalu - ...|        0|20230603|
| 124100202|Cliente Magalu - ...|        0|20230603|
|  15659567|Cliente Magalu - ...|        0|20230603|
|  37375617|Cliente Magalu - ...|        1|20230603|
|  52082515|Cliente Magalu - ...|        0|20230603|
|  56214875|Cliente Magalu - ...|        0|20230603|
| 137980527|Cliente Magalu - ...|        0|20230603|
|  11806492|Cliente Magalu - ...|        0|20230603|
| 112690290|Cliente Magalu - ...|        1|20230603|
| 120559842|Cliente Magalu - ...|        1|20230603|
| 140697495|Cliente Magalu - ...|        0|20230603|
|  64431627|Cliente Magalu - ...|        0|202

In [12]:
# Tabela filial
df_filial = spark.sql("""
                SELECT *
                FROM aula_hive_stg.tbl_filial
                WHERE id_filial != 'id_filial'
                """)
df_filial.show()

+---------+---------------+---------+--------+
|id_filial|      ds_filial|id_cidade| dt_foto|
+---------+---------------+---------+--------+
|        6|Filial - 000006|       22|20230603|
|        9|Filial - 000009|       22|20230603|
|       88|Filial - 000088|       22|20230603|
|       98|Filial - 000098|       22|20230603|
|      118|Filial - 000118|       22|20230603|
|      146|Filial - 000146|       22|20230603|
|      223|Filial - 000223|       22|20230603|
|      268|Filial - 000268|       22|20230603|
|      353|Filial - 000353|       22|20230603|
|      363|Filial - 000363|       22|20230603|
|      372|Filial - 000372|       22|20230603|
|      373|Filial - 000373|       22|20230603|
|      417|Filial - 000417|       22|20230603|
|      464|Filial - 000464|       22|20230603|
|      470|Filial - 000470|       22|20230603|
|      531|Filial - 000531|       22|20230603|
|      538|Filial - 000538|       22|20230603|
|      617|Filial - 000617|       22|20230603|
|      620|Fi

In [13]:
# Tabela item_pedido
df_item_pedido = spark.sql("""
                        SELECT *
                        FROM aula_hive_stg.tbl_item_pedido
                        WHERE id_pedido != 'id_pedido'
                        ORDER BY vr_unitario DESC
                        """)
df_item_pedido.show()

+-----------+----------+----------+-----------+--------+
|  id_pedido|id_produto|quantidade|vr_unitario| dt_foto|
+-----------+----------+----------+-----------+--------+
| 4972773226|   1357853|         1|    9998.17|20230603|
| 4782631506|   5017608|         1|     999.57|20230603|
| 4793147056|    721095|         1|     999.57|20230603|
| 4800263156|    721095|         1|     999.57|20230603|
| 4810312456|    721095|         1|     999.57|20230603|
| 4770011776|   5017608|         1|     999.57|20230603|
|48935276716|   3277163|         8|      998.3|20230603|
| 4896805005|   2282017|         1|     998.27|20230603|
| 4748781753|     81781|         1|     996.97|20230603|
| 4708762423|   1158116|         1|     996.97|20230603|
| 4747737453|     21525|         1|     996.97|20230603|
| 4727352703|   2589693|         1|     996.97|20230603|
| 4748424453|     21525|         1|     996.97|20230603|
| 4748822373|   1831256|         1|     996.97|20230603|
| 4688052453|   1831256|       

In [14]:
# Tabela parceiro
df_parceiro = spark.sql("""
                    SELECT *
                    FROM aula_hive_stg.tbl_parceiro
                    WHERE id_parceiro != 'id_parceiro'
                    """)
df_parceiro.show()

+-----------+--------------------+--------+
|id_parceiro|         nm_parceiro| dt_foto|
+-----------+--------------------+--------+
|          1|Parceiro Magalu - 01|20230603|
|          2|Parceiro Magalu - 02|20230603|
|          3|Parceiro Magalu - 03|20230603|
|          4|Parceiro Magalu - 04|20230603|
|          5|Parceiro Magalu - 05|20230603|
|          6|Parceiro Magalu - 06|20230603|
|          7|Parceiro Magalu - 07|20230603|
|          8|Parceiro Magalu - 08|20230603|
|          9|Parceiro Magalu - 09|20230603|
|         10|Parceiro Magalu - 10|20230603|
|         11|Parceiro Magalu - 11|20230603|
|         12|Parceiro Magalu - 12|20230603|
|         13|Parceiro Magalu - 13|20230603|
|         14|Parceiro Magalu - 14|20230603|
|         15|Parceiro Magalu - 15|20230603|
|         16|Parceiro Magalu - 16|20230603|
+-----------+--------------------+--------+



In [15]:
# Tabela produtos
df_produtos = spark.sql("""
                    SELECT *
                    FROM aula_hive_stg.tbl_produto
                    WHERE id_produto != 'id_produto'
                    """)
df_produtos.show()

+----------+--------------------+---------------+--------+
|id_produto|          ds_produto|id_subcategoria| dt_foto|
+----------+--------------------+---------------+--------+
|     12006|Produto - 0000012006|            572|20230603|
|     45183|Produto - 0000045183|           2142|20230603|
|     78905|Produto - 0000078905|            140|20230603|
|     79758|Produto - 0000079758|           3132|20230603|
|    117196|Produto - 0000117196|             96|20230603|
|    169903|Produto - 0000169903|           1735|20230603|
|    185406|Produto - 0000185406|             82|20230603|
|    201415|Produto - 0000201415|           1519|20230603|
|    284396|Produto - 0000284396|           3132|20230603|
|    293376|Produto - 0000293376|           1309|20230603|
|    299033|Produto - 0000299033|            336|20230603|
|    301419|Produto - 0000301419|           2370|20230603|
|    312382|Produto - 0000312382|           1610|20230603|
|    315401|Produto - 0000315401|           2880|2023060

In [16]:
# Tabela subcategoria
df_subcategoria = spark.sql("""
                        SELECT *
                        FROM aula_hive_stg.tbl_subcategoria
                        WHERE id_subcategoria != 'id_subcategoria'
                        """)
df_subcategoria.show()

+---------------+--------------------+------------+--------+
|id_subcategoria|     ds_subcategoria|id_categoria| dt_foto|
+---------------+--------------------+------------+--------+
|            132|Sub-categoria - 0...|           1|20230603|
|            137|Sub-categoria - 0...|           1|20230603|
|            288|Sub-categoria - 0...|           1|20230603|
|            380|Sub-categoria - 0...|           1|20230603|
|            397|Sub-categoria - 0...|           1|20230603|
|            498|Sub-categoria - 0...|           1|20230603|
|            508|Sub-categoria - 0...|           1|20230603|
|            514|Sub-categoria - 0...|           1|20230603|
|            586|Sub-categoria - 0...|           1|20230603|
|            668|Sub-categoria - 0...|           1|20230603|
|            751|Sub-categoria - 0...|           1|20230603|
|            923|Sub-categoria - 0...|           1|20230603|
|           1040|Sub-categoria - 0...|           1|20230603|
|           1170|Sub-cat

In [17]:
# Tabela pedido
df_pedido = spark.sql("""
                    SELECT *
                    FROM aula_hive_stg.tbl_pedido
                    WHERE id_pedido != 'id_pedido'
                    ORDER BY vr_total_pago DESC
                    """)
df_pedido.show()

+-----------+--------------------+-----------+----------+---------+-------------+--------+
|  id_pedido|           dt_pedido|id_parceiro|id_cliente|id_filial|vr_total_pago| dt_foto|
+-----------+--------------------+-----------+----------+---------+-------------+--------+
| 4972773226|2021-08-08T00:00:...|          6|  10176110|        9|      9998.17|20230603|
| 4713322026|2021-06-13T00:00:...|          6|  54099842|      329|      9992.97|20230603|
|48387711216|2021-07-10T00:00:...|         16| 141774350|        3|       999.69|20230603|
|49280204216|2021-07-29T00:00:...|         16|  34170242|      231|       999.66|20230603|
| 4770011776|2021-06-25T00:00:...|          6|   1674237|      371|       999.57|20230603|
| 4810312456|2021-07-05T00:00:...|          6| 121001967|        3|       999.57|20230603|
| 4793147056|2021-06-30T00:00:...|          6| 141065652|      257|       999.57|20230603|
| 4782631506|2021-06-28T00:00:...|          6|  45642457|      371|       999.57|20230603|

### 02.Criar um dataframe (df_pedidos) esta dataframe deve ser a união de pedido e item_pedido;

In [18]:
df_pedido.createOrReplaceTempView("pedido")
df_item_pedido.createOrReplaceTempView("item_pedido")

In [23]:
query = """
        SELECT
            P.*,
            IP.id_produto,
            IP.quantidade,
            IP.vr_unitario
        FROM
            pedido as P
        JOIN item_pedido AS IP
        ON
            P.id_pedido = IP.id_pedido
        ORDER BY quantidade DESC
        """

df_pedidos = spark.sql(query)

df_pedidos.filter(df_pedidos["id_pedido"] != "id_pedido").show()

+-----------+--------------------+-----------+----------+---------+-------------+--------+----------+----------+-----------+
|  id_pedido|           dt_pedido|id_parceiro|id_cliente|id_filial|vr_total_pago| dt_foto|id_produto|quantidade|vr_unitario|
+-----------+--------------------+-----------+----------+---------+-------------+--------+----------+----------+-----------+
| 4706802305|2021-06-11T00:00:...|          5|  75664277|      141|       410.67|20230603|    390538|         9|      45.63|
| 5080690026|2021-08-31T00:00:...|          6| 146166857|      329|      8939.97|20230603|   4355238|         9|     993.33|
|49403134213|2021-08-01T00:00:...|         13| 102149515|        3|      2377.91|20230603|    728063|         9|     233.88|
|49345371213|2021-07-31T00:00:...|         13|  73656702|        3|      2420.82|20230603|   1064898|         9|     268.98|
| 4713322026|2021-06-13T00:00:...|          6|  54099842|      329|      9992.97|20230603|   4355238|         9|    1110.33|


#### 2.1.Ver a quantidade de pedidos?

In [25]:
qtd_pedidos = df_pedidos.select("id_pedido").distinct().count()
print(f"A quantidade de pedidos é de:{qtd_pedidos}")

Py4JJavaError: An error occurred while calling o494.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 32 (count at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Stream is corrupted 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554) 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:470) 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64) 	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) 	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source) 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithoutKey_0$(Unknown Source) 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) 	at org.apache.spark.scheduler.Task.run(Task.scala:121) 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403) 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409) 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 	at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Stream is corrupted 	at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:202) 	at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) 	at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:170) 	at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:361) 	at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:348) 	at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:348) 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 	at org.apache.spark.util.Utils$.copyStream(Utils.scala:369) 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:462) 	... 23 more 
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1493)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2107)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2830)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2829)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2829)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


#### 2.2.Quantidade de produtos e agrupa-los por pedido

In [26]:
produtos_pedido = (df_pedidos
                   .groupby("id_pedido")
                   .agg(sum("quantidade")
                        .alias("quantidade_produtos"))
                   .orderBy("quantidade_produtos", ascending=False))
produtos_pedido.show()

+-----------+-------------------+
|  id_pedido|quantidade_produtos|
+-----------+-------------------+
|49001989016|               50.0|
|48013645016|               45.0|
|48483039513|               40.0|
|47272431213|               40.0|
|48704946213|               40.0|
|48265171013|               36.0|
|47769750516|               32.0|
|50903766713|               31.0|
|48474595213|               30.0|
|48456340013|               30.0|
|47741988510|               30.0|
|49910739213|               30.0|
|48813362513|               30.0|
|48112853213|               30.0|
|48394480213|               30.0|
|51367310716|               30.0|
|49284138513|               30.0|
|46972298716|               30.0|
|49235898013|               30.0|
|49630885213|               28.0|
+-----------+-------------------+
only showing top 20 rows



#### 2.3.Quantidade de pedidos por cliente

In [16]:
qtd_pedido_cliente = (df_pedidos
                      .groupby("id_cliente")
                      .agg(countDistinct("id_pedido")
                           .alias("pedidos_por_cliente"))
                      .orderBy("pedidos_por_cliente", ascending=False))
qtd_pedido_cliente.show()

+----------+-------------------+
|id_cliente|pedidos_por_cliente|
+----------+-------------------+
|  19912727|                182|
| 133452557|                174|
| 125627750|                120|
|  13565460|                112|
|  20325327|                 97|
|  32115700|                 76|
|  10344587|                 68|
|   4806975|                 67|
|   1501190|                 66|
|  90954057|                 66|
|  97294812|                 65|
| 105584482|                 58|
| 119699252|                 55|
|  41453777|                 53|
|  62353192|                 52|
| 124140150|                 50|
|  54170967|                 47|
|  16352982|                 47|
| 138871907|                 46|
|  65586217|                 45|
+----------+-------------------+
only showing top 20 rows



#### 2.4.Quantidade de pedidos por parceiro

In [17]:
qtd_pedido_parceiro = (df_pedidos
                       .groupby("id_parceiro")
                       .agg(countDistinct("id_pedido")
                            .alias("pedidos_parceiro"))
                       .orderBy("pedidos_parceiro", ascending=False))
qtd_pedido_parceiro.show()

+-----------+----------------+
|id_parceiro|pedidos_parceiro|
+-----------+----------------+
|         16|          332413|
|         13|          306170|
|          6|           62142|
|          5|           19193|
|         10|            8822|
|          3|            3408|
|          1|             705|
|         11|             446|
|          4|              88|
|          8|              44|
|          2|              40|
+-----------+----------------+



#### 2.5.Quantidade de pedido por filial

In [18]:
qtd_pedido_filial = (df_pedidos
                     .groupby("id_filial")
                     .agg(countDistinct("id_pedido")
                          .alias("pedidos_filial"))
                     .orderBy("pedidos_filial", ascending=False))
qtd_pedido_filial.show()

+---------+--------------+
|id_filial|pedidos_filial|
+---------+--------------+
|      231|        204263|
|        3|        181948|
|      257|         19380|
|      883|         15514|
|      228|         14119|
|      366|         12327|
|      494|         12214|
|      117|         11741|
|      547|          9831|
|     1680|          7995|
|      276|          7376|
|     1692|          7083|
|      439|          7056|
|      229|          6937|
|      435|          6728|
|        4|          6270|
|      141|          5961|
|      224|          5471|
|      884|          5136|
|     1730|          3728|
+---------+--------------+
only showing top 20 rows



### 03.Juntar criar df_filial que deverá ser a junção das tabelas filial, cidade e estado;

In [27]:
df_filial.createOrReplaceTempView("filial")
df_cidade.createOrReplaceTempView("cidade")
df_estado.createOrReplaceTempView("estado")

In [28]:
query = """
        SELECT
            F.id_filial,
            C.id_cidade,
            E.id_estado,
            F.ds_filial,
            C.ds_cidade,
            E.ds_estado
        FROM
            filial AS F
        JOIN
            cidade AS C ON F.id_cidade = C.id_cidade
        JOIN
            estado AS E ON C.id_estado = E.id_estado
        """

df_filiais = spark.sql(query)
df_filiais.filter(df_filiais["id_filial"] != "id_filial").show()

+---------+---------+---------+---------------+-----------+---------+
|id_filial|id_cidade|id_estado|      ds_filial|  ds_cidade|ds_estado|
+---------+---------+---------+---------------+-----------+---------+
|      805|      711|        7|Filial - 000805|   BARREIRA|       CE|
|     1002|      711|        7|Filial - 001002|   BARREIRA|       CE|
|     1233|      711|        7|Filial - 001233|   BARREIRA|       CE|
|     1474|      986|        7|Filial - 001474|BREJO SANTO|       CE|
|      325|      351|        7|Filial - 000325|    ARACATI|       CE|
|      755|      351|        7|Filial - 000755|    ARACATI|       CE|
|      800|      351|        7|Filial - 000800|    ARACATI|       CE|
|      831|      351|        7|Filial - 000831|    ARACATI|       CE|
|      863|      351|        7|Filial - 000863|    ARACATI|       CE|
|      976|      351|        7|Filial - 000976|    ARACATI|       CE|
|     1079|      351|        7|Filial - 001079|    ARACATI|       CE|
|     1251|      351

#### 3.1.Juntar com o df_pedidos

In [29]:
df_pedidos.createOrReplaceTempView("pedidos")
df_filiais.createOrReplaceTempView("filiais")

In [30]:
query = """
        SELECT
            P.*,
            F.id_cidade,
            F.id_estado,
            F.ds_cidade,
            F.ds_estado,
            F.ds_filial
        FROM
            pedidos AS P
        JOIN
            filiais AS F ON P.id_filial = F.id_filial
        """

df_pedidos_filiais = spark.sql(query)
df_pedidos_filiais.show(truncate=False)

+-----------+------------------------+-----------+----------+---------+-------------+--------+----------+----------+-----------+---------+---------+----------------+---------+---------------+
|id_pedido  |dt_pedido               |id_parceiro|id_cliente|id_filial|vr_total_pago|dt_foto |id_produto|quantidade|vr_unitario|id_cidade|id_estado|ds_cidade       |ds_estado|ds_filial      |
+-----------+------------------------+-----------+----------+---------+-------------+--------+----------+----------+-----------+---------+---------+----------------+---------+---------------+
|48443031713|2021-07-12T00:00:00.000Z|13         |136800692 |1090     |129.99       |20230603|4335308   |1         |129.99     |737      |33       |BARUERI         |SP       |Filial - 001090|
|48472409513|2021-07-12T00:00:00.000Z|13         |69267635  |1090     |259.99       |20230603|1375916   |1         |259.99     |737      |33       |BARUERI         |SP       |Filial - 001090|
|48454600516|2021-07-12T00:00:00.000Z|16

#### 3.2.Ver a quantidade de pedidos por estado

In [23]:
df_pedidos_por_estado = (df_pedidos_filiais
                      .groupBy("id_estado", "ds_estado")
                      .agg(countDistinct("id_pedido").alias("pedidos_por_estado"))
                      .orderBy("pedidos_por_estado", ascending=False))
df_pedidos_por_estado.show(27)

+---------+---------+------------------+
|id_estado|ds_estado|pedidos_por_estado|
+---------+---------+------------------+
|       14|       MG|            317571|
|       18|       PA|            235021|
|       11|       GO|             36273|
|       20|       PE|             36204|
|       31|       SC|             26252|
|       33|       SP|             14683|
|       13|       MA|             13833|
|        6|       BA|             10541|
|       28|       RS|              9024|
|        7|       CE|              8042|
|       22|       PR|              7315|
|       25|       RN|              5419|
|        3|       AL|              2822|
|       10|       ES|              2308|
|       32|       SE|              2202|
|       19|       PB|              1992|
|       21|       PI|              1213|
|       24|       RJ|              1189|
|       16|       MT|               675|
|        8|       DF|               461|
|       15|       MS|               270|
|       35|     

#### 3.3.Top 10 filial que mais vendeu

In [24]:
df_vendas_por_filial = (df_pedidos_filiais
                     .groupBy("id_filial", "ds_filial")
                     .agg(round(sum("quantidade"), 2).alias("volume_vendas"))
                     .orderBy("volume_vendas", ascending=False))
df_vendas_por_filial.show(10)

+---------+---------------+-------------+
|id_filial|      ds_filial|volume_vendas|
+---------+---------------+-------------+
|      231|Filial - 000231|       240806|
|        3|Filial - 000003|       220768|
|      257|Filial - 000257|        23725|
|      883|Filial - 000883|        17717|
|      228|Filial - 000228|        15851|
|      366|Filial - 000366|        13735|
|      494|Filial - 000494|        13461|
|      117|Filial - 000117|        13459|
|      547|Filial - 000547|        11169|
|     1680|Filial - 001680|         9531|
+---------+---------------+-------------+
only showing top 10 rows



### 04.Criar o dataframe df_stage juntando todas as bases do nosso modelo relacional

In [33]:
#df_pedidos com df_item_pedido
df_stage = df_pedido.join(df_item_pedido, df_pedido.id_pedido == df_item_pedido.id_pedido, 'left')
df_stage = df_stage.drop(df_item_pedido.id_pedido)

#df_stage com df_parceiro
df_stage = df_stage.join(df_parceiro, df_stage.id_parceiro == df_parceiro.id_parceiro, 'left')
df_stage = df_stage.drop(df_parceiro.id_parceiro)

#df_stage com df_cliente
df_stage = df_stage.join(df_clientes, df_stage.id_cliente == df_clientes.id_cliente, 'left')
df_stage = df_stage.drop(df_clientes.id_cliente)

#df_stage com df_filial
df_stage = df_stage.join(df_filial, df_stage.id_filial == df_filial.id_filial, 'left')
df_stage = df_stage.drop(df_filial.id_filial)

#df_stage com df_cidade
df_stage = df_stage.join(df_cidade, df_stage.id_cidade == df_cidade.id_cidade, 'left')
df_stage = df_stage.drop(df_cidade.id_cidade)

#df_stage com df_estado
df_stage = df_stage.join(df_estado, df_stage.id_estado == df_estado.id_estado, 'left')
df_stage = df_stage.drop(df_estado.id_estado)

# df_stage com df_produtos
#df_stage = df_stage.join(df_produtos, df_stage.ds_produto == df_produtos.ds_produto, 'left')
#df_stage = df_stage.drop(df_produtos.ds_produto)

# df_stage com df_subcategoria
#df_stage = df_stage.join(df_subcategoria, df_stage.id_subcategoria == df_subcategoria.id_subcategoria, 'left')
#df_stage = df_stage.drop(df_subcategoria.id_subcategoria)

# df_stage com df_categorias
#df_stage = df_stage.join(df_categorias, df_stage.id_categoria == df_categorias.id_categoria, 'left')
#df_stage = df_stage.drop(df_categorias.id_categoria)

#show
#df_stage.show(truncate=False)

# Trabalhando com calendario

#### 1. Com o dataframe df_stage criado, criar as colunas do calendário

In [34]:
df_stage = (df_stage
            .withColumn('Ano', year(df_stage.dt_pedido))
            .withColumn('Mês', month(df_stage.dt_pedido))
            .withColumn('Dia', dayofmonth(df_stage.dt_pedido))
            .withColumn('Trimestre', quarter(df_stage.dt_pedido))
           )

df_stage.show(truncate=False)

Py4JJavaError: An error occurred while calling o740.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 112 (showString at <unknown>:0) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Stream is corrupted 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554) 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:470) 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64) 	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) 	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage16.sort_addToSorter_0$(Unknown Source) 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage16.processNext(Unknown Source) 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) 	at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83) 	at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:794) 	at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:755) 	at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:917) 	at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:953) 	at org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68) 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage19.processNext(Unknown Source) 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) 	at org.apache.spark.scheduler.Task.run(Task.scala:121) 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403) 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409) 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 	at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Stream is corrupted 	at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:202) 	at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) 	at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:170) 	at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:361) 	at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:348) 	at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:348) 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 	at org.apache.spark.util.Utils$.copyStream(Utils.scala:369) 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:462) 	... 31 more 
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1493)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2107)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.GeneratedMethodAccessor134.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [27]:
#apenas para mostrar apenas o que foi pedido na atividade
df_stage.select('dt_pedido', 'Ano', 'Mês', 'Dia', 'Trimestre').show()

+--------------------+----+---+---+---------+
|           dt_pedido| Ano|Mês|Dia|Trimestre|
+--------------------+----+---+---+---------+
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2|        2|
|2021-06-02T00:00:...|2021|  6|  2