In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName('marketing').getOrCreate()

marketingDF = spark.read.option('inferSchema','true').option('header','true')\
    .csv(r"\Users\Admin\Desktop\pyspark\data\ECommerce_consumer behaviour.csv")

marketingDF.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- order_dow: integer (nullable = true)
 |-- order_hour_of_day: integer (nullable = true)
 |-- days_since_prior_order: double (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- add_to_cart_order: integer (nullable = true)
 |-- reordered: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- product_name: string (nullable = true)



In [2]:
#checking for missing values
marketingDF.select([col(c).isNull().alias(c) for c in marketingDF.columns]).show()

marketingDF.select([sum(col(c).isNull().cast('int')).alias(c) for c in marketingDF.columns]).show()

+--------+-------+------------+---------+-----------------+----------------------+----------+-----------------+---------+-------------+----------+------------+
|order_id|user_id|order_number|order_dow|order_hour_of_day|days_since_prior_order|product_id|add_to_cart_order|reordered|department_id|department|product_name|
+--------+-------+------------+---------+-----------------+----------------------+----------+-----------------+---------+-------------+----------+------------+
|   false|  false|       false|    false|            false|                  true|     false|            false|    false|        false|     false|       false|
|   false|  false|       false|    false|            false|                  true|     false|            false|    false|        false|     false|       false|
|   false|  false|       false|    false|            false|                  true|     false|            false|    false|        false|     false|       false|
|   false|  false|       false|    false

In [3]:
marketingDF.drop(marketingDF['days_since_prior_order']).show(5)

+--------+-------+------------+---------+-----------------+----------+-----------------+---------+-------------+----------+------------------+
|order_id|user_id|order_number|order_dow|order_hour_of_day|product_id|add_to_cart_order|reordered|department_id|department|      product_name|
+--------+-------+------------+---------+-----------------+----------+-----------------+---------+-------------+----------+------------------+
| 2425083|  49125|           1|        2|               18|        17|                1|        0|           13|    pantry|baking ingredients|
| 2425083|  49125|           1|        2|               18|        91|                2|        0|           16|dairy eggs|   soy lactosefree|
| 2425083|  49125|           1|        2|               18|        36|                3|        0|           16|dairy eggs|            butter|
| 2425083|  49125|           1|        2|               18|        83|                4|        0|            4|   produce|  fresh vegetables|

In [4]:
#checking for duplicates (output clearly shows presence of duplicates)
marketingDF.select(count("user_id")).show()
marketingDF.select(countDistinct("user_id")).show()
marketingDF = marketingDF.dropDuplicates()
marketingDF.select(count("user_id")).show()


+--------------+
|count(user_id)|
+--------------+
|       2019501|
+--------------+

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                 105273|
+-----------------------+

+--------------+
|count(user_id)|
+--------------+
|       2019501|
+--------------+



In [5]:
#distribution of product categories (counts of product categories sold,sum of products sold)

# using this we can: 1.Get total sales of the supermarket
#                    2.Top & least selling products by revenue
#                    3.Total revenue generated by product catgery(ddepartment)
#                    4. 

marketingDF.groupBy('department_id').count().orderBy("count", ascending=True).show()
marketingDF.groupBy('department').count().orderBy("count",ascending=False).show()
marketingDF.groupBy('product_name').count().orderBy("count",ascending=True).show()



+-------------+------+
|department_id| count|
+-------------+------+
|           10|  2133|
|            2|  2240|
|           21|  4749|
|            8|  6013|
|            5|  9439|
|            6| 16738|
|           18| 25940|
|           11| 28134|
|           12| 44271|
|           14| 44605|
|           17| 46446|
|            9| 54054|
|           20| 65176|
|           15| 66053|
|            3| 72983|
|           13|116262|
|            1|139536|
|            7|168126|
|           19|180692|
|           16|336915|
+-------------+------+
only showing top 20 rows

+---------------+------+
|     department| count|
+---------------+------+
|        produce|588996|
|     dairy eggs|336915|
|         snacks|180692|
|      beverages|168126|
|         frozen|139536|
|         pantry|116262|
|         bakery| 72983|
|   canned goods| 66053|
|           deli| 65176|
|dry goods pasta| 54054|
|      household| 46446|
|      breakfast| 44605|
|   meat seafood| 44271|
|  personal care| 2813

In [None]:
#variation of product sales in a certail hours
marketingDF.groupBy('order_id').count().orderBy('count',ascending=False).show()

+--------+-----+
|order_id|count|
+--------+-----+
|  790903|  137|
| 2621625|  109|
|  416346|  100|
| 1031566|   95|
| 2409933|   93|
| 1930316|   93|
| 2926893|   92|
|   45973|   92|
|  936852|   87|
|   70299|   83|
|  293169|   77|
| 3310628|   76|
| 2869702|   76|
|  810928|   74|
| 1355077|   74|
| 1220886|   72|
| 2749239|   72|
|   28332|   71|
|  346891|   71|
| 1208941|   71|
+--------+-----+
only showing top 20 rows



In [None]:
# product sold/odered at each hour,How does product sales performance vary by hour
marketingDF.createOrReplaceTempView('sales')
sales_per_hour = spark.sql(
    "SELECT product_id,count(order_id),product_name,order_hour_of_day FROM sales GROUP BY order_hour_of_day,product_name,product_id"
    )
sales_per_hour.show()

+----------+---------------+--------------------+-----------------+
|product_id|count(order_id)|        product_name|order_hour_of_day|
+----------+---------------+--------------------+-----------------+
|        58|            216|frozen breads doughs|               13|
|       131|           1321|           dry pasta|               12|
|        45|            843|     candy chocolate|                8|
|         4|            606|       instant foods|                8|
|        49|            533|    packaged poultry|               17|
|        11|            113|    cold flu allergy|               15|
|        25|            134|                soap|               20|
|         8|            173|     bakery desserts|               10|
|        23|            235|       popcorn jerky|                7|
|       117|            892|nuts seeds dried ...|               18|
|        22|            151|           hair care|               15|
|       111|             55|plates bowls cups...

In [None]:
#average number of items in an order
avg_items_per_order = spark.sql(
    "SELECT order_id,avg(add_to_cart_order) AS items_per_order FROM sales group by order_id"
    )
avg_items_per_order.show()

+--------+---------------+
|order_id|items_per_order|
+--------+---------------+
| 3060194|           12.5|
|  596351|           18.0|
| 2229935|           11.0|
| 2527139|            6.5|
|  599057|            4.0|
|  692091|            4.0|
| 1296515|            9.0|
| 1115188|            4.5|
| 2599587|            8.5|
| 3095942|            8.0|
|  930298|           11.5|
| 2664273|            4.0|
| 2448538|            7.0|
| 3261003|            5.5|
|  683541|            4.5|
|  579502|           13.0|
| 1957852|           19.5|
| 2749827|            8.0|
| 1609475|            6.0|
| 1074329|            4.5|
+--------+---------------+
only showing top 20 rows



In [24]:
most_sales_per_dept = spark.sql(
    "SELECT SUM(add_to_cart_order),department_id FROM sales GROUP BY department_id"
    )
most_sales_per_dept.show()

+----------------------+-------------+
|sum(add_to_cart_order)|department_id|
+----------------------+-------------+
|                378104|           12|
|               1263140|            1|
|               1119987|           13|
|               2536790|           16|
|                167096|            6|
|                589153|            3|
|                568239|           20|
|                 48776|            5|
|               1663523|           19|
|                653360|           15|
|                551369|            9|
|                394174|           17|
|               4732268|            4|
|                 45866|            8|
|               1165314|            7|
|                 17995|           10|
|                 43744|           21|
|                248205|           11|
|                412021|           14|
|                 18203|            2|
+----------------------+-------------+
only showing top 20 rows



In [15]:
#customers with most orders(for marketing in form of offering vouchers)
customers_with_most_orders = spark.sql(
    "SELECT user_id,count(order_id) AS total_orders FROM sales GROUP BY user_id ORDER BY total_orders DESC"
    )
customers_with_most_orders.show()

+-------+------------+
|user_id|total_orders|
+-------+------------+
| 176478|         460|
| 129928|         405|
| 126305|         384|
| 201268|         347|
| 115495|         283|
| 100330|         271|
|  31903|         270|
|  15503|         258|
| 105213|         245|
| 203166|         240|
| 193539|         238|
|  17738|         236|
|  60694|         232|
|  97816|         227|
| 175294|         223|
|  96305|         213|
|  11375|         213|
|  33731|         212|
| 180203|         212|
| 167069|         210|
+-------+------------+
only showing top 20 rows

