## Retail Inventory Management System

Create Product and Order classes that will be used to handle the successful management of a retail inventory system, including the creation, updating, and deletion of products, and the placement of orders.

You will develop a comprehensive inventory management system for a retail business by applying your knowledge in object-oriented programming (OOP). This is a rapidly growing online retailer that sells a wide range of products, including electronics, clothing, and home goods. As the company expands, efficiently managing inventory becomes crucial to ensure smooth operations and customer satisfaction.

Object-oriented programming (OOP) is a programming paradigm that organizes software design around data or objects rather than functions and logic. OOP allows for modular, reusable, and maintainable code, which is particularly beneficial for complex systems like inventory management systems.

You will define two classes `Product` and `Order`, using the implementation requirements detailed below:

### `Product`

- Constructor parameter(s): `self`, `product_id`, `name`, `category`, `quantity`, `price`, and `supplier`.
- Class-level variable(s): `inventory`.

#### `Product` class method(s)

##### `add_product()`
- Parameter(s): `cls`, `name`, `category`, `quantity`, `price`, and `supplier`.
- Behavior: 
    - Define the `product_id` assuming it's auto-generated incrementally, without any duplicate `product_id` values.
    - Define a `new_product` variable that will call the constructor of the Product class.
    - Return the message `"Product added successfully"` to know that the product was added successfully.

##### `update_product()`
- Parameter(s): `cls`, `product_id`, `quantity`, `price`, and `supplier`.
    - `quantity`, `price`, and `supplier` should have default values of `None`. 
- Behavior: 
    - Check if the `product_id` already exists in the `inventory`.
    - If `product_id` exists, check for the given parameters in the method if they have a value and update accordingly the product.
    - Return either one of these messages: `"Product information updated successfully"` or `"Product not found"`.

##### `delete_product()`
- Parameter(s): `cls`, `product_id`.
- Behavior: 
    - Check in the inventory list if the given `product_id` was passed as a parameter.
    - If `product_id` exists then remove the product from the list.
    - Return either one of these messages: `"Product deleted successfully"` or `"Product not found"`.


### `Order`

- Constructor parameter(s): `self`, `order_id`, `products`, and `customer_info`.
    - `customer_info` should have a default value of `None`. 

#### `Order` method(s)

##### `place_order()`
- Parameter(s): `self`, `product_id`, `quantity`, and `customer_info`.
    - `customer_info` should have a default value of `None`.
- Behavior: 
    - Append to the `products` list a tuple containing `product_id` and `quantity`.
    - Assume that each order can only take **one** product. 
    - Return the message: `"Order placed successfully. Order ID: {self.order_id}"`.


<br>


As an example, your code must be able to create products like this:

`p1 = Product.add_product("Laptop", "Electronics", 50, 1000, "Supplier A")`

Update them like this:

`update_p1 = Product.update_product(1, quantity=45, price=950)`

Delete them like this:

`delete_p1 = Product.delete_product(1)`

And, create and place orders like this:

`order = Order(order_id=1, products=[])`

`order_placement = order.place_order(1, 2, customer_info="John Doe")`


In [6]:
class Product:
    inventory = []  # Class-level variable to store all products. This is shared among all instances of Product.

    def __init__(self, product_id, name, category, quantity, price, supplier):
        # The __init__ method is the constructor. It initializes a new instance of the Product class.
        self.product_id = product_id  # Instance variable to store the product's ID.
        self.name = name  # Instance variable to store the product's name.
        self.category = category  # Instance variable to store the product's category.
        self.quantity = quantity  # Instance variable to store the quantity of the product.
        self.price = price  # Instance variable to store the price of the product.
        self.supplier = supplier  # Instance variable to store the supplier of the product.
        Product.inventory.append(self)  # Add this new product instance to the class-level inventory list.

    @classmethod
    def add_product(cls, name, category, quantity, price, supplier):
        # This is a class method that adds a new product to the inventory.
        product_id = cls.inventory[-1].product_id + 1 if len(cls.inventory)>0 else 1  # Generate a new product ID by counting existing products and adding 1.
        new_product = cls(product_id, name, category, quantity, price, supplier)  # Create a new product instance.
        return "Product added successfully"  # Return a confirmation message.

    @classmethod
    def update_product(cls, product_id, quantity=None, price=None, supplier=None):
        # This class method updates the details of an existing product.
        for product in cls.inventory:  # Loop through the inventory to find the product with the given ID.
            if product.product_id == product_id:  # Check if this is the product we want to update.
                if quantity is not None:  # If a new quantity is provided, update it.
                    product.quantity = quantity
                if price is not None:  # If a new price is provided, update it.
                    product.price = price
                if supplier is not None:  # If a new supplier is provided, update it.
                    product.supplier = supplier
                return "Product information updated successfully"  # Return a confirmation message.
        return "Product not found"  # If the product ID was not found, return this message.

    @classmethod
    def delete_product(cls, product_id):
        # This class method deletes a product from the inventory.
        for i, product in enumerate(cls.inventory):  # Loop through the inventory with index to find the product.
            if product.product_id == product_id:  # Check if this is the product we want to delete.
                del cls.inventory[i]  # Delete the product from the inventory list.
                return "Product deleted successfully"  # Return a confirmation message.
        return "Product not found"  # If the product ID was not found, return this message.

In [7]:

class Order:
    def __init__(self, order_id, products, customer_info=None):
        # The __init__ method is the constructor. It initializes a new instance of the Order class.
        self.order_id = order_id  # Instance variable to store the order's ID.
        self.products = products  # Instance variable to store the list of products in the order. Each product is represented as a tuple (product_id, quantity).
        self.customer_info = customer_info  # Instance variable to store optional customer information.

    def place_order(self, product_id, quantity, customer_info=None):
        # This method adds a product to the order.
        for product in Product.inventory:
            if product.product_id == product_id and product.quantity >= quantity: # Check that the product exist in the inventory and that there's stock
                product.quantity -= quantity  # Update the stock of the product by reducing it by the ordered quantity
                self.products.append((product_id, quantity))  # Add the product and quantity as a tuple to the order's products list.
                if customer_info:  # If customer information is provided, update it.
                    self.customer_info = customer_info
                return f"Order placed successfully. Order ID: {self.order_id}"  # Return a confirmation message with the order ID.
        return "Order could not be placed. Product not found or insufficient quantity."


## Orders Dataset Cleaning

Prepare `"orders_data.parquet"` so that it can be used to build a forecasting model.

* Clean the orders dataset as per the requirements specified in the Workbook.
* Save the updated file as `orders_data_clean.parquet`.

<br>

As a Data Engineer at an electronics e-commerce company, Voltmart, you have been requested by a peer Machine Learning team to clean the data containing the information about orders made last year. They are planning to further use this cleaned data to build a demand forecasting model. To achieve this, they have shared their requirements regarding the desired output table format.

An analyst shared a parquet file called `"orders_data.parquet"` for you to clean and preprocess. 

You can see the dataset schema below along with the **cleaning requirements**:

<br>


`orders_data.parquet`

| column | data type | description | cleaning requirements | 
|--------|-----------|-------------|-----------------------|
| `order_date` | `timestamp` | Date and time when the order was made | _Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date_ |
| `time_of_day` | `string` | Period of the day when the order was made | _New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm_ |
| `order_id` | `long` | Order ID | _N/A_ |
| `product` | `string` | Name of a product ordered | _Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase_ |
| `product_ean` | `double` | Product ID | _N/A_ |
| `category` | `string` | Broader category of a product | _Ensure all values are lowercase_ |
| `purchase_address` | `string` | Address line where the order was made ("House Street, City, State Zipcode") | _N/A_ |
| `purchase_state` | `string` | US State of the purchase address | _New column containing: the State that the purchase was ordered from_ |
| `quantity_ordered` | `long` | Number of product units ordered | _N/A_ |
| `price_each` | `double` | Price of a product unit | _N/A_ |
| `cost_price` | `double` | Cost of production per product unit | _N/A_ |
| `turnover` | `double` | Total amount paid for a product (quantity x price) | _N/A_ |
| `margin` | `double` | Profit made by selling a product (turnover - cost) | _N/A_ |

<br>

In [3]:
from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

# Initiate a Spark session
spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)

25/01/10 23:23:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
# IMPORT DATA
# Read data from the parquet file

orders_data = spark.read.parquet('../data_raw/orders_data.parquet')
orders_data.toPandas()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin
0,2023-01-22 21:25:00,141234,iPhone,5.638009e+12,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.00,231.0000,700.00,469.0000
1,2023-01-28 14:15:00,141235,Lightning Charging Cable,5.563320e+12,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.4750,14.95,7.4750
2,2023-01-17 13:33:00,141236,Wired Headphones,2.113973e+12,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.9950,23.98,11.9900
3,2023-01-05 20:33:00,141237,27in FHD Monitor,3.069157e+12,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965
4,2023-01-25 11:59:00,141238,Wired Headphones,9.692681e+12,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.9950,11.99,5.9950
...,...,...,...,...,...,...,...,...,...,...,...
185945,2023-12-11 20:58:00,319666,Lightning Charging Cable,6.545974e+12,Électronique,"14 Madison St, San Francisco, CA 94016",1,14.95,7.4750,14.95,7.4750
185946,2023-12-01 12:01:00,319667,AA Batteries (4-pack),5.352480e+12,Électronique,"549 Willow St, Los Angeles, CA 90001",2,3.84,1.9200,7.68,3.8400
185947,2023-12-09 06:43:00,319668,Vareebadd Phone,2.674213e+12,Alimentation,"273 Wilson St, Seattle, WA 98101",1,400.00,132.0000,400.00,268.0000
185948,2023-12-03 10:39:00,319669,Wired Headphones,5.216304e+12,Alimentation,"778 River St, Dallas, TX 75001",1,11.99,5.9950,11.99,5.9950


In [10]:
# DATA CLEANING AND PREPROCESSING

orders_data = (
    orders_data
    # Create a new column time_of_day
    .withColumn(
        'time_of_day',
        # When/otherwise (similar to case/when/else) statements extracting hour from timestamp
        F.when((F.hour('order_date') >= 0) & (F.hour('order_date') <= 5), 'night')
         .when((F.hour('order_date') >= 6) & (F.hour('order_date') <= 11), 'morning')
         .when((F.hour('order_date') >= 12) & (F.hour('order_date') <= 17), 'afternoon')
         .when((F.hour('order_date') >= 18) & (F.hour('order_date') <= 23), 'evening')
        # You can keep the otherwise statement as None to validate whether the conditions are exhaustive
         .otherwise(None)
    )
    # Filter by time of day
    .filter(
        F.col('time_of_day') != 'night'
    )
    # Cast order_date to date as it is originally a timestamp
    .withColumn(
        'order_date',
        F.col('order_date').cast(types.DateType())
    )
)


orders_data = (
    orders_data
    # Make product and category columns lowercase
    .withColumn(
        'product',
        F.lower('product')
    )
    .withColumn(
        'category',
        F.lower('category')
    )
    # Remove rows where product column contains "tv" (as you have already made it lowercase)
    .filter(
        ~F.col('product').contains('tv')
    )
)


orders_data = (
    orders_data
    # First you split the purchase address by space (" ")
    .withColumn(
        'address_split',
        F.split('purchase_address', ' ')
    )
    # If you look at the address lines, you can see that the state abbreviation is always at the 2nd last position
    .withColumn(
        'purchase_state',
        # F.col('address_split').getItem(F.size('address_split') - 2)
        F.col('address_split')[F.size('address_split') - 2]
    )
    # Dropping address_split columns as it is a temporary technical column
    .drop('address_split')
)

# Use distinct and count to calculate the number of unique values
n_states = (
    orders_data
    .select('purchase_state')
    .distinct()
    .count()
)

In [11]:
# EXPORT
# Export the resulting table to parquet format with the new name
(
    orders_data
    .write
    .parquet(
        '../data_cleaned/orders_data.parquet_cleaned',
        mode='overwrite',
    )
)


In [12]:
# jupyter nbconvert --to HTML orders-dataset-cleaning.ipynb

## Build Retail Data Pipeline

Build a data pipeline using custom functions to extract, transform, aggregate, and load e-commerce data. 

1. Implement a function named `transform()` with one argument, taking `merged_df` as input, filling missing numerical values (using any method of your choice), adding a column `"Month"`, keeping the rows where the weekly sales are over $10,000 and drops the unnecessary columns. Ultimately, it should return a DataFrame and be stored as the `clean_data` variable.

2. Implement the function `avg_weekly_sales_per_month` with one argument (the cleaned data). This function will calculate the average monthly sales. For implementing this function you must select the "Month" and "Weekly_Sales" columns as they are the only ones needed for this analysis, then create a chain operation with `groupby()`, `agg()`, `reset_index()`, and `round()` functions, then group by the "Month" column and calculate the average monthly sales, then call `reset_index()` to start a new index order and finally round the results to two decimal places.

3. Create a function called `load()` that takes the cleaned and aggregated DataFrames, and their paths, and saves them as `clean_data.csv` and `agg_data.csv` respectively, without an index.

4. Lastly, define a `validation()` function that checks whether the two csv files from the `load()` exist in the current working directory.

<br>
<!-- ![walmartecomm](walmartecomm.jpg) -->

Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas. 

In this project, you have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. 

`grocery_sales`
- `"index"` - unique ID of the row
- `"Store_ID"` - the store number
- `"Date"` - the week of sales
- `"Weekly_Sales"` - sales for the given store

Also, you have the `extra_data.parquet` file that contains complementary data:

`extra_data.parquet`
- `"IsHoliday"` - Whether the week contains a public holiday - 1 if yes, 0 if no.
- `"Temperature"` - Temperature on the day of sale
- `"Fuel_Price"` - Cost of fuel in the region
- `"CPI"` – Prevailing consumer price index
- `"Unemployment"` - The prevailing unemployment rate
- `"MarkDown1"`, `"MarkDown2"`, `"MarkDown3"`, `"MarkDown4"` - number of promotional markdowns
- `"Dept"` - Department Number in each store
- `"Size"` - size of the store
- `"Type"` - type of the store (depends on `Size` column)

You will need to merge those files and perform some data manipulations. The transformed DataFrame can then be stored as the `clean_data` variable containing the following columns:
- `"Store_ID"`
- `"Month"`
- `"Dept"`
- `"IsHoliday"`
- `"Weekly_Sales"`
- `"CPI"`
- "`"Unemployment"`"

After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis as the `agg_data` variable that should look like:

|  Month | Weekly_Sales  | 
|---|---|
| 1.0  |  33174.178494 |
|  2.0 |  34333.326579 |
|  ... | ...  |  

Finally, you should save the `clean_data` and `agg_data` as the csv files.

In [13]:
import pandas as pd
import sqlite3
from sqlalchemy import create_engine
import os

In [14]:
# Load CSV into DataFrame
df = pd.read_csv('../data_raw/grocery_sales.csv')

# Create SQLite database
conn = sqlite3.connect("../data_raw/grocery_sales.db")
df.to_sql("grocery_sales", conn, if_exists="replace", index=False)
conn.close()
print("Data loaded into SQLite database successfully.")

Data loaded into SQLite database successfully.


In [15]:
query = """ SELECT * FROM grocery_sales; """
engine = create_engine("sqlite:///../data_raw/grocery_sales.db")
grocery_sales = pd.read_sql_query(query, engine)
grocery_sales

Unnamed: 0.1,Unnamed: 0,index,Store_ID,Date,Dept,Weekly_Sales
0,0,0,1,2010-02-05,1,24924.50
1,1,1,1,2010-02-05,26,11737.12
2,2,2,1,2010-02-05,17,13223.76
3,3,3,1,2010-02-05,45,37.44
4,4,4,1,2010-02-05,28,1085.29
...,...,...,...,...,...,...
231517,231517,232414,24,2011-05-06,8,49471.07
231518,231518,232415,24,2011-05-06,50,1210.00
231519,231519,232416,24,2011-05-06,87,25893.32
231520,231520,232417,24,2011-05-06,85,1357.83


In [16]:
# # Just use the CSV file directly
# grocery_sales = pd.read_csv('../data_raw/grocery_sales.csv')

# Extract function is already implemented for you 
def extract(store_data, extra_data):
    extra_df = pd.read_parquet(extra_data)
    merged_df = store_data.merge(extra_df, on = "index")
    return merged_df

# Call the extract() function and store it as the "merged_df" variable
merged_df = extract(grocery_sales, "../data_raw/extra_data.parquet")
merged_df

Unnamed: 0.1,Unnamed: 0,index,Store_ID,Date,Dept,Weekly_Sales,IsHoliday,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,Type,Size
0,0,0,1,2010-02-05,1,24924.50,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
1,1,1,1,2010-02-05,26,11737.12,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
2,2,2,1,2010-02-05,17,13223.76,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
3,3,3,1,2010-02-05,45,37.44,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0
4,4,4,1,2010-02-05,28,1085.29,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
231517,231517,232414,24,2011-05-06,8,49471.07,0,55.75,4.192,0.0,0.0,0.0,0.0,0.0,134.514367,8.212,3.0,203819.0
231518,231518,232415,24,2011-05-06,50,1210.00,0,55.75,4.192,0.0,0.0,0.0,0.0,0.0,134.514367,8.212,3.0,203819.0
231519,231519,232416,24,2011-05-06,87,25893.32,0,55.75,4.192,0.0,0.0,0.0,0.0,0.0,134.514367,8.212,3.0,203819.0
231520,231520,232417,24,2011-05-06,85,1357.83,0,55.75,4.192,0.0,0.0,0.0,0.0,0.0,134.514367,8.212,3.0,203819.0


In [17]:
# Create the transform() function with one parameter: "raw_data"
def transform(raw_data):
  # Fill NaNs using mean since we are dealing with numeric columns
  # Set inplace = True to do the replacing on the current DataFrame
    raw_data.fillna(
      {
          'CPI': raw_data['CPI'].mean(),
          'Weekly_Sales': raw_data['Weekly_Sales'].mean(),
          'Unemployment': raw_data['Unemployment'].mean(),
      }, inplace = True
    )

    # Define the type of the "Date" column and its format
    raw_data["Date"] = pd.to_datetime(raw_data["Date"], format = "%Y-%m-%d")
    # Extract the month value from the "Date" column to calculate monthly sales later on
    raw_data["Month"] = raw_data["Date"].dt.month

    # Filter the entire DataFrame using the "Weekly_Sales" column. Use .loc to access a group of rows
    raw_data = raw_data.loc[raw_data["Weekly_Sales"] > 10000, :]
    
    # Drop unnecessary columns. Set axis = 1 to specify that the columns should be removed
    raw_data = raw_data.drop(["index", "Temperature", "Fuel_Price", "MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4", "MarkDown5", "Type", "Size", "Date"], axis = 1)
    return raw_data


# Call the transform() function and pass the merged DataFrame
clean_data = transform(merged_df)
clean_data

Unnamed: 0.1,Unnamed: 0,Store_ID,Dept,Weekly_Sales,IsHoliday,CPI,Unemployment,Month
0,0,1,1,24924.50,0,211.096358,8.106000,2.0
1,1,1,26,11737.12,0,211.096358,8.106000,2.0
2,2,1,17,13223.76,0,211.096358,8.106000,2.0
5,5,1,79,46729.77,0,211.096358,7.500052,2.0
6,6,1,55,21249.31,0,211.096358,7.500052,2.0
...,...,...,...,...,...,...,...,...
231513,231513,24,40,45396.26,0,134.514367,8.212000,5.0
231515,231515,24,93,41295.84,0,134.514367,8.212000,5.0
231516,231516,24,9,24024.18,0,134.514367,8.212000,5.0
231517,231517,24,8,49471.07,0,134.514367,8.212000,5.0


In [18]:
# Create the avg_weekly_sales_per_month function that takes in the cleaned data from the last step
def avg_weekly_sales_per_month(clean_data):
  	# Select the "Month" and "Weekly_Sales" columns as they are the only ones needed for this analysis
    holidays_sales = clean_data[["Month", "Weekly_Sales"]]
   	# Create a chain operation with groupby(), agg(), reset_index(), and round() functions
    # Group by the "Month" column and calculate the average monthly sales
    # Call reset_index() to start a new index order
    # Round the results to two decimal places
    
    holidays_sales = (holidays_sales.groupby("Month")
    .agg(Avg_Sales = ("Weekly_Sales", "mean"))
    .reset_index().round(2))
    return holidays_sales


# Call the avg_weekly_sales_per_month() function and pass the cleaned DataFrame
agg_data = avg_weekly_sales_per_month(clean_data)
agg_data

Unnamed: 0,Month,Avg_Sales
0,1.0,33174.18
1,2.0,34333.33
2,3.0,33220.89
3,4.0,33392.37
4,5.0,33339.89
5,6.0,34582.47
6,7.0,33922.76
7,8.0,33644.79
8,9.0,33258.05
9,10.0,32736.99


In [19]:
# Create the load() function that takes in the cleaned DataFrame and the aggregated one with the paths where they are going to be stored
def load(full_data, full_data_file_path, agg_data, agg_data_file_path):
  	# Save both DataFrames as csv files. Set index = False to drop the index columns
    full_data.to_csv(full_data_file_path, index = False)
    agg_data.to_csv(agg_data_file_path, index = False)

# Call the load() function and pass the cleaned and aggregated DataFrames with their paths
load(clean_data, "../data_cleaned/clean_data.csv", agg_data, "../data_cleaned/agg_data.csv")

In [20]:
# Create the validation() function with one parameter: file_path - to check whether the previous function was correctly executed
def validation(file_path):
  	# Use the "os" package to check whether a path exists
    file_exists = os.path.exists(file_path)
    # Raise an exception if the path doesn't exist, hence, if there is no file found on a given path
    if not file_exists:
        raise Exception(f"There is no file at the path {file_path}")


# Call the validation() function and pass first, the cleaned DataFrame path, and then the aggregated DataFrame path
validation("../data_cleaned/clean_data.csv")
validation("../data_cleaned/agg_data.csv")

In [21]:
# jupyter nbconvert --to html walmart-retail-data-pipeline.ipynb

## Online Retail Forecasting Model

Analyze the `Online Retail.csv` dataset and build a forecasting model to predict `'Quantity'` of products sold.

* Split the data into two sets based on the splitting date, `"2011-09-25"`. All data up to and including this date should be in the training set, while data after this date should be in the test set. Return a pandas DataFrame, `pd_daily_train_data`, containing, at least, the columns `"Country"`, `"StockCode"`, `"InvoiceDate"`, `"Quantity"`.
* Using your test set, calculate the Mean Absolute Error (MAE) for your forecast model for the `'Quantity'` sold? Return a double (float) named `mae`.
* How many units are expected to be sold during the week `39` of 2011? Store as an integer variable called `quantity_sold_w39`.

<br>

It's simple to buy any product with a click and have it delivered to your door. Online shopping has been rapidly evolving over the last few years, making our lives easier. But behind the scenes, e-commerce companies face a complex challenge that needs to be addressed. 

Uncertainty plays a big role in how the supply chains plan and organize their operations to ensure that the products are delivered on time. These uncertainties can lead to challenges such as stockouts, delayed deliveries, and increased operational costs.

You work for the Sales & Operations Planning (S&OP) team at a multinational e-commerce company. They need your help to assist in planning for the upcoming end-of-the-year sales. They want to use your insights to plan for promotional opportunities and manage their inventory. This effort is to ensure they have the right products in stock when needed and ensure their customers are satisfied with the prompt delivery to their doorstep.

You are provided with a sales dataset to use. A summary and preview are provided below.

`Online Retail.csv`

| Column     | Description              |
|------------|--------------------------|
| `'InvoiceNo'` | A 6-digit number uniquely assigned to each transaction |
| `'StockCode'` | A 5-digit number uniquely assigned to each distinct product |
| `'Description'` | The product name |
| `'Quantity'` | The quantity of each product (item) per transaction |
| `'UnitPrice'` | Product price per unit |
| `'CustomerID'` | A 5-digit number uniquely assigned to each customer |
| `'Country'` | The name of the country where each customer resides |
| `'InvoiceDate'` | The day and time when each transaction was generated `"MM/DD/YYYY"` |
| `'Year'` | The year when each transaction was generated |
| `'Month'` | The month when each transaction was generated |
| `'Week'` | The week when each transaction was generated (`1`-`52`) |
| `'Day'` | The day of the month when each transaction was generated (`1`-`31`) |
| `'DayOfWeek'` | The day of the weeke when each transaction was generated <br>(`0` = Monday, `6` = Sunday) |

In [2]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark session
my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

# Importing sales data
sales_data = my_spark.read.csv(
    "../data_raw/Online Retail.csv", header=True, inferSchema=True, sep=",")

# Convert InvoiceDate to datetime 
sales_data = sales_data.withColumn("InvoiceDate", to_date(
    to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")))

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/10 23:47:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
# Insert the code necessary to solve the assigned problems. Use as many code cells as you need.

# Aggregate data into daily intervals
daily_sales_data = sales_data.groupBy("Country", "StockCode", "InvoiceDate", "Year", "Month", "Day", "Week", "DayOfWeek").agg({"Quantity": "sum",                                                                                                           "UnitPrice": "avg"})
# Rename the target column
daily_sales_data = daily_sales_data.withColumnRenamed(
    "sum(Quantity)", "Quantity")

# Split the data into two sets based on the spliting date, "2011-09-25". All data up to and including this date should be in the training set, while data after this date should be in the testing set. Return a pandas Dataframe, pd_daily_train_data, containing, at least, the columns ["Country", "StockCode", "InvoiceDate", "Quantity"].

split_date_train_test = "2011-09-25"

# Creating the train and test datasets
train_data = daily_sales_data.filter(
    col("InvoiceDate") <= split_date_train_test)
test_data = daily_sales_data.filter(col("InvoiceDate") > split_date_train_test)

pd_daily_train_data = train_data.toPandas()

# Creating indexer for categorical columns
country_indexer = StringIndexer(
    inputCol="Country", outputCol="CountryIndex").setHandleInvalid("keep")
stock_code_indexer = StringIndexer(
    inputCol="StockCode", outputCol="StockCodeIndex").setHandleInvalid("keep")

# Selectiong features columns
feature_cols = ["CountryIndex", "StockCodeIndex", "Month", "Year",
                "DayOfWeek", "Day", "Week"]

# Using vector assembler to combine features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Initializing a Random Forest model
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="Quantity",
    maxBins=4000
)

# Create a pipeline for staging the processes
pipeline = Pipeline(stages=[country_indexer, stock_code_indexer, assembler, rf])

# Training the model
model = pipeline.fit(train_data)

# Getting test predictions
test_predictions = model.transform(test_data)
test_predictions = test_predictions.withColumn(
    "prediction", col("prediction").cast("double"))

# Provide the Mean Absolute Error (MAE) for your forecast? Return a double/floar "mae"

# Initializing the evaluator
mae_evaluator = RegressionEvaluator(
    labelCol="Quantity", predictionCol="prediction", metricName="mae")

# Obtaining MAE
mae = mae_evaluator.evaluate(test_predictions)
print(f"Mean Absolute Error: {mae}")

# How many units will be sold during the  week 39 of 2011? Return an integer `quantity_sold_w39`.

# Getting the weekly sales of all countries
weekly_test_predictions = test_predictions.groupBy("Year", "Week").agg({"prediction": "sum"})

# Finding the quantity sold on the 39 week. 
promotion_week = weekly_test_predictions.filter(col('Week')==39)

# Storing prediction as quantity_sold_w30
quantity_sold_w39 = int(promotion_week.select("sum(prediction)").collect()[0][0])
print(f"Quantity sold on week 39: {quantity_sold_w39}")

# Stop the Spark session
my_spark.stop()



CodeCache: size=131072Kb used=23721Kb max_used=23737Kb free=107350Kb
 bounds [0x000000010b9f8000, 0x000000010d158000, 0x00000001139f8000]
 total_blobs=9344 nmethods=8416 adapters=840
 compilation: disabled (not enough contiguous free space left)


25/01/10 23:47:52 WARN DAGScheduler: Broadcasting large task binary with size 1239.9 KiB
25/01/10 23:47:53 WARN DAGScheduler: Broadcasting large task binary with size 1931.3 KiB
25/01/10 23:47:54 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
                                                                                

Mean Absolute Error: 9.413134348459366


                                                                                

Quantity sold on week 39: 88665


In [None]:
# jupyter nbconvert --to html framework.ipynb