# spark olist

This notebook is used to test some piece of code before start writing the scala application.

You need [spylon-kernel](https://pypi.org/project/spylon-kernel/) to execute the cells.

## Explore the data orders

In [113]:
val df = spark.read.option("header", true).csv("data/olist_orders_dataset.csv")
df.show
df.count

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

df: org.apache.spark.sql.DataFrame = [order_id: string, customer_id: string ... 6 more fields]
res95: Long = 99441


## Delay for delivery with null delivered customer

In [114]:
val df_null = df.filter($"order_delivered_customer_date".isNull)
df_null.show
df_null.count

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|136cce7faa42fdb2c...|ed0271e0b7da060a3...|    invoiced|     2017-04-11 12:22:08|2017-04-13 13:25:17|                        null|                         null|          2017-05-09 00:00:00|
|ee64d42b8cf066f35...|caded193e8e47b836...|     shipped|     2018-06-04 16:44:48|2018-06-05 04:31:18|         2018-06-05 14:32:00|                         null|          2018-06-28 00:00:00|
|0760a852e4e9d89eb...|d2a79636084590b74...|  

df_null: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [order_id: string, customer_id: string ... 6 more fields]
res96: Long = 2965


In [11]:
df.select("order_status").distinct.show

+------------+
|order_status|
+------------+
|     shipped|
|    canceled|
|    approved|
|    invoiced|
|     created|
|   delivered|
| unavailable|
|  processing|
+------------+



Identify the order status where the command is canceled.

I don't compute the delay for canceled and unavailable.

I haven't information of the time when the data is exported except 2018 on kaggle. I will take tha maximum date as the date of export.

In [59]:
val current_time = df.select(to_utc_timestamp($"order_purchase_timestamp","Brazil/East").alias("utc")).agg(max("utc")).collect

current_time: Array[org.apache.spark.sql.Row] = Array([2018-10-17 20:30:18.0])


In [48]:
df.createOrReplaceTempView("orders")

In [227]:
val data = spark.sql(
"""
SELECT *,
       NULL AS delivered_utc, 
       Datediff(To_timestamp("2018-09-03 12:06:57"), order_utc) AS delay
FROM   (SELECT order_id,
               customer_id,
               order_status,
               To_utc_timestamp(order_purchase_timestamp, 'Brazil/East') AS order_utc
        FROM   orders
        WHERE  order_status NOT IN ( 'canceled', 'unavailable' ) AND order_delivered_customer_date IS NULL)
WHERE Datediff(To_timestamp("2018-09-03 12:06:57"), order_utc) >10 
ORDER  BY delay DESC 
""")
data.show
data.count

+--------------------+--------------------+------------+-------------------+-------------+-----+
|            order_id|         customer_id|order_status|          order_utc|delivered_utc|delay|
+--------------------+--------------------+------------+-------------------+-------------+-----+
|2e7a8482f6fb09756...|08c5351a6aca1c158...|     shipped|2016-09-05 00:15:19|         null|  728|
|711b9be9c346d9ecd...|81e4aed5ab4253757...|    invoiced|2016-10-04 16:38:37|         null|  699|
|dd359d3c294458c6d...|5c58d1ea5a893380e...|    invoiced|2016-10-04 16:02:10|         null|  699|
|5cb8558cbb7c0c2f0...|2ff5a6455514da421...|     shipped|2016-10-04 18:02:37|         null|  699|
|a6475bb7a50387e3c...|442d66f0d96f65609...|    invoiced|2016-10-04 19:28:25|         null|  699|
|cda873529ca7ab71f...|76c74aaff2f3f7355...|     shipped|2016-10-05 19:57:30|         null|  698|
|3f913d30288c117e4...|7aae6b74d7e0a2a11...|     shipped|2016-10-05 17:36:55|         null|  698|
|97d2f8fe76f2f253b...|5458c93e

data: org.apache.spark.sql.DataFrame = [order_id: string, customer_id: string ... 4 more fields]
res200: Long = 1735


## Delay for customers delivery with date time

In [116]:
val df_customer = spark.read.option("header", true).csv("data/olist_customers_dataset.csv")
df_customer.show

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                   09790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                   01151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                   08775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

df_customer: org.apache.spark.sql.DataFrame = [customer_id: string, customer_unique_id: string ... 3 more fields]


I need to link the postion of the customers to his timezone.

![Brezil time zone](https://upload.wikimedia.org/wikipedia/commons/thumb/d/d0/Standard_Timezones_of_Brazil.svg/1022px-Standard_Timezones_of_Brazil.svg.png)

I can't use state because some state have two time zone (cf red and green on the right).

So I need to link the city to the timezone.

In [127]:
val df_city = df_customer.select("customer_city").distinct
df_city.show
df_city.count

+--------------------+
|       customer_city|
+--------------------+
|            camacari|
|           arapiraca|
|           itaberaba|
|           igrejinha|
|  aguas de sao pedro|
|            vermelho|
|                pote|
|jijoca de jericoa...|
|            barracao|
|                iepe|
|divino das laranj...|
|divino de sao lou...|
|              bacaxa|
|   redencao da serra|
|       astolfo dutra|
|            itanhaem|
|             brusque|
|           boa vista|
|  cachoeira paulista|
|            guaranta|
+--------------------+
only showing top 20 rows



df_city: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [customer_city: string]
res108: Long = 4119


It is to complicated with the city. I will consider amazonas only in one time zone and I will use state. I will create a csv with the timezone for each state.

|  State | utc offset   | tz |
|---|---| ---|
| MA  | −03:00 | Brazil/East |
|   PI|  −03:00 |  Brazil/East |
|   CE|  −03:00 | Brazil/East |
|  RN |  −03:00 | Brazil/East |
|  PB |  −03:00 | Brazil/East |
|  GO |    −03:00 | Brazil/East |
|  AP |  −03:00 | Brazil/East |
|  DF |    −03:00| Brazil/East |
|  MG |    −03:00| Brazil/East |
|  ES |    −03:00 | Brazil/East |
|  RJ |    −03:00 | Brazil/East |
|  SP |    −03:00 | Brazil/East |
|  PR |    −03:00 | Brazil/East |
|  SC |    −03:00| Brazil/East |
|  RS |    −03:00 | Brazil/East |
|  PA |  −03:00 | Brazil/East |
|  PE | −03:00  | Brazil/East |
|  TO | −03:00  | Brazil/East |
|  AL | −03:00  | Brazil/East |
| SE  | −03:00  | Brazil/East |
| BA  | −03:00  | Brazil/East |
| MT  |  −04:00 | Brazil/West |
|  RO |  −04:00 |Brazil/West |
|   RR |  −04:00 |Brazil/West |
|  AM |  −04:00  |Brazil/West |
| MS  | −04:00  |Brazil/West |
| AC  |    −05:00  |    Brazil/Acre |

In [164]:
val df_state = df_customer.select("customer_state").distinct
df_state.show
df_state.count

+--------------+
|customer_state|
+--------------+
|            SC|
|            RO|
|            PI|
|            AM|
|            RR|
|            GO|
|            TO|
|            MT|
|            SP|
|            ES|
|            PB|
|            RS|
|            MS|
|            AL|
|            MG|
|            PA|
|            BA|
|            SE|
|            PE|
|            CE|
+--------------+
only showing top 20 rows



df_state: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [customer_state: string]
res144: Long = 27


In [132]:
df_customer.createOrReplaceTempView("customer")
df.createOrReplaceTempView("orders")

In [197]:
val df_timezone = spark.read.option("header", true).csv("data/time_zone.csv")
df_timezone.show
df_timezone.distinct.count

+-----+----------+-------------+
|State|utc_offset|           tz|
+-----+----------+-------------+
|   MA|    −03:00| Brazil/East |
|   PI|    −03:00| Brazil/East |
|   CE|    −03:00| Brazil/East |
|   AP|    −03:00| Brazil/East |
|   RN|    −03:00| Brazil/East |
|   PB|    −03:00| Brazil/East |
|   GO|    −03:00| Brazil/East |
|   DF|    −03:00| Brazil/East |
|   MG|    −03:00| Brazil/East |
|   ES|    −03:00| Brazil/East |
|   RJ|    −03:00| Brazil/East |
|   SP|    −03:00| Brazil/East |
|   PR|    −03:00| Brazil/East |
|   SC|    −03:00| Brazil/East |
|   RS|    −03:00| Brazil/East |
|   PA|    −03:00| Brazil/East |
|   PE|    −03:00| Brazil/East |
|   TO|    −03:00| Brazil/East |
|   AL|    −03:00| Brazil/East |
|   SE|    −03:00| Brazil/East |
+-----+----------+-------------+
only showing top 20 rows



df_timezone: org.apache.spark.sql.DataFrame = [State: string, utc_offset: string ... 1 more field]
res177: Long = 27


In [195]:
df_timezone.createOrReplaceTempView("timezone")

In [228]:
val data3 = spark.sql(
"""
SELECT *,
       Datediff(delivered_utc, order_utc) AS delay
FROM   (SELECT o.order_id,
               o.customer_id,
               o.order_status,
               To_utc_timestamp(order_purchase_timestamp, 'Brazil/East') AS order_utc,
               To_utc_timestamp(o.order_delivered_customer_date, t.tz) AS delivered_utc
        FROM   customer c
               JOIN timezone t
                 ON c.customer_state = t.state
               JOIN orders o
                 ON c.customer_id = o.customer_id
        WHERE  o.order_delivered_customer_date IS NOT NULL)
WHERE  Datediff(delivered_utc, order_utc) > 10
ORDER  BY delay DESC 
"""
)
data3.show
data3.count

+--------------------+--------------------+------------+-------------------+-------------------+-----+
|            order_id|         customer_id|order_status|          order_utc|      delivered_utc|delay|
+--------------------+--------------------+------------+-------------------+-------------------+-----+
|1b3190b2dfa9d789e...|d306426abe5fca15e...|   delivered|2018-02-23 17:57:35|2018-09-20 02:24:07|  209|
|ca07593549f1816d2...|75683a92331068e2d...|   delivered|2017-02-22 02:31:27|2017-09-19 17:36:39|  209|
|440d0d17af552815d...|7815125148cfa1e8c...|   delivered|2017-03-08 02:59:51|2017-09-19 18:12:50|  195|
|2fb597c2f772eca01...|217906bc11a32c1e4...|   delivered|2017-03-08 21:09:02|2017-09-19 17:33:17|  195|
|0f4519c5f1c541dde...|1a8a4a30dc2969767...|   delivered|2017-03-09 16:26:57|2017-09-19 17:38:21|  194|
|285ab9426d6982034...|9cf2c3fa2632cee74...|   delivered|2017-03-09 01:47:40|2017-09-19 17:00:04|  194|
|47b40429ed8cce3ae...|cb2caaaead400c973...|   delivered|2018-01-03 11:44:

data3: org.apache.spark.sql.DataFrame = [order_id: string, customer_id: string ... 4 more fields]
res201: Long = 46744


In [215]:
data3.select("customer_id").distinct.count

res192: Long = 46744


## concat both for the final result

In [226]:
val df_final = data.union(data3).sort(desc("delay"))
df_final.show
df_final.count

+--------------------+--------------------+------------+-------------------+-------------+-----+
|            order_id|         customer_id|order_status|          order_utc|delivered_utc|delay|
+--------------------+--------------------+------------+-------------------+-------------+-----+
|2e7a8482f6fb09756...|08c5351a6aca1c158...|     shipped|2016-09-05 00:15:19|         null|  728|
|711b9be9c346d9ecd...|81e4aed5ab4253757...|    invoiced|2016-10-04 16:38:37|         null|  699|
|5cb8558cbb7c0c2f0...|2ff5a6455514da421...|     shipped|2016-10-04 18:02:37|         null|  699|
|a6475bb7a50387e3c...|442d66f0d96f65609...|    invoiced|2016-10-04 19:28:25|         null|  699|
|dd359d3c294458c6d...|5c58d1ea5a893380e...|    invoiced|2016-10-04 16:02:10|         null|  699|
|e04f1da1f48bf2bbf...|0d00d77134cae4c58...|    invoiced|2016-10-05 16:22:20|         null|  698|
|3f913d30288c117e4...|7aae6b74d7e0a2a11...|     shipped|2016-10-05 17:36:55|         null|  698|
|97d2f8fe76f2f253b...|5458c93e

df_final: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [order_id: string, customer_id: string ... 4 more fields]
res199: Long = 48479


## save to csv

In [238]:
df_final.repartition(1).write.mode("Overwrite").option("header", "true").csv("csv")