In [75]:
#lib cria tabelas/sessões do pyspark
import pyspark as pyspark 
from pyspark.sql import SparkSession

In [76]:
#lib cria tabelas/sessões do pyspark
from pyspark.sql.types import *

In [79]:
#lib groupby funções (agg, mean, etc...)
import pyspark.sql.functions as f
from pyspark.sql.types import FloatType
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.appName('Dados_Emp').getOrCreate()

In [4]:
path_site = r"C:\Users\tthia\Downloads\site.json"
path_orc = r"C:\Users\tthia\Downloads\loja.orc"
path_contabilidade = r"C:\Users\tthia\Downloads\contabilidade.csv.gz"

In [5]:
dados_site = spark.read.json(path_site)
dados_loja = spark.read.format('orc').load(path_orc)
dados_contabilidade = spark.read.format('csv').option('header', True).option('delimiter', ";").load(path_contabilidade)

In [11]:
dados_site.printSchema()

root
 |-- id_ped: long (nullable = true)
 |-- id_unidade: string (nullable = true)
 |-- preco_venda: string (nullable = true)
 |-- quantidade: string (nullable = true)



In [13]:
dados_contabilidade.printSchema()

root
 |-- id: string (nullable = true)
 |-- status: string (nullable = true)
 |-- data_pedido: string (nullable = true)



In [22]:
Schema = StructType([StructField('id', IntegerType(), True),
                    StructField('Status', StringType(), True),
                    StructField('data_pedido', TimestampType(), True)])

In [24]:
dados_contabilidade = spark.read.format('csv').schema(Schema).option('header', True).option('delimiter', ";").load(path_contabilidade)

In [25]:
dados_contabilidade.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- data_pedido: timestamp (nullable = true)



In [40]:
dados_loja.show(2)

+---------+-------------------+-----------+
|id_pedido|       data_entrega|frete_valor|
+---------+-------------------+-----------+
|965107906|2019-04-22 16:33:14|       22.5|
|965107906|2019-04-22 18:08:37|      30.94|
+---------+-------------------+-----------+
only showing top 2 rows



In [29]:
dados_contabilidade.show(2)

+-----------+------+-------------------+
|         id|Status|        data_pedido|
+-----------+------+-------------------+
|  420143590|   CAN|2019-02-11 10:31:00|
|-1085764026|   ETM|2019-02-11 07:13:00|
+-----------+------+-------------------+
only showing top 2 rows



In [34]:
dataframe_inner = dados_contabilidade.join(dados_site, dados_contabilidade.id == dados_site.id_ped)

In [50]:
dataframe_inner.show(5)

+-----------+------+-------------------+-----------+----------+-----------+----------+
|         id|Status|        data_pedido|     id_ped|id_unidade|preco_venda|quantidade|
+-----------+------+-------------------+-----------+----------+-----------+----------+
| -716752696|   NFS|2019-02-11 14:26:00| -716752696|         8|      83.73|         1|
|  788688814|   ENT|2019-04-09 16:30:00|  788688814|         8|       35.9|         1|
| 1036357147|   NFS|2019-04-09 21:17:00| 1036357147|         1|     699.92|         1|
|  788688814|   AXD|2019-02-15 13:29:00|  788688814|         8|       35.9|         1|
|-1345775998|   PAP|2019-02-15 15:52:00|-1345775998|         1|     229.09|         1|
+-----------+------+-------------------+-----------+----------+-----------+----------+
only showing top 5 rows



In [42]:
dataframe_site_loja = dados_loja.join(dados_site, dados_loja.id_pedido == dados_site.id_ped, how = 'full')

In [49]:
dataframe_site_loja.show(5)

+-----------+-------------------+-----------+-----------+----------+-----------+----------+
|  id_pedido|       data_entrega|frete_valor|     id_ped|id_unidade|preco_venda|quantidade|
+-----------+-------------------+-----------+-----------+----------+-----------+----------+
|-2145969532|2019-04-25 16:29:27|      10.71|-2145969532|         8|       59.9|         3|
|-2144499221|2019-05-02 19:08:03|      68.06|-2144499221|         8|      284.0|         1|
|-2114249380|2019-04-29 16:20:00|       77.0|-2114249380|         5|     492.48|         1|
|-2020035985|2019-04-29 15:29:29|      73.87|-2020035985|         1|      559.0|         1|
|-2018039854|2019-04-22 14:02:18|      32.98|-2018039854|         5|      849.0|         1|
+-----------+-------------------+-----------+-----------+----------+-----------+----------+
only showing top 5 rows



In [44]:
dataframe_site_loja.count()

57074

In [45]:
dataframe_site_loja_dis = dataframe_site_loja.distinct()

In [46]:
dataframe_site_loja_dis.count()

57044

In [48]:
dados_contabilidade.filter(dados_contabilidade.Status == 'ENT').show(5)

+-----------+------+-------------------+
|         id|Status|        data_pedido|
+-----------+------+-------------------+
|  283311229|   ENT|2019-02-11 10:16:00|
| -804282547|   ENT|2019-02-11 04:00:00|
|-1528509500|   ENT|2019-02-11 11:42:00|
| -118303931|   ENT|2019-02-11 10:10:00|
|-1507015997|   ENT|2019-02-11 04:00:00|
+-----------+------+-------------------+
only showing top 5 rows



In [51]:
dataframe_frevereiro = dados_contabilidade.filter(dados_contabilidade.Status == 'ENT')

In [52]:
dataframe_frevereiro

DataFrame[id: int, Status: string, data_pedido: timestamp]

In [63]:
dataframe_frevereiro = dataframe_frevereiro.filter(
    dataframe_frevereiro.data_pedido >= '2019-02-01').filter(
    dataframe_frevereiro.data_pedido <= '2019-02-28').filter(
    dataframe_frevereiro.Status == 'ENT')

In [64]:
dataframe_frevereiro.count()

399546

In [72]:
dataframe_groupby = dataframe_site_loja.groupBy(dataframe_site_loja.id_pedido).agg(f.sum(dataframe_site_loja.preco_venda).alias('preco venda agrupado'))

In [73]:
dataframe_groupby.show()

+-----------+--------------------+
|  id_pedido|preco venda agrupado|
+-----------+--------------------+
|-1327358791|                19.9|
| -359035740|              599.91|
| 1045323572|                18.9|
| -933767611|              2851.8|
|-1442837451|              1929.0|
|  690565871|             1899.98|
|-1761271152|               229.9|
| -232933969|              846.21|
|  767216253|               949.0|
| 1116223944|               399.9|
|-1419819680|              2549.0|
|-1765787008|              106.36|
|  619048573|              757.98|
|    8329599|              6236.0|
|  176572296|              1999.0|
|-1375692421|                85.0|
|  847022602|               558.0|
| 1973451197|              1190.0|
|-1955479379|               899.0|
|-1396752534|              165.76|
+-----------+--------------------+
only showing top 20 rows



In [80]:
#cria nova coluna que puxa o maior valor entre dois campos definidos
dataframe_coalesce = dataframe_site_loja.withColumn('nova_coluna', coalesce(dataframe_site_loja.preco_venda, dataframe_site_loja.frete_valor))

In [81]:
dataframe_coalesce.show(3)

+-----------+-------------------+-----------+-----------+----------+-----------+----------+-----------+
|  id_pedido|       data_entrega|frete_valor|     id_ped|id_unidade|preco_venda|quantidade|nova_coluna|
+-----------+-------------------+-----------+-----------+----------+-----------+----------+-----------+
|-2145969532|2019-04-25 16:29:27|      10.71|-2145969532|         8|       59.9|         3|       59.9|
|-2144499221|2019-05-02 19:08:03|      68.06|-2144499221|         8|      284.0|         1|      284.0|
|-2114249380|2019-04-29 16:20:00|       77.0|-2114249380|         5|     492.48|         1|     492.48|
+-----------+-------------------+-----------+-----------+----------+-----------+----------+-----------+
only showing top 3 rows



In [83]:
dataframe_coalesce.select(dataframe_coalesce.preco_venda, dataframe_coalesce.frete_valor, dataframe_coalesce.nova_coluna).show()

+-----------+-----------+-----------+
|preco_venda|frete_valor|nova_coluna|
+-----------+-----------+-----------+
|       59.9|      10.71|       59.9|
|      284.0|      68.06|      284.0|
|     492.48|       77.0|     492.48|
|      559.0|      73.87|      559.0|
|      849.0|      32.98|      849.0|
|      84.34|       0.19|      84.34|
|       49.9|       9.76|       49.9|
|     1159.0|       99.9|     1159.0|
|       16.8|       9.29|       16.8|
|      499.0|       19.9|      499.0|
|      339.0|       64.0|      339.0|
|      158.1|       60.2|      158.1|
|      709.9|      26.61|      709.9|
|      329.9|       32.7|      329.9|
|     849.98|       49.7|     849.98|
|     1319.8|        9.7|     1319.8|
|     1605.5|      49.99|     1605.5|
|       34.8|       30.0|       34.8|
|      559.9|        0.0|      559.9|
|      219.9|       24.0|      219.9|
+-----------+-----------+-----------+
only showing top 20 rows



In [85]:
dataframe_inner

DataFrame[id: int, Status: string, data_pedido: timestamp, id_ped: bigint, id_unidade: string, preco_venda: string, quantidade: string]

In [86]:
dataframe_inner.drop(dataframe_inner.id_ped)

DataFrame[id: int, Status: string, data_pedido: timestamp, id_unidade: string, preco_venda: string, quantidade: string]

In [87]:
dataframe_inner.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- data_pedido: timestamp (nullable = true)
 |-- id_ped: long (nullable = true)
 |-- id_unidade: string (nullable = true)
 |-- preco_venda: string (nullable = true)
 |-- quantidade: string (nullable = true)



In [88]:
dataframe_exclusao = dataframe_inner.drop(dataframe_inner.id_ped)

In [89]:
dataframe_exclusao.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- data_pedido: timestamp (nullable = true)
 |-- id_unidade: string (nullable = true)
 |-- preco_venda: string (nullable = true)
 |-- quantidade: string (nullable = true)



In [90]:
dataframe_nodup = dataframe_frevereiro.dropDuplicates()

In [91]:
dataframe_nodup.count()

374438

In [92]:
dataframe_frevereiro.count()

399546