In [1]:
# ===========================
# 0. Install dependencies
# ===========================
!pip install google-cloud-translate==3.15.3 transformers torch


Collecting google-cloud-translate==3.15.3
  Downloading google_cloud_translate-3.15.3-py2.py3-none-any.whl.metadata (5.3 kB)
Collecting protobuf!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev,>=3.19.5 (from google-cloud-translate==3.15.3)
  Downloading protobuf-4.25.8-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
INFO: pip is looking at multiple versions of grpcio-status to determine which version is compatible with other requirements. This could take a while.
Collecting grpcio-status<2.0.0,>=1.33.2 (from google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.10.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,<3.0.0dev,>=1.34.1->google-cloud-translate==3.15.3)
  Downloading grpcio_status-1.75.0-py3-none-any.whl.metadata (1.1 kB)
  Downloading grpcio_status-1.74.0-py3-none-any.whl.metadata (1.1 kB)
  Downloading grpcio_status-1.73.1-py3-none-any.whl.metadata (1.1 kB)
  Downloading grpcio_status-1.73.0-py3-none-any.whl.metadata (1.1 kB)
  D

final pipeline starts

In [29]:
!pip install streamlit
!pip install pyngrok



In [30]:
import os
import requests
from PIL import Image, ExifTags
from PIL.ExifTags import TAGS, GPSTAGS
from google.cloud import translate_v2 as translate
import tweepy
from datetime import datetime, timedelta
import pytz
from transformers import pipeline
import torch

# Google Translate setup
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/content/sihproject-471916-514170694ad5.json"
translate_client = translate.Client()

# Tweepy setup
bearer_token = "AAAAAAAAAAAAAAAAAAAAAKl%2B4AEAAAAA1RIimtdT1mw73boVJogtxxVNQJU%3DgbN1hecgD6OmRFBqBxooFqAja1dy1bPlGT4d7FXyVoCLUDgzaw"
client = tweepy.Client(bearer_token=bearer_token)

# Hazard keywords
hazard_keywords = ["flood", "tsunami", "storm", "earthquake", "cyclone", "landslide", "fire", "oil spill", "shipwreck"]

# Zero-shot classifier setup
classifier = pipeline(
    "zero-shot-classification",
    model="facebook/bart-large-mnli",
    device=0 if torch.cuda.is_available() else -1
)

# ---------------- EXIF + GPS Functions ----------------
def get_exif_data(image_path):
    image = Image.open(image_path)
    exif_data = {}
    info = image._getexif()
    if not info:
        return None
    for tag, value in info.items():
        decoded = TAGS.get(tag, tag)
        if decoded == "GPSInfo":
            gps_data = {}
            for t in value:
                gps_decoded = GPSTAGS.get(t, t)
                gps_data[gps_decoded] = value[t]
            exif_data[decoded] = gps_data
        else:
            exif_data[decoded] = value
    return exif_data

def to_float(rational):
    try:
        return float(rational)
    except TypeError:
        return rational.numerator / rational.denominator

def convert_to_degrees(value):
    d, m, s = value
    return to_float(d) + to_float(m)/60 + to_float(s)/3600

def get_lat_lon(exif_data):
    if not exif_data or "GPSInfo" not in exif_data:
        return None, None
    gps_info = exif_data["GPSInfo"]
    lat = convert_to_degrees(gps_info["GPSLatitude"])
    if gps_info["GPSLatitudeRef"] != "N":
        lat = -lat
    lon = convert_to_degrees(gps_info["GPSLongitude"])
    if gps_info["GPSLongitudeRef"] != "E":
        lon = -lon
    return lat, lon

def reverse_geocode(lat, lon):
    url = f"https://nominatim.openstreetmap.org/reverse?lat={lat}&lon={lon}&format=json"
    response = requests.get(url, headers={"User-Agent": "my-app"}).json()
    address = response.get("address", {})
    city = address.get("city") or address.get("town") or address.get("village") or None
    full_address = response.get("display_name", "Unknown")
    return city, full_address

# ---------------- Hazard & Tweets ----------------
def classify_hazard(description, target_language="en"):
    result = translate_client.translate(description, target_language=target_language)
    translated_text = result["translatedText"].lower()
    scores = {}
    for hazard in hazard_keywords:
        scores[hazard] = 1.0 if hazard in translated_text else 0.0
    best_hazard = max(scores, key=scores.get)
    return best_hazard, scores

def detect_city_from_text(text):
    result = translate_client.translate(text, target_language="en")
    translated_text = result["translatedText"].lower()
    words = translated_text.split()
    city = words[0]  # fallback, can improve with full geocoding
    return city

def fetch_tweets(city, hazard, max_results=10):
    query = f'("{city}" OR #{city}) ("{hazard}" OR #{hazard}) -is:retweet'
    print("Twitter Query:", query)

    now = datetime.utcnow()
    start_time = (now - timedelta(hours=1)).isoformat("T") + "Z"
    end_time   = (now - timedelta(seconds=15)).isoformat("T") + "Z"

    local_tz = pytz.timezone("Asia/Kolkata")

    tweets = client.search_recent_tweets(
        query=query,
        max_results=max_results,
        tweet_fields=["created_at", "text", "geo"],
        expansions=["author_id"],
        user_fields=["username"],
        start_time=start_time,
        end_time=end_time
    )

    user_map = {}
    if tweets.includes and "users" in tweets.includes:
        for u in tweets.includes["users"]:
            user_map[u["id"]] = u["username"]

    tweets_texts = []
    if tweets.data:
        for t in tweets.data:
            local_time = t.created_at.astimezone(local_tz)
            formatted_time = local_time.strftime("%Y-%m-%d %H:%M:%S %Z")
            username = user_map.get(t.author_id, "Unknown")
            print(f"[{formatted_time}] @{username} → {t.text}\n")
            tweets_texts.append(t.text)
    else:
        print("No tweets found for this location + hazard in the last 1 hour.")

    return tweets_texts

# ---------------- Classification ----------------
def translate_tweets_to_english(tweets_list):
    translated_tweets = []
    for tweet in tweets_list:
        result = translate_client.translate(tweet, target_language="en")
        translated_text = result["translatedText"]
        translated_tweets.append(translated_text)
    return translated_tweets

def classify_tweets_disaster(translated_tweets):
    aggregate_scores = {hazard: 0.0 for hazard in hazard_keywords}
    for tweet in translated_tweets:
        res = classifier(tweet, hazard_keywords, multi_label=False)
        top_label = res['labels'][0]
        top_score = res['scores'][0]
        aggregate_scores[top_label] += top_score
    num_tweets = len(translated_tweets)
    if num_tweets == 0:
        return None, None, None
    for hazard in aggregate_scores:
        aggregate_scores[hazard] = (aggregate_scores[hazard] / num_tweets) * 100
    final_disaster = max(aggregate_scores, key=aggregate_scores.get)
    intensity = aggregate_scores[final_disaster]
    return final_disaster, intensity, aggregate_scores

# ---------------- Pipeline ----------------
def analyze_disaster_from_tweets(tweets_texts):
    if not tweets_texts:
        print("No tweets to analyze.")
        return None, None, None
    print("Translating tweets to English...")
    translated_tweets = translate_tweets_to_english(tweets_texts)
    print("Classifying tweets for disaster type...")
    final_disaster, intensity, scores = classify_tweets_disaster(translated_tweets)
    print("\n=== Disaster Analysis from Tweets ===")
    print("Final Disaster Type:", final_disaster)
    print(f"Disaster Intensity (Confidence): {intensity:.2f}%")
    print("Detailed Scores per Hazard:", scores)
    return final_disaster, intensity, scores

def run_pipeline(image_path=None, description=None, direct_text=None):
    if direct_text:
        print("\nRunning Direct Text Search Mode...")
        hazard, _ = classify_hazard(direct_text)
        city = detect_city_from_text(direct_text)
        print("Detected City:", city)
        print("Predicted Hazard:", hazard)
        tweets_texts = fetch_tweets(city, hazard)
        return analyze_disaster_from_tweets(tweets_texts)

    elif image_path and description:
        print("\nRunning Image + Description Mode...")
        exif_data = get_exif_data(image_path)
        lat, lon = get_lat_lon(exif_data)
        if lat and lon:
            city, full_address = reverse_geocode(lat, lon)
            print("Detected City:", city)
            print("Full Address:", full_address)
        else:
            city = detect_city_from_text(description)
            print("Detected City from text:", city)
        hazard, _ = classify_hazard(description)
        print("Predicted Hazard:", hazard)
        tweets_texts = fetch_tweets(city, hazard)
        return analyze_disaster_from_tweets(tweets_texts)

    else:
        print("Provide either image+description or direct_text input.")
        return None, None, None

# ---------------- Example Run ----------------
run_pipeline(direct_text="पंजाब बाढ़")
# run_pipeline(image_path="/content/drive/MyDrive/IMG20250904195552.jpg", description="बाढ़ आ गई है और पानी बहुत बढ़ रहा है")


Device set to use cuda:0



Running Direct Text Search Mode...
Detected City: punjab
Predicted Hazard: flood
Twitter Query: ("punjab" OR #punjab) ("flood" OR #flood) -is:retweet


  now = datetime.utcnow()


[2025-09-20 01:09:28 IST] @KaptaanWelfare → South Punjab ke flood se affected logon ki madad karna humari zimmedari hai. Din raat khana, pani aur zaroori cheezein pohchane ki koshish kar rhy hain. Aap ki choti si support kisi ki zindagi bacha sakti hai. ❤️🙏 
#kaptaanwelfaresociety
#floodrelief #southpunjab #welfarework https://t.co/OCrPwX7wYE

[2025-09-20 00:27:10 IST] @NaurangD86736 → बाढ़ पीड़ित गाँव में लगातार मदद की जा रही है | अन्नपूर्णा मुहिम | #flood #punjab https://t.co/WhhDP4nmRb

[2025-09-20 00:20:21 IST] @Arvind881161 → @Her_Harpreet @Toxicity_______ @narendramodi @HarshdeepKaur Congress and aap never brought msp, never solved a single problem for Punjab, aap govt did not help during flood...but still you guys find excuses to hate Modi and BJP.

[2025-09-20 00:18:00 IST] @KafiranaBlogs → भागवंत मान स्लैम्स सेंटर की of 1,600 करोड़ की सहायता - 

 https://t.co/Bmwdk4hUjj 

#news #india https://t.co/z2SinEoopW

[2025-09-20 00:17:31 IST] @BaazzOnHunt → Punjabis are fighting to cu

('flood',
 49.032918214797974,
 {'flood': 49.032918214797974,
  'tsunami': 0.0,
  'storm': 6.501468420028686,
  'earthquake': 0.0,
  'cyclone': 0.0,
  'landslide': 0.0,
  'fire': 7.8387224674224845,
  'oil spill': 0.0,
  'shipwreck': 0.0})

final code working fine with streamlit web app UI (functioning)

In [60]:
%%writefile app.py
import os
import requests
import pandas as pd
import streamlit as st
from PIL import Image, ExifTags
from PIL.ExifTags import TAGS, GPSTAGS
from google.cloud import translate_v2 as translate
import tweepy
from datetime import datetime, timedelta
import pytz
from transformers import pipeline
import torch

# ---------------- SETUP ----------------
st.set_page_config(page_title="Disaster Detection App", layout="wide")

# Google Translate setup
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "sihproject-471916-514170694ad5.json"
translate_client = translate.Client()

# Tweepy setup
bearer_token = "AAAAAAAAAAAAAAAAAAAAAIyi4AEAAAAAvT6sMfLiUl%2FPzDe%2BiCV4NkvNzbQ%3DfoX0qsgli8JaAQLp8dcYFi6yliDVqZf2TRURigrKxhSvgnXsPB"
client = tweepy.Client(bearer_token=bearer_token)

# Hazard keywords
hazard_keywords = ["flood", "tsunami", "storm", "earthquake", "cyclone", "landslide", "fire", "oil spill", "shipwreck"]

# Zero-shot classifier setup
classifier = pipeline(
    "zero-shot-classification",
    model="facebook/bart-large-mnli",
    device=0 if torch.cuda.is_available() else -1
)

# ---------------- EXIF + GPS Functions ----------------
def get_exif_data(image_path):
    image = Image.open(image_path)
    exif_data = {}
    info = image._getexif()
    if not info:
        return None
    for tag, value in info.items():
        decoded = TAGS.get(tag, tag)
        if decoded == "GPSInfo":
            gps_data = {}
            for t in value:
                gps_decoded = GPSTAGS.get(t, t)
                gps_data[gps_decoded] = value[t]
            exif_data[decoded] = gps_data
        else:
            exif_data[decoded] = value
    return exif_data

def to_float(rational):
    try:
        return float(rational)
    except TypeError:
        return rational.numerator / rational.denominator

def convert_to_degrees(value):
    d, m, s = value
    return to_float(d) + to_float(m)/60 + to_float(s)/3600

def get_lat_lon(exif_data):
    if not exif_data or "GPSInfo" not in exif_data:
        return None, None
    gps_info = exif_data["GPSInfo"]
    lat = convert_to_degrees(gps_info["GPSLatitude"])
    if gps_info["GPSLatitudeRef"] != "N":
        lat = -lat
    lon = convert_to_degrees(gps_info["GPSLongitude"])
    if gps_info["GPSLongitudeRef"] != "E":
        lon = -lon
    return lat, lon

def reverse_geocode(lat, lon):
    url = f"https://nominatim.openstreetmap.org/reverse?lat={lat}&lon={lon}&format=json"
    response = requests.get(url, headers={"User-Agent": "my-app"}).json()
    address = response.get("address", {})
    city = address.get("city") or address.get("town") or address.get("village") or None
    full_address = response.get("display_name", "Unknown")
    return city, full_address

# ---------------- Hazard & Tweets ----------------
def classify_hazard(description, target_language="en"):
    result = translate_client.translate(description, target_language=target_language)
    translated_text = result["translatedText"].lower()
    scores = {}
    for hazard in hazard_keywords:
        scores[hazard] = 1.0 if hazard in translated_text else 0.0
    best_hazard = max(scores, key=scores.get)
    return best_hazard, scores

def detect_city_from_text(text):
    result = translate_client.translate(text, target_language="en")
    translated_text = result["translatedText"].lower()
    words = translated_text.split()
    return words[0] if words else "Unknown"

def fetch_tweets(city, hazard, max_results=10):
    query = f'("{city}" OR #{city}) ("{hazard}" OR #{hazard}) -is:retweet'

    now = datetime.utcnow()
    start_time = (now - timedelta(hours=1)).isoformat("T") + "Z"
    end_time   = (now - timedelta(seconds=15)).isoformat("T") + "Z"

    local_tz = pytz.timezone("Asia/Kolkata")

    tweets = client.search_recent_tweets(
        query=query,
        max_results=max_results,
        tweet_fields=["created_at", "text", "geo"],
        expansions=["author_id"],
        user_fields=["username"],
        start_time=start_time,
        end_time=end_time
    )

    user_map = {}
    if tweets.includes and "users" in tweets.includes:
        for u in tweets.includes["users"]:
            user_map[u["id"]] = u["username"]

    tweets_data = []
    if tweets.data:
        for t in tweets.data:
            local_time = t.created_at.astimezone(local_tz)
            formatted_time = local_time.strftime("%Y-%m-%d %H:%M:%S %Z")
            username = user_map.get(t.author_id, "Unknown")
            tweets_data.append({"time": formatted_time, "user": username, "text": t.text})
    return tweets_data

def translate_tweets_to_english(tweets_list):
    translated = []
    for tweet in tweets_list:
        result = translate_client.translate(tweet["text"], target_language="en")
        translated.append(result["translatedText"])
    return translated

def classify_tweets_disaster(translated_tweets):
    aggregate_scores = {hazard: 0.0 for hazard in hazard_keywords}
    for tweet in translated_tweets:
        res = classifier(tweet, hazard_keywords, multi_label=False)
        top_label = res['labels'][0]
        top_score = res['scores'][0]
        aggregate_scores[top_label] += top_score
    num_tweets = len(translated_tweets)
    if num_tweets == 0:
        return None, None, None
    for hazard in aggregate_scores:
        aggregate_scores[hazard] = (aggregate_scores[hazard] / num_tweets) * 100
    final_disaster = max(aggregate_scores, key=aggregate_scores.get)
    intensity = aggregate_scores[final_disaster]
    return final_disaster, intensity, aggregate_scores

# ---------------- Streamlit UI ----------------
st.title("🌍 DisasterPulse")

mode = st.radio("Choose Input Mode", ["Image + Description", "Direct Text"])

tweets_data = []
final_disaster, intensity, scores, city, full_address = None, None, None, None, None

if mode == "Direct Text":
    direct_text = st.text_area("Enter disaster-related text (any language):")
    if st.button("Analyze"):
        hazard, _ = classify_hazard(direct_text)
        city = detect_city_from_text(direct_text)
        st.write(f"**Detected City:** {city}")
        st.write(f"**Predicted Hazard:** {hazard}")
        tweets_data = fetch_tweets(city, hazard)
        translated = translate_tweets_to_english(tweets_data)
        final_disaster, intensity, scores = classify_tweets_disaster(translated)

elif mode == "Image + Description":
    uploaded_img = st.file_uploader("Upload an Image", type=["jpg", "jpeg", "png"])
    description = st.text_area("Enter disaster description (any language):")
    if uploaded_img and description and st.button("Analyze"):
        img_path = "temp_img.jpg"
        with open(img_path, "wb") as f:
            f.write(uploaded_img.read())

        exif_data = get_exif_data(img_path)
        lat, lon = get_lat_lon(exif_data)
        if lat and lon:
            city, full_address = reverse_geocode(lat, lon)
            st.write(f"**Detected City:** {city}")
            st.write(f"**Full Address:** {full_address}")
        else:
            city = detect_city_from_text(description)
            st.write(f"**Detected City from text:** {city}")

        hazard, _ = classify_hazard(description)
        st.write(f"**Predicted Hazard:** {hazard}")

        tweets_data = fetch_tweets(city, hazard)
        translated = translate_tweets_to_english(tweets_data)
        final_disaster, intensity, scores = classify_tweets_disaster(translated)

# ---------------- Show Results ----------------
if final_disaster:
    st.subheader("📊 Disaster Analysis Result")
    st.write(f"**Final Disaster Type:** {final_disaster}")
    st.write(f"**Confidence (Intensity):** {intensity:.2f}%")
    st.write("### Detailed Hazard Scores:")
    st.json(scores)

    if tweets_data:
        st.subheader("📝 Fetched Tweets")
        df = pd.DataFrame(tweets_data)
        st.dataframe(df)

        # Download option
        csv = df.to_csv(index=False).encode("utf-8")
        st.download_button("Download Tweets as CSV", data=csv, file_name="tweets.csv", mime="text/csv")
    else:
        st.warning("No tweets found for this query.")


Overwriting app.py


In [61]:
!ngrok authtoken 30BSvhKS0UakM7ZXSv2i1dIo0wf_2F2vBDU86aSMsAJCachbM

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [62]:
import os
import threading

def run_streamlit():
    os.system("streamlit run app.py --server.port 8501 --server.headless true")

thread = threading.Thread(target=run_streamlit)
thread.start()


In [63]:
from pyngrok import ngrok
import time

# Kill previous tunnels (avoid conflicts)
ngrok.kill()

time.sleep(2)

public_url = ngrok.connect(8501)
print("🚀 Your Streamlit app is live here:", public_url)


🚀 Your Streamlit app is live here: NgrokTunnel: "https://adff13f19039.ngrok-free.app" -> "http://localhost:8501"
