# Liquor Sales Data Analysis

## Objective

In this case study, you will learn the principles of hands-on data processing and analysis using
a detailed dataset on liquor sales spanning 2020 to 2025. This assignment aims to give you
practical experience in processing large datasets using tech stacks such as AWS RDS, HBase,
and Hadoop MapReduce. You will not only apply the techniques covered in the project modules,
but also gain insights into the complexities of business data analytics. At the end of this
assignment, you will have developed a robust understanding of the following:
• Data ingestion processes using cloud-based tools like AWS RDS and HBase.
• Data cleaning and preparation to ensure high-quality analysis.
• Applying MapReduce to solve real-world analytics problems.
• Creating actionable insights and recommendations based on the analysis.
This hands-on approach will bridge the gap between theoretical learning and practical implementation,
preparing you for real-world challenges in the field of data analytics.



# Business Objective:
The liquor industry is a significant contributor to the retail economy, particularly in regions where
sales are highly regulated and tracked. For liquor businesses, understanding sales trends is vital to
maintaining competitive advantage, meeting customer demand, and ensuring efficient operations.
As an analyst, you are tasked with analyzing detailed liquor sales data from 2020 to 2025 to uncover
patterns and insights that can drive strategic decision-making. The objective is to identify trends in
consumer preferences, regional sales performance, and product popularity, enabling stakeholders to
optimize inventory management, boost profitability, and enhance customer satisfaction.

## Hadoop and MapReduce Assignment Tasks:

<br> Data Cleaning Tasks: </br>
<br>Task 1:  Data Cleaning</br>

<br> Data Ingestion Tasks: </br>
<br>Task 2: Upload Liquor Sales Data to AWS RDS</br>
<br>Task 3: Ingest Data into HBase</br>


<r>Data Analysis Using MapReduce:</br>
<br>Task 4: Total Revenue by Store</br>
<br>Task 5: Top-Selling Liquor Categories</br>
<br>Task 6: County-Level Sales Analysis</br>
<br>Task 7: Store Performance Analysis</br>
<br>Task 8: Trends in Liquor Sales Over Time</br>
<br>Task 9: Vendor Performance</br>


**NOTE:** The marks given along with headings and sub-headings are cumulative marks for those particular headings/sub-headings.<br>

The actual marks for each task are specified within the tasks themselves.

For example, marks given with heading *2* or sub-heading *2.1* are the cumulative marks, for your reference only. <br>

The marks you will receive for completing tasks are given with the tasks.

Suppose the marks for two tasks are: 3 marks for 2.1.1 and 2 marks for 3.2.2, or
* 2.1.1 [3 marks]
* 3.2.2 [2 marks]

then, you will earn 3 marks for completing task 2.1.1 and 2 marks for completing task 3.2.2.

---

## Data Understanding
The dataset link can be accessed from the following [link](https://liquor-data.s3.us-east-1.amazonaws.com/Liquor_Sales.csv).
The dataset contains liquor sales data from multiple stores across various states, providing rich information for analysis. The fields are as follows:


| Variable              | Class            | Description                                                     |
|-----------------------|------------------|-----------------------------------------------------------------|
| Invoice/Item Number   | String/Integer   | Unique identifier for each sale.                                |
| Date                  | Date             | The date of the sale.                                           |
| Store Number          | Integer          | Unique identifier for the store.                                |
| Store Name            | String           | Name of the store.                                              |
| Address               | String           | Store address.                                                  |
| City                  | String           | City where the store is located.                                |
| Zip Code              | String/Integer   | ZIP code of the store location.                                 |
| Store Location        | String/GeoPoint  | GPS coordinates of the store.                                   |
| County Number         | Integer          | Unique identifier for the county.                               |
| County                | String           | Name of the county.                                             |
| Category              | Integer          | Liquor category code.                                           |
| Category Name         | String           | Name of the liquor category (e.g., Whiskey, Vodka).             |
| Vendor Number         | Integer          | Vendor's unique identifier.                                     |
| Vendor Name           | String           | Name of the vendor/distributor.                                 |
| Item Number           | Integer          | Product's unique identifier.                                    |
| Item Description      | String           | Description of the liquor product.                              |
| Pack                  | Integer          | Number of bottles in a pack.                                    |
| Bottle Volume (ml)    | Float/Integer    | Volume of a single bottle in milliliters.                       |
| State Bottle Cost     | Float            | Cost per bottle for the state.                                  |
| State Bottle Retail   | Float            | Retail price per bottle.                                        |
| Bottles Sold          | Integer          | Number of bottles sold.                                         |
| Sale (Dollars)        | Float            | Total revenue from the sale.                                    |
| Volume Sold (Liters)  | Float            | Volume sold in liters.                                          |
| Volume Sold (Gallons) | Float            | Volume sold in gallons.                                         |


### Import Libraries and Load Dataset

In [1]:
!pip install mrjob==0.7.4

Collecting mrjob==0.7.4
  Downloading mrjob-0.7.4-py2.py3-none-any.whl.metadata (7.3 kB)
Downloading mrjob-0.7.4-py2.py3-none-any.whl (439 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/439.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m430.1/439.6 kB[0m [31m17.4 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m439.6/439.6 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mrjob
Successfully installed mrjob-0.7.4


In [2]:
# Import the libraries you will be using for analysis
import pandas as pd
import numpy as np
from mrjob.job import MRJob
import csv

In [3]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [7]:
# Load chunk of data into df
file_path = '/content/drive/MyDrive/Liquor_Sales.csv'
chunk_size = 10000

# Load only the first chunk
chunk_iter = pd.read_csv(file_path, chunksize=chunk_size)
df = next(chunk_iter)  # get the first 10,000 rows

# Preview
print(f"Loaded rows: {len(df)}")
df.head()

Loaded rows: 10000


Unnamed: 0,Invoice/Item Number,Date,Store Number,Store Name,Address,City,Zip Code,Store Location,County Number,County,...,Item Number,Item Description,Pack,Bottle Volume (ml),State Bottle Cost,State Bottle Retail,Bottles Sold,Sale (Dollars),Volume Sold (Liters),Volume Sold (Gallons)
0,S24127700024,02/19/2015,3678,"Smoke Shop, The",1918 SE 14TH ST,DES MOINES,50320,POINT (-93.597011 41.570844),77.0,Polk,...,41783,Uv Blue Raspberry Vodka Mini,6,500,4.89,7.34,2,14.68,1.0,0.26
1,S15066200002,10/10/2013,2633,Hy-Vee #3 / BDI / Des Moines,3221 SE 14TH ST,DES MOINES,50320,POINT (-93.596754 41.554101),77.0,Polk,...,904969,Sabe Premiom Sake Double Barrel,6,750,14.99,22.49,6,134.94,4.5,1.19
2,S19323500030,06/03/2014,2607,Hy-Vee Wine and Spirits / Shenandoah,520 SO FREMONT,SHENANDOAH,51601,POINT (-95.385111 40.761736),73.0,Page,...,45277,Paramount White Rum,12,1000,4.34,6.51,12,78.12,12.0,3.17
3,S23334500013,01/06/2015,4810,Kum & Go #518 / Ankeny,3603 NE OTTERVIEW CIRCLE,ANKENY,50021,POINT (-93.572458 41.760989),77.0,Polk,...,43121,Bacardi Superior Rum Mini,12,500,5.54,8.31,1,8.31,0.5,0.13
4,S09742200010,12/27/2012,4025,Karam Kaur Khasriya Llc,702 13TH ST,BELLE PLAINE,52208,POINT (-92.277759 41.897052),6.0,Benton,...,11298,Crown Royal Canadian Whisky,6,1750,31.0,46.49,2,92.98,3.5,0.92


## **1** Data Cleaning
<font color = red>[5 marks]</font> <br>

#### **1.1** Fixing Columns

In [8]:
# Rename columns safely
df.rename(columns={
    "Invoice/Item Number": "invoice_item_number",
    "Store Number": "store_number",
    "Store Name": "store_name",
    "Zip Code": "zip_code",
    "Store Location": "store_location",
    "County Number": "county_number",
    "Category Name": "category_name",
    "Vendor Number": "vendor_number",
    "Vendor Name": "vendor_name",
    "Item Number": "item_number",
    "Item Description": "item_description",
    "Bottle Volume (ml)": "bottle_volume_ml",
    "State Bottle Cost": "state_bottle_cost",
    "State Bottle Retail": "state_bottle_retail",
    "Bottles Sold": "bottles_sold",
    "Sale (Dollars)": "sale_dollars",
    "Volume Sold (Liters)": "volume_sold_liters",
    "Volume Sold (Gallons)": "volume_sold_gallons"
}, inplace=True)

In [9]:
df.columns

Index(['invoice_item_number', 'Date', 'store_number', 'store_name', 'Address',
       'City', 'zip_code', 'store_location', 'county_number', 'County',
       'Category', 'category_name', 'vendor_number', 'vendor_name',
       'item_number', 'item_description', 'Pack', 'bottle_volume_ml',
       'state_bottle_cost', 'state_bottle_retail', 'bottles_sold',
       'sale_dollars', 'volume_sold_liters', 'volume_sold_gallons'],
      dtype='object')

In [10]:
# --- Fix column names ---
# Strip whitespace, replace spaces with underscores, make lowercase
df.columns = (
    df.columns.str.strip()
                      .str.lower()
                      .str.replace(" ", "_")
                      .str.replace(r"[()]", "", regex=True)
)

In [11]:
df.columns

Index(['invoice_item_number', 'date', 'store_number', 'store_name', 'address',
       'city', 'zip_code', 'store_location', 'county_number', 'county',
       'category', 'category_name', 'vendor_number', 'vendor_name',
       'item_number', 'item_description', 'pack', 'bottle_volume_ml',
       'state_bottle_cost', 'state_bottle_retail', 'bottles_sold',
       'sale_dollars', 'volume_sold_liters', 'volume_sold_gallons'],
      dtype='object')

In [12]:
# Fix column data types
df['date'] = pd.to_datetime(df['date'], errors='coerce')
if 'zip_code' in df.columns:
    df['zip_code'] = df['zip_code'].astype(str)

In [25]:
columns_to_convert = ['county_number', 'category', 'vendor_number', 'item_number', 'year', 'month', 'day', 'day_of_week']
for col in columns_to_convert:
    if col in df.columns:
        df[col] = df[col].astype(str)


#### **1.2** Fixing Rows

In [13]:
# Drop duplicate rows
df.drop_duplicates(inplace=True)

# Clean text fields
df['vendor_name'] = df['vendor_name'].astype(str).str.strip().str.lower().str.title()
df['category_name'] = df['category_name'].astype(str).str.strip().str.lower().str.title()


#### **1.3** Handling Missing Values

In [14]:
# Drop rows with nulls in essential columns
df.dropna(subset=['date', 'store_number', 'vendor_name', 'category_name'], inplace=True)

# Fill nulls in non-critical numeric fields with default values
df['bottles_sold'] = df['bottles_sold'].fillna(0)

#### **1.4** Feature Engineering

In [15]:
# Extract features from date
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day'] = df['date'].dt.day
df['day_of_week'] = df['date'].dt.day_name()

In [16]:
# Extract longitude and latitude from 'store_location' column
import re

def extract_coordinates(point_str):
    match = re.match(r"POINT\s+\(([-\d.]+)\s+([-\d.]+)\)", str(point_str))
    if match:
        return float(match.group(1)), float(match.group(2))
    return None, None

df[['longitude', 'latitude']] = df['store_location'].apply(lambda x: pd.Series(extract_coordinates(x)))

# 3. Drop the old 'store_location' column
df.drop(columns=['store_location'], inplace=True)

In [17]:
df.columns

Index(['invoice_item_number', 'date', 'store_number', 'store_name', 'address',
       'city', 'zip_code', 'county_number', 'county', 'category',
       'category_name', 'vendor_number', 'vendor_name', 'item_number',
       'item_description', 'pack', 'bottle_volume_ml', 'state_bottle_cost',
       'state_bottle_retail', 'bottles_sold', 'sale_dollars',
       'volume_sold_liters', 'volume_sold_gallons', 'year', 'month', 'day',
       'day_of_week', 'longitude', 'latitude'],
      dtype='object')

#### **1.4** Handling Outliers

In [18]:
from scipy.stats import zscore

# Step 1: Identify all numeric columns
numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns.tolist()
print("Numeric columns for outlier check:", numeric_cols)

# Step 2: Drop rows with outliers (|z-score| > 3) for each numeric column
for col in numeric_cols:
    # Drop NA temporarily to calculate z-score, then filter
    col_zscore = zscore(df[col].dropna())
    df = df.loc[abs(zscore(df[col].fillna(df[col].mean()))) <= 3]

# Step 3: Report new shape
print("Outlier removal completed.")
print("Shape after removing outliers:", df.shape)

Numeric columns for outlier check: ['store_number', 'county_number', 'category', 'vendor_number', 'item_number', 'pack', 'bottle_volume_ml', 'state_bottle_cost', 'state_bottle_retail', 'bottles_sold', 'sale_dollars', 'volume_sold_liters', 'volume_sold_gallons', 'longitude', 'latitude']
Outlier removal completed.
Shape after removing outliers: (8873, 29)


In [20]:
# Clean column names
df.columns = df.columns.str.strip().str.replace('\n', '', regex=False)

# Define numeric columns (update this list as needed)
numeric_columns = [
    'store_number', 'zip_code', 'county_number', 'category', 'vendor_number',
    'item_number', 'pack', 'bottle_volume_ml', 'state_bottle_cost',
    'state_bottle_retail', 'bottles_sold', 'sale_dollars',
    'volume_sold_liters', 'volume_sold_gallons', 'year', 'month', 'day',
    'longitude', 'latitude'
]

# Convert to numeric where applicable
for col in numeric_columns:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce')

# Convert date column
if 'date' in df.columns:
    df['date'] = pd.to_datetime(df['date'], errors='coerce')

# Drop rows with all NaNs
df.dropna(how='all', inplace=True)

In [26]:
df.to_csv('/content/drive/MyDrive/Liquor_Sales_df_new1.csv', index=False)
print("Processed dataset saved.")


Processed dataset saved.


## **2** Data Ingestion Tasks
<font color = red>[15 marks]</font> <br>

#### 2.1 Upload Liquor Sales Data to AWS RDS
<font color = red>[5 marks]</font> <br>

### Step 1: Set Up EC2 Instance + SSH Access

  1. **Create EC2 Instance**
    - AMI: Amazon Linux 2023
    - Instance type: t2.micro (free tier)
    - Key pair: Create/download `my_key_val_tm.pem`
    - Security group:
      - Inbound: Port 22 (SSH) open to `0.0.0.0/0` (for development)
    - Enable **public IPv4** assignment

  2. **Set Permissions & SSH into EC2 (Mac Terminal)**

  ```bash
  chmod 400 ~/Downloads/my_key_val_tm.pem
  ssh -i ~/Downloads/my_key_val_tm.pem ec2-user@<public-ip-or-dns>
  ```

### Step 2: Launch EMR Cluster

  1. **Go to AWS EMR → Create cluster**

    - Name: LiquorSalesCluster
    - Software: Hadoop, Hive, Spark (choose minimal)
    - EC2: Use same key pair my_key_val_tm.pem
    - Choose same VPC, subnet, and security group as EC2
    - Enable Auto-termination if needed

  2. **After cluster is running, SSH using:**
  ```bash
  ssh -i ~/Downloads/my_key_val_tm.pem hadoop@<master-node-public-dns>
  ```


### Step 3: Create and Configure RDS (MySQL)
  1. **Go to RDS → Create Database**

    - Engine: MySQL 8.0
    - Template: Free tier
    - DB Identifier: liquor-db
    - Master username: admin
    - Set a strong password

  2. **Connectivity:**

    - Enable public access
    - Add inbound rule for port 3306 in security group: 0.0.0.0/0 (for testing)
    - Save the endpoint for connection


### Step 4: Connect via MySQL Workbench

  1. **Open Workbench → Create New Connection**
    - **Hostname**: `<your-rds-endpoint>`  
    - **Username**: `admin`  
    - **Password**: store it securely  
    - **Port**: `3306`

  2. **Test the Connection**
    - Ensure RDS instance is in "Available" state
    - Public access must be enabled
    - Inbound rule for port 3306 (MySQL) must be open to `0.0.0.0/0` for testing



### Step 5: Create SQL Table Schema in Workbench

  1. **Create a database** (if not done already)
    ```sql
    CREATE DATABASE liquor_sales;
    USE liquor_sales;


### Step 7: Import CSV Data into RDS via MySQL Workbench

  1. **Enable `local_infile` in MySQL RDS**
    - Open SQL Editor in Workbench and run:
      ```sql
      SHOW VARIABLES LIKE 'local_infile';
      SET GLOBAL local_infile = 1;
      ```

  2. **Enable “Allow LOAD DATA LOCAL INFILE” in Workbench**
    - Go to:
      - **Workbench** → **Preferences**
      - Navigate to **SQL Editor**
      - Check `Allow LOAD DATA LOCAL INFILE`
      - Reconnect to the RDS connection


  3. **Load CSV into RDS MySQL Table via Workbench**

    ```sql
    LOAD DATA LOCAL INFILE '/Users/tiyasamukherjee/Downloads/Liquor_Sales_cleaned_df.csv'
    INTO TABLE liquor_sales.liquor_sales_df
    FIELDS TERMINATED BY ','
    ENCLOSED BY '"'
    LINES TERMINATED BY '\r\n'
    IGNORE 1 ROWS;
    

  4. **Verify the Data**
    - Run:
      ```sql
      SELECT COUNT(*) FROM liquor_sales_df;
      SELECT * FROM liquor_sales_df LIMIT 10;
      ```



  

####2.2 Ingest Data to HBase
<font color = red>[10 marks]</font> <br>

## Prerequisites
- RDS MySQL DB (Public access enabled or in same VPC + SG as EMR)
- EMR cluster with HBase & Sqoop
- SSH access to EMR Master node (`.pem` key file)


## 1. SSH into EMR Master Node

```bash
ssh -i ~/Downloads/your-key.pem hadoop@<EMR-MASTER-PUBLIC-DNS>
```


# 2. Download JDBC driver
wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-5.1.49.tar.gz

# 3. Extract the archive
tar -xvzf mysql-connector-java-5.1.49.tar.gz

# 4. Copy JAR file to Sqoop's lib directory
sudo cp mysql-connector-java-5.1.49/mysql-connector-java-5.1.49-bin.jar /usr/lib/sqoop/lib/

# 5. Create HBase Table Manually
```bash
  hbase shell

  # Inside shell
  create 'liquor_sales', 'cf'
  list
  exit
```

# 6. Run Sqoop Command to Import Data into HBase
```bash
sqoop import \
  --connect jdbc:mysql://<RDS-ENDPOINT>:3306/liquor_sales \
  --username admin \
  --password 'my_password_here' \
  --table Liquor_Sales_df \
  --hbase-table liquor_sales_hbase \
  --column-family info \
  --hbase-create-table \
  --split-by invoice_item_number \
  --num-mappers 1
```


## **3** Analytics Queries using MapReduce
<font color = red>[60 marks]</font> <br>

In [10]:
# Load cleaned dataframe
clean_df = pd.read_csv('/content/drive/MyDrive/Liquor_Sales_df_new1.csv')

# Save as tab-separated file for MRJob (to avoid comma issues)
clean_df.to_csv('/content/liquor_sales.txt', sep='\t', index=False)

In [5]:
clean_df.head()

Unnamed: 0,invoice_item_number,date,store_number,store_name,address,city,zip_code,county_number,county,category,...,bottles_sold,sale_dollars,volume_sold_liters,volume_sold_gallons,year,month,day,day_of_week,longitude,latitude
0,S24127700024,2015-02-19,3678,"Smoke Shop, The",1918 SE 14TH ST,DES MOINES,50320.0,77.0,Polk,1031200.0,...,2,14.68,1.0,0.26,2015,2,19,Thursday,-93.597011,41.570844
1,S19323500030,2014-06-03,2607,Hy-Vee Wine and Spirits / Shenandoah,520 SO FREMONT,SHENANDOAH,51601.0,73.0,Page,1062200.0,...,12,78.12,12.0,3.17,2014,6,3,Tuesday,-95.385111,40.761736
2,S23334500013,2015-01-06,4810,Kum & Go #518 / Ankeny,3603 NE OTTERVIEW CIRCLE,ANKENY,50021.0,77.0,Polk,1062200.0,...,1,8.31,0.5,0.13,2015,1,6,Tuesday,-93.572458,41.760989
3,S15034600007,2013-10-09,4583,Kum & Go #5100 / Manson,208 MAIN ST,MANSON,50563.0,13.0,Calhoun,1081200.0,...,6,112.5,4.5,1.19,2013,10,9,Wednesday,-94.534532,42.517855
4,S25185100053,2015-04-21,5080,C's Liquor Store,719 2ND AVE W,SPENCER,51301.0,21.0,Clay,1081390.0,...,1,7.44,0.5,0.13,2015,4,21,Tuesday,-95.147741,43.14521


In [6]:
clean_df.columns

Index(['invoice_item_number', 'date', 'store_number', 'store_name', 'address',
       'city', 'zip_code', 'county_number', 'county', 'category',
       'category_name', 'vendor_number', 'vendor_name', 'item_number',
       'item_description', 'pack', 'bottle_volume_ml', 'state_bottle_cost',
       'state_bottle_retail', 'bottles_sold', 'sale_dollars',
       'volume_sold_liters', 'volume_sold_gallons', 'year', 'month', 'day',
       'day_of_week', 'longitude', 'latitude'],
      dtype='object')

#### 3.1 Total Revenue by Store
<font color = red>[10 marks]</font> <br>

In [7]:
%%file MRTotalRevenueByStore.py
from mrjob.job import MRJob
import csv
from io import StringIO

class MRTotalRevenueByStore(MRJob):

    def mapper(self, _, line):
        try:
            fields = list(csv.reader([line], delimiter='\t'))[0]

            # Skip header row
            if fields[0] == "invoice_item_number":
                return

            store_number = fields[2]  # 'store_number'
            sale_dollars = float(fields[20])  # 'sale_dollars'

            yield store_number, sale_dollars

        except Exception:
            pass  # Ignore bad lines

    def reducer(self, store_number, sales):
        yield store_number, round(sum(sales), 2)

if __name__ == '__main__':
    MRTotalRevenueByStore.run()


Writing MRTotalRevenueByStore.py


In [22]:
!python MRTotalRevenueByStore.py /content/liquor_sales.txt --quiet > total_revenue_output.txt
!head -20 total_revenue_output.txt



"4200"	284.95
"4201"	347.67
"4202"	660.34
"4203"	737.84
"4204"	287.68
"4205"	899.22
"4209"	1733.64
"4214"	1386.33
"4219"	661.34
"4221"	226.56
"4222"	334.56
"4223"	323.19
"4225"	133.86
"4226"	9.76
"4228"	529.56
"4229"	234.16
"4230"	70.57
"4231"	492.96
"4232"	327.78
"4233"	987.21


#### 3.2 Top-Selling Categories
<font color = red>[10 marks]</font> <br>

In [12]:
%%file MRTopSellingLiquorCategories.py

from mrjob.job import MRJob
import csv

class MRTopSellingLiquorCategories(MRJob):

    def mapper(self, _, line):
        # Use csv reader to handle tab-separated fields
        fields = list(csv.reader([line], delimiter='\t'))[0]

        try:
            category_name = fields[10]
            bottles_sold = int(fields[19])
            yield category_name, bottles_sold
        except:
            pass  # Skip header or malformed lines

    def reducer(self, category_name, bottles_iter):
        yield category_name, sum(bottles_iter)

if __name__ == '__main__':
    MRTopSellingLiquorCategories.run()


Writing MRTopSellingLiquorCategories.py


In [23]:
!python MRTopSellingLiquorCategories.py /content/liquor_sales.txt --quiet > top_categories_output.txt
!cat top_categories_output.txt | head -20


"Tequila"	2157
"Triple Sec"	263
"Tropical Fruit Schnapps"	24
"Vodka 80 Proof"	10791
"Vodka Flavored"	3090
"Watermelon Schnapps"	148
"Whiskey Liqueur"	2224
"White Creme De Cacao"	36
"White Rum"	24
"100 Proof Vodka"	709
"American Alcohol"	174
"American Amaretto"	320
"American Cocktails"	1730
"American Cordials & Liqueur"	12
"American Dry Gins"	1801
"American Flavored Vodka"	22
"American Grape Brandies"	2192
"American Schnapps"	14
"American Sloe Gins"	47
"American Vodkas"	72


#### 3.3 County-Level Sales Analysis
<font color = red>[10 marks]</font> <br>

In [14]:
%%file MRCountyLevelSalesAnalysis.py

from mrjob.job import MRJob
import csv

class MRCountyLevelSalesAnalysis(MRJob):

    def mapper(self, _, line):
        fields = list(csv.reader([line], delimiter='\t'))[0]
        try:
            county = fields[8]
            sale_dollars = float(fields[20])
            yield county, sale_dollars
        except:
            pass  # Skip header or malformed rows

    def reducer(self, county, sales_iter):
        yield county, round(sum(sales_iter), 2)

if __name__ == '__main__':
    MRCountyLevelSalesAnalysis.run()


Writing MRCountyLevelSalesAnalysis.py


In [24]:
!python MRCountyLevelSalesAnalysis.py /content/liquor_sales.txt --quiet > county_sales_output.txt
!cat county_sales_output.txt | head -20


"Poweshiek"	5874.89
"Ringgold"	396.93
"SCOTT"	118.68
"SIOUX"	73.22
"STORY"	807.0
"Sac"	2461.06
"Scott"	47299.09
"Shelby"	1733.02
"Sioux"	2968.62
"Story"	23986.87
"Tama"	2182.61
"Taylor"	313.01
"Union"	3123.26
"Van Buren"	488.88
"WAPELLO"	94.56
"Wapello"	8395.99
"Warren"	6242.89
"Washington"	3501.63
"Wayne"	782.54
"Webster"	10018.31


#### 3.4 Store Performance Analysis
<font color = red>[10 marks]</font> <br>

In [16]:
%%file MRStorePerformanceAnalysis.py

from mrjob.job import MRJob
import csv

class MRStorePerformanceAnalysis(MRJob):

    def mapper(self, _, line):
        fields = list(csv.reader([line], delimiter='\t'))[0]
        try:
            store_number = fields[2]
            sale_dollars = float(fields[20])
            bottles_sold = int(fields[19])
            yield store_number, (sale_dollars, bottles_sold)
        except:
            pass  # Skip headers or invalid rows

    def reducer(self, store_number, values):
        total_sales = 0
        total_bottles = 0
        for sale, bottles in values:
            total_sales += sale
            total_bottles += bottles
        yield store_number, {'Total_Sales': round(total_sales, 2), 'Total_Bottles': total_bottles}

if __name__ == '__main__':
    MRStorePerformanceAnalysis.run()


Writing MRStorePerformanceAnalysis.py


In [25]:
!python MRStorePerformanceAnalysis.py /content/liquor_sales.txt --quiet > store_perf_output.txt
!cat store_perf_output.txt | head -20


"4200"	{"Total_Sales": 284.95, "Total_Bottles": 27}
"4201"	{"Total_Sales": 347.67, "Total_Bottles": 33}
"4202"	{"Total_Sales": 660.34, "Total_Bottles": 53}
"4203"	{"Total_Sales": 737.84, "Total_Bottles": 62}
"4204"	{"Total_Sales": 287.68, "Total_Bottles": 16}
"4205"	{"Total_Sales": 899.22, "Total_Bottles": 54}
"4209"	{"Total_Sales": 1733.64, "Total_Bottles": 168}
"4214"	{"Total_Sales": 1386.33, "Total_Bottles": 109}
"4219"	{"Total_Sales": 661.34, "Total_Bottles": 51}
"4221"	{"Total_Sales": 226.56, "Total_Bottles": 24}
"4222"	{"Total_Sales": 334.56, "Total_Bottles": 48}
"4223"	{"Total_Sales": 323.19, "Total_Bottles": 32}
"4225"	{"Total_Sales": 133.86, "Total_Bottles": 9}
"4226"	{"Total_Sales": 9.76, "Total_Bottles": 2}
"4228"	{"Total_Sales": 529.56, "Total_Bottles": 60}
"4229"	{"Total_Sales": 234.16, "Total_Bottles": 20}
"4230"	{"Total_Sales": 70.57, "Total_Bottles": 7}
"4231"	{"Total_Sales": 492.96, "Total_Bottles": 36}
"4232"	{"Total_Sales": 327.78, "Total_Bottles": 13}
"4233"	{"Total

#### 3.5 Trends in Liquor Sales Over Time
<font color = red>[10 marks]</font> <br>

In [18]:
%%file MRLiquorSalesTrends.py

from mrjob.job import MRJob
import csv

class MRLiquorSalesTrends(MRJob):

    def mapper(self, _, line):
        fields = list(csv.reader([line], delimiter='\t'))[0]
        try:
            year = fields[23]
            month = fields[24]
            sale_dollars = float(fields[20])
            key = f"{year}-{month.zfill(2)}"
            yield key, sale_dollars
        except:
            pass  # Skip header or corrupt rows

    def reducer(self, month_year, sales):
        total_sales = round(sum(sales), 2)
        yield month_year, total_sales

if __name__ == '__main__':
    MRLiquorSalesTrends.run()


Writing MRLiquorSalesTrends.py


In [26]:
!python MRLiquorSalesTrends.py /content/liquor_sales.txt --quiet > sales_trend_output.txt
!cat sales_trend_output.txt | head -20


"2015-02"	14850.74
"2015-03"	14310.34
"2015-04"	17517.74
"2015-05"	21360.29
"2015-06"	18215.97
"2015-07"	15603.01
"2015-08"	16567.84
"2015-09"	16334.0
"2015-10"	14642.6
"2015-11"	15221.52
"2016-07"	4928.0
"2017-10"	3961.58
"2012-01"	13671.66
"2012-02"	13403.23
"2012-03"	14064.6
"2012-04"	12211.45
"2012-05"	19938.2
"2012-06"	14587.39
"2012-07"	17178.14
"2012-08"	16453.99


#### 3.6 Vendor Performance
<font color = red>[10 marks]</font> <br>

In [20]:
%%file MRVendorPerformance.py

from mrjob.job import MRJob
import csv

class MRVendorPerformance(MRJob):

    def mapper(self, _, line):
        fields = list(csv.reader([line], delimiter='\t'))[0]
        try:
            vendor_name = fields[12]
            sale_dollars = float(fields[20])
            yield vendor_name, sale_dollars
        except:
            pass  # Skip header or malformed rows

    def reducer(self, vendor_name, sales):
        total_sales = round(sum(sales), 2)
        yield vendor_name, total_sales

if __name__ == '__main__':
    MRVendorPerformance.run()


Writing MRVendorPerformance.py


In [27]:
!python MRVendorPerformance.py /content/liquor_sales.txt --quiet > vendor_performance_output.txt
!cat vendor_performance_output.txt | head -20


"Sazerac Company  Inc"	284.46
"Sazerac North America"	26396.28
"Serralles Usa"	775.5
"Shaw Ross International Importers Ll"	426.18
"Sidney Frank Importing Co."	9511.9
"Stoli Group"	3059.36
"The Patron Spirits Company"	3509.09
"Trinchero Family Estates"	121.04
"Western Spirits Beverage Co. Llc"	1289.61
"Wildman And Sons, F."	77.77
"William Grant And Sons, Inc."	6097.37
"Wilson Daniels Ltd."	8885.25
"Bacardi U.S.A., Inc."	42983.14
"Bacardi Usa Inc"	111.5
"Black Rock Spirits"	134.97
"Broadbent Distillery"	177.48
"Brown Forman Corp."	45.0
"Brown-Forman Corporation"	42048.4
"Campari(Skyy)"	12965.72
"Castle Brands"	76.04
