# Liquor Sales Data Analysis

## Objective

In this case study, you will learn the principles of hands-on data processing and analysis using
a detailed dataset on liquor sales spanning 2020 to 2025. This assignment aims to give you
practical experience in processing large datasets using tech stacks such as AWS RDS, HBase,
and Hadoop MapReduce. You will not only apply the techniques covered in the project modules,
but also gain insights into the complexities of business data analytics. At the end of this
assignment, you will have developed a robust understanding of the following:
• Data ingestion processes using cloud-based tools like AWS RDS and HBase.
• Data cleaning and preparation to ensure high-quality analysis.
• Applying MapReduce to solve real-world analytics problems.
• Creating actionable insights and recommendations based on the analysis.
This hands-on approach will bridge the gap between theoretical learning and practical implementation,
preparing you for real-world challenges in the field of data analytics.



# Business Objective:
The liquor industry is a significant contributor to the retail economy, particularly in regions where
sales are highly regulated and tracked. For liquor businesses, understanding sales trends is vital to
maintaining competitive advantage, meeting customer demand, and ensuring efficient operations.
As an analyst, you are tasked with analyzing detailed liquor sales data from 2020 to 2025 to uncover
patterns and insights that can drive strategic decision-making. The objective is to identify trends in
consumer preferences, regional sales performance, and product popularity, enabling stakeholders to
optimize inventory management, boost profitability, and enhance customer satisfaction.

## Hadoop and MapReduce Assignment Tasks:

<br> Data Cleaning Tasks: </br>
<br>Task 1:  Data Cleaning</br>

<br> Data Ingestion Tasks: </br>
<br>Task 2: Upload Liquor Sales Data to AWS RDS</br>
<br>Task 3: Ingest Data into HBase</br>


<r>Data Analysis Using MapReduce:</br>
<br>Task 4: Total Revenue by Store</br>
<br>Task 5: Top-Selling Liquor Categories</br>
<br>Task 6: County-Level Sales Analysis</br>
<br>Task 7: Store Performance Analysis</br>
<br>Task 8: Trends in Liquor Sales Over Time</br>
<br>Task 9: Vendor Performance</br>


**NOTE:** The marks given along with headings and sub-headings are cumulative marks for those particular headings/sub-headings.<br>

The actual marks for each task are specified within the tasks themselves.

For example, marks given with heading *2* or sub-heading *2.1* are the cumulative marks, for your reference only. <br>

The marks you will receive for completing tasks are given with the tasks.

Suppose the marks for two tasks are: 3 marks for 2.1.1 and 2 marks for 3.2.2, or
* 2.1.1 [3 marks]
* 3.2.2 [2 marks]

then, you will earn 3 marks for completing task 2.1.1 and 2 marks for completing task 3.2.2.

---

## Data Understanding
The dataset link can be accessed from the following [link](https://liquor-data.s3.us-east-1.amazonaws.com/Liquor_Sales.csv).
The dataset contains liquor sales data from multiple stores across various states, providing rich information for analysis. The fields are as follows:


| Variable              | Class            | Description                                                     |
|-----------------------|------------------|-----------------------------------------------------------------|
| Invoice/Item Number   | String/Integer   | Unique identifier for each sale.                                |
| Date                  | Date             | The date of the sale.                                           |
| Store Number          | Integer          | Unique identifier for the store.                                |
| Store Name            | String           | Name of the store.                                              |
| Address               | String           | Store address.                                                  |
| City                  | String           | City where the store is located.                                |
| Zip Code              | String/Integer   | ZIP code of the store location.                                 |
| Store Location        | String/GeoPoint  | GPS coordinates of the store.                                   |
| County Number         | Integer          | Unique identifier for the county.                               |
| County                | String           | Name of the county.                                             |
| Category              | Integer          | Liquor category code.                                           |
| Category Name         | String           | Name of the liquor category (e.g., Whiskey, Vodka).             |
| Vendor Number         | Integer          | Vendor's unique identifier.                                     |
| Vendor Name           | String           | Name of the vendor/distributor.                                 |
| Item Number           | Integer          | Product's unique identifier.                                    |
| Item Description      | String           | Description of the liquor product.                              |
| Pack                  | Integer          | Number of bottles in a pack.                                    |
| Bottle Volume (ml)    | Float/Integer    | Volume of a single bottle in milliliters.                       |
| State Bottle Cost     | Float            | Cost per bottle for the state.                                  |
| State Bottle Retail   | Float            | Retail price per bottle.                                        |
| Bottles Sold          | Integer          | Number of bottles sold.                                         |
| Sale (Dollars)        | Float            | Total revenue from the sale.                                    |
| Volume Sold (Liters)  | Float            | Volume sold in liters.                                          |
| Volume Sold (Gallons) | Float            | Volume sold in gallons.                                         |


### Import Libraries and Load Dataset

In [None]:
!pip install mrjob==0.7.4

In [None]:
# Import the libraries you will be using for analysis
import pandas as pd
import numpy as np
from mrjob.job import MRJob
import csv
from sqlalchemy import create_engine
import subprocess
import zipfile
import os
import requests

In [None]:
# Define the URL and file paths
url = 'https://kh3-ls-storage.s3.us-east-1.amazonaws.com/UPGrad/Liquor_Sales.zip'
zip_file_path = '/mnt/data/Liquor_Sales.zip'
extracted_folder_path = '/mnt/data/'
excel_file_name = 'Liquor_Sales.xlsx'
#cleaned_file_path = '/mnt/data/cleaned_liquor_sales.xlsx'

# Download the ZIP file
response = requests.get(url)
with open(zip_file_path, 'wb') as file:
    file.write(response.content)
print("ZIP file downloaded successfully.")

# Unzip the file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extracted_folder_path)
print("ZIP file extracted successfully.")

# Load the data from the extracted Excel file
excel_file_path = os.path.join(extracted_folder_path, excel_file_name)
data = pd.read_excel(excel_file_path, sheet_name='Sheet1')

## **1** Data Cleaning
<font color = red>[5 marks]</font> <br>

#### **1.1** Fixing Columns

In [None]:
data.columns = [col.strip().replace(' ', '_').replace('/', '_') for col in data.columns]

# Display the updated column names
print("Updated Column Names:")
print(data.columns)

# Ensure correct data types based on the data dictionary
data['Invoice_Item_Number'] = data['Invoice_Item_Number'].astype(str)
data['Date'] = pd.to_datetime(data['Date'], errors='coerce')
data['Store_Number'] = data['Store_Number'].astype(int)
data['Store_Name'] = data['Store_Name'].astype(str)
data['Address'] = data['Address'].astype(str)
data['City'] = data['City'].astype(str)
data['Zip_Code'] = data['Zip_Code'].astype(str)
data['Store_Location'] = data['Store_Location'].astype(str)
data['County_Number'] = data['County_Number'].astype(int)
data['County'] = data['County'].astype(str)
data['Category'] = data['Category'].astype(int)
data['Category_Name'] = data['Category_Name'].astype(str)
data['Vendor_Number'] = data['Vendor_Number'].astype(int)
data['Vendor_Name'] = data['Vendor_Name'].astype(str)
data['Item_Number'] = data['Item_Number'].astype(int)
data['Item_Description'] = data['Item_Description'].astype(str)
data['Pack'] = data['Pack'].astype(int)
data['Bottle_Volume_(ml)'] = data['Bottle_Volume_(ml)'].astype(float)
data['State_Bottle_Cost'] = data['State_Bottle_Cost'].astype(float)
data['State_Bottle_Retail'] = data['State_Bottle_Retail'].astype(float)
data['Bottles_Sold'] = data['Bottles_Sold'].astype(int)
data['Sale_(Dollars)'] = data['Sale_(Dollars)'].astype(float)
data['Volume_Sold_(Liters)'] = data['Volume_Sold_(Liters)'].astype(float)
data['Volume_Sold_(Gallons)'] = data['Volume_Sold_(Gallons)'].astype(float)

# Remove rows with invalid dates
data.dropna(subset=['Date'], inplace=True)

# Standardize categorical columns
data['City'] = data['City'].str.title()
data['County'] = data['County'].str.title()
data['Store_Name'] = data['Store_Name'].str.title()
data['Category_Name'] = data['Category_Name'].str.title()
data['Vendor_Name'] = data['Vendor_Name'].str.title()

#### **1.2** Fixing Rows

In [1]:
# Remove duplicate records
initial_duplicate_count = data.shape[0]
data.drop_duplicates(inplace=True)
final_duplicate_count = data.shape[0]

print(f"Rows before removing duplicates: {initial_duplicate_count}")
print(f"Rows after removing duplicates: {final_duplicate_count}")

NameError: name 'data' is not defined

#### **1.3** Handling Missing Values

In [2]:
# Check for any remaining missing values
# Remove rows with missing or incomplete values
initial_row_count = data.shape[0]
data.dropna(inplace=True)
final_row_count = data.shape[0]

print(f"Rows before removing missing values: {initial_row_count}")
print(f"Rows after removing missing values: {final_row_count}")
missing_values = data.isnull().sum()
print("Missing Values After Initial Cleaning:")
print(missing_values)

NameError: name 'data' is not defined

#### **1.4** Handling Outliers

In [None]:
# Identify numerical columns
numerical_columns = data.select_dtypes(include=['float64', 'int64']).columns

# Apply IQR method to each numerical column
for col in numerical_columns:
    Q1 = data[col].quantile(0.25)
    Q3 = data[col].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    initial_outlier_count = data.shape[0]
    data = data[(data[col] >= lower_bound) & (data[col] <= upper_bound)]
    final_outlier_count = data.shape[0]
    print(f"Column: {col}")
    print(f"Rows before removing outliers: {initial_outlier_count}")
    print(f"Rows after removing outliers: {final_outlier_count}")

## **2** Data Ingestion Tasks
<font color = red>[15 marks]</font> <br>

#### 2.1 Upload Liquor Sales Data to AWS RDS
<font color = red>[5 marks]</font> <br>

In [None]:
# Define AWS RDS connection details
rds_host = 'your_rds_host'
rds_port = 'your_rds_port'
rds_dbname = 'your_rds_dbname'
rds_user = 'your_rds_user'
rds_password = 'your_rds_password'

# Create a connection to AWS RDS
engine = create_engine(f'mysql+pymysql://{rds_user}:{rds_password}@{rds_host}:{rds_port}/{rds_dbname}')

# Upload data to AWS RDS
data.to_sql('liquor_sales', con=engine, if_exists='replace', index=False)
print("Data uploaded to AWS RDS successfully.")

####2.2 Ingest Data to HBase
<font color = red>[10 marks]</font> <br>

In [None]:
# Define HBase schema
hbase_table = 'liquor_sales'
column_family = 'cf'

# Use Apache Sqoop to transfer data from AWS RDS to HBase
sqoop_command = f"""
sqoop import \
--connect jdbc:mysql://{rds_host}:{rds_port}/{rds_dbname} \
--username {rds_user} \
--password {rds_password} \
--table liquor_sales \
--hbase-table {hbase_table} \
--column-family {column_family} \
--hbase-row-key Invoice_Item_Number
"""

# Execute the Sqoop command
subprocess.run(sqoop_command, shell=True, check=True)
print("Data ingested to HBase successfully.")

## **3** Analytics Queries using MapReduce
<font color = red>[60 marks]</font> <br>

#### 3.1 Total Revenue by Store
<font color = red>[10 marks]</font> <br>

In [None]:
class MRTotalRevenueByStore(MRJob):

    # This is the Mapper
    def mapper(self, _, line):
      fields = line.split(',')
      store_name = fields[3]
      sale_dollars = float(fields[21])
      yield store_name, sale_dollars

    # This is the Reducer
    def reducer(self, store_name, sales):
      yield store_name, sum(sales)

if __name__ == '__main__':
    MRTotalRevenueByStore.run()

#### 3.2 Top-Selling Categories
<font color = red>[10 marks]</font> <br>

In [None]:
class MRTopSellingLiquorCategories(MRJob):

    # This is the Mapper
    def mapper(self, _, line):
      fields = line.split(',')
      category_name = fields[11]
      bottles_sold = int(fields[20])
      sale_dollars = float(fields[21])
      yield category_name, (bottles_sold, sale_dollars)

    # This is the Reducer
    def reducer(self, category_name, values):
      total_bottles_sold = 0
      total_sales = 0
      for bottles_sold, sale_dollars in values:
          total_bottles_sold += bottles_sold
          total_sales += sale_dollars
      yield category_name, (total_bottles_sold, total_sales)

if __name__ == '__main__':
    MRTopSellingLiquorCategories.run()

#### 3.3 County-Level Sales Analysis
<font color = red>[10 marks]</font> <br>

In [None]:
class MRCountyLevelSalesAnalysis(MRJob):

    # This is the Mapper
    def mapper(self, _, line):
      fields = line.split(',')
      county = fields[9]
      sale_dollars = float(fields[21])
      volume_sold_liters = float(fields[22])
      volume_sold_gallons = float(fields[23])
      yield county, (sale_dollars, volume_sold_liters, volume_sold_gallons)

    # This is the Reducer
    def reducer(self, county, values):
      total_sales = 0
      total_volume_liters = 0
      total_volume_gallons = 0
      for sale_dollars, volume_sold_liters, volume_sold_gallons in values:
          total_sales += sale_dollars
          total_volume_liters += volume_sold_liters
          total_volume_gallons += volume_sold_gallons
      yield county, (total_sales, total_volume_liters, total_volume_gallons)


if __name__ == '__main__':
    MRCountyLevelSalesAnalysis.run()

#### 3.4 Store Performance Analysis
<font color = red>[10 marks]</font> <br>

In [None]:
class MRStorePerformanceAnalysis(MRJob):

    # This is the Mapper
    def mapper(self, _, line):
      fields = line.split(',')
      store_name = fields[3]
      sale_dollars = float(fields[21])
      volume_sold_liters = float(fields[22])
      yield store_name, (sale_dollars, volume_sold_liters)

    # This is the Reducer
    def reducer(self, store_name, values):
      total_sales = 0
      total_volume = 0
      count = 0
      for sale_dollars, volume_sold_liters in values:
          total_sales += sale_dollars
          total_volume += volume_sold_liters
          count += 1
      avg_sales_per_transaction = total_sales / count
      yield store_name, (total_sales, total_volume, avg_sales_per_transaction)


if __name__ == '__main__':
    MRStorePerformanceAnalysis.run()

#### 3.5 Trends in Liquor Sales Over Time
<font color = red>[10 marks]</font> <br>

In [None]:
class MRLiquorSalesTrends(MRJob):

    # This is the Mapper
    def mapper(self, _, line):
      fields = line.split(',')
      date = fields[1]
      month = date[:7]  # Extract YYYY-MM
      sale_dollars = float(fields[21])
      volume_sold_liters = float(fields[22])
      yield month, (sale_dollars, volume_sold_liters)

    # This is the Reducer
    def reducer(self, month, values):
      total_sales = 0
      total_volume = 0
      for sale_dollars, volume_sold_liters in values:
          total_sales += sale_dollars
          total_volume += volume_sold_liters
      yield month, (total_sales, total_volume)


if __name__ == '__main__':
    MRLiquorSalesTrends.run()

#### 3.6 Vendor Performance
<font color = red>[10 marks]</font> <br>

In [None]:
class MRVendorPerformance(MRJob):

    # This is the Mapper
    def mapper(self, _, line):
      fields = line.split(',')
      vendor_name = fields[13]
      sale_dollars = float(fields[21])
      volume_sold_liters = float(fields[22])
      yield vendor_name, (sale_dollars, volume_sold_liters)

    # This is the Reducer
    def reducer(self, vendor_name, values):
      total_sales = 0
      total_volume = 0
      for sale_dollars, volume_sold_liters in values:
          total_sales += sale_dollars
          total_volume += volume_sold_liters
      yield vendor_name, (total_sales, total_volume)


if __name__ == '__main__':
    MRVendorPerformance.run()