In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import aggregate,avg,count,sum,col,expr,to_timestamp,year,day,row_number,rand

In [3]:
spark = (
    SparkSession.builder.appName('e-commerce').master('local[*]').getOrCreate()
)
print('spark Version',spark.version)

spark Version 4.0.1


In [4]:
df_customer = (
    spark.read
    .option('header',True)
    .option('inferschema',True)
    .csv('customers.csv')
)
print(df_customer.head())

Row(customer_id='06b8999e2fba1a1fbc88172c00ba8bc7', customer_unique_id='861eff4711a542e4b93843c6dd7febb0', customer_zip_code_prefix=14409, customer_city='franca', customer_state='SP')


In [5]:

orders = (
    spark.read
    .option('header',True)
    .option('inferschema',True)
    .csv('orders.csv')
)
orders.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



## Juniors


In [6]:
orders.show(10)


+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [7]:
orders.count()


99441

In [8]:
orders.select("order_id", "order_status").show()


+--------------------+------------+
|            order_id|order_status|
+--------------------+------------+
|e481f51cbdc54678b...|   delivered|
|53cdb2fc8bc7dce0b...|   delivered|
|47770eb9100c2d0c4...|   delivered|
|949d5b44dbf5de918...|   delivered|
|ad21c59c0840e6cb8...|   delivered|
|a4591c265e18cb1dc...|   delivered|
|136cce7faa42fdb2c...|    invoiced|
|6514b8ad8028c9f2c...|   delivered|
|76c6e866289321a7c...|   delivered|
|e69bfb5eb88e0ed6a...|   delivered|
|e6ce16cb79ec1d90b...|   delivered|
|34513ce0c4fab462a...|   delivered|
|82566a660a982b15f...|   delivered|
|5ff96c15d0b717ac6...|   delivered|
|432aaf21d85167c2c...|   delivered|
|dcb36b511fcac050b...|   delivered|
|403b97836b0c04a62...|   delivered|
|116f0b09343b49556...|   delivered|
|85ce859fd6dc634de...|   delivered|
|83018ec114eee8641...|   delivered|
+--------------------+------------+
only showing top 20 rows


In [9]:
orders.filter(col("order_status") == "delivered").show()


+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [10]:
orders.withColumn("order_purchase_timestamp", to_timestamp("order_purchase_timestamp"))


DataFrame[order_id: string, customer_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp]

In [11]:
orders.orderBy(col("order_purchase_timestamp").desc()).show()


+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|10a045cdf6a5650c2...|a4b417188addbc05b...|    canceled|     2018-10-17 17:30:18|               NULL|                        NULL|                         NULL|          2018-10-30 00:00:00|
|b059ee4de278302d5...|856336203359aa6a6...|    canceled|     2018-10-16 20:16:02|               NULL|                        NULL|                         NULL|          2018-11-12 00:00:00|
|a2ac6dad85cf8af5b...|4c2ec60c29d10c34b...|  

In [12]:
orders.filter(col("order_status") == "delivered").filter(year(col('order_purchase_timestamp'))==2018).show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|   delivered|     2018-08-08 08:38:49|2018-08-08 08:55:23|         2018-08-08 13:50:00|          2018-08-17 18:06:29|          2018-09-04 00:00:00|
|ad21c59c0840e6cb8...|8ab97904e6daea886...|  

In [13]:
orders.dropDuplicates(["order_id"]).count()


99441

In [14]:
orders.groupBy("order_status").count().show()


+------------+-----+
|order_status|count|
+------------+-----+
|     shipped| 1107|
|    canceled|  625|
|    approved|    2|
|    invoiced|  314|
|     created|    5|
|   delivered|96478|
| unavailable|  609|
|  processing|  301|
+------------+-----+



## Intermediates

In [15]:
orders.join(df_customer, "customer_id", "inner").show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+------------------------+--------------------+--------------+
|9ef432eb625129730...|e481f51cbdc54678b...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 

In [16]:
orders.join(df_customer, "customer_id", "left").show()


+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+------------------------+--------------------+--------------+
|9ef432eb625129730...|e481f51cbdc54678b...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 

In [17]:
# Broadcast Join
from pyspark.sql.functions import broadcast
orders.join(broadcast(df_customer), "customer_id").show()


+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+------------------------+--------------------+--------------+
|9ef432eb625129730...|e481f51cbdc54678b...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 

In [18]:
orders.groupBy("customer_id").count().show()


+--------------------+-----+
|         customer_id|count|
+--------------------+-----+
|f54a9f0e6b351c431...|    1|
|2a1dfb647f32f4390...|    1|
|4f28355e5c17a4a42...|    1|
|4632eb5a8f175f6fe...|    1|
|843ff05b30ce4f75b...|    1|
|a4156bb8aff5d6722...|    1|
|1099d033c74a027a7...|    1|
|7a3bd3b37285f0ab2...|    1|
|5fff39f1b59dc4d2f...|    1|
|18f6ca10777417c93...|    1|
|f1e46939e6408b3e6...|    1|
|a8695124db570d100...|    1|
|545b9a267af9ba134...|    1|
|90d7075599361b694...|    1|
|a340ce6c3570e68d4...|    1|
|ce0681e1c3f70e145...|    1|
|ee905ec97794ec6e9...|    1|
|b74ca180d63f9ae04...|    1|
|0de46efc7d10114ac...|    1|
|8bdae6b4ff9bc7f4c...|    1|
+--------------------+-----+
only showing top 20 rows


In [19]:
orders = orders.repartition(10)
orders = orders.coalesce(5)

In [20]:
# repartition = expensive but balanced
# coalesce = cheap but risky

In [21]:
orders.cache()
orders.count()


99441

In [22]:
orders.persist()

DataFrame[order_id: string, customer_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp]

In [23]:
# Window Function (ranking)
# from pyspark.sql.window import Window

# w = Window.partitionBy("customer_id").orderBy("order_date")
# orders.withColumn("rank", row_number().over(w)).show()

In [24]:
orders.fillna({"order_status": "unknown"}).show()


+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|6172acc45ea968ebf...|4b3b525e0cf9bfbd2...|   delivered|     2018-03-30 20:48:54|2018-03-30 21:07:35|         2018-04-03 01:52:51|          2018-04-13 23:26:36|          2018-04-19 00:00:00|
|23799ee0d85541263...|98209b8b1b97a90f4...|     shipped|     2017-05-01 10:23:49|2017-05-01 10:35:20|         2017-05-02 11:50:50|                         NULL|          2017-05-23 00:00:00|
|a118f629bb45d3d29...|113e46e208d4e04bc...|  

In [25]:
orders.dropna().count()


96461

In [26]:
orders.explain(True)

== Parsed Logical Plan ==
Repartition 5, false
+- Repartition 10, true
   +- Relation [order_id#44,customer_id#45,order_status#46,order_purchase_timestamp#47,order_approved_at#48,order_delivered_carrier_date#49,order_delivered_customer_date#50,order_estimated_delivery_date#51] csv

== Analyzed Logical Plan ==
order_id: string, customer_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp
Repartition 5, false
+- Repartition 10, true
   +- Relation [order_id#44,customer_id#45,order_status#46,order_purchase_timestamp#47,order_approved_at#48,order_delivered_carrier_date#49,order_delivered_customer_date#50,order_estimated_delivery_date#51] csv

== Optimized Logical Plan ==
InMemoryRelation [order_id#44, customer_id#45, order_status#46, order_purchase_timestamp#47, order_approved_at#48, order_delivered_carrier_date#49, ord

In [27]:
orders.write.mode("overwrite").parquet("output/orders")


## Advanced / Senior

In [28]:
orders.write.partitionBy("order_status").parquet("warehouse/orders")

In [29]:
spark.read.parquet("warehouse/orders/order_status=delivered").count()


96478

In [30]:
orders.filter(col("order_status") == "delivered").count()


96478

In [33]:
orders.withColumn("salt", rand()).repartition("customer_id", "salt").show()


+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|                salt|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+
|bf210c81ef1a94cbf...|d98fbfa5100d65442...|   delivered|     2017-08-18 17:58:20|2017-08-18 18:24:26|         2017-08-21 21:03:52|          2017-08-25 22:32:58|          2017-09-08 00:00:00| 0.09718532572048677|
|ccb3dbd615aa85a6e...|319a71d1061c0bff9...|   delivered|     2018-06-19 19:59:48|2018-06-19 20:16:21|         2018-06-20 14:16:00|          2018-06-22 1

In [34]:
orders.rdd.getNumPartitions()


5

In [36]:
# ETL Job كامل
raw = spark.read.csv("orders.csv", header=True)
clean = raw.filter(col("order_status").isNotNull())
clean.write.parquet("gold/orders")

In [37]:
gold_orders = spark.read.parquet("gold/orders")
gold_orders.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  