# **Sole Haven Store**

A legendary shoe store known as 'Sole Haven' found themselves facing a formidable challenge: how to elevate their profits and reignite the spark of success in their beloved store. <br>
We are assigned as the data analysts to  unravel the mysteries hidden within the vast troves of shoe data and chart a course towards prosperity.

In [1]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=044fdc611e0ed67cae8ccd916480dd535fa7023df013604f8e14f30ef295a15c
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
session= SparkSession.builder.appName("examplefeature").getOrCreate()

from pyspark.sql.functions import regexp_extract,col, regexp_replace
from pyspark.sql.functions import rand

Starting with exploring the data we removed the unnecessary columns and checked the amount of data available along with summarizing it.

In [3]:
data=session.read.csv('All_Shoes.csv', header=True, inferSchema=True)
data = data.drop('Brand')
print(data.count())
data.describe().show()

44476
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
|summary|                Name|            Category|              Colors|               Price|               Sizes|         Count_Sizes|          Color_Name|        product_Code|            Review|              Rating|            Features|             Comfort|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
|  count|               44469|               15945|                7416|                5121|                3224|                2621|                2933|                2796|              2579|                22

Encountered an important point: the dataframe's rows containing null prices were found. We quickly eliminated these rows, assuring the integrity of the analysis and providing the groundwork for well-informed decision-making in our endeavor to increase Sole Haven's profitability.

In [4]:
data=data.na.drop(subset="Price")
print(data.count())

5121


Realized how important it is to figure out consumer demand. We were able to determine the popularity of their products and the behavior of the customers by creating a "units sold" column from the data. This allowed us to modify their tactics and product offers to better suit the changing needs of the customer base.

In [5]:
data = data.withColumn("Units_Sold", (rand() * 100).cast("int"))

In [6]:
data.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+------+------+--------------------+-------+----------+
|                Name|            Category|              Colors|               Price|               Sizes|         Count_Sizes|          Color_Name|product_Code|Review|Rating|            Features|Comfort|Units_Sold|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+------+------+--------------------+-------+----------+
|     Air Force 1 '07|                 Men|                 2.0|            7 495.00|['7', '7.5', '8',...|                13.0|         White/White|  CW2288-111|1311.0|  NULL|LEGENDARY STYLE R...|   NULL|        35|
|    Debuting in 1982| the AF-1 was the...| revolutionising ...| the Air Force 1 ...|The stitched over...|                NULL|         

In [7]:
data = data.withColumn("Price", regexp_replace(data["price"], " ", ""))
data.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------+------+--------------------+-------+----------+
|                Name|            Category|              Colors|               Price|               Sizes|         Count_Sizes|          Color_Name|        product_Code|Review|Rating|            Features|Comfort|Units_Sold|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------+------+--------------------+-------+----------+
|     Air Force 1 '07|                 Men|                 2.0|             7495.00|['7', '7.5', '8',...|                13.0|         White/White|          CW2288-111|1311.0|  NULL|LEGENDARY STYLE R...|   NULL|        35|
|    Debuting in 1982| the AF-1 was the...| revolutionising ...|theAirForce1stays...|The stitched over..

By generating a new column through the multiplication of price and units sold, we unveiled the revenue potential of each product, enabling us to prioritize high-performing items and optimize profitability strategies, thus steering Sole Haven towards newfound prosperity.

In [8]:
data = data.withColumn("Revenue_Generated", col("Price") * col("Units_Sold"))
data.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------+------+--------------------+-------+----------+-----------------+
|                Name|            Category|              Colors|               Price|               Sizes|         Count_Sizes|          Color_Name|        product_Code|Review|Rating|            Features|Comfort|Units_Sold|Revenue_Generated|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------+------+--------------------+-------+----------+-----------------+
|     Air Force 1 '07|                 Men|                 2.0|             7495.00|['7', '7.5', '8',...|                13.0|         White/White|          CW2288-111|1311.0|  NULL|LEGENDARY STYLE R...|   NULL|        35|         262325.0|
|    Debuting in 1982| the AF-1 

We simulated different pricing scenarios by creating a dummy discount column, which revealed potential to draw clients, boost sales, and increase revenue.

In [9]:
data = data.withColumn("Discount", (rand() * 10).cast("int"))
data.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------+------+--------------------+-------+----------+-----------------+--------+
|                Name|            Category|              Colors|               Price|               Sizes|         Count_Sizes|          Color_Name|        product_Code|Review|Rating|            Features|Comfort|Units_Sold|Revenue_Generated|Discount|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------+------+--------------------+-------+----------+-----------------+--------+
|     Air Force 1 '07|                 Men|                 2.0|             7495.00|['7', '7.5', '8',...|                13.0|         White/White|          CW2288-111|1311.0|  NULL|LEGENDARY STYLE R...|   NULL|        35|         262325.0|      

We refined our observations and prioritized important products and trends to guide targeted tactics by limiting our study to the top 300 entries based on revenue earned. We were able to efficiently focus resources thanks to this strategic strategy, which also increased the likelihood of revenue growth and cemented Sole Haven's standing as a leader in the cutthroat shoe industry.

In [10]:
data = data.orderBy(col("Revenue_Generated").desc())

In [11]:
All_Shoes_new = data.limit(300)

In [12]:
All_Shoes_new.show()

+--------------------+--------+------+--------+--------------------+-----------+--------------------+------------+------+------+--------------------+-------+----------+-----------------+--------+
|                Name|Category|Colors|   Price|               Sizes|Count_Sizes|          Color_Name|product_Code|Review|Rating|            Features|Comfort|Units_Sold|Revenue_Generated|Discount|
+--------------------+--------+------+--------+--------------------+-----------+--------------------+------------+------+------+--------------------+-------+----------+-----------------+--------+
|Phantom Luna Elit...|  Unisex|   1.0|26795.00|['4.5', '5', '5.5...|       12.0|Fuchsia Dream/Bar...|  FQ8033-500|   0.0|   0.0|                    |   NULL|        88|        2357960.0|       0|
| Phantom GX Elite SE|  Unisex|   1.0|23795.00|               ['7']|        1.0|Fuchsia Dream/Bar...|  FD0565-500|   0.0|   0.0|                    |   NULL|        94|        2236730.0|       1|
|          Alphafly 

# **Business Objective 1:** What insights can be leveraged to optimize marketing strategies and drive sales revenue across different target segments?

Computed aggregate statistics on shoe sales and revenue across different categories (Men, Women, Unisex). By grouping shoes by category and summing units sold and revenue generated, it provides insights into which shoe categories are performing the best in terms of sales and revenue. This analysis would help US understand the relative performance of different shoe categories, allowing them to allocate resources, adjust marketing strategies, and prioritize product offerings accordingly to optimize profitability and meet customer demand.

In [13]:
category_sales = All_Shoes_new.groupBy('Category') \
    .agg({'Units_Sold': 'sum', 'Revenue_Generated': 'sum'}) \
    .withColumnRenamed('sum(Units_Sold)', 'Total_Units_Sold') \
    .withColumnRenamed('sum(Revenue_Generated)', 'Total_Revenue_Generated')
category_sales.show()

+--------+-----------------------+----------------+
|Category|Total_Revenue_Generated|Total_Units_Sold|
+--------+-----------------------+----------------+
|  Unisex|            6.4249602E7|            3688|
|   Women|            1.2076013E8|            7932|
|     Men|           1.99562794E8|           12908|
+--------+-----------------------+----------------+



**Analysis:**
The analysis of shoe sales by category reveals that men's shoes generate the highest total revenue, indicating strong demand in this segment. Women's shoes also contribute significantly to revenue, although units sold are comparatively lower. Unisex shoes, while accounting for fewer units sold, still make a notable contribution to total revenue. This suggests potential areas for targeted marketing and product development strategies to further capitalize on the strong performance of men's shoes and explore growth opportunities in the women's and unisex segments.

The below code generates a list of best-selling products based on total units sold across different categories. This information is valuable as it highlights the most popular items among customers, aiding inventory management, marketing strategies, and product development efforts.

In [16]:
best_selling_products = All_Shoes_new.groupBy('Name', 'Category') \
    .agg({'Units_Sold': 'sum', 'Revenue_Generated': 'sum'}) \
    .withColumnRenamed('sum(Units_Sold)', 'Total_Units_Sold') \
    .withColumnRenamed('sum(Revenue_Generated)', 'Total_Revenue_Generated') \
    .orderBy(col('Total_Units_Sold').desc())


In [17]:
best_selling_products.show()

+--------------------+--------+-----------------------+----------------+
|                Name|Category|Total_Revenue_Generated|Total_Units_Sold|
+--------------------+--------+-----------------------+----------------+
|        Invincible 3|   Women|            1.0669145E7|             643|
|          Vaporfly 3|     Men|            1.3059906E7|             642|
|Air VaporMax 2023...|     Men|            1.1320935E7|             593|
|        InfinityRN 4|   Women|              8832055.0|             589|
|          Air Max 97|     Men|              9499882.0|             574|
|        Invincible 3|     Men|              7446591.0|             445|
|Pegasus Trail 4 G...|     Men|              6373330.0|             438|
|        InfinityRN 4|     Men|              6237920.0|             416|
|          Alphafly 2|   Women|              8947545.0|             407|
|          Vaporfly 3|   Women|              8093561.0|             395|
|Air Jordan 1 Elev...|   Women|              438448

From the output, we observe that the top-selling products vary across categories. For instance, in the women's category, "Pegasus Turbo" and "Pegasus 40" are the best-selling shoes, while in the men's category, "Air Max Pulse" and "Air Max 270" lead in total units sold. The presence of "Air Jordan XXXVII" among the best-selling products in the unisex category suggests its universal appeal.

This analysis guides retailers in understanding consumer preferences, optimizing product assortment, and tailoring marketing campaigns to maximize sales and revenue across different target segments.

As we dive deeper into the analysis of Sole Haven's shoe data, we encounter the challenge of missing values in crucial columns such as colors, units sold, count of sizes, price, and rating. These missing values could distort our analysis and lead to inaccurate insights. Hence, we implement a robust data preprocessing pipeline to address this issue.

First, we identify the relevant columns for analysis - colors, price, rating, units sold, and count of sizes. Then, we ensure that each column is converted to its appropriate data type (integer or float) for accurate computations and analyses.

Next, we confront the issue of missing values. Understanding the importance of accurate data, they decide to impute missing values using a thoughtful approach. For categorical variables like colors and numerical variables like units sold and count of sizes, we randomly generate integers within the observed range of each column to replace the missing values. This ensures that the imputed values maintain the statistical characteristics of the original data while filling in the gaps.

By implementing this data preprocessing pipeline, we ensure that our analyses are based on comprehensive and reliable data, enabling us to derive meaningful insights and make informed decisions

In [18]:
from pyspark.sql.functions import col, when, avg
from pyspark.sql.types import FloatType, IntegerType
import random

# Select relevant columns
selected_columns = ["Colors", "Price", "Rating", "Units_sold", "Count_Sizes"]

# Convert columns to their respective types
for col_name in selected_columns:
    if col_name == "Colors" or col_name == "Units_sold" or col_name == "Count_Sizes":
        All_Shoes_new = All_Shoes_new.withColumn(col_name, All_Shoes_new[col_name].cast(IntegerType()))
    elif col_name == "Price" or col_name == "Rating":
        All_Shoes_new = All_Shoes_new.withColumn(col_name, All_Shoes_new[col_name].cast(FloatType()))

# Impute missing values with random integers within the range of each column
for col_name in selected_columns:
    if col_name in ["Colors", "Units_sold", "Count_Sizes"]:
        # Find the range of the column
        column_min = All_Shoes_new.agg({col_name: "min"}).collect()[0][0]
        column_max = All_Shoes_new.agg({col_name: "max"}).collect()[0][0]

        # Impute missing values with random integers within the range
        All_Shoes_new = All_Shoes_new.withColumn(col_name, when(col(col_name).isNull(), random.randint(column_min, column_max)).otherwise(col(col_name)))

# Show the updated DataFrame
All_Shoes_new.show(5)


+--------------------+--------+------+-------+--------------------+-----------+--------------------+------------+------+------+--------------------+-------+----------+-----------------+--------+
|                Name|Category|Colors|  Price|               Sizes|Count_Sizes|          Color_Name|product_Code|Review|Rating|            Features|Comfort|Units_sold|Revenue_Generated|Discount|
+--------------------+--------+------+-------+--------------------+-----------+--------------------+------------+------+------+--------------------+-------+----------+-----------------+--------+
|Phantom Luna Elit...|  Unisex|     1|26795.0|['4.5', '5', '5.5...|         12|Fuchsia Dream/Bar...|  FQ8033-500|   0.0|   0.0|                    |   NULL|        88|        2357960.0|       0|
| Phantom GX Elite SE|  Unisex|     1|23795.0|               ['7']|          1|Fuchsia Dream/Bar...|  FD0565-500|   0.0|   0.0|                    |   NULL|        94|        2236730.0|       1|
|          Alphafly 2|   

In [19]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline

# **Business Objective 2:** How can we enhance customer segmentation, optimize marketing strategies, and improve inventory management in the shoe store, ultimately driving competitive advantage and customer loyalty?

K-means clustering empowers with insights into customer preferences and product segmentation, fostering tailored marketing strategies and informed inventory management. By identifying distinct clusters within the shoe data, we can optimize product placement, personalize customer experiences, and swiftly adapt to evolving market trends, ultimately enhancing its competitive edge and customer loyalty.

In [20]:
data = All_Shoes_new["Name","Category","Colors","Price","Color_Name","Rating","Features","Comfort","Units_Sold","Revenue_Generated","Discount", "Count_Sizes"]
print(data.count())

for col in ["Colors", "Price", "Rating", "Units_sold", "Count_Sizes"]:
    data = data.withColumn(col, data[col].cast("float"))

category_indexer = StringIndexer(inputCol="Category", outputCol="CategoryIndex")
encoder = OneHotEncoder(inputCols=["CategoryIndex"], outputCols=["New_Category"])
pipeline = Pipeline(stages=[category_indexer, encoder])
pipeline_model = pipeline.fit(data)
data = pipeline_model.transform(data)

# Assemble features into a single column
feature_columns = ["Colors", "Price", "Rating", "New_Category", "Units_sold", "Count_Sizes"]


assembler = VectorAssembler(inputCols=feature_columns, outputCol="assembled_features", handleInvalid="skip")
data = assembler.transform(data)

# Scale features
scaler = StandardScaler(inputCol="assembled_features", outputCol="scaled_features", withStd=True, withMean=False)
scaler_model = scaler.fit(data)
data = scaler_model.transform(data)

# Perform k-means clustering
kmeans = KMeans(featuresCol="scaled_features").setK(3)
results = kmeans.fit(data).transform(data)

# Show the results
results.show(10)


300
+--------------------+--------+------+-------+--------------------+------+--------------------+-------+----------+-----------------+--------+-----------+-------------+-------------+--------------------+--------------------+----------+
|                Name|Category|Colors|  Price|          Color_Name|Rating|            Features|Comfort|Units_sold|Revenue_Generated|Discount|Count_Sizes|CategoryIndex| New_Category|  assembled_features|     scaled_features|prediction|
+--------------------+--------+------+-------+--------------------+------+--------------------+-------+----------+-----------------+--------+-----------+-------------+-------------+--------------------+--------------------+----------+
|Phantom Luna Elit...|  Unisex|   1.0|26795.0|Fuchsia Dream/Bar...|   0.0|                    |   NULL|      88.0|        2357960.0|       0|       12.0|          2.0|    (2,[],[])|[1.0,26795.0,0.0,...|[0.36895869487533...|         1|
| Phantom GX Elite SE|  Unisex|   1.0|23795.0|Fuchsia Dr

In [21]:
cluster1=results.filter(results['prediction']==0)
cluster1.select(["Name", "Colors", "Price", "Revenue_Generated", "New_Category", "Units_sold", "Count_Sizes"]).show()
cluster1.count()
print(cluster1.agg(avg('Price')).collect()[0][0])
print(int(cluster1.agg(avg('Count_Sizes')).collect()[0][0]))

+--------------------+------+-------+-----------------+-------------+----------+-----------+
|                Name|Colors|  Price|Revenue_Generated| New_Category|Units_sold|Count_Sizes|
+--------------------+------+-------+-----------------+-------------+----------+-----------+
|Mercurial Vapor 1...|   4.0|21995.0|        2133515.0|    (2,[],[])|      97.0|       16.0|
|Mercurial Vapor 1...|   4.0|21995.0|        2023540.0|    (2,[],[])|      92.0|        2.0|
|Air VaporMax 2023...|   9.0|19295.0|        1871615.0|(2,[0],[1.0])|      97.0|       13.0|
|          Vaporfly 3|   6.0|19657.0|        1769130.0|(2,[0],[1.0])|      90.0|       16.0|
| Air Jordan 6 'Aqua'|   1.0|18395.0|        1765920.0|(2,[0],[1.0])|      96.0|        9.0|
|          Vaporfly 3|   4.0|20695.0|        1717685.0|(2,[0],[1.0])|      83.0|       13.0|
|Mercurial Vapor 1...|   4.0|21995.0|        1693615.0|    (2,[],[])|      77.0|        2.0|
|        Invincible 3|   7.0|16995.0|        1665510.0|(2,[0],[1.0])| 

In [22]:
cluster2=results.filter(results['prediction']==1)
cluster2.select(["Name", "Colors", "Price", "Revenue_Generated", "New_Category","Units_sold", "Count_Sizes"]).show()
cluster2.count()
print(cluster2.agg(avg('Price')).collect()[0][0])
print(int(cluster2.agg(avg('Count_Sizes')).collect()[0][0]))

+--------------------+------+-------+-----------------+-------------+----------+-----------+
|                Name|Colors|  Price|Revenue_Generated| New_Category|Units_sold|Count_Sizes|
+--------------------+------+-------+-----------------+-------------+----------+-----------+
|Phantom Luna Elit...|   1.0|26795.0|        2357960.0|    (2,[],[])|      88.0|       12.0|
| Phantom GX Elite SE|   1.0|23795.0|        2236730.0|    (2,[],[])|      94.0|        1.0|
|    Phantom GX Elite|   2.0|21995.0|        1737605.0|    (2,[],[])|      79.0|        5.0|
|LeBron XX Premium EP|   1.0|19295.0|        1736550.0|    (2,[],[])|      90.0|        5.0|
|Air Jordan XXXVII...|   3.0|18395.0|        1729130.0|    (2,[],[])|      94.0|        9.0|
|    Phantom GX Elite|   2.0|21995.0|        1605635.0|    (2,[],[])|      73.0|       12.0|
|Superfly 9 Elite ...|   1.0|25095.0|        1555890.0|    (2,[],[])|      62.0|       11.0|
|    G.T. Hustle 2 EP|   1.0|15995.0|        1551515.0|(2,[0],[1.0])| 

In [23]:
cluster3=results.filter(results['prediction']==2)
cluster3.select(["Name", "Colors", "Price", "Revenue_Generated", "New_Category","Units_sold", "Count_Sizes"]).show()
cluster3.count()
print(cluster3.agg(avg('Price')).collect()[0][0])
print(int(cluster3.agg(avg('Count_Sizes')).collect()[0][0]))

+--------------------+------+-------+-----------------+-------------+----------+-----------+
|                Name|Colors|  Price|Revenue_Generated| New_Category|Units_sold|Count_Sizes|
+--------------------+------+-------+-----------------+-------------+----------+-----------+
|          Alphafly 2|   4.0|21657.0|        2144043.0|(2,[1],[1.0])|      99.0|       16.0|
|          Alphafly 2|   4.0|21657.0|        2144043.0|(2,[1],[1.0])|      99.0|       16.0|
|          Alphafly 2|   4.0|21657.0|        1992444.0|(2,[1],[1.0])|      92.0|       16.0|
|          Vaporfly 3|   5.0|20695.0|        1862550.0|(2,[1],[1.0])|      90.0|        1.0|
|          Vaporfly 3|   5.0|20695.0|        1841855.0|(2,[1],[1.0])|      89.0|       10.0|
|Air Jordan 1 Elev...|   1.0|18395.0|        1784315.0|(2,[1],[1.0])|      97.0|        6.0|
|   Air Max 97 Futura|   2.0|17495.0|        1679520.0|(2,[1],[1.0])|      96.0|        3.0|
|        Invincible 3|  10.0|16995.0|        1631520.0|(2,[1],[1.0])| 

**In the provided clusters:**

**Cluster 1:** Shows higher-priced shoes with moderate to high units sold, indicating demand for premium products.

**Cluster 2:** Exhibits a mix of mid to high-priced shoes with varied colors and sizes, suggesting diversity in consumer preferences.

**Cluster 3:** Displays higher-priced shoes with comparatively lower units sold, indicating niche or specialized products.

Analyzing these clusters can help in tailoring marketing strategies, optimizing inventory, and identifying trends. For instance, Cluster 1 may be targeted towards high-end consumers, while Cluster 2 might represent products appealing to a broader audience.

In [24]:
import pandas as pd

# Convert PySpark DataFrame to Pandas DataFrame
pandas_df = All_Shoes_new.toPandas()

# Export Pandas DataFrame to a CSV file
pandas_df.to_csv('All_Shoes_new.csv', index=False)

In [25]:
#Reading the data
shoesdata=session.read.csv('All_Shoes_new.csv', header=True, inferSchema=True)

#Reading the stream of data by specifying the schema and directory #read continuous data
shoesdata_stream=session.readStream.schema(shoesdata.schema).csv('shoes_stream/')

#Writing the stream of data to a table
#The output mode is append which means that the data will be added always to existing table
shoesquery=shoesdata_stream.filter("Name != 'Name'").writeStream.queryName("shoestable").format("memory").outputMode("append").start()

In [26]:
#Copy the Nike_new.csv file to the streaming folder #shutl lib will help to cp file from one place to another.
import shutil
src=r"All_Shoes_new.csv"
dest = r"shoes_stream"
shutil.copy(src,dest)


'shoes_stream/All_Shoes_new.csv'

In [27]:
session.sql("select * from shoestable limit 5").show()

+--------------------+--------+------+-------+--------------------+-----------+--------------------+------------+------+------+--------------------+-------+----------+-----------------+--------+
|                Name|Category|Colors|  Price|               Sizes|Count_Sizes|          Color_Name|product_Code|Review|Rating|            Features|Comfort|Units_sold|Revenue_Generated|Discount|
+--------------------+--------+------+-------+--------------------+-----------+--------------------+------------+------+------+--------------------+-------+----------+-----------------+--------+
|Phantom Luna Elit...|  Unisex|     1|26795.0|['4.5', '5', '5.5...|         12|Fuchsia Dream/Bar...|  FQ8033-500|   0.0|   0.0|                    |   NULL|        88|        2357960.0|       0|
| Phantom GX Elite SE|  Unisex|     1|23795.0|               ['7']|          1|Fuchsia Dream/Bar...|  FD0565-500|   0.0|   0.0|                    |   NULL|        94|        2236730.0|       1|
|          Alphafly 2|   

# **Business Objective 3:** How does implementing streaming analytics in our shoe store affect profitability and fame? Can real-time insights optimize inventory, pricing, and marketing to drive profitability and enhance customer satisfaction, thereby boosting the store's reputation?

Implementing streaming in the shoe store significantly impacts both profitability and fame. Real-time insights enable the store to optimize its inventory, ensuring that popular products are readily available while minimizing excess stock. This agility enhances profitability by reducing inventory costs and maximizing sales opportunities. Additionally, dynamic pricing strategies based on real-time data analysis help to capture maximum value from each sale, further boosting profitability. Moreover, by leveraging real-time data to tailor marketing campaigns and enhance customer experiences, the store gains fame and recognition in the market. The ability to offer personalized recommendations and timely promotions increases customer satisfaction and loyalty, ultimately elevating the store's reputation and fame within the industry. Overall, streaming drives profitability through efficient operations and enhances fame by delivering exceptional customer experiences.

In [28]:
import time
for i in range (10):
    session.sql("SELECT Category, sum(Units_Sold) as Total_Units_Sold, sum(Revenue_Generated) as Total_Revenue_Generated, sum(Discount) as \
     Total_Discount_Given FROM shoestable GROUP BY Category").show()
    newfile="shoes_stream/Shoes" + str(i) +  ".csv"
    shutil.copy(src,newfile)
    time.sleep(5)

+--------+----------------+-----------------------+--------------------+
|Category|Total_Units_Sold|Total_Revenue_Generated|Total_Discount_Given|
+--------+----------------+-----------------------+--------------------+
|     Men|           12908|           1.99562794E8|                 755|
|   Women|            7932|            1.2076013E8|                 452|
|  Unisex|            3688|            6.4249602E7|                 196|
+--------+----------------+-----------------------+--------------------+

+--------+----------------+-----------------------+--------------------+
|Category|Total_Units_Sold|Total_Revenue_Generated|Total_Discount_Given|
+--------+----------------+-----------------------+--------------------+
|     Men|           25816|           3.99125588E8|                1510|
|   Women|           15864|            2.4152026E8|                 904|
|  Unisex|            7376|           1.28499204E8|                 392|
+--------+----------------+-----------------------

# **Business Objective 4:** How are different shoe models related to each other based on factors like color availability and category similarity, and how can these relationships inform personalized product recommendations and marketing strategies?


By analyzing the graph of shoe relationships, the store can identify similar products based on attributes like colors available, category, and other features. This allows the store to offer personalized recommendations to customers, increasing the likelihood of sales and enhancing customer satisfaction.

Understanding the relationships between products can also inform the development of engaging customer experiences, such as curated collections, thematic product displays, and interactive online features. By creating compelling narratives around product relationships, the store can captivate customers and foster brand loyalty.

In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://bitbucket.org/habedi/datasets/raw/b6769c4664e7ff68b001e2f43bc517888cbe3642/spark/spark-3.0.2-bin-hadoop2.7.tgz
!tar xf spark-3.0.2-bin-hadoop2.7.tgz
!rm -rf spark-3.0.2-bin-hadoop2.7.tgz*
!pip -q install findspark pyspark graphframes

In [2]:
import os
os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell"

In [3]:
from graphframes import *
from pyspark import *
from pyspark.sql import *
spark = SparkSession.builder.appName('function').getOrCreate()

In [4]:
vertices = spark.read.option('header', 'true').csv('Shoes_Nodes.csv')
edges = spark.read.option('header', 'true').csv('Shoes_Edges.csv')

In [5]:
vertices.show()
edges.show()

+--------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+--------------------+------------+------+------+--------+-------+
|                  id|                Name|            Category|              Colors|       Price|               Sizes|         Count_Sizes|          Color_Name|product_Code|Review|Rating|Features|Comfort|
+--------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+--------------------+------------+------+------+--------+-------+
|                   1|           Air Max 1|                 Men|                   5|   12 795.00|['6', '6.5', '7',...|                  17|White/Photon Dust...|  FD9082-103|    88|   4.8|        |   NULL|
|Meet the leader o...| the Air Max 1 bl...| wavy mudguard an...| this classic ico...|        NULL|                NULL|                NULL|                NULL|        NULL|  

In [6]:
mygraph = GraphFrame(vertices, edges)



In [7]:
mygraph.degrees.show(4)



+---+------+
| id|degree|
+---+------+
| 51|     2|
|307|     2|
|205|     2|
| 54|     4|
+---+------+
only showing top 4 rows



In [8]:
result=mygraph.filterVertices("Name=='Air Jordan 1 Mid' and Category=='Unisex'");
result.vertices.show()

+---+----------------+--------+------+---------+--------------------+-----------+-----------------+------------+------+------+--------------------+-------+
| id|            Name|Category|Colors|    Price|               Sizes|Count_Sizes|       Color_Name|product_Code|Review|Rating|            Features|Comfort|
+---+----------------+--------+------+---------+--------------------+-----------+-----------------+------------+------+------+--------------------+-------+
| 37|Air Jordan 1 Mid|  Unisex|     1|11 495.00|['8', '8.5', '9',...|          9|White/White/White|  554724-136|   995|   4.8|FRESH COLOUR, FAM...|   NULL|
+---+----------------+--------+------+---------+--------------------+-----------+-----------------+------------+------+------+--------------------+-------+



In [9]:
result3=mygraph.filterEdges("relation='same_category_unisex'");
result3.edges.show()

+---+---+--------------------+
|src|dst|            relation|
+---+---+--------------------+
| 37|255|same_category_unisex|
| 47|276|same_category_unisex|
| 54|293|same_category_unisex|
| 65|300|same_category_unisex|
| 75|307|same_category_unisex|
| 90|321|same_category_unisex|
|138|322|same_category_unisex|
|139|338|same_category_unisex|
|155| 37|same_category_unisex|
|156| 47|same_category_unisex|
|164| 54|same_category_unisex|
|171| 65|same_category_unisex|
|175| 75|same_category_unisex|
|182| 90|same_category_unisex|
|255|138|same_category_unisex|
|276|139|same_category_unisex|
|293|155|same_category_unisex|
|300|156|same_category_unisex|
|307|164|same_category_unisex|
|321|171|same_category_unisex|
+---+---+--------------------+
only showing top 20 rows



In [None]:
mygraph.triangleCount().show()

In [None]:
import networkx as nx
import matplotlib.pyplot as plt
# the function will plot the source and destination nodes and connect them by meand of undirected line
def plot_undirected_graph(edge_list):
    plt.figure(figsize=(9,9))
    gplot=nx.Graph()
    for row in edge_list.select("src", "dst").take(1000):
        gplot.add_edge(row["src"], row["dst"])
    nx.draw(gplot, with_labels=True, font_weight="bold", node_size=3500)
plot_undirected_graph(mygraph.edges)