![walmartecomm](walmartecomm.jpg)




E-comm LTD is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of E-comm LTD. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas. 

In this project, I have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. I will be working with two data sources: grocery sales and complementary data. I have been provided with the `grocery_sales` dataset in csv format with the following features:

# `grocery_sales`
- `"index"` - unique ID of the row
- `"Store_ID"` - the store number
- `"Date"` - the week of sales
- `"Weekly_Sales"` - sales for the given store

Also, I have the `extra_data.parquet` file that contains complementary data:

# `extra_data.parquet`
- `"IsHoliday"` - Whether the week contains a public holiday - 1 if yes, 0 if no.
- `"Temperature"` - Temperature on the day of sale
- `"Fuel_Price"` - Cost of fuel in the region
- `"CPI"` – Prevailing consumer price index
- `"Unemployment"` - The prevailing unemployment rate
- `"MarkDown1"`, `"MarkDown2"`, `"MarkDown3"`, `"MarkDown4"` - number of promotional markdowns
- `"Dept"` - Department Number in each store
- `"Size"` - size of the store
- `"Type"` - type of the store (depends on `Size` column)

I will need to merge those files and perform some data manipulations. The transformed DataFrame can then be stored as the `clean_data` variable containing the following columns:
- `"Store_ID"`
- `"Month"`
- `"Dept"`
- `"IsHoliday"`
- `"Weekly_Sales"`
- `"CPI"`
- "`"Unemployment"`"

After merging and cleaning the data, I will have to analyze monthly sales of Walmart and store the results of your analysis as the `agg_data` variable that should look like:

|  Month | Weekly_Sales  | 
|---|---|
| 1.0  |  33174.178494 |
|  2.0 |  34333.326579 |
|  ... | ...  |  

Finally, I would save the `clean_data` and `agg_data` as the csv files.


In [14]:
### import the relevant libraries
import pandas as pd
import os


In [20]:

# Define the extract function
def extract(store_data, extra_data):
    extra_df = pd.read_parquet(extra_data)
    store_df = pd.read_csv(store_data, index_col = 0)
    merged_df = store_df.merge(extra_df, on = "index")
    return merged_df

# Call the extract() function and store it as the "merged_df" variable
merged_df = extract("sales.csv", "extra_data.parquet")

In [22]:
## display the head of the extracted dataset
merged_df.head()

Unnamed: 0,index,Store_ID,Date,Dept,Weekly_Sales,IsHoliday,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,Type,Size
0,0,1,2010-02-05,1,24924.5,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
1,1,1,2010-02-05,26,11737.12,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
2,2,1,2010-02-05,17,13223.76,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
3,3,1,2010-02-05,45,37.44,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0
4,4,1,2010-02-05,28,1085.29,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0


In [24]:
# Create the transform() function with one parameter: "raw_data"
def transform(raw_data):
    # perform data transformation on the proivded dataframe
    raw_data.fillna(
        value = {
            "Weekly_Sales" : raw_data["Weekly_Sales"].mean(),
            "CPI" : raw_data["CPI"].mean(),
            "Unemployment" : raw_data["Unemployment"].mean(),
            "Type" : raw_data["Type"].mean(),
            "Size" : raw_data["Size"].mean()
        }, inplace = True)
    
    ## Extract the month from the date column
    raw_data["Month"] = pd.to_datetime(raw_data["Date"]).dt.month

    ## Filtering rows 
    clean_data = raw_data.loc[raw_data["Weekly_Sales"] >10000, :]

    ## Dropping unnessary columns
    clean_data = clean_data.drop(["index", "Date","Temperature", "Fuel_Price", "MarkDown1", "MarkDown2", 
                                  "MarkDown3", "MarkDown4", "MarkDown5", "Type", "Size"], axis = 1)

    return clean_data
            

In [26]:
# Call the transform() function and pass the merged DataFrame
clean_data = transform(merged_df)
clean_data

Unnamed: 0,Store_ID,Dept,Weekly_Sales,IsHoliday,CPI,Unemployment,Month
0,1,1,24924.50,0,211.096358,8.106000,2.0
1,1,26,11737.12,0,211.096358,8.106000,2.0
2,1,17,13223.76,0,211.096358,8.106000,2.0
5,1,79,46729.77,0,211.096358,7.500052,2.0
6,1,55,21249.31,0,211.096358,7.500052,2.0
...,...,...,...,...,...,...,...
231513,24,40,45396.26,0,134.514367,8.212000,5.0
231515,24,93,41295.84,0,134.514367,8.212000,5.0
231516,24,9,24024.18,0,134.514367,8.212000,5.0
231517,24,8,49471.07,0,134.514367,8.212000,5.0


In [28]:
# Create the avg_weekly_sales_per_month function that takes in the cleaned data from the last step
def avg_weekly_sales_per_month(clean_data):
    # Isolate the month and weekly_sales column and calculate the average weekly sales per month
    clean_data = clean_data[["Month", "Weekly_Sales"]]
    agg_sales = clean_data.groupby(by = ["Month"], axis = 0).agg("mean").reset_index().round(2)
    agg_sales = agg_sales.rename(columns = {"Weekly_Sales" : "Avg_Sales"})
    return agg_sales

In [30]:
# Call the avg_weekly_sales_per_month() function and pass the cleaned DataFrame
agg_data = avg_weekly_sales_per_month(clean_data)
agg_data

Unnamed: 0,Month,Avg_Sales
0,1.0,33174.18
1,2.0,34333.33
2,3.0,33220.89
3,4.0,33392.37
4,5.0,33339.89
5,6.0,34582.47
6,7.0,33922.76
7,8.0,33644.79
8,9.0,33258.05
9,10.0,32736.99


In [32]:
# Create the load() function that takes in the cleaned DataFrame and the aggregated one with the paths where they are going to be stored
def load(full_data, full_data_file_path, agg_data, agg_data_file_path):
    # save the dataframes into their respective locations
    full_data_file_path = "clean_data.csv"
    agg_data_file_path = "agg_data.csv"
    full_data.to_csv(full_data_file_path, index = False)
    agg_data.to_csv(agg_data_file_path, index = False)

In [34]:
# Call the load() function and pass the cleaned and aggregated DataFrames with their paths
load(clean_data, "clean_data", agg_data, "agg_data")

In [36]:
# Create the validation() function with one parameter: file_path - to check whether the previous function was correctly executed
def validation(file_path):
    # function check for the files
    if os.path.exists(file_path):
        print(f"{file_path}")

    else:
        raise Exception

In [38]:
# Call the validation() function and pass first, the cleaned DataFrame path, and then the aggregated DataFrame path
validation("clean_data.csv")
validation("agg_data.csv")

clean_data.csv
agg_data.csv
