In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=0fbbdcfcbad2e7feb1c24dc859b9c72391194b1f72b35a2f80b2d7abf2b6623c
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

import datetime as dt

In [3]:
ecommerce_session = SparkSession.builder.appName("Ecommerce").getOrCreate()

In [4]:
ecommerce_schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("order_status", StringType(), True),
    StructField("order_products_value", FloatType(), True),
    StructField("order_freight_value", FloatType(), True),
    StructField("order_items_qty", IntegerType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True),
    StructField("customer_zip_code_prefix", IntegerType(), True),
    StructField("product_name_lenght", IntegerType(), True),
    StructField("product_description_lenght", IntegerType(), True),
    StructField("product_photos_qty", IntegerType(), True),
    StructField("review_score", IntegerType(), True),
    StructField("order_purchase_timestamp", StringType(), True),
    StructField("order_aproved_at", StringType(), True),
    StructField("order_delivered_customer_date", StringType(), True)
])

In [37]:
ecommerce_df = ecommerce_session.read.format("csv").option("header", "False").schema(ecommerce_schema).load("olist_public_dataset.csv")
print((ecommerce_df.count(), len(ecommerce_df.columns)))
ecommerce_df.printSchema()

(100000, 15)
root
 |-- id: integer (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_products_value: float (nullable = true)
 |-- order_freight_value: float (nullable = true)
 |-- order_items_qty: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- order_purchase_timestamp: string (nullable = true)
 |-- order_aproved_at: string (nullable = true)
 |-- order_delivered_customer_date: string (nullable = true)



In [38]:
ecommerce_session.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [39]:
ecommerce_df = ecommerce_df.withColumn("order_purchase_as_date",to_date(col("order_purchase_timestamp"),'dd/MM/yy'))
ecommerce_df = ecommerce_df.withColumn("order_approved_at_as_date",to_date(col("order_aproved_at"),'dd/MM/yy'))
ecommerce_df = ecommerce_df.withColumn("order_delivery_customer_date_as_date",to_date(col("order_delivered_customer_date"),'dd/MM/yy'))

In [104]:
ecommerce_df = ecommerce_df.withColumn("order_purchase_as_datetime",to_timestamp(col("order_purchase_timestamp"),'dd/MM/yy HH:mm'))
ecommerce_df = ecommerce_df.withColumn("order_approved_at_as_datetime",to_timestamp(col("order_aproved_at"),'dd/MM/yy HH:mm'))
ecommerce_df = ecommerce_df.withColumn("order_delivery_customer_date_as_datetime",to_timestamp(col("order_delivered_customer_date"),'dd/MM/yy HH:mm'))

+---+------------+--------------------+-------------------+---------------+-------------------+--------------+------------------------+-------------------+--------------------------+------------------+------------+------------------------+----------------+-----------------------------+----------------------+-------------------------+------------------------------------+-----------+-------------+--------------------------+-----------------------------+----------------------------------------+
| id|order_status|order_products_value|order_freight_value|order_items_qty|      customer_city|customer_state|customer_zip_code_prefix|product_name_lenght|product_description_lenght|product_photos_qty|review_score|order_purchase_timestamp|order_aproved_at|order_delivered_customer_date|order_purchase_as_date|order_approved_at_as_date|order_delivery_customer_date_as_date|day_of_week|week_of_month|order_purchase_as_datetime|order_approved_at_as_datetime|order_delivery_customer_date_as_datetime|
+---+-

In [42]:
ecommerce_df.printSchema()
ecommerce_df.show()

root
 |-- id: integer (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_products_value: float (nullable = true)
 |-- order_freight_value: float (nullable = true)
 |-- order_items_qty: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- order_purchase_timestamp: string (nullable = true)
 |-- order_aproved_at: string (nullable = true)
 |-- order_delivered_customer_date: string (nullable = true)
 |-- order_purchase_as_date: date (nullable = true)
 |-- order_approved_at_as_date: date (nullable = true)
 |-- order_delivery_customer_date_as_date: date (nullable = true)



In [43]:
def dayAsString(day_int):
  day_of_week_string = {
    1: "Monday",
    2: "Tuesday",
    3: "Wednesday",
    4: "Thursday",
    5: "Friday",
    6: "Saturday",
    7: "Sunday"
  }
  return day_of_week_string[day_int]

In [44]:
day_as_string_udf = udf(lambda x: dayAsString(x), StringType())

In [45]:
becommerce_df = ecommerce_df.withColumn("day_of_week",day_as_string_udf(dayofweek(col("order_purchase_as_date"))))
ecommerce_df.show()

+---+------------+--------------------+-------------------+---------------+-------------------+--------------+------------------------+-------------------+--------------------------+------------------+------------+------------------------+----------------+-----------------------------+----------------------+-------------------------+------------------------------------+-----------+
| id|order_status|order_products_value|order_freight_value|order_items_qty|      customer_city|customer_state|customer_zip_code_prefix|product_name_lenght|product_description_lenght|product_photos_qty|review_score|order_purchase_timestamp|order_aproved_at|order_delivered_customer_date|order_purchase_as_date|order_approved_at_as_date|order_delivery_customer_date_as_date|day_of_week|
+---+------------+--------------------+-------------------+---------------+-------------------+--------------+------------------------+-------------------+--------------------------+------------------+------------+----------------

In [46]:
ecommerce_df.select("day_of_week").distinct().show()

+-----------+
|day_of_week|
+-----------+
|  Wednesday|
|    Tuesday|
|     Friday|
|   Thursday|
|   Saturday|
|     Monday|
|     Sunday|
+-----------+



In [48]:
def weekOfMonth(date1):
  month = date1.month
  year = date1.year
  month_start = dt.date(year, month, 1)
  if (dt.date(year, 1, 1) == month_start):
    week_of_year_month_start = 1
  else:
    week_of_year_month_start = month_start.isocalendar()[1]
  week_of_year_date1 = date1.isocalendar()[1]
  return week_of_year_date1 - week_of_year_month_start + 1

In [49]:
weekOfMonth_udf = udf(lambda x: weekOfMonth(x), IntegerType())

In [50]:
ecommerce_df = ecommerce_df.withColumn("week_of_month",weekOfMonth_udf(col("order_purchase_as_date")))
ecommerce_df.show()

+---+------------+--------------------+-------------------+---------------+-------------------+--------------+------------------------+-------------------+--------------------------+------------------+------------+------------------------+----------------+-----------------------------+----------------------+-------------------------+------------------------------------+-----------+-------------+
| id|order_status|order_products_value|order_freight_value|order_items_qty|      customer_city|customer_state|customer_zip_code_prefix|product_name_lenght|product_description_lenght|product_photos_qty|review_score|order_purchase_timestamp|order_aproved_at|order_delivered_customer_date|order_purchase_as_date|order_approved_at_as_date|order_delivery_customer_date_as_date|day_of_week|week_of_month|
+---+------------+--------------------+-------------------+---------------+-------------------+--------------+------------------------+-------------------+--------------------------+------------------+-

In [52]:
ecommerce_df.select("week_of_month").distinct().show()

+-------------+
|week_of_month|
+-------------+
|            1|
|            6|
|            3|
|            5|
|            4|
|            2|
+-------------+



In [53]:
ecommerce_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_products_value: float (nullable = true)
 |-- order_freight_value: float (nullable = true)
 |-- order_items_qty: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- order_purchase_timestamp: string (nullable = true)
 |-- order_aproved_at: string (nullable = true)
 |-- order_delivered_customer_date: string (nullable = true)
 |-- order_purchase_as_date: date (nullable = true)
 |-- order_approved_at_as_date: date (nullable = true)
 |-- order_delivery_customer_date_as_date: date (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- week_of_month: integer (nu

In [72]:
ecommerce_datasets = {}

# Insight on Sales

In [73]:
total_sales_df = ecommerce_df.agg(round(sum("order_products_value"),2).alias("total_sales"))
total_sales_df.show()

ecommerce_datasets["total_sales"] = total_sales_df

+-------------+
|  total_sales|
+-------------+
|1.284147698E7|
+-------------+



## Sales by Day

In [74]:
total_sales_by_day_df = ecommerce_df.groupBy("day_of_week").agg(round(sum("order_products_value"),2).alias("total_sales")).orderBy("total_sales")
total_sales_by_day_df.show()

ecommerce_datasets["total_sales_by_day"] = total_sales_by_day_df

+-----------+-----------+
|day_of_week|total_sales|
+-----------+-----------+
|     Sunday| 1406302.11|
|     Monday| 1530648.75|
|   Saturday| 1811400.14|
|     Friday| 1922774.44|
|   Thursday| 1989730.72|
|  Wednesday| 2077208.79|
|    Tuesday| 2103412.03|
+-----------+-----------+



In [75]:
total_sales_by_day_and_city_df = ecommerce_df.groupBy("day_of_week","customer_city").agg(round(sum("order_products_value"),2).alias("total_sales")).orderBy("customer_city")
total_sales_by_day_and_city_df.show()

ecommerce_datasets["total_sales_by_day_and_city"] = total_sales_by_day_and_city_df

+-----------+--------------------+-----------+
|day_of_week|       customer_city|total_sales|
+-----------+--------------------+-----------+
|     Friday|ALMIRANTE TAMANDA...|       49.9|
|    Tuesday|ALMIRANTE TAMANDA...|      99.99|
|     Sunday|ALTA FLORESTA D'O...|     708.99|
|    Tuesday|ALTO ALEGRE DOS P...|      299.0|
|     Sunday|ALTO ALEGRE DOS P...|      299.0|
|     Monday|ALTO ALEGRE DOS P...|     314.99|
|     Friday|    ALVORADA D'OESTE|      328.0|
|    Tuesday|    ALVORADA D'OESTE|     359.98|
|   Saturday| Abadia dos Dourados|       39.9|
|    Tuesday| Abadia dos Dourados|      319.0|
|    Tuesday|           Abadiania|       68.9|
|     Friday|           Abadiania|     949.99|
|   Thursday|              Abaete|     398.79|
|    Tuesday|              Abaete|      449.0|
|  Wednesday|              Abaete|      321.6|
|     Friday|              Abaete|      56.99|
|  Wednesday|          Abaetetuba|     435.41|
|     Monday|          Abaetetuba|     115.99|
|   Thursday|

In [76]:
total_sales_by_day_and_state_df = ecommerce_df.groupBy("day_of_week","customer_state").agg(round(sum("order_products_value"),2).alias("total_sales")).orderBy("customer_state")
total_sales_by_day_and_state_df.show()

ecommerce_datasets["total_sales_by_day_and_state"] = total_sales_by_day_and_state_df

+-----------+--------------+-----------+
|day_of_week|customer_state|total_sales|
+-----------+--------------+-----------+
|   Thursday|            AC|    2862.77|
|     Monday|            AC|     627.57|
|     Sunday|            AC|     853.49|
|     Friday|            AC|    2542.04|
|   Saturday|            AC|    3987.15|
|  Wednesday|            AC|    3478.97|
|    Tuesday|            AC|    2548.83|
|   Saturday|            AL|    7661.65|
|     Sunday|            AL|    8837.77|
|     Monday|            AL|   13089.56|
|    Tuesday|            AL|   14769.35|
|  Wednesday|            AL|   10002.48|
|     Friday|            AL|   11160.45|
|   Thursday|            AL|   12779.28|
|     Sunday|            AM|    2519.56|
|     Friday|            AM|    2063.87|
|  Wednesday|            AM|    2145.55|
|   Saturday|            AM|    3192.41|
|     Monday|            AM|     3949.5|
|   Thursday|            AM|    2505.38|
+-----------+--------------+-----------+
only showing top

## Sales by Week

In [77]:
total_sales_by_week_df = ecommerce_df.groupBy("week_of_month").agg(round(sum("order_products_value"),2).alias("total_sales")).orderBy("week_of_month")
total_sales_by_week_df.show()

ecommerce_datasets["total_sales_by_week"] = total_sales_by_week_df

+-------------+-----------+
|week_of_month|total_sales|
+-------------+-----------+
|            1| 1542155.29|
|            2| 3089498.12|
|            3|  2957336.5|
|            4| 3027674.28|
|            5| 2058842.95|
|            6|  165969.84|
+-------------+-----------+



In [78]:
total_sales_by_week_and_city_df = ecommerce_df.groupBy("week_of_month","customer_city").agg(round(sum("order_products_value"),2).alias("total_sales")).orderBy("customer_city")
total_sales_by_week_and_city_df.show()

ecommerce_datasets["total_sales_by_week_and_city"] = total_sales_by_week_and_city_df

+-------------+--------------------+-----------+
|week_of_month|       customer_city|total_sales|
+-------------+--------------------+-----------+
|            2|ALMIRANTE TAMANDA...|      99.99|
|            5|ALMIRANTE TAMANDA...|       49.9|
|            4|ALTA FLORESTA D'O...|     349.99|
|            1|ALTA FLORESTA D'O...|      359.0|
|            3|ALTO ALEGRE DOS P...|      299.0|
|            4|ALTO ALEGRE DOS P...|     613.99|
|            1|    ALVORADA D'OESTE|      328.0|
|            5|    ALVORADA D'OESTE|     359.98|
|            3| Abadia dos Dourados|      358.9|
|            1|           Abadiania|     949.99|
|            5|           Abadiania|       68.9|
|            5|              Abaete|      515.7|
|            1|              Abaete|      69.99|
|            4|              Abaete|     176.89|
|            3|              Abaete|      208.9|
|            2|              Abaete|      254.9|
|            1|          Abaetetuba|     134.99|
|            4|     

In [79]:
total_sales_by_week_and_state_df = ecommerce_df.groupBy("week_of_month","customer_state").agg(round(sum("order_products_value"),2).alias("total_sales")).orderBy("customer_state")
total_sales_by_week_and_state_df.show()

ecommerce_datasets["total_sales_by_week_and_state"] = total_sales_by_week_and_state_df

+-------------+--------------+-----------+
|week_of_month|customer_state|total_sales|
+-------------+--------------+-----------+
|            1|            AC|     3787.2|
|            4|            AC|    2092.61|
|            5|            AC|    4023.55|
|            3|            AC|    4034.82|
|            6|            AC|      699.0|
|            2|            AC|    2263.64|
|            6|            AL|    1057.29|
|            4|            AL|   18700.29|
|            1|            AL|    7343.29|
|            3|            AL|   17451.02|
|            2|            AL|   19961.02|
|            5|            AL|   13787.63|
|            4|            AM|    5775.42|
|            3|            AM|    5308.23|
|            5|            AM|     1554.8|
|            2|            AM|    6492.69|
|            6|            AM|      255.4|
|            1|            AM|    2789.03|
|            2|            AP|    5580.18|
|            5|            AP|    2929.67|
+----------

# Insights on Orders

In [80]:
total_order_df = ecommerce_df.agg(count("id").alias("total_orders"))
total_order_df.show()

ecommerce_datasets["total_orders"] = total_order_df

+------------+
|total_orders|
+------------+
|      100000|
+------------+



## Orders by Day

In [81]:
total_order_by_day_df = ecommerce_df.groupBy("day_of_week").agg(count("id").alias("total_orders")).orderBy("total_orders")
total_order_by_day_df.show()

ecommerce_datasets["total_orders_by_day"] = total_order_by_day_df

+-----------+------------+
|day_of_week|total_orders|
+-----------+------------+
|     Sunday|       10944|
|     Monday|       12034|
|   Saturday|       14199|
|     Friday|       14857|
|   Thursday|       15634|
|  Wednesday|       16045|
|    Tuesday|       16287|
+-----------+------------+



In [82]:
total_order_by_day_and_city_df = ecommerce_df.groupBy("day_of_week","customer_city").agg(count("id").alias("total_orders")).orderBy("customer_city")
total_order_by_day_and_city_df.show()

ecommerce_datasets["total_orders_by_day_and_city"] = total_order_by_day_and_city_df

+-----------+--------------------+------------+
|day_of_week|       customer_city|total_orders|
+-----------+--------------------+------------+
|     Friday|ALMIRANTE TAMANDA...|           1|
|    Tuesday|ALMIRANTE TAMANDA...|           1|
|     Sunday|ALTA FLORESTA D'O...|           2|
|    Tuesday|ALTO ALEGRE DOS P...|           1|
|     Sunday|ALTO ALEGRE DOS P...|           1|
|     Monday|ALTO ALEGRE DOS P...|           1|
|     Friday|    ALVORADA D'OESTE|           1|
|    Tuesday|    ALVORADA D'OESTE|           1|
|   Saturday| Abadia dos Dourados|           1|
|    Tuesday| Abadia dos Dourados|           2|
|    Tuesday|           Abadiania|           1|
|     Friday|           Abadiania|           1|
|   Thursday|              Abaete|           3|
|    Tuesday|              Abaete|           1|
|  Wednesday|              Abaete|           3|
|     Friday|              Abaete|           1|
|  Wednesday|          Abaetetuba|           3|
|     Monday|          Abaetetuba|      

In [83]:
total_order_by_day_and_state_df = ecommerce_df.groupBy("day_of_week","customer_state").agg(count("id").alias("total_orders")).orderBy("customer_state")
total_order_by_day_and_state_df.show()

ecommerce_datasets["total_orders_by_day_and_state"] = total_order_by_day_and_state_df

+-----------+--------------+------------+
|day_of_week|customer_state|total_orders|
+-----------+--------------+------------+
|   Thursday|            AC|          10|
|     Monday|            AC|           7|
|     Sunday|            AC|          11|
|     Friday|            AC|          15|
|   Saturday|            AC|          16|
|  Wednesday|            AC|          13|
|    Tuesday|            AC|          12|
|   Saturday|            AL|          53|
|     Sunday|            AL|          54|
|     Monday|            AL|          59|
|    Tuesday|            AL|          80|
|  Wednesday|            AL|          52|
|     Friday|            AL|          71|
|   Thursday|            AL|          65|
|     Sunday|            AM|          21|
|     Friday|            AM|          12|
|  Wednesday|            AM|          22|
|   Saturday|            AM|          21|
|     Monday|            AM|          20|
|   Thursday|            AM|          28|
+-----------+--------------+------

## Orders by Week

In [84]:
total_order_by_week_df = ecommerce_df.groupBy("week_of_month").agg(count("id").alias("total_orders")).orderBy("week_of_month")
total_order_by_week_df.show()

ecommerce_datasets["total_orders_by_week"] = total_order_by_week_df

+-------------+------------+
|week_of_month|total_orders|
+-------------+------------+
|            1|       12202|
|            2|       23676|
|            3|       23255|
|            4|       23392|
|            5|       16160|
|            6|        1315|
+-------------+------------+



In [85]:
total_order_by_week_and_city_df = ecommerce_df.groupBy("week_of_month","customer_city").agg(count("id").alias("total_orders")).orderBy("customer_city")
total_order_by_week_and_city_df.show()

ecommerce_datasets["total_orders_by_week_and_city"] = total_order_by_week_and_city_df

+-------------+--------------------+------------+
|week_of_month|       customer_city|total_orders|
+-------------+--------------------+------------+
|            2|ALMIRANTE TAMANDA...|           1|
|            5|ALMIRANTE TAMANDA...|           1|
|            4|ALTA FLORESTA D'O...|           1|
|            1|ALTA FLORESTA D'O...|           1|
|            3|ALTO ALEGRE DOS P...|           1|
|            4|ALTO ALEGRE DOS P...|           2|
|            1|    ALVORADA D'OESTE|           1|
|            5|    ALVORADA D'OESTE|           1|
|            3| Abadia dos Dourados|           3|
|            1|           Abadiania|           1|
|            5|           Abadiania|           1|
|            5|              Abaete|           3|
|            1|              Abaete|           1|
|            4|              Abaete|           2|
|            3|              Abaete|           1|
|            2|              Abaete|           1|
|            1|          Abaetetuba|           1|


In [86]:
total_order_by_week_and_state_df = ecommerce_df.groupBy("week_of_month","customer_state").agg(count("id").alias("total_orders")).orderBy("customer_state")
total_order_by_week_and_state_df.show()

ecommerce_datasets["total_orders_by_week_and_state"] = total_order_by_week_and_state_df

+-------------+--------------+------------+
|week_of_month|customer_state|total_orders|
+-------------+--------------+------------+
|            1|            AC|          15|
|            4|            AC|          20|
|            5|            AC|          15|
|            3|            AC|          20|
|            6|            AC|           1|
|            2|            AC|          13|
|            6|            AL|           4|
|            4|            AL|         104|
|            1|            AL|          52|
|            3|            AL|         113|
|            2|            AL|          89|
|            5|            AL|          72|
|            4|            AM|          41|
|            3|            AM|          30|
|            5|            AM|          18|
|            2|            AM|          41|
|            6|            AM|           2|
|            1|            AM|          22|
|            2|            AP|          25|
|            5|            AP|  

# Average of Misc. Columns

## By Day

In [99]:
average_review_score_by_day_df = ecommerce_df.groupBy("day_of_week").agg(round(avg("review_score"),2).alias("average_review_score")).orderBy("average_review_score")
average_review_score_by_day_df.show()

ecommerce_datasets["average_review_score_by_day"] = average_review_score_by_day_df

+-----------+--------------------+
|day_of_week|average_review_score|
+-----------+--------------------+
|    Tuesday|                4.04|
|     Friday|                4.04|
|  Wednesday|                4.05|
|   Saturday|                4.05|
|     Sunday|                4.05|
|   Thursday|                4.06|
|     Monday|                4.06|
+-----------+--------------------+



In [101]:
average_freight_value_by_day_df = ecommerce_df.groupBy("day_of_week").agg(round(avg("order_freight_value"),2).alias("average_freight_value")).orderBy("average_freight_value")
average_freight_value_by_day_df.show()

ecommerce_datasets["average_freight_value_by_day"] = average_freight_value_by_day_df

+-----------+---------------------+
|day_of_week|average_freight_value|
+-----------+---------------------+
|     Monday|                21.49|
|    Tuesday|                 21.5|
|  Wednesday|                 21.7|
|   Saturday|                21.79|
|   Thursday|                21.85|
|     Friday|                21.92|
|     Sunday|                21.98|
+-----------+---------------------+



In [115]:
ecommerce_df = ecommerce_df.withColumn("time_to_approve_order",col("order_approved_at_as_datetime").cast("long") - col('order_purchase_as_datetime').cast("long"))
ecommerce_df = ecommerce_df.withColumn("time_to_deliver_order",col("order_delivery_customer_date_as_datetime").cast("long") - col('order_purchase_as_datetime').cast("long"))
ecommerce_df.show()

+---+------------+--------------------+-------------------+---------------+-------------------+--------------+------------------------+-------------------+--------------------------+------------------+------------+------------------------+----------------+-----------------------------+----------------------+-------------------------+------------------------------------+-----------+-------------+--------------------------+-----------------------------+----------------------------------------+---------------------+---------------------+
| id|order_status|order_products_value|order_freight_value|order_items_qty|      customer_city|customer_state|customer_zip_code_prefix|product_name_lenght|product_description_lenght|product_photos_qty|review_score|order_purchase_timestamp|order_aproved_at|order_delivered_customer_date|order_purchase_as_date|order_approved_at_as_date|order_delivery_customer_date_as_date|day_of_week|week_of_month|order_purchase_as_datetime|order_approved_at_as_datetime|orde

In [117]:
average_time_to_approve_order_by_day_df = ecommerce_df.groupBy("day_of_week").agg(round(avg("time_to_approve_order"),2).alias("average_time_to_approve_order")).orderBy("average_time_to_approve_order")
average_time_to_approve_order_by_day_df.show()

ecommerce_datasets["average_time_to_approve_order_by_day"] = average_time_to_approve_order_by_day_df

+-----------+-----------------------------+
|day_of_week|average_time_to_approve_order|
+-----------+-----------------------------+
|   Thursday|                     31410.68|
|     Friday|                     31577.61|
|  Wednesday|                     32383.93|
|    Tuesday|                     33473.55|
|     Monday|                     38871.57|
|   Saturday|                     48351.09|
|     Sunday|                     52226.07|
+-----------+-----------------------------+



In [116]:
average_time_to_deliver_order_by_day_df = ecommerce_df.groupBy("day_of_week").agg(round(avg("time_to_deliver_order"),2).alias("average_time_to_approve_order")).orderBy("average_time_to_approve_order")
average_time_to_deliver_order_by_day_df.show()

ecommerce_datasets["average_time_to_deliver_order_by_day"] = average_time_to_deliver_order_by_day_df

+-----------+-----------------------------+
|day_of_week|average_time_to_approve_order|
+-----------+-----------------------------+
|     Monday|                   1033518.33|
|    Tuesday|                   1036488.73|
|  Wednesday|                   1043121.45|
|   Thursday|                   1075211.61|
|     Friday|                   1101193.49|
|     Sunday|                   1154172.92|
|   Saturday|                   1172419.75|
+-----------+-----------------------------+



## By Week

In [113]:
average_review_score_by_week_df = ecommerce_df.groupBy("week_of_month").agg(round(avg("review_score"),2).alias("average_review_score")).orderBy("average_review_score")
average_review_score_by_week_df.show()

ecommerce_datasets["average_review_score_by_week"] = average_review_score_by_week_df

+-------------+--------------------+
|week_of_month|average_review_score|
+-------------+--------------------+
|            3|                4.04|
|            5|                4.04|
|            4|                4.05|
|            2|                4.06|
|            1|                4.07|
|            6|                4.07|
+-------------+--------------------+



In [114]:
average_freight_value_by_week_df = ecommerce_df.groupBy("week_of_month").agg(round(avg("order_freight_value"),2).alias("average_freight_value")).orderBy("average_freight_value")
average_freight_value_by_week_df.show()

ecommerce_datasets["average_freight_value_by_week"] = average_freight_value_by_week_df

+-------------+---------------------+
|week_of_month|average_freight_value|
+-------------+---------------------+
|            6|                21.43|
|            3|                21.62|
|            5|                21.71|
|            2|                21.78|
|            1|                21.82|
|            4|                21.82|
+-------------+---------------------+



In [118]:
average_time_to_approve_order_by_week_df = ecommerce_df.groupBy("week_of_month").agg(round(avg("time_to_approve_order"),2).alias("average_time_to_approve_order")).orderBy("average_time_to_approve_order")
average_time_to_approve_order_by_week_df.show()

ecommerce_datasets["average_time_to_approve_order_by_week"] = average_time_to_approve_order_by_week_df

+-------------+-----------------------------+
|week_of_month|average_time_to_approve_order|
+-------------+-----------------------------+
|            3|                     34514.17|
|            2|                     37290.22|
|            5|                     37842.18|
|            6|                     37842.89|
|            4|                     39152.78|
|            1|                      40009.4|
+-------------+-----------------------------+



In [119]:
average_time_to_deliver_order_by_week_df = ecommerce_df.groupBy("week_of_month").agg(round(avg("time_to_deliver_order"),2).alias("average_time_to_deliver_order")).orderBy("average_time_to_deliver_order")
average_time_to_deliver_order_by_week_df.show()

ecommerce_datasets["average_time_to_deliver_order_by_week"] = average_time_to_deliver_order_by_week_df

+-------------+-----------------------------+
|week_of_month|average_time_to_deliver_order|
+-------------+-----------------------------+
|            6|                    812527.32|
|            5|                   1073336.96|
|            3|                   1082696.05|
|            2|                   1089760.94|
|            4|                   1095327.44|
|            1|                   1105146.08|
+-------------+-----------------------------+



#Extra Look at Freight Costs

In [122]:
total_freight_charges_df = ecommerce_df.agg(round(sum("order_freight_value"),2).alias("total_freight_charges"))
total_freight_charges_df.show()

ecommerce_datasets["total_freight_charges"] = total_freight_charges_df

+---------------------+
|total_freight_charges|
+---------------------+
|           2174127.92|
+---------------------+



In [131]:
average_freight_charges_by_city_df = ecommerce_df.groupBy("customer_city").agg(round(avg("order_freight_value"),2).alias("average_freight_charges"))
average_freight_charges_by_city_df.show()

ecommerce_datasets["average_freight_charges_by_city"] = average_freight_charges_by_city_df

+-----------------+-----------------------+
|    customer_city|average_freight_charges|
+-----------------+-----------------------+
|         Araruama|                   24.3|
|         Guidoval|                  17.61|
|      Piranguinho|                  16.18|
|      Tres Pontas|                  22.49|
| Senador Guiomard|                  68.55|
|         Rio Novo|                  15.56|
|        Carrancas|                  16.09|
|        Fronteira|                  20.64|
|           Utinga|                  26.07|
|     Assis Brasil|                  24.84|
|           Pianco|                  65.21|
|         Macaubas|                  45.09|
|       Livramento|                  25.47|
|     Cristalandia|                  48.81|
|Tiradentes do Sul|                  28.99|
|            Apodi|                  37.52|
|Porto dos Gauchos|                   34.0|
|         Machados|                  42.51|
|     Rio do Campo|                  25.14|
|       Purilandia|             

In [134]:
average_freight_charges_by_state_df = ecommerce_df.groupBy("customer_state").agg(round(avg("order_freight_value"),2).alias("average_freight_charges"))
average_freight_charges_by_state_df.show()

ecommerce_datasets["average_freight_charges_by_state"] = average_freight_charges_by_state_df

+--------------+-----------------------+
|customer_state|average_freight_charges|
+--------------+-----------------------+
|            SC|                  23.25|
|            RO|                  51.16|
|            PI|                  41.18|
|            AM|                  36.32|
|            RR|                  46.53|
|            GO|                  24.44|
|            TO|                  38.92|
|            MT|                   29.8|
|            SP|                  16.63|
|            ES|                  23.97|
|            PB|                  41.15|
|            RS|                  23.64|
|            MS|                  24.97|
|            AL|                  37.71|
|            MG|                  22.02|
|            PA|                  37.95|
|            BA|                  29.07|
|            SE|                  39.24|
|            PE|                  34.65|
|            CE|                  35.23|
+--------------+-----------------------+
only showing top

# Exporting Data

In [139]:
for i in ecommerce_datasets:
  ecommerce_datasets[i].write.format("csv").option("header", "True").save('{}'.format(i,'r'))
  ecommerce_datasets[i].write.format("csv").option("header", "True").save('{}'.format(i,'r'))

  config = {
  'endpoint': 'https://cosmosdb-xg-sql.documents.azure.com:443/',
  'masterkey': 'WVkKMa27deTM6hH1emUN68KJMSeJmvi1ZJpYSOQxaSZ6qFNURAaNyvOjutbwYizQFfaSXKSWlVmIACDbcbmCdA==',
  'database': 'synapselinkdb',
  'collection': i,
  }
  ecommerce_datasets[i].write.format("com.microsoft.azure.cosmosdb.spark").options(**config).mode('append').save()