In [1]:
# -----------------------> initialize libraries
import os
import json
import pandas as pd
from urllib.request import urlretrieve
from zipfile import ZipFile
import ijson
import numpy as np

In [2]:
# -----------------------> Download FDA recall ZIP file
url = "https://download.open.fda.gov/device/recall/device-recall-0001-of-0001.json.zip"
zip_filename = "device-recall-0001-of-0001.json.zip"
json_filename = "device-recall-0001-of-0001.json"

if not os.path.exists(zip_filename):
    print("Downloading FDA recall zip...")
    urlretrieve(url, zip_filename)
else:
    print("ZIP already downloaded.")

# -----------------------> Extract JSON
if not os.path.exists(json_filename):
    print("Extracting JSON...")
    with ZipFile(zip_filename, 'r') as z:
        z.extract(json_filename)
else:
    print("JSON already extracted.")

ZIP already downloaded.
JSON already extracted.


In [None]:
# -----------------------> Stream JSON to avoid MemoryError
records = []
with open(json_filename, 'r', encoding='utf-8') as f:
    parser = ijson.items(f, 'results.item')
    for entry in parser:
        manufacturer = entry.get('recalling_firm', '').strip()
        if manufacturer == '':
            manufacturer = "Unknown"

        recall_status = entry.get('recall_status', '').strip().lower()
        # Compute rating
        if 'terminated' in recall_status:
            rating = 1000
        elif 'completed' in recall_status:
            rating = 800
        elif 'ongoing' in recall_status:
            rating = 500
        elif 'pending' in recall_status:
            rating = 300
        else:
            rating = 600  # unknown/missing

        records.append({
            'manufacturer': manufacturer,
            'product_description': entry.get('product_description', '').replace('\n',' ').strip(),
            'recall_status': recall_status,
            'reason_for_recall': entry.get('reason_for_recall','').replace('\n',' ').strip(),
            'event_date_initiated': entry.get('event_date_initiated',''),
            'recall_number': entry.get('recall_number','').strip(),
            'rating': rating
        })

print(f"Parsed {len(records)} recalls")

# -----------------------> Convert to DataFrame
df = pd.DataFrame(records)

In [None]:
# -----------------------> Save Raw Recalls Sheet
output_file = "recalls_with_ratings_summary.xlsx"
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
    df.to_excel(writer, sheet_name="Raw_Recalls", index=False)

print(f"Saved raw recalls to {output_file}")

In [None]:
# -----------------------> Aggregate per manufacturer
manufacturer_summary = (
    df.groupby('manufacturer')
      .agg(total_recalls=('recall_number','count'),
           avg_rating=('rating','mean'))
      .reset_index()
      .sort_values('total_recalls', ascending=False)  # largest companies first
)

In [None]:
# -----------------------> Compute ELO-style score
avg_rating = manufacturer_summary['avg_rating'].mean()
manufacturer_summary['elo'] = 1000 - 0.04 * (manufacturer_summary['avg_rating'] - avg_rating)

In [None]:
# -----------------------> Classify companies based on ELO percentiles
percentiles = manufacturer_summary['elo'].rank(pct=True)

def classify(p):
    if p >= 0.8:
        return "Excellent"
    elif p >= 0.6:
        return "Great"
    elif p >= 0.4:
        return "Average"
    elif p >= 0.2:
        return "Poor"
    else:
        return "Avoid"

manufacturer_summary['rating_class'] = percentiles.apply(classify)

In [None]:
# -----------------------> Save Manufacturer Summary Sheet
with pd.ExcelWriter(output_file, engine='openpyxl', mode='a') as writer:
    manufacturer_summary.to_excel(writer, sheet_name="Manufacturer_Summary", index=False)

print(f"Saved manufacturer summary to {output_file}")