In [None]:
import aiohttp
import asyncio
import csv

OUTPUT_FILE = "mil_spec_under_10.csv"

async def fetch_filtered_listings(api_key, pages=1):
    headers = {
        "Authorization": api_key,
        "Accept": "application/json",
        "User-Agent": "CSFloat-TradeUp-Script"
    }

    listings = []

    async with aiohttp.ClientSession(headers=headers) as session:
        for page in range(pages):
            url = (
                "https://csfloat.com/api/v1/listings"
                f"?limit=50&page={page}&sort_by=lowest_price"
                "&type=buynow&rarity=3&max_price=1000"
                #"&category=1&def_index=1,2,3,4,7,8,9,10,11,13,14,16,17,19,23,24,25,26,27,28,29,30,32,33,34,35,36,38,39,40,60,61,63,64"
            )
            async with session.get(url) as resp:
                print(f"Page {page} status: {resp.status}")
                if resp.status == 429:
                    print("Rate limited. Waiting 30s...")
                    await asyncio.sleep(30)
                    continue
                if resp.status != 200:
                    text = await resp.text()
                    raise Exception(f"Error {resp.status}: {text}")

                data = await resp.json()
                batch = data.get("data", [])
                if not batch:
                    break
                listings.extend(batch)
                await asyncio.sleep(5)

    return listings

def save_to_csv(listings, filename):
    with open(filename, mode="w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerow(["market_hash_name", "collection", "rarity", "price"])
        saved, skipped = 0, 0

        for l in listings:
            
            if (
                item.get("type") != "skin"
            ):
                continue

            item = l.get("item", {})
            name = item.get("market_hash_name")
            coll = item.get("collection")
            rarity = item.get("rarity")
            price = l.get("price")
            if name and coll and rarity is not None and price is not None:
                writer.writerow([name, coll, rarity, price])
                saved += 1
            else:
                skipped += 1

    print(f"✅ Saved {saved} listings to {filename}")
    print(f"⚠️ Skipped {skipped} listings due to missing fields")

API_KEY = "Ch_7H2yKeV9_28fWemiPawkaL1Nic9N3"
listings = await fetch_filtered_listings(API_KEY)
save_to_csv(listings, OUTPUT_FILE)




Page 0 status: 403


Exception: Error 403: {"code":1,"message":"You need to be logged in to search listings"}


In [2]:
# analyze_tradeups_from_csv.py
import csv
from collections import defaultdict

INPUT_FILE = "csfloat_listings.csv"

RARITY_ORDER = ["consumer", "industrial", "mil-spec", "restricted", "classified", "covert"]

def next_rarity(rarity):
    try:
        return RARITY_ORDER[RARITY_ORDER.index(rarity) + 1]
    except (ValueError, IndexError):
        return None

def load_listings_from_csv(filename):
    listings = []
    with open(filename, newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            try:
                listings.append({
                    "market_hash_name": row["market_hash_name"],
                    "collection": row["collection"],
                    "rarity": row["rarity"],
                    "price": int(row["price"])  # stored in cents
                })
            except Exception as e:
                continue
    return listings

def group_by_collection_and_rarity(listings):
    grouped = defaultdict(lambda: defaultdict(list))
    for item in listings:
        grouped[item["collection"]][item["rarity"]].append(item)
    return grouped

def analyze_tradeups(grouped_data):
    tradeups = []
    for collection, rarities in grouped_data.items():
        for rarity in rarities:
            if len(rarities[rarity]) < 10:
                continue
            next_rar = next_rarity(rarity)
            if not next_rar or next_rar not in rarities:
                continue

            fillers = sorted(rarities[rarity], key=lambda x: x["price"])[:10]
            total_cost = sum(f["price"] for f in fillers) / 100
            outputs = rarities[next_rar]
            prob = 1 / len(outputs)
            ev = sum(prob * (o["price"] / 100) for o in outputs)
            roi = ev / total_cost

            tradeups.append({
                "collection": collection,
                "rarity": rarity,
                "next_rarity": next_rar,
                "total_cost": total_cost,
                "expected_value": ev,
                "roi": roi,
                "output_count": len(outputs),
                "output_skins": [o["market_hash_name"] for o in outputs]
            })
    return sorted(tradeups, key=lambda x: x["roi"], reverse=True)

listings = load_listings_from_csv(INPUT_FILE)
grouped = group_by_collection_and_rarity(listings)
tradeups = analyze_tradeups(grouped)

print("\nTop Profitable Trade-Ups:")
for i, t in enumerate(tradeups[:10], 1):
    print(f"\n#{i} | Collection: {t['collection']}, Rarity: {t['rarity']} → {t['next_rarity']}")
    print(f"Cost: ${t['total_cost']:.2f}, EV: ${t['expected_value']:.2f}, ROI: {t['roi']:.2f}, Outputs: {t['output_count']}")
    for out in t['output_skins']:
        print(f"  - {out}")





Top Profitable Trade-Ups:


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.0.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [6]:
import aiodns
print(aiodns.__version__)


3.5.0
