In [117]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql.functions import *
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .config("spark.jars", "C:\\tools\\spark-3.3.2-bin-hadoop3\\jars\\postgresql-42.6.2.jar")\
    .master("local[*]") \
    .appName('spark-app') \
    .getOrCreate()

In [118]:
spark.version

'3.3.2'

In [135]:
# Write to database postgres on site Neon.tech
def write_to_pg(df, table_name):
    df.write.format("jdbc")\
        .option("url", "jdbc:postgresql://ep-small-salad-a2sbsxpd.eu-central-1.aws.neon.tech/brazilian_ecommerce") \
        .option("driver", "org.postgresql.Driver").option("dbtable", table_name) \
        .option("user", "brazilian_ecommerce_owner").option("password", "xQI5TMXN3COU").save()

In [4]:
df_orders =spark.read.parquet("hdfs://localhost:9000/home/datalake/orders")
df_customers =spark.read.parquet("hdfs://localhost:9000/home/datalake/customers")
df_order_items =spark.read.parquet("hdfs://localhost:9000/home/datalake/order_items")
df_payments =spark.read.parquet("hdfs://localhost:9000/home/datalake/payments")
df_products =spark.read.parquet("hdfs://localhost:9000/home/datalake/products")
df_reviews =spark.read.parquet("hdfs://localhost:9000/home/datalake/reviews")
df_sellers =spark.read.parquet("hdfs://localhost:9000/home/datalake/sellers")
df_geolocation =spark.read.parquet("hdfs://localhost:9000/home/datalake/geolocation")
df_category_name_translation =spark.read.parquet("hdfs://localhost:9000/home/datalake/category_name_translation")

In [5]:
df_orders_2 = df_orders.groupby('order_status').count()
df_orders_2.show()

+------------+-----+
|order_status|count|
+------------+-----+
|     shipped| 1107|
|    canceled|  625|
|    invoiced|  314|
|   delivered|96478|
| unavailable|  609|
|  processing|  301|
|     created|    5|
|    approved|    2|
+------------+-----+



In [126]:
write_to_pg(df_orders_2, 'orders_status')

In [11]:
# Top saling category product

In [12]:
df_order_item_product = df_order_items.join(df_products,'product_id', 'inner')\
                                    .select('product_id', 'product_category_name', 'price',)
df_order_item_product_en = df_order_item_product.join(df_category_name_translation, 'product_category_name', 'inner')\
                                                .drop('product_category_name')

In [33]:
popular_products = df_order_item_product_en.groupby('product_category_name_english')\
                                            .agg(count('product_category_name_english').alias('count'), \
                                                 sum('price').alias('total'))\
                                            .orderBy('count', ascending = False)\
                                            .withColumn('total', round(col('total'),2))

In [34]:
popular_products.show(10)

+-----------------------------+-----+----------+
|product_category_name_english|count|     total|
+-----------------------------+-----+----------+
|               bed_bath_table|11115|1036988.68|
|                health_beauty| 9670|1258681.34|
|               sports_leisure| 8641| 988048.97|
|              furniture_decor| 8334| 729762.49|
|         computers_accesso...| 7827| 911954.32|
|                   housewares| 6964| 632248.66|
|                watches_gifts| 5991|1205005.68|
|                    telephony| 4545| 323667.53|
|                 garden_tools| 4347| 485256.46|
|                         auto| 4235| 592720.11|
+-----------------------------+-----+----------+
only showing top 10 rows



In [127]:
write_to_pg(popular_products, 'popular_products')

In [32]:
# Top seller revenue

In [128]:
df_sellers_orders_detail = df_order_items.join(df_sellers,'seller_id', 'inner')\
                                        .drop('order_item_id', 'shipping_limit_date', 'freight_value', 'seller_zip_code_prefix')

df_sellers_total = df_sellers_orders_detail.groupby('seller_id').agg(sum('price').alias('total'))\
                        .orderBy('total', ascending = False)\
                        .withColumn('total', round(col('total'),2))
df_sellers_total.show()

+--------------------+---------+
|           seller_id|    total|
+--------------------+---------+
|4869f7a5dfa277a7d...|229472.63|
|53243585a1d6dc264...|222776.05|
|4a3ca9315b744ce9f...|200472.92|
|fa1c13f2614d7b5c4...|194042.03|
|7c67e1448b00f6e96...|187923.89|
|7e93a43ef30c4f03f...|176431.87|
|da8622b14eb17ae28...|160236.57|
|7a67c85e85bb2ce85...|141745.53|
|1025f0e2d44d7041d...|138968.55|
|955fee9216a65b617...| 135171.7|
|46dc3b2cc0980fb8e...|128111.19|
|6560211a19b47992c...|123304.83|
|620c87c171fb2a6dd...| 114774.5|
|7d13fca1522535862...|113628.97|
|5dceca129747e92ff...|112155.53|
|1f50f920176fa81da...|106939.21|
|cc419e0650a3c5ba7...|104288.42|
|a1043bafd471dff53...|101901.16|
|3d871de0142ce09b7...|  94914.2|
|edb1ef5e36e0c8cd8...| 79284.55|
+--------------------+---------+
only showing top 20 rows



In [129]:
write_to_pg(df_sellers_total, 'top_sellers_total')

In [26]:
# Best city with quantity customer

In [48]:
df_orders_customer = df_orders.join(df_customers,'customer_id', 'inner')\
                                .select('customer_city', 'customer_state', 'order_id' )\
                                .groupby('customer_city', 'customer_state').agg(count('order_id').alias('order_quantity'))\
                                .orderBy('order_quantity', ascending = False)
df_orders_customer.show()

+--------------------+--------------+--------------+
|       customer_city|customer_state|order_quantity|
+--------------------+--------------+--------------+
|           sao paulo|            SP|         15540|
|      rio de janeiro|            RJ|          6882|
|      belo horizonte|            MG|          2773|
|            brasilia|            DF|          2131|
|            curitiba|            PR|          1521|
|            campinas|            SP|          1444|
|        porto alegre|            RS|          1379|
|            salvador|            BA|          1245|
|           guarulhos|            SP|          1189|
|sao bernardo do c...|            SP|           938|
|             niteroi|            RJ|           849|
|         santo andre|            SP|           796|
|              osasco|            SP|           746|
|              santos|            SP|           713|
|             goiania|            GO|           692|
| sao jose dos campos|            SP|         

In [130]:
write_to_pg(df_orders_customer, 'top_purchased_cities')

In [72]:
# Which product category is most purchased in each city

In [131]:
from pyspark.sql import functions as f

df_orders_customer_items = df_orders.join(df_customers,'customer_id', 'inner')
df_orders_customer_items = df_orders_customer_items.join(df_order_items,'order_id', 'inner')
df_orders_customer_items = df_orders_customer_items.join(df_products,'product_id', 'inner')
df_orders_customer_items = df_orders_customer_items.select('customer_city', 'product_category_name')\
                        .groupby( 'customer_city', 'product_category_name')\
                        .agg(count('product_category_name'))\
                        .orderBy('customer_city', ascending = True)
df_orders_customer_items = df_orders_customer_items.withColumn("product_count", f.array("count(product_category_name)","product_category_name"))
df_orders_customer_items = df_orders_customer_items.groupby("customer_city")\
                        .agg(max("product_count").getItem(1).alias("most_purchased_product"), \
                             max("product_count").getItem(0).alias("count"),)
df_orders_customer_items.show()

+-------------------+----------------------+-----+
|      customer_city|most_purchased_product|count|
+-------------------+----------------------+-----+
|abadia dos dourados|  livros_interesse_...|    1|
|          abadiania|       eletroportateis|    1|
|             abaete|    relogios_presentes|    3|
|         abaetetuba|  informatica_acess...|    5|
|            abaiara|            brinquedos|    1|
|             abaira|             telefonia|    1|
|              abare|    relogios_presentes|    1|
|             abatia|             telefonia|    1|
|      abdon batista|       cama_mesa_banho|    2|
|       abelardo luz|  utilidades_domest...|    2|
|           abrantes|  informatica_acess...|    1|
|         abre campo|  utilidades_domest...|    1|
|       abreu e lima|    ferramentas_jardim|    4|
|            acaiaca|  informatica_acess...|    1|
|         acailandia|    relogios_presentes|    2|
|          acajutiba|             telefonia|    1|
|             acarau|      mala

In [132]:
write_to_pg(df_orders_customer_items, 'most_purchased_product_of_cities')

In [74]:
# top payment type

In [80]:
df_top_payments_type = df_payments.groupby('payment_type').count().alias('count').orderBy('count', ascending = False)
df_top_payments_type.show(10)

+------------+-----+
|payment_type|count|
+------------+-----+
| credit_card|76795|
|      boleto|19784|
|     voucher| 5775|
|  debit_card| 1529|
| not_defined|    3|
+------------+-----+



In [133]:
write_to_pg(df_top_payments_type, 'top_payments_type')

In [134]:
# Orders were not delivered as expected 

In [91]:
df_order_shipping = df_orders.select('order_id','order_delivered_customer_date', 'order_estimated_delivery_date').where(df_orders.order_status == 'delivered')

In [96]:
df_order_shipping.show()

+--------------------+-----------------------------+-----------------------------+
|            order_id|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+-----------------------------+-----------------------------+
|5188f7ae18cdf7538...|          2017-10-13 20:52:37|          2017-10-06 00:00:00|
|084ac8d010dca0cb8...|          2018-02-26 16:13:34|          2018-03-07 00:00:00|
|045b201c094804e3f...|          2018-01-03 21:27:41|          2018-01-12 00:00:00|
|53a1965cb8e1d6cf4...|          2017-07-04 14:34:23|          2017-07-18 00:00:00|
|cd4d00f4cdfc2365a...|          2018-04-23 18:15:28|          2018-04-26 00:00:00|
|23a0c1ef37b1e2599...|          2017-09-19 16:39:58|          2017-08-04 00:00:00|
|3f78a6201286f6b6b...|          2018-04-02 22:50:06|          2018-04-06 00:00:00|
|b1127d37a7b6bdc4f...|          2018-08-22 17:21:39|          2018-08-28 00:00:00|
|9ffb45ae888742578...|          2017-08-29 20:53:04|          2017-09-14 00:00:00|
|639

In [93]:
df_order_delay = df_order_shipping.filter(df_order_shipping.order_delivered_customer_date > df_order_shipping.order_estimated_delivery_date)

In [121]:
df_order_delay= df_order_delay.withColumn('delay_hours', \
                          round(((f.col('order_delivered_customer_date').cast("long") - f.col('order_estimated_delivery_date').cast("long"))/3600), 2))

In [124]:
write_to_pg(df_order_delay, 'orders_delay')