In [204]:
import os
from sqlalchemy import create_engine

DATABASE_URL="postgresql+psycopg2://aerostream:aerostream@postgres:5432/aerostream"

print(DATABASE_URL)

engine =  create_engine(
    DATABASE_URL,
    # echo=True,
    # pool_pre_ping=True

)


postgresql+psycopg2://aerostream:aerostream@postgres:5432/aerostream


In [205]:
from sqlalchemy import Column, Integer, String, Float, Text, DateTime, CheckConstraint
from sqlalchemy.sql import func
from sqlalchemy.orm import declarative_base, sessionmaker

Base = declarative_base()

class Tweet(Base):
    __tablename__ = "tweets"

    id = Column(Integer, primary_key=True, index=True)

    airline_sentiment_confidence = Column(Float, nullable=False)

    airline = Column(String(50), nullable=False)

    negativereason = Column(String(100), nullable=True)

    tweet_created = Column(DateTime(timezone=True), nullable=False)

    text = Column(Text, nullable=False)
    
    prediction = Column(String(10),nullable=False)
    
    created_at = Column(DateTime(timezone=True), server_default=func.now())

    __table_args__ = (
        CheckConstraint(
            "airline_sentiment_confidence >= 0 AND airline_sentiment_confidence <= 1",
            name="confidence_range",
        ),
    )

SessionLocal = sessionmaker(bind=engine)
    
Base.metadata.create_all(bind=engine)

In [206]:
import requests

def get_tweets():
    resp = requests.get("http://fastapi:8000/batch",timeout=20)
    resp.raise_for_status()
    return resp.json()

In [207]:




def predict(text):
    resp = requests.post("http://fastapi:8000/predict",json={"text":text},timeout=30)
    resp.raise_for_status()
    return resp.json()


test = predict("test test")

print(test["label"])




neutral


In [208]:
from datetime import datetime
from sqlalchemy.orm import session


def load_tweets():
    tweets = get_tweets()
    db = SessionLocal()
    for t in tweets:
        pred = predict(t["text"])
        db.add(
            Tweet(
                airline_sentiment_confidence=t["airline_sentiment_confidence"],
                airline=t["airline"],
                negativereason=t["negativereason"],
                tweet_created=datetime.fromisoformat(t["tweet_created"]),
                text=t["text"],                
                prediction=pred["label"],
            )
        )
    db.commit()
    db.close()
    
    return {"resp": "ok"}

In [209]:
load_tweets()

{'resp': 'ok'}

# Aggs :

In [210]:
from sqlalchemy import func, cast, Float



<h1 style="color:orange;"> 1.</h1>



In [211]:
db = SessionLocal()

resp =( db.query(
    Tweet.airline,func.count(Tweet.airline))
    .group_by(Tweet.airline)
    .all()
    )

for companie, volume in resp:
    print(companie,volume)

db.close


United 1
Delta 2
Virgin America 6
American 1
US Airways 4
Southwest 6


<bound method Session.close of <sqlalchemy.orm.session.Session object at 0x75870554bad0>>


<h1 style="color:orange;"> 2.</h1>



In [212]:
db = SessionLocal()
res2 = (
    db.query(
        Tweet.airline,
        Tweet.prediction,
        func.count(Tweet.prediction).label("count")
    )
    .group_by(Tweet.airline, Tweet.prediction)
    .order_by(Tweet.airline, Tweet.prediction)
    .all()
)

for airline, prediction, count in res2:
    print(
        airline, prediction, count
    
    )
db.close()

American negative 1
Delta negative 1
Delta positive 1
Southwest negative 3
Southwest positive 3
US Airways negative 3
US Airways positive 1
United positive 1
Virgin America negative 4
Virgin America positive 2



<h1 style="color:orange;"> 3.</h1>



In [213]:
db = SessionLocal()

taux_satisfaction = (
    cast(
        func.count(Tweet.prediction).filter(Tweet.prediction == "positive"),
        Float
    )
    / func.count(Tweet.prediction)
    * 100
).label("taux_satisfaction")

res3 = (
    db.query(Tweet.airline, taux_satisfaction)
    .group_by(Tweet.airline)
    .order_by(taux_satisfaction.desc())
    .all()
)

for companie, satisfaction in res3:
    print(companie, satisfaction)
    
db.close()


United 100.0
Delta 50.0
Southwest 50.0
Virgin America 33.33333333333333
US Airways 25.0
American 0.0


In [214]:
db = SessionLocal()

count = func.count(Tweet.negativereason).label("count")

res4 = (
    db.query(Tweet.negativereason, count )
    .group_by(Tweet.negativereason)
    .order_by(count.desc())
    .limit(3)
    .all()
    
)

for negativereason, count in res4:
    print(negativereason, count)
    

Lost Luggage 5
Cancelled Flight 3
Flight Booking Problems 2
