In [153]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql.functions import col, to_date,date_format, to_timestamp
from pyspark.sql.types import StructType, TimestampType, StringType, StructField, StringType, IntegerType, BooleanType, DateType,DoubleType

In [32]:
spark = SparkSession.builder.master("local").appName("Retail Analytics").getOrCreate()

In [33]:
spark

In [34]:
BASE_PATH = os.getcwd()
BASE_PATH

'/home/saurabh/Desktop/SparkStudy/Spark60Days'

In [184]:
schema = StructType([
    StructField("Transaction_ID", DoubleType(), True),
    StructField("Customer_ID", DoubleType(), True),
    StructField("Name", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("Phone", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Zipcode", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("Age", DoubleType(), True),
    StructField("Gender", StringType(), True),
    StructField("Income", StringType(), True),
    StructField("Customer_Segment", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("Year", DoubleType(), True),
    StructField("Month", StringType(), True),
    StructField("Time", StringType(), True),
    StructField("Total_Purchases", DoubleType(), True),
    StructField("Amount", DoubleType(), True),
    StructField("Total_Amount", DoubleType(), True),
    StructField("Product_Category", StringType(), True),
    StructField("Product_Brand", StringType(), True),
    StructField("Product_Type", StringType(), True),
    StructField("Feedback", StringType(), True),
    StructField("Shipping_Method", StringType(), True),
    StructField("Payment_Method", StringType(), True),
    StructField("Order_Status", StringType(), True),
    StructField("Ratings", DoubleType(), True),
    StructField("products", StringType(), True),
])

In [185]:
retail = spark.read.format("csv") \
    .option("header", "true") \
    .option("dateFormat", "%d/%m/%y") \
    .schema(schema) \
    .load(f"file://{BASE_PATH}/data/new_retail_data.csv")

In [186]:
retail = retail.withColumn("Date", to_date(col("Date"), "M/d/yyyy")) \
    .withColumn("Time", date_format(to_timestamp(col("Time"), "H:mm:ss"), "H:mm:ss"))

In [187]:
retail.show(10)

+--------------+-----------+-------------------+-------------------+------------+--------------------+----------+---------------+-------+---------+----+------+------+----------------+----------+------+---------+--------+---------------+-----------+------------+----------------+-------------+------------+---------+---------------+--------------+------------+-------+-----------------+
|Transaction_ID|Customer_ID|               Name|              Email|       Phone|             Address|      City|          State|Zipcode|  Country| Age|Gender|Income|Customer_Segment|      Date|  Year|    Month|    Time|Total_Purchases|     Amount|Total_Amount|Product_Category|Product_Brand|Product_Type| Feedback|Shipping_Method|Payment_Method|Order_Status|Ratings|         products|
+--------------+-----------+-------------------+-------------------+------------+--------------------+----------+---------------+-------+---------+----+------+------+----------------+----------+------+---------+--------+--------

In [188]:
retail.printSchema()

root
 |-- Transaction_ID: double (nullable = true)
 |-- Customer_ID: double (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Income: string (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Year: double (nullable = true)
 |-- Month: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Total_Purchases: double (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Total_Amount: double (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Brand: string (nullable = true)
 |-- Product_Type: string (nullable = true)
 |-- Feedback: string (nullable = true

In [191]:
state1 = retail.filter((col("State") == 'Berlin') & (col("Age") > 20) & (col("Date") > "2023") & (col("Month") == 'January'))

In [196]:
state1.groupBy(col("Gender")).count().show()

[Stage 55:>                                                         (0 + 1) / 1]

+------+-----+
|Gender|count|
+------+-----+
|  NULL|    2|
|Female| 1070|
|  Male| 3238|
+------+-----+



                                                                                

### 