In [None]:
import os
import requests
from requests_oauthlib import OAuth1
import pandas as pd
import sqlite3
import matplotlib.pyplot as plt
import urllib, json, time, re

from IPython.display import display
pd.options.display.max_columns = None

In [None]:
# =====================
# 1. Setup
# =====================
# Make sure to set your X API Bearer Token in your environment before running

CONSUMER_KEY = os.getenv("CONSUMER_KEY")
CONSUMER_SECRET = os.getenv("CONSUMER_SECRET")
OAUTH_TOKEN = os.getenv("X_OAUTH_TOKEN")
OAUTH_SECRET = os.getenv("X_OAUTH_SECRET")
BASE_URL = "https://api.x.com/graphql/KI9jCXUx3Ymt-hDKLOZb9Q/SearchTimeline"

if not OAUTH_TOKEN:
    raise ValueError("Please set your X_OAUTH_TOKEN environment variable.")    
if not OAUTH_SECRET:
    raise ValueError("Please set your X_OAUTH_SECRET environment variable.")    

hashtags = ["#BahaSaLuneta", "#TrillionPesoMarch"]
adv_search = "until:2025-09-23 since:2025-09-20"
count = 50

headers = {
    "connection": "keep-alive",
    "content-type": "application/json",
    "x-twitter-active-user": "yes",
    "authority": "api.x.com",
    "accept-encoding": "gzip",
    "accept-language": "en-US,en;q=0.9",
    "accept": "*/*",
    "DNT": "1"
}

features = {
    "android_graphql_skip_api_media_color_palette": False,
    "blue_business_profile_image_shape_enabled": False,
    "creator_subscriptions_subscription_count_enabled": False,
    "creator_subscriptions_tweet_preview_api_enabled": True,
    "freedom_of_speech_not_reach_fetch_enabled": False,
    "graphql_is_translatable_rweb_tweet_is_translatable_enabled": False,
    "hidden_profile_likes_enabled": False,
    "highlights_tweets_tab_ui_enabled": False,
    "interactive_text_enabled": False,
    "longform_notetweets_consumption_enabled": True,
    "longform_notetweets_inline_media_enabled": False,
    "longform_notetweets_richtext_consumption_enabled": True,
    "longform_notetweets_rich_text_read_enabled": False,
    "responsive_web_edit_tweet_api_enabled": False,
    "responsive_web_enhance_cards_enabled": False,
    "responsive_web_graphql_exclude_directive_enabled": True,
    "responsive_web_graphql_skip_user_profile_image_extensions_enabled": False,
    "responsive_web_graphql_timeline_navigation_enabled": False,
    "responsive_web_media_download_video_enabled": False,
    "responsive_web_text_conversations_enabled": False,
    "responsive_web_twitter_article_tweet_consumption_enabled": False,
    "responsive_web_twitter_blue_verified_badge_is_enabled": True,
    "rweb_lists_timeline_redesign_enabled": True,
    "spaces_2022_h2_clipping": True,
    "spaces_2022_h2_spaces_communities": True,
    "standardized_nudges_misinfo": False,
    "subscriptions_verification_info_enabled": True,
    "subscriptions_verification_info_reason_enabled": True,
    "subscriptions_verification_info_verified_since_enabled": True,
    "super_follow_badge_privacy_enabled": False,
    "super_follow_exclusive_tweet_notifications_enabled": False,
    "super_follow_tweet_api_enabled": False,
    "super_follow_user_api_enabled": False,
    "tweet_awards_web_tipping_enabled": False,
    "tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": False,
    "tweetypie_unmention_optimization_enabled": False,
    "unified_cards_ad_metadata_container_dynamic_card_content_query_enabled": False,
    "verified_phone_label_enabled": False,
    "vibe_api_enabled": False,
    "view_counts_everywhere_api_enabled": False,
    "premium_content_api_read_enabled": False,
    "communities_web_enable_tweet_community_results_fetch": False,
    "responsive_web_jetfuel_frame": False,
    "responsive_web_grok_analyze_button_fetch_trends_enabled": False,
    "responsive_web_grok_image_annotation_enabled": False,
    "rweb_tipjar_consumption_enabled": False,
    "profile_label_improvements_pcf_label_in_post_enabled": False,
    "creator_subscriptions_quote_tweet_preview_enabled": False,
    "c9s_tweet_anatomy_moderator_badge_enabled": False,
    "responsive_web_grok_analyze_post_followups_enabled": False,
    "rweb_video_timestamps_enabled": False,
    "responsive_web_grok_share_attachment_enabled": False,
    "articles_preview_enabled": False,
    "immersive_video_status_linkable_timestamps": False,
    "articles_api_enabled": False,
    "responsive_web_grok_analysis_button_from_backend": False
}

auth = OAuth1(
    client_key=CONSUMER_KEY,
    client_secret=CONSUMER_SECRET,
    resource_owner_key=OAUTH_TOKEN,
    resource_owner_secret=OAUTH_SECRET
)

all_responses = []
cursor = None

for hashtag in hashtags:
    variables = {
        "rawQuery": f"{hashtag} {adv_search}",
        "count": count,
        "product": "Top",       # "Top" or "Latest"
        "withDownvotePerspective": False,
        "withReactionsMetadata": False,
        "withReactionsPerspective": False
    }

    MAX_PAGES = 5
    page = 1
    while page < MAX_PAGES:
        if cursor:
            variables["cursor"] = cursor
        
        params = {
            "variables": json.dumps(variables),
            "features": json.dumps(features),
        }
    
        params_enc = urllib.parse.urlencode(params)
        
        resp = requests.get(BASE_URL, headers=headers, params=params_enc, auth=auth)
        if(resp.status_code == 200):
            data = resp.json()
            all_responses.extend(data["data"]["search_by_raw_query"]["search_timeline"]["timeline"]["instructions"])
            page += 1
        else:
            time.sleep(10)
            continue
    
        # try extracting next cursor
        try:
            instructions = data["data"]["search_by_raw_query"]["search_timeline"]["timeline"]["instructions"]
            cursor_entries = [entry for instr in instructions for entry in instr.get("entries", []) if "cursorType" in entry.get("content", {})]
            next_cursor = None
            for entry in cursor_entries:
                if entry["content"]["cursorType"] == "Bottom":
                    next_cursor = entry["content"]["value"]
                    # print(next_cursor)
                    break
        except Exception as e:
            print("Cursor extraction failed:", e)
            break
    
        if not next_cursor:
            print(f"Retrieved {(page - 1)} page(s) for {hashtag}.")
            break
    
        cursor = next_cursor

In [None]:
tweets = []
for entry in all_responses:
    if "entries" in entry:
        for item in entry["entries"]:
            content = item.get("content", {})
            if "itemContent" in content:
                tweet = content["itemContent"].get("tweet_results", {}).get("result", {})
                legacy = tweet.get("legacy", {})
                if legacy:
                    tweets.append({
                        "query": re.findall(r"(#BahaSaLuneta|#TrillionPesoMarch)", legacy.get("full_text")),
                        # "text": legacy.get("full_text"),
                        "tweet_id": tweet.get("rest_id"),
                        "user": legacy.get("user_id_str"),
                        "created_at": legacy.get("created_at"),
                        "retweets": legacy.get("retweet_count"),
                        "replies": legacy.get("reply_count"),
                        "likes": legacy.get("favorite_count"),
                        "quotes": legacy.get("quote_count", 0)
                    })

# --- Convert to Pandas DataFrame ---
df = pd.DataFrame(tweets)
# print(df)

In [None]:
display(df)

In [None]:
# Explode data to get one row per hashtag
df = df.explode('query') \
    .reset_index(drop=True)    # reset index to reset numbering

display(df)

In [None]:
# =====================
# 2. Fetch Data from X API
# =====================
def fetch_hashtag_data(hashtag, max_results=50):
    url = "https://api.x.com/2/tweets/search/recent"
    params = {
        "query": hashtag,
        "max_results": max_results,
        "tweet.fields": "author_id,created_at,public_metrics"
    }
    r = requests.get(url, headers=headers, params=params)
    r.raise_for_status()
    return r.json()

hashtags = ["#BahaSaLuneta"]
frames = []
for tag in hashtags:
    data = fetch_hashtag_data(tag)
    df_temp = pd.DataFrame(data.get("data", []))
    if not df_temp.empty:
        df_temp["hashtag"] = tag
        frames.append(df_temp)

if not frames:
    raise ValueError("No data retrieved from X API. Check hashtags or API permissions.")

df = pd.concat(frames, ignore_index=True)

In [None]:
# =====================
# 3. Wrangle Engagement Data
# =====================
# Convert created_at to datetime
df['created_at'] = pd.to_datetime(df['created_at'], format="%a %b %d %H:%M:%S %z %Y")

# Create a created_date field
df['created_date'] = df['created_at'].dt.date

display(df)

In [None]:
# Engagement = likes + retweets + replies + quotes
df["engagement"] = df["likes"] + df["retweets"] + df["replies"] + df["quotes"]

print("Average engagement per hashtag per date:")
display(df.groupby(["query", "created_date"])["engagement"].mean())

In [None]:
# =====================
# 4. SQL Queries
# =====================
conn = sqlite3.connect(":memory:")
df.to_sql("tweets", conn, index=False, if_exists="replace")

query = """
SELECT query,
       created_date,
       COUNT(*) as num_posts,
       AVG(engagement) as avg_engagement
FROM tweets
GROUP BY query, created_date
ORDER BY num_posts DESC
"""

# Create a new df from sqlite query
sql_df = pd.read_sql_query(query, conn)
print("\nSQL Aggregation Result:")
print(sql_df)

In [None]:
# =====================
# 5. Visualization
# =====================
avg_engagement = df.groupby(["created_date", "query"])["engagement"].mean()
avg_engagement.plot(kind="bar", title=f"Average Daily Engagement", rot=0)
plt.xticks(rotation=90)
plt.ylabel("Engagement (likes+retweets+replies+quotes)")
plt.show()

In [None]:
# Do for the sql_df
sql_df.plot(kind="line", x='created_date', y='avg_engagement', title=f"Average Daily Engagement", rot=0)
plt.ylabel("Engagement (likes+retweets+replies+quotes)")
plt.show()

In [None]:
sql_df_pvt = sql_df.pivot(index='created_date', columns='query', values='avg_engagement')
ax = sql_df_pvt.plot(kind="line", figsize=(5,3))
fig = ax.get_figure()

In [None]:
output_file = "report.xlsx"

with pd.ExcelWriter(output_file, engine="xlsxwriter") as writer:
    # Save DataFrame
    df['created_at'] = df['created_at'].astype('str')
    df.to_excel(writer, sheet_name="Data", index=False)

    # Access the underlying workbook/worksheet
    workbook = writer.book
    worksheet = workbook.add_worksheet("Plot")
    writer.sheets["Plot"] = worksheet

    # Save plot as image
    img_path = "plot.png"
    fig.savefig(img_path)

    # Insert plot image into Excel
    worksheet.insert_image("B2", img_path)

plt.close(fig)
print(f"✅ Saved DataFrame and plot to {output_file}")