## Real-Time Twitter Data Ingestion Using Delta Live Tables (DLT) Streaming Pipeline

###Flow Diagram Of Project

![](/Volumes/data_gov/data_gov_test/image/tweitter_fow.jpg)

### Integrate with the Twitter API to retrieve data and ingest it into DBFS (Databricks File System)

In [0]:
pip install tweepy

In [0]:
pip install requests


### Extracting the Twwiter Time line data from Own Acccount

In [0]:
import requests
import json
import os
import time

# === CONFIGURATION ===
BEARER_TOKEN = "AAAAAAAAAAAAAAAAAAAAAOnN2QEAAAAAkDVTjCBFdlU%2BLAravXrKB%2FkpuSY%3DYSwPr6eCqKVoWDHPJcG2ZODNcPLRYfSSvi9K6jaEkpCOAuyPnC"  
USERNAME = 'madimgiri'       
DBFS_PATH = '/Volumes/data_gov/data_gov_test/twitter/my_tweets.json'

# === Twitter API Headers ===
headers = {
    'Authorization': f'Bearer {BEARER_TOKEN}',
    'User-Agent': 'v2UserTimelinePython'
}

# === Get user ID from username ===
def get_user_id(username):
    url = f'https://api.twitter.com/2/users/by/username/{username}'
    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        raise Exception(f"Failed to get user ID: {response.status_code} {response.text}")
    return response.json()['data']['id']

# === Retry wrapper with backoff ===
def safe_request(url, headers, params, retries=3, wait=60):
    for attempt in range(retries):
        response = requests.get(url, headers=headers, params=params)
        if response.status_code == 200:
            return response
        elif response.status_code == 429:
            print(f"⚠️ Rate limited. Waiting {wait} seconds (attempt {attempt + 1}/{retries})...")
            time.sleep(wait)
        else:
            raise Exception(f"HTTP Error {response.status_code}: {response.text}")
    raise Exception("❌ Failed after retries due to rate limiting.")

# === Fetch up to 800 tweets (max 8 pages) ===
def get_user_timeline(user_id, max_pages=8):
    url = f'https://api.twitter.com/2/users/{user_id}/tweets'
    params = {
        'max_results': 100,
        'tweet.fields': 'created_at,id,text'
    }

    all_tweets = []
    next_token = None

    for _ in range(max_pages):
        if next_token:
            params['pagination_token'] = next_token

        response = safe_request(url, headers, params)
        data = response.json()
        tweets = data.get('data', [])
        all_tweets.extend(tweets)

        next_token = data.get('meta', {}).get('next_token')
        if not next_token:
            break

        time.sleep(1)  # Pause to respect Twitter API pacing

    return all_tweets

# === Save to local path (not FileStore, which is disabled) ===
def save_to_local(tweets, file_path=DBFS_PATH):
    local_path = f'/tmp{file_path}' if file_path.startswith('/dbfs') else file_path
    os.makedirs(os.path.dirname(local_path), exist_ok=True)
    with open(local_path, 'w', encoding='utf-8') as f:
        json.dump(tweets, f, ensure_ascii=False, indent=2)

# === Main Script ===
if __name__ == '__main__':
    try:
        user_id = get_user_id(USERNAME)
        tweets = get_user_timeline(user_id)
        save_to_local(tweets)
        print(f"✅ Saved {len(tweets)} tweets to local path: {DBFS_PATH}")
    except Exception as e:
        print(f"❌ Error: {e}")


In [0]:
#Read File from DBFS
df = spark.read.format("json").load("/Volumes/data_gov/data_gov_test/twitter/my_tweets.json").show()
display(df)

In [0]:
%fs ls dbfs:/FileStore/
