In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
.appName('integration')\
.getOrCreate()

25/09/16 14:15:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
hdfs_path = '/data/olist/'

In [3]:
customers_df = spark.read.csv(hdfs_path+"olist_customers_dataset.csv", header=True, inferSchema=True)
geo_df = spark.read.csv(hdfs_path+"olist_geolocation_dataset.csv", header=True, inferSchema=True)
items_df = spark.read.csv(hdfs_path+"olist_order_items_dataset.csv", header=True, inferSchema=True)
payments_df = spark.read.csv(hdfs_path+"olist_order_payments_dataset.csv", header=True, inferSchema=True)
reviews_df = spark.read.csv(hdfs_path+"olist_order_reviews_dataset.csv", header=True, inferSchema=True)
orders_df = spark.read.csv(hdfs_path+"olist_orders_dataset.csv", header=True, inferSchema=True)
products_df = spark.read.csv(hdfs_path+"olist_products_dataset.csv", header=True, inferSchema=True)
sellers_df = spark.read.csv(hdfs_path+"olist_sellers_dataset.csv", header=True, inferSchema=True)
category_name_df = spark.read.csv(hdfs_path+"product_category_name_translation.csv", header=True, inferSchema=True)

                                                                                

In [4]:
geo_df.columns

['geolocation_zip_code_prefix',
 'geolocation_lat',
 'geolocation_lng',
 'geolocation_city',
 'geolocation_state']

In [5]:
customers_df.columns

['customer_id',
 'customer_unique_id',
 'customer_zip_code_prefix',
 'customer_city',
 'customer_state']

In [6]:
orders_df.cache()
items_df.cache()
customers_df.cache()

DataFrame[customer_id: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string]

In [7]:
orders_items_joined = orders_df.join(items_df, 'order_id', 'inner')

In [8]:
orders_items_products_df = orders_items_joined.join(products_df, 'product_id', 'inner')

In [9]:
orders_items_products_sellers_df = orders_items_products_df.join(sellers_df, 'seller_id', 'inner') #use broadcast for optimization

In [10]:
full_orders_df = orders_items_products_sellers_df.join(customers_df, 'customer_id', 'inner')

In [11]:
full_orders_df = full_orders_df.join(geo_df, full_orders_df.customer_zip_code_prefix==geo_df.geolocation_zip_code_prefix, 'left') #use broadcast for optimization

In [12]:
full_orders_df.columns

['customer_id',
 'seller_id',
 'product_id',
 'order_id',
 'order_status',
 'order_purchase_timestamp',
 'order_approved_at',
 'order_delivered_carrier_date',
 'order_delivered_customer_date',
 'order_estimated_delivery_date',
 'order_item_id',
 'shipping_limit_date',
 'price',
 'freight_value',
 'product_category_name',
 'product_name_lenght',
 'product_description_lenght',
 'product_photos_qty',
 'product_weight_g',
 'product_length_cm',
 'product_height_cm',
 'product_width_cm',
 'seller_zip_code_prefix',
 'seller_city',
 'seller_state',
 'customer_unique_id',
 'customer_zip_code_prefix',
 'customer_city',
 'customer_state',
 'geolocation_zip_code_prefix',
 'geolocation_lat',
 'geolocation_lng',
 'geolocation_city',
 'geolocation_state']

In [13]:
full_orders_df = full_orders_df.join(reviews_df, 'order_id', 'left') #use broadcast for optimization

In [14]:
full_orders_df = full_orders_df.join(payments_df, 'order_id', 'left')

In [15]:
full_orders_df.cache()

25/09/16 14:16:08 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

In [16]:
from pyspark.sql.functions import *

In [17]:
#total orders per customer
#average review score per seller
#most sold products
#top customers by spending

In [18]:
full_orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [19]:
total_orders_per_customer = full_orders_df.groupBy('customer_id')\
                            .agg(count(when(col('order_status')=='delivered', 1).otherwise(0)).alias('total_orders'))\
                            .orderBy('total_orders', ascending=False)

In [20]:
total_orders_per_customer.show(5)



+--------------------+------------+
|         customer_id|total_orders|
+--------------------+------------+
|351e40989da90e704...|       11427|
|50920f8cd0681fd86...|       10752|
|9b43e2a62de9bab3a...|        8556|
|270c23a11d024a44c...|        8001|
|5c87184371002d49e...|        6876|
+--------------------+------------+
only showing top 5 rows



                                                                                

In [21]:
average_review_per_seller = full_orders_df.groupby('seller_id')\
                            .agg(round(avg('review_score'),2).alias('average_review'))\
                            .orderBy('average_review', ascending=False)

In [22]:
average_review_per_seller.show(5)



+--------------------+--------------+
|           seller_id|average_review|
+--------------------+--------------+
|5d9f3746e112e696f...|           5.0|
|398cb257329ef7af7...|           5.0|
|51e0557bc7b86de13...|           5.0|
|8fb67a334bacec338...|           5.0|
|0b36063d5818f81cc...|           5.0|
+--------------------+--------------+
only showing top 5 rows



                                                                                

In [23]:
most_sold_prodcuts = full_orders_df.groupBy('product_id')\
                        .agg(count('*').alias('total_sold'))\
                        .orderBy('total_sold', ascending=False)

In [24]:
most_sold_prodcuts.show(10)



+--------------------+----------+
|          product_id|total_sold|
+--------------------+----------+
|aca2eb7d00ea1a7b8...|     86740|
|422879e10f4668299...|     81110|
|99a4788cb24856965...|     78775|
|389d119b48cf3043d...|     60248|
|d1c427060a0f73f6b...|     59274|
|368c6c730842d7801...|     58358|
|53759a2ecddad2bb8...|     52654|
|53b36df67ebb7c415...|     52105|
|154e7e31ebfa09220...|     42700|
|3dd2a17168ec895c7...|     40787|
+--------------------+----------+
only showing top 10 rows



                                                                                

In [25]:
top_customers = full_orders_df.groupBy('customer_id')\
                .agg(sum('price').alias('total_amount_spent'))\
                .orderBy('total_amount_spent', ascending=False)

In [26]:
top_customers.show(5)



+--------------------+------------------+
|         customer_id|total_amount_spent|
+--------------------+------------------+
|d3e82ccec3cb5f956...|         6662844.0|
|df55c14d1476a9a34...|         3565657.0|
|fe5113a38e3575c04...|         3293604.0|
|ec5b2ba62e5743423...|         2556120.0|
|63b964e79dee32a35...|         2501664.0|
+--------------------+------------------+
only showing top 5 rows



                                                                                

In [27]:
customer_aov = full_orders_df.groupBy('customer_id')\
               .agg(
                count('order_id').alias('total_orders'),
                sum('price').alias('total_amount'),
                round(avg('price'),2).alias('AOV')
                )\
               .orderBy('total_amount', ascending=False)

In [28]:
customer_aov.show(5)



+--------------------+------------+------------+------+
|         customer_id|total_orders|total_amount|   AOV|
+--------------------+------------+------------+------+
|d3e82ccec3cb5f956...|        6876|   6662844.0| 969.0|
|df55c14d1476a9a34...|         743|   3565657.0|4799.0|
|fe5113a38e3575c04...|        2292|   3293604.0|1437.0|
|ec5b2ba62e5743423...|        1428|   2556120.0|1790.0|
|63b964e79dee32a35...|        6072|   2501664.0| 412.0|
+--------------------+------------+------------+------+
only showing top 5 rows



                                                                                

In [29]:
seller_performance = full_orders_df.groupBy('seller_id')\
                        .agg(
                          count('order_id').alias('total_orders'),
                          avg('review_score').alias('average_review'),
                          sum('price').alias('total_revenue'),
                        )\
                        .orderBy('total_revenue', ascending=False)


In [30]:
seller_performance.show(5)



+--------------------+------------+------------------+--------------------+
|           seller_id|total_orders|    average_review|       total_revenue|
+--------------------+------------+------------------+--------------------+
|4869f7a5dfa277a7d...|      184587| 4.093344031789699| 3.613871731999996E7|
|53243585a1d6dc264...|       54514| 4.118026783071914| 3.429159294999999E7|
|4a3ca9315b744ce9f...|      330661|3.7684451702537354| 3.375957084000011E7|
|7c67e1448b00f6e96...|      233306|3.4183987368185553|3.2282321789999794E7|
|fa1c13f2614d7b5c4...|       87686| 4.378364698845075| 3.013938630999998E7|
+--------------------+------------+------------------+--------------------+
only showing top 5 rows



                                                                                

In [31]:
product_popularity = full_orders_df.groupBy('product_id')\
                        .agg(
                            count('order_id').alias('total_sales'),
                            sum('price').alias('total_revenue'),
                            round(avg('price'), 2).alias('avg_price'),
                            round(stddev('price'), 2).alias('price_volatility'),
                            collect_set('seller_id').alias('unique_sellers')
                            )\
                         .orderBy(desc('total_sales'))   

In [32]:
product_popularity.show(5)

[Stage 80:>                                                         (0 + 4) / 4]

+--------------------+-----------+------------------+---------+----------------+--------------------+
|          product_id|total_sales|     total_revenue|avg_price|price_volatility|      unique_sellers|
+--------------------+-----------+------------------+---------+----------------+--------------------+
|aca2eb7d00ea1a7b8...|      86740|6164630.3000000175|    71.07|            3.17|[955fee9216a65b61...|
|422879e10f4668299...|      81110| 4442791.510000011|    54.77|            4.46|[1f50f920176fa81d...|
|99a4788cb24856965...|      78775| 6921762.710000021|    87.87|            4.08|[4a3ca9315b744ce9...|
|389d119b48cf3043d...|      60248|3280533.1300000115|    54.45|            4.37|[1f50f920176fa81d...|
|d1c427060a0f73f6b...|      59274| 8220103.329999989|   138.68|           16.58|[a1043bafd471dff5...|
+--------------------+-----------+------------------+---------+----------------+--------------------+
only showing top 5 rows



                                                                                

In [33]:
monthly_revenue_oreder = full_orders_df.groupBy(month('order_purchase_timestamp').alias('month'))\
                            .agg(
                                count('order_id').alias('total_orders'),
                                sum('price').alias('total_revenue'),
                                round(avg('price'),2).alias('AOV'),
                                )\
                            .orderBy('month')

In [34]:
monthly_revenue_oreder.show(5)



+-----+------------+--------------------+------+
|month|total_orders|       total_revenue|   AOV|
+-----+------------+--------------------+------+
|    1|     1495580|1.7153290149999994E8|114.69|
|    2|     1551163|1.7878178406999967E8|115.26|
|    3|     1809467|2.1868116843000054E8|120.85|
|    4|     1693860| 2.171569691299998E8| 128.2|
|    5|     1918571| 2.400611519699976E8|125.12|
+-----+------------+--------------------+------+
only showing top 5 rows



                                                                                

In [35]:
customer_retention = full_orders_df.withColumn("order_purchase_timestamp", to_timestamp("order_purchase_timestamp"))\
                        .groupBy('customer_id')\
                        .agg(
                            min('order_purchase_timestamp').alias('first_time'),
                            max('order_purchase_timestamp').alias('last_time'),
                            count('order_id').alias('total_orders'),
                            round(avg('price'),2).alias('avg_price')
                            )\
                         .orderBy('total_orders')

In [36]:
customer_retention.show() ########



+--------------------+-------------------+-------------------+------------+---------+
|         customer_id|         first_time|          last_time|total_orders|avg_price|
+--------------------+-------------------+-------------------+------------+---------+
|02469bb79cf009697...|2018-08-12 13:51:25|2018-08-12 13:51:25|           1|    59.99|
|f33b127cad2e1ad3d...|2017-11-24 13:10:02|2017-11-24 13:10:02|           1|    714.3|
|64553969cd28069b6...|2018-04-05 16:33:16|2018-04-05 16:33:16|           1|     35.9|
|73348a24fb5172a8c...|2017-05-08 00:25:30|2017-05-08 00:25:30|           1|   127.48|
|80ca2676141288b0f...|2017-09-24 06:47:11|2017-09-24 06:47:11|           1|     99.9|
|07e37b9181238afc5...|2018-08-15 14:01:43|2018-08-15 14:01:43|           1|    229.9|
|89586072c05686f65...|2018-05-02 20:35:55|2018-05-02 20:35:55|           1|   259.99|
|1f7d089d663f7a5be...|2018-01-21 14:35:46|2018-01-21 14:35:46|           1|     29.5|
|3e1180884767238f0...|2018-05-09 13:05:32|2018-05-09 1

                                                                                

In [37]:
# spark.stop()

In [42]:
full_orders_df = full_orders_df.withColumn('is_delivered', when(col('order_status')=='delivered', lit(1)).otherwise(lit(0)))\
                               .withColumn('is_cancelled', when(col('order_status')=='canceled', lit(1)).otherwise(lit(0)))

In [43]:
full_orders_df.select('customer_id', 'is_cancelled', 'is_delivered').show(5)

+--------------------+------------+------------+
|         customer_id|is_cancelled|is_delivered|
+--------------------+------------+------------+
|88b0bac2d79ffc975...|           0|           1|
|88b0bac2d79ffc975...|           0|           1|
|88b0bac2d79ffc975...|           0|           1|
|88b0bac2d79ffc975...|           0|           1|
|88b0bac2d79ffc975...|           0|           1|
+--------------------+------------+------------+
only showing top 5 rows



In [47]:
full_orders_df = full_orders_df.withColumn('order_revenue', round(col('price')+col('freight_value'),2))

In [48]:
full_orders_df.select('price', 'order_revenue', 'freight_value').show(5)

+-----+-------------+-------------+
|price|order_revenue|freight_value|
+-----+-------------+-------------+
|28.99|        36.45|         7.46|
|28.99|        36.45|         7.46|
|28.99|        36.45|         7.46|
|28.99|        36.45|         7.46|
|28.99|        36.45|         7.46|
+-----+-------------+-------------+
only showing top 5 rows



In [49]:
customer_aov.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- total_orders: long (nullable = false)
 |-- total_amount: double (nullable = true)
 |-- AOV: double (nullable = true)



In [50]:
customer_segment = customer_aov.withColumn('segment', 
                                            when(col('AOV')>=1200,'High Valued')
                                            .when((col('AOV')<1200)&(col('AOV')>=700), 'Medium Valued')
                                            .otherwise('Low Valued')
                                          )

In [51]:
customer_segment.show(5)



+--------------------+------------+------------+------+-------------+
|         customer_id|total_orders|total_amount|   AOV|      segment|
+--------------------+------------+------------+------+-------------+
|d3e82ccec3cb5f956...|        6876|   6662844.0| 969.0|Medium Valued|
|df55c14d1476a9a34...|         743|   3565657.0|4799.0|  High Valued|
|fe5113a38e3575c04...|        2292|   3293604.0|1437.0|  High Valued|
|ec5b2ba62e5743423...|        1428|   2556120.0|1790.0|  High Valued|
|63b964e79dee32a35...|        6072|   2501664.0| 412.0|   Low Valued|
+--------------------+------------+------------+------+-------------+
only showing top 5 rows



                                                                                

In [53]:
full_orders_df = full_orders_df.join(customer_segment.select('customer_id', 'segment'), 'customer_id', 'left')

In [55]:
full_orders_df.select('customer_id', 'segment').show(5)

                                                                                

+--------------------+----------+
|         customer_id|   segment|
+--------------------+----------+
|88b0bac2d79ffc975...|Low Valued|
|88b0bac2d79ffc975...|Low Valued|
|88b0bac2d79ffc975...|Low Valued|
|88b0bac2d79ffc975...|Low Valued|
|88b0bac2d79ffc975...|Low Valued|
+--------------------+----------+
only showing top 5 rows



In [56]:
# spark.stop()