In [None]:
def get_random_timelines(api, num_timelines=30):
    """
    Gets a number of timelines for random users

    Args:
        api: An instance of TweetUserAPI
        num_timelines: The number of timelines to get
    """

    # get user_ids
    user_ids = api.get_user_ids()

    # Run get_timeline num_timelines times with a random user_id
    for i in range(num_timelines):
        # select a random choice
        user_id = random.choice(user_ids)

        timeline = api.get_timeline(user_id)

        unpacked_timeline = [(tweet.user_id, tweet.tweet_text) for tweet in timeline]
        print(f"Timeline for user_id {user_id}: {unpacked_timeline}")


def main():
    # Authenticate
    api = TweetUserAPI(
        "tweetuser", "password", "Tweets"
    )

    # Get random timelines
    get_random_timelines(api)


# Driver Code
if __name__ == "__main__":
        main()

In [None]:
def api_tracker(api, call_function, num_iterations=30, per_second=True):
    """
    Tracks the number of API calls made when getting random timelines

    Args:
        api: An instance of TweetUserAPI
        num_timelines: The number of timelines to get
    """

    # Initialize API call counter
    api_calls = 0

    # Record start time
    start_time = time.time()

    # Run get_random_timelines and increment API call counter
    for _ in range(num_iterations):
        api_calls, *values = call_function # will use the api most likely


    # Record end time
    end_time = time.time()

    # Print total number of API calls and API calls per second
    print(f"Total API calls: {api_calls}")

    if per_second is True:
        # Calculate elapsed time
        elapsed_time = end_time - start_time

        # Calculate API calls per second
        api_calls_per_second = api_calls / elapsed_time

        print(f"API calls per second: {api_calls_per_second}")


def main():
    # Authenticate
    api = TweetUserAPI(
        "tweetuser", "password", "Tweets"
    )

    # Track API calls while getting random timelines
    api_tracker(api)


# Driver Code
if __name__ == "__main__":
    main()

In [1]:
import csv
import os
import pymysql
from tweet_mysql import TweetUserAPI
from tweet_objects import Tweet, User
import pandas as pd
import dotenv
import random
from datetime import datetime
from dotenv import load_dotenv, dotenv_values
from pprint import pprint
import time

In [None]:
os.chdir('/Users/jeffreypan/Documents/DS 4300/ds-4300-assignment1/tweet_py')

In [6]:
load_dotenv()

True

In [7]:
dotenv_values(".env")

OrderedDict([('TWEET_USER', 'tweetuser'), ('TWEET_PASSWORD', 'password')])

In [9]:
# initialize conenction given user and password created in sql file with database tweets
connection = pymysql.connect(host='localhost',
                             user=os.getenv("TWEET_USER"),
                             password=os.getenv("TWEET_PASSWORD"),
                             db='Tweets')


In [10]:
# testing out code
cursor = connection.cursor()
sql = "SELECT *  FROM Tweets LIMIT 10;" # automatically assumes count?
cursor.execute(sql)
result = cursor.fetchall() # fetching it gets the real data

# Convert the result to a DataFrame
df = pd.DataFrame(result, columns=[i[0] for i in cursor.description])

df.head()

Unnamed: 0,tweet_id,user_id,tweet_ts,tweet_text


In [None]:
# testing out code
cursor = connection.cursor()
sql = "SELECT COUNT(tweet_id) FROM tweets;"
cursor.execute(sql)
result = cursor.fetchall() # fetching it gets the real data

# Convert the result to a DataFrame
df = pd.DataFrame(result, columns=[i[0] for i in cursor.description])

df.head()

In [None]:
# testing out code
cursor = connection.cursor()
sql = "SELECT * FROM Follows;" # automatically assumes count?
cursor.execute(sql)
result = cursor.fetchall() # fetching it gets the real data

# Convert the result to a DataFrame
df = pd.DataFrame(result, columns=[i[0] for i in cursor.description])

df.head()

In [None]:
# Establish a database connection
connection = pymysql.connect(
    host="localhost", user="tweetuser", password="password", db="Tweets"
)


def read_tweet_csv(api, csv_data):
    """
    Reads a CSV file and returns a list of Tweet objects

    Args:
        csv_file: A CSV file containing tweet data
    Returns:

        A list of Tweet objects
    """

    for row in csv_data:
        one_tweet = Tweet(
            int(row["USER_ID"]),
            row["TWEET_TEXT"],
            datetime.now()
        )
        api.post_tweet(one_tweet)
        print(row)



def main(csv_file):
    # Open the CSV file once to get the data object,
    # then can get each row with the read_tweet_csv function instead of reading the file each function call
    csv_data = csv.DictReader(open(csv_file))
    # Authenticate
    api = TweetUserAPI(
        "tweetuser", "password", "Tweets"
    )

    # Load tweets data into sql database one at a time
    read_tweet_csv(api, csv_data)

# Driver Code
if __name__ == "__main__":
    main(
        csv_file="/Users/jeffreypan/Documents/DS 4300/ds-4300-assignment1/hw1_data/tweets_sample.csv"
    )  # set filename to tweets to initialize tweets table


The number of API calls per second is a measure of how many times your program is able to send requests to the API within one second. It's a common way to measure the rate of requests in applications that interact with APIs.

This metric is important because most APIs have a limit on the number of requests you can make in a certain period of time, often referred to as rate limiting. If your program makes requests too quickly and exceeds this limit, the API might respond with an error, or your access to the API might be temporarily or permanently blocked.

By monitoring the number of API calls per second, you can ensure that your program stays within the API's rate limits. If necessary, you can adjust your program to make requests more slowly to avoid exceeding these limits.

In [None]:
import time
import csv

# Establish a database connection
connection = pymysql.connect(
    host="localhost", user="tweetuser", password="password", db="Tweets"
)

def read_tweet_csv(api, csv_file):
    """
    Reads a CSV file and returns a list of Tweet objects

    Args:
        csv_file: A CSV file containing tweet data
    Returns:

        A list of Tweet objects
    """

    api_calls = 0
    start_time = time.time()

    with open(csv_file, 'r') as f:
        csv_data = csv.DictReader(f)
        for i, row in enumerate(csv_data, start=1):
            one_tweet = Tweet(
                int(row["USER_ID"]),
                row["TWEET_TEXT"],
                datetime.now())
            api.post_tweet(one_tweet)

            # add api calls
            api_calls += 1

    elapsed_time = time.time() - start_time
    if elapsed_time > 0:
        print(f"API calls per second: {api_calls / elapsed_time}")

def main(csv_file):
    # Authenticate
    api = TweetUserAPI(
        "tweetuser", "password", "Tweets"
    )

    # Load tweets data into sql database one at a time
    read_tweet_csv(api, csv_file)

# Driver Code
if __name__ == "__main__":
    main(
        csv_file="/Users/jeffreypan/Documents/DS 4300/ds-4300-assignment1/hw1_data/tweet.csv"
    )  # set filename to tweets to initialize tweets table

In [None]:
cursor = connection.cursor()
sql = "SELECT DISTINCT user_id FROM Tweets;"
cursor.execute(sql)
result = cursor.fetchall()
# Convert the result to a DataFrame
df = pd.DataFrame(result, columns=[i[0] for i in cursor.description])
df.head()