In [95]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,lit,when,expr,substring,lower,sum,to_date,month,year,count
from pyspark.sql.types import IntegerType, StringType, DateType
import re 

In [2]:
cust_Schema = ('id int , age int, gender string, income float, education string, region string, loyalty_status string, purchase_frequency string, purchase_amount float , product_category string, promotion_usage int, satisfaction_score int')

In [3]:
spark = SparkSession.builder.appName('customer_puschases').master('local[2]').getOrCreate()

In [4]:
custdf = spark.read.format('csv')\
                   .option('header','true')\
                   .schema(cust_Schema)\
                   .load(r'C:\Users\kkafk\IdeaProjects\pycode\scenario based\customer_data.csv')


In [5]:
custdf.show()

+---+---+------+-------+----------+------+--------------+------------------+---------------+----------------+---------------+------------------+
| id|age|gender| income| education|region|loyalty_status|purchase_frequency|purchase_amount|product_category|promotion_usage|satisfaction_score|
+---+---+------+-------+----------+------+--------------+------------------+---------------+----------------+---------------+------------------+
|  1| 27|  Male|40682.0|  Bachelor|  East|          Gold|          frequent|        18249.0|           Books|              0|                 6|
|  2| 29|  Male|15317.0|   Masters|  West|       Regular|              rare|         4557.0|        Clothing|              1|                 6|
|  3| 37|  Male|38849.0|  Bachelor|  West|        Silver|              rare|        11822.0|        Clothing|              0|                 6|
|  4| 30|  Male|11568.0|HighSchool| South|       Regular|          frequent|         4098.0|            Food|              0|     

COL ,WHEN , LIT usecases

In [6]:
custdf.select('*').filter("education == 'NULL'").show()

+---+---+------+-------+---------+------+--------------+------------------+---------------+----------------+---------------+------------------+
| id|age|gender| income|education|region|loyalty_status|purchase_frequency|purchase_amount|product_category|promotion_usage|satisfaction_score|
+---+---+------+-------+---------+------+--------------+------------------+---------------+----------------+---------------+------------------+
| 12| 32|Female|40044.0|     NULL| North|        Silver|              rare|        13608.0|     Electronics|              0|                 5|
| 27| 34|  Male|39287.0|     NULL| North|       Regular|        occasional|        11944.0|            Food|              0|                 6|
| 47| 28|  Male| 6755.0|     NULL| South|       Regular|        occasional|         2394.0|     Electronics|              0|                 6|
| 79| 31|Female|29452.0|     NULL|  West|       Regular|              rare|         9814.0|            Home|              0|            

In [7]:
customer = custdf.distinct()
df = customer.withColumn("education", when(((col("income")>1.0) & (col("income")<100000.0)),lit("College")).otherwise(lit("Bachelors")))


In [8]:
df.filter("education == 'NULL'").show()

+---+---+------+------+---------+------+--------------+------------------+---------------+----------------+---------------+------------------+
| id|age|gender|income|education|region|loyalty_status|purchase_frequency|purchase_amount|product_category|promotion_usage|satisfaction_score|
+---+---+------+------+---------+------+--------------+------------------+---------------+----------------+---------------+------------------+
+---+---+------+------+---------+------+--------------+------------------+---------------+----------------+---------------+------------------+



Remove certain rows based on condition

In [9]:
df2 = df.filter("loyalty_status != 'Regular'").drop()

In [10]:
df2.select("*").filter("loyalty_status == 'Regular'").show()

+---+---+------+------+---------+------+--------------+------------------+---------------+----------------+---------------+------------------+
| id|age|gender|income|education|region|loyalty_status|purchase_frequency|purchase_amount|product_category|promotion_usage|satisfaction_score|
+---+---+------+------+---------+------+--------------+------------------+---------------+----------------+---------------+------------------+
+---+---+------+------+---------+------+--------------+------------------+---------------+----------------+---------------+------------------+



Substrings

In [11]:
subdf = custdf.withColumn("loyalty_subName", substring("loyalty_status",1,3))
subdf.show()

+---+---+------+-------+----------+------+--------------+------------------+---------------+----------------+---------------+------------------+---------------+
| id|age|gender| income| education|region|loyalty_status|purchase_frequency|purchase_amount|product_category|promotion_usage|satisfaction_score|loyalty_subName|
+---+---+------+-------+----------+------+--------------+------------------+---------------+----------------+---------------+------------------+---------------+
|  1| 27|  Male|40682.0|  Bachelor|  East|          Gold|          frequent|        18249.0|           Books|              0|                 6|            Gol|
|  2| 29|  Male|15317.0|   Masters|  West|       Regular|              rare|         4557.0|        Clothing|              1|                 6|            Reg|
|  3| 37|  Male|38849.0|  Bachelor|  West|        Silver|              rare|        11822.0|        Clothing|              0|                 6|            Sil|
|  4| 30|  Male|11568.0|HighSchool

You have a sales dataset with columns transaction_id, product_id, customer_id, date, and amount.

In [80]:
salesschema = "Row_ID int,OrderID string,OrderDate Date, ShipDate date,ShipMode string,CustomerID string,Customer_Name string,Segment string, Country string,City string,   State  string,PostalCode string,Region string,ProductID  string,Category  string,SubCategory  string,ProductName string,Sales float"

In [82]:
sales = spark.read.format('csv')\
                  .option("header","true")\
                  .option("dateFormat","dd/mm/yyyy")\
                  .schema(salesschema)\
                  .load(r'C:\Users\kkafk\IdeaProjects\pycode\scenario based\sales_dataset.csv')

sales.show(5)

+------+--------------+----------+----------+--------------+----------+---------------+---------+-------------+---------------+----------+----------+------+---------------+---------------+-----------+--------------------+--------+
|Row_ID|       OrderID| OrderDate|  ShipDate|      ShipMode|CustomerID|  Customer_Name|  Segment|      Country|           City|     State|PostalCode|Region|      ProductID|       Category|SubCategory|         ProductName|   Sales|
+------+--------------+----------+----------+--------------+----------+---------------+---------+-------------+---------------+----------+----------+------+---------------+---------------+-----------+--------------------+--------+
|     1|CA-2017-152156|2017-01-08|2017-01-11|  Second Class|  CG-12520|    Claire Gute| Consumer|United States|      Henderson|  Kentucky|     42420| South|FUR-BO-10001798|      Furniture|  Bookcases|Bush Somerset Col...|  261.96|
|     2|CA-2017-152156|2017-01-08|2017-01-11|  Second Class|  CG-12520|    C

In [84]:
salesdf = sales.toDF(*[column.replace(' ','') for column in sales.columns])

salesdf1 = sales.toDF(*([re.sub("[^a-zA-Z0-9]","",i) for i in sales.columns]))
salesdf1 = salesdf1.toDF(*[column.lower() for column in salesdf1.columns])


sdf = salesdf1.select("rowid","orderid","customerid","productid","orderdate","sales")
                     
sdf.show(2)

+-----+--------------+----------+---------------+----------+------+
|rowid|       orderid|customerid|      productid| orderdate| sales|
+-----+--------------+----------+---------------+----------+------+
|    1|CA-2017-152156|  CG-12520|FUR-BO-10001798|2017-01-08|261.96|
|    2|CA-2017-152156|  CG-12520|FUR-CH-10000454|2017-01-08|731.94|
+-----+--------------+----------+---------------+----------+------+
only showing top 2 rows



In [93]:
sdf.select("*").filter("productid == 'OFF-EN-10002600'").show()

+-----+--------------+----------+---------------+----------+-----+
|rowid|       orderid|customerid|      productid| orderdate|sales|
+-----+--------------+----------+---------------+----------+-----+
|  115|CA-2015-115259|  RC-19960|OFF-EN-10002600|2015-01-25| 4.72|
| 1007|CA-2016-105312|  MT-17815|OFF-EN-10002600|2016-01-06| 7.08|
| 1428|US-2016-164448|  DK-12835|OFF-EN-10002600|2016-01-31|14.75|
| 3872|CA-2016-132486|  JF-15355|OFF-EN-10002600|2016-01-23| 11.8|
| 5286|CA-2018-105991|  LH-17020|OFF-EN-10002600|2018-01-05|21.24|
| 6023|CA-2015-130869|  CB-12025|OFF-EN-10002600|2015-01-17| 7.08|
| 8296|CA-2017-166380|  EB-13750|OFF-EN-10002600|2017-01-30|26.55|
+-----+--------------+----------+---------------+----------+-----+



In [99]:
transactiondf = sdf.groupBy("productid").agg(sum("sales").alias("total_sales"))
# transactiondf.show()
transactiondf.select("*").sort("total_Sales" ,ascending = False).limit(5)

salesmonthly = sdf.withColumn("month",month("orderdate"))
salesmonthly.groupBy("month").agg(sum("sales").alias("monthlysales"))

df = sdf.groupBy("customerid").agg(count("productid").alias("products")).sort("products",ascending = False).limit(5)
df.show()

+----------+--------+
|customerid|products|
+----------+--------+
|  WB-21850|      35|
|  PP-18955|      34|
|  MA-17560|      34|
|  JL-15835|      33|
|  CK-12205|      32|
+----------+--------+

