![walmartecomm](walmartecomm.jpg)

Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas. 

In this project, you have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the `grocery_sales` table in `PostgreSQL` database with the following features:

# `grocery_sales`
- `"index"` - unique ID of the row
- `"Store_ID"` - the store number
- `"Date"` - the week of sales
- `"Weekly_Sales"` - sales for the given store

Also, you have the `extra_data.parquet` file that contains complementary data:

# `extra_data.parquet`
- `"IsHoliday"` - Whether the week contains a public holiday - 1 if yes, 0 if no.
- `"Temperature"` - Temperature on the day of sale
- `"Fuel_Price"` - Cost of fuel in the region
- `"CPI"` – Prevailing consumer price index
- `"Unemployment"` - The prevailing unemployment rate
- `"MarkDown1"`, `"MarkDown2"`, `"MarkDown3"`, `"MarkDown4"` - number of promotional markdowns
- `"Dept"` - Department Number in each store
- `"Size"` - size of the store
- `"Type"` - type of the store (depends on `Size` column)

You will need to merge those files and perform some data manipulations. The transformed DataFrame can then be stored as the `clean_data` variable containing the following columns:
- `"Store_ID"`
- `"Month"`
- `"Dept"`
- `"IsHoliday"`
- `"Weekly_Sales"`
- `"CPI"`
- "`"Unemployment"`"

After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis as the `agg_data` variable that should look like:

|  Month | Weekly_Sales  | 
|---|---|
| 1.0  |  33174.178494 |
|  2.0 |  34333.326579 |
|  ... | ...  |  

Finally, you should save the `clean_data` and `agg_data` as the csv files.

It is recommended to use `pandas` for this project. 

In [72]:
import pandas as pd
import os

# Extract function is already implemented for you 
def extract(store_data, extra_data):
    extra_df = pd.read_parquet(extra_data)
    merged_df = store_data.merge(extra_df, on = "index")
    return merged_df

# Call the extract() function and store it as the "merged_df" variable
grocery_sales = pd.read_csv("grocery_sales.csv")
merged_df = extract(grocery_sales, "extra_data.parquet")
is_na_df = merged_df[merged_df.isna().any(axis=1)]
merged_df.info()
is_na_df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 231522 entries, 0 to 231521
Data columns (total 18 columns):
 #   Column        Non-Null Count   Dtype  
---  ------        --------------   -----  
 0   Unnamed: 0    231522 non-null  int64  
 1   index         231522 non-null  int64  
 2   Store_ID      231522 non-null  int64  
 3   Date          231483 non-null  object 
 4   Dept          231522 non-null  int64  
 5   Weekly_Sales  231484 non-null  float64
 6   IsHoliday     231522 non-null  int64  
 7   Temperature   231522 non-null  float64
 8   Fuel_Price    231522 non-null  float64
 9   MarkDown1     231522 non-null  float64
 10  MarkDown2     231522 non-null  float64
 11  MarkDown3     231522 non-null  float64
 12  MarkDown4     231521 non-null  float64
 13  MarkDown5     231521 non-null  float64
 14  CPI           231475 non-null  float64
 15  Unemployment  231485 non-null  float64
 16  Type          231521 non-null  float64
 17  Size          231521 non-null  float64
dtypes: f

Unnamed: 0.1,Unnamed: 0,index,Store_ID,Date,Dept,Weekly_Sales,IsHoliday,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,Type,Size
3,3,3,1,2010-02-05,45,37.44,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0
4,4,4,1,2010-02-05,28,1085.29,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0
5,5,5,1,2010-02-05,79,46729.77,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0
6,6,6,1,2010-02-05,55,21249.31,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0
7,7,7,1,2010-02-05,5,32229.38,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0


In [73]:
# Create the transform() function with one parameter: "raw_data"
def transform(raw_data: pd.DataFrame):
  # Write your code here

  values = {
        "Weekly_Sales": 0.00,
        "CPI": 0.00,
        "Unemployment": 0.00,
    }

  raw_data.fillna(value=values, inplace=True)

  raw_data['Month'] = pd.to_datetime(raw_data['Date']).dt.month

  raw_data = raw_data[raw_data['Weekly_Sales'] > 10000]

  col_to_keep = [
    'Month',
    'Weekly_Sales'
  ]

  raw_data = raw_data.loc[:, col_to_keep]
  return raw_data
  pass

In [74]:
# Call the transform() function and pass the merged DataFrame
clean_data = transform(merged_df)
clean_data.head()

Unnamed: 0,Month,Weekly_Sales
0,2.0,24924.5
1,2.0,11737.12
2,2.0,13223.76
5,2.0,46729.77
6,2.0,21249.31


In [75]:
# Create the avg_weekly_sales_per_month function that takes in the cleaned data from the last step
def avg_weekly_sales_per_month(clean_data):
    # Write your code here

    avg_sales_df = (clean_data
           .groupby('Month')['Weekly_Sales']
           .agg('mean')
           .reset_index()
           .round(2)
          )
    return avg_sales_df
    pass

In [76]:
# Call the avg_weekly_sales_per_month() function and pass the cleaned DataFrame
avg_weekly_sales_df = avg_weekly_sales_per_month(clean_data)
avg_weekly_sales_df.head()

Unnamed: 0,Month,Weekly_Sales
0,1.0,33174.18
1,2.0,34342.44
2,3.0,33227.31
3,4.0,33414.78
4,5.0,33339.89


In [None]:
# Create the load() function that takes in the cleaned DataFrame and the aggregated one with the paths where they are going to be stored
def load(full_data, full_data_file_path, agg_data, agg_data_file_path):
    # Write your code here
    pass

In [None]:
# Call the load() function and pass the cleaned and aggregated DataFrames with their paths    

In [None]:
# Create the validation() function with one parameter: file_path - to check whether the previous function was correctly executed
def validation(file_path):
    # Write your code here
    pass

In [None]:
# Call the validation() function and pass first, the cleaned DataFrame path, and then the aggregated DataFrame path
