In [40]:
import praw
import pymongo
import datetime
import requests

def stackoverflow_scraper(tag, keyword, db_name, user_agent, uri, fromDate, toDate):
    """
    This function searches for Stack Overflow questions tagged with a certain tag and saves the information to a MongoDB database.
    Parameters:
        tag(String) = tag name
        keyword(String) = keyword to search in titles and body of questions
        db_name(String) = what you want the database called
        user_agent(String) = user agent name
        uri(String) = The link to the MongoDB database 
        fraomDate(Datetime) = What date to start scrapping 
        toDate(Datetime) = The end date to end scrapping
    """
    # Set the base URL for the API
    base_url = "https://api.stackexchange.com/2.3/"

    # Setting the parameters for the request
    params = {
        "site": "stackoverflow",
        "tagged": tag,
        "filter": "withbody",
        "pagesize": 100,
        "fromdate": fromDate,
        "todate": toDate,
        "answers": "true"
    }

    # Set the headers for the request
    headers = {
        "User-Agent": user_agent
    }

    # Make the request to the API
    response = requests.get(f"{base_url}search", params=params, headers=headers)

    # Get the JSON data from the response
    data = response.json()
    # Name of the database
    client = pymongo.MongoClient(uri)
    db = client[db_name]
    collection = db["StackOverflowPosts"]
    collection_users = db["StackOverflowUsers"]
        
    # Loop through the questions and save them to the database
    for question in data["items"]:
        # If there is a duplicate question_id then skip over.
        if collection.find_one({"question_id": question["question_id"]}):
            continue 
            
        # Timestamp formatting
        timestamp = datetime.datetime.utcfromtimestamp(question["creation_date"])
        formatted_timestamp = timestamp.isoformat()

        # Question dictionary to be added to the database
        question_dict = {
            "question_id": question["question_id"],
            "title": question["title"],
            "body": question["body"],
            "score": question["score"],
            "tags": question["tags"],
            "view_count": question["view_count"],
            "answer_count": question["answer_count"],
            "timestamp": formatted_timestamp,
        }
        collection.insert_one(question_dict)

        # User dictionary to be added to the database
        user_id = question["owner"]["user_id"]
        user_response = requests.get(f"{base_url}users/{user_id}", params={"site": "stackoverflow"}, headers=headers)
        user_data = user_response.json()
        # If there is a duplicate user_id then skip over.
        if collection.find_one({"user_id":user_id,}):
            continue
            
        user_dict = {
            "username": user_data["items"][0].get("display_name", "N/A"),
            "user_id": user_id,
            "location": user_data["items"][0].get("location", "N/A"),
            "question_count":user_questions(user_id),
            "answer_count":user_questions(user_id),
            "reputation": user_data["items"][0].get("reputation", "N/A"),
            "badges": user_data["items"][0].get("badge_counts", "N/A"),
            "experience": user_data["items"][0].get("creation_date", "N/A"),
            "privileges": user_data["items"][0].get("privileges"),
            "account_id": user_data["items"][0].get("account_id", "N/A"),
            "is_employee": user_data["items"][0].get("is_employee", "N/A"),
            "last_access_date": user_data["items"][0].get("last_access_date", "N/A"),
            "reputation_change_year": user_data["items"][0].get("reputation_change_year", "N/A"),
            "reputation_change_quarter": user_data["items"][0].get("reputation_change_quarter", "N/A"),
            "reputation_change_month": user_data["items"][0].get("reputation_change_month", "N/A"),
            "reputation_change_week": user_data["items"][0].get("reputation_change_week", "N/A"),
            "reputation_change_day": user_data["items"][0].get("reputation_change_day", "N/A"),
            "user_type": user_data["items"][0].get("user_type", "N/A"),
            "link": user_data["items"][0].get("link", "N/A"),
            "profile_image": user_data["items"][0].get("profile_image", "N/A")
        }

        collection_users.insert_one(user_dict)

def user_answer(user_id):
    """
    Args:
        user_id (String): user_id of the user to get the number of answers they asked

    Returns:
        int: # of questions
    """
    url = f"https://api.stackexchange.com//2.3/users/{user_id}/answers?order=desc&sort=activity&site=stackoverflow&filter=total"

    #API request
    response = requests.get(url)

    # data = the json response
    data = response.json()

    # Return the answer count of a user
    answer_count = data['total']
    return answer_count

def user_questions(user_id):
    """

    Args:
        user_id (String): user_id of the user to get the number of questions they asked

    Returns:
        int: # of answers
    """
    url = f"https://api.stackexchange.com//2.3/users/{user_id}/questions?order=desc&sort=activity&site=stackoverflow&filter=total"

    #API request
    response = requests.get(url)

    # data = the json response
    data = response.json()

    # returning the question count for a given user
    question_count = data['total']
    return question_count

In [6]:
tag = "python"
keyword = None

db_name = "StackOverFlowData"
uri = "mongodb+srv://testbot:king@cluter1.kov9r66.mongodb.net/?retryWrites=true&w=majority"
username = "Da16King"
fromDate = datetime.datetime(2020,1,1)
toDate = datetime.datetime(2023,4,4)

#stackoverflow_scraper(tag,keyword, db_name , username, uri, fromDate, toDate)

In [41]:
print(user_answer('4942017'))
print(user_questions('4942017'))

3
11
