In [1]:
!pip install fastapi pydantic



In [2]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import string
import random
app = FastAPI(title="URL Shortener Service")
# In-memory database
url_db = {}
class URLCreate(BaseModel):
    long_url: str
def generate_short_code(length=6):
    chars = string.ascii_letters + string.digits
    return ''.join(random.choice(chars) for _ in range(length))
@app.post("/shorten")
def shorten_url(req: URLCreate):
    short_code = generate_short_code()
    while short_code in url_db:
        short_code = generate_short_code()
    url_db[short_code] = req.long_url
    return {"short_url": f"http://short.ly/{short_code}"}
@app.get("/{short_code}")
def redirect(short_code: str):
    if short_code not in url_db:
        raise HTTPException(status_code=404, detail="URL not found")
    return {"long_url": url_db[short_code]}

In [3]:
from fastapi.testclient import TestClient
client = TestClient(app)
res = client.post("/shorten", json={"long_url": "https://www.amazon.com"})
print(res.json())
short_code = res.json()["short_url"].split("/")[-1]
res = client.get(f"/{short_code}")
print(res.json())

{'short_url': 'http://short.ly/QZg24d'}
{'long_url': 'https://www.amazon.com'}


In [4]:
!pip install sqlalchemy



In [5]:
from sqlalchemy import Column, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "sqlite:///./urls.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class URL(Base):
    __tablename__ = "urls"
    short_code = Column(String, primary_key=True, index=True)
    long_url = Column(String, nullable=False)
Base.metadata.create_all(bind=engine)

  Base = declarative_base()


In [6]:
from fastapi import Depends
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
@app.post("/shorten")
def shorten_url(req: URLCreate, db=Depends(get_db)):
    short_code = generate_short_code()
    while db.query(URL).filter(URL.short_code == short_code).first():
        short_code = generate_short_code()
    db_url = URL(short_code=short_code, long_url=req.long_url)
    db.add(db_url)
    db.commit()
    return {"short_url": f"http://short.ly/{short_code}"}
@app.get("/{short_code}")
def redirect(short_code: str, db=Depends(get_db)):
    db_url = db.query(URL).filter(URL.short_code == short_code).first()
    if not db_url:
        raise HTTPException(status_code=404, detail="URL not found")
    return {"long_url": db_url.long_url}

In [7]:
client = TestClient(app)
res = client.post("/shorten", json={"long_url": "https://www.amazon.com"})
print(res.json())
short_code = res.json()["short_url"].split("/")[-1]
res = client.get(f"/{short_code}")
print(res.json())

{'short_url': 'http://short.ly/3bbFVP'}
{'long_url': 'https://www.amazon.com'}


In [8]:
from collections import OrderedDict
import time
class LRUCache:
    def __init__(self, capacity=1000, ttl=300):
        self.cache = OrderedDict()
        self.capacity = capacity
        self.ttl = ttl
    def get(self, key):
        if key not in self.cache:
            return None
        value, timestamp = self.cache[key]
        if time.time() - timestamp > self.ttl:
            del self.cache[key]
            return None
        self.cache.move_to_end(key)
        return value
    def put(self, key, value):
        self.cache[key] = (value, time.time())
        self.cache.move_to_end(key)
        if len(self.cache) > self.capacity:
            self.cache.popitem(last=False)
cache = LRUCache(capacity=1000, ttl=300)

In [9]:
from collections import defaultdict
RATE_LIMIT = 5  # requests
WINDOW = 60     # seconds
client_requests = defaultdict(list)
def rate_limited(client_ip):
    now = time.time()
    timestamps = client_requests[client_ip]
    client_requests[client_ip] = [t for t in timestamps if now - t < WINDOW]
    if len(client_requests[client_ip]) >= RATE_LIMIT:
        return True
    client_requests[client_ip].append(now)
    return False

In [10]:
from fastapi import Request
@app.post("/shorten")
def shorten_url(req: URLCreate, request: Request, db=Depends(get_db)):
    client_ip = request.client.host if request.client else "unknown"
    if rate_limited(client_ip):
        raise HTTPException(status_code=429, detail="Too many requests")
    short_code = generate_short_code()
    while db.query(URL).filter(URL.short_code == short_code).first():
        short_code = generate_short_code()
    db_url = URL(short_code=short_code, long_url=req.long_url)
    db.add(db_url)
    db.commit()
    cache.put(short_code, req.long_url)
    return {"short_url": f"http://short.ly/{short_code}"}
@app.get("/{short_code}")
def redirect(short_code: str, request: Request, db=Depends(get_db)):
    client_ip = request.client.host if request.client else "unknown"
    if rate_limited(client_ip):
        raise HTTPException(status_code=429, detail="Too many requests")
    cached = cache.get(short_code)
    if cached:
        return {"long_url": cached, "source": "cache"}
    db_url = db.query(URL).filter(URL.short_code == short_code).first()
    if not db_url:
        raise HTTPException(status_code=404, detail="URL not found")
    cache.put(short_code, db_url.long_url)
    return {"long_url": db_url.long_url, "source": "db"}

In [11]:
client = TestClient(app)
res = client.post("/shorten", json={"long_url": "https://www.amazon.com"})
print(res.json())
code = res.json()["short_url"].split("/")[-1]
print(client.get(f"/{code}").json())  # DB
print(client.get(f"/{code}").json())  # Cache
# Rate limit test
for i in range(7):
    r = client.get(f"/{code}")
    print(i, r.status_code)

{'short_url': 'http://short.ly/wii4J3'}
{'long_url': 'https://www.amazon.com'}
{'long_url': 'https://www.amazon.com'}
0 200
1 200
2 200
3 200
4 200
5 200
6 200
