# Data collection

TODO: Insert comments in this part

## Get the list of candidates

In [None]:
#%%
import fitz
import re
import pandas as pd
from os import path
from glob import glob

# Concatenate hyphenated words
TEXT_FLAGS = fitz.TEXT_DEHYPHENATE


def extract_text_from_doc(doc):
    """Extracts text from fitz.Document

    Args:
        doc (fitz.Document): fitz.Document to extract text from

    Returns:
        doc_text(List[str]): List of strings where string is cleaned text from page in the doc.
    """
    # Get raw text from doc for every page
    page_text = (page.get_text("text", flags=TEXT_FLAGS) for page in doc)
    # Split every page_text into list of strings where there are \n.
    page_text = (text.split("\n") for text in page_text)
    # Remove leading and trailing space of every string in list.
    page_text = (list((string.strip() for string in text)) for text in page_text)
    # Remove empty strings
    page_text = (list(filter(None, text)) for text in page_text)
    # Concatenate all strings
    page_text = (" ".join(text) for text in page_text)
    doc_text = list(page_text)
    return doc_text


def find_ext(dr, ext):
    return glob(path.join(dr, "*.{}".format(ext)))


file_paths = find_ext("candidates/", "pdf")


def get_data(file_path):
    """Extracts text from fitz.Document

    Args:
        file_path (str): File path to pdf.

    Returns:
        party_df (pd.DataFrame): Dataframe with candate data and party affiliation.
    """
    doc = fitz.open(file_path)  # Open PDF
    text = extract_text_from_doc(doc)  # Extract text from all pages
    text.pop(0)
    text = "\n".join(text)

    ## CLEAN DOCUMENT ##
    text = text.replace(
        "(Prioriteret sideordnet opstilling anmeldt i følgende opstillingskredse: Alle) (Valg på personlige stemmer anmeldt)",
        "",
    )
    text = text.replace(
        "Alle",
        "",
    )
    text = "".join([i for i in text if not i.isdigit()])
    text = text.replace(
        "Kandidaternes navne på stemmesedlen     Opstillet i opstillingskreds nr.",
        "",
    )
    text = text.replace(
        "A. Socialdemokratiet ",
        "PARTI,",
    )
    text = text.replace(
        "V. Venstre, Danmarks Liberale Parti",
        "PARTI,",
    )
    text = text.replace(
        "Å. Alternativet",
        "PARTI,",
    )
    text = text.replace(
        "Ø. Enhedslisten - De Rød-Grønne",
        "PARTI,",
    )
    text = text.replace(
        "Æ. Danmarksdemokraterne - Inger Støjberg",
        "PARTI,",
    )
    text = text.replace(
        "Q. Frie Grønne, Danmarks Nye Venstrefløjsparti",
        "PARTI,",
    )
    text = text.replace(
        "M. Moderaterne",
        "PARTI,",
    )
    text = text.replace(
        "I. Liberal Alliance",
        "PARTI,",
    )
    text = text.replace(
        "C. Det Konservative Folkeparti",
        "PARTI,",
    )
    text = text.replace(
        "F. SF - Socialistisk Folkeparti",
        "PARTI,",
    )
    text = text.replace(
        "K. KD - Kristendemokraterne",
        "PARTI,",
    )
    text = text.replace(
        "O. Dansk Folkeparti",
        "PARTI,",
    )
    text = text.replace(
        "B. Radikale Venstre",
        "PARTI,",
    )
    text = text.replace(
        "D. Nye Borgerlige",
        "PARTI,",
    )
    text = text.replace(
        "Uden for partierne",
        "PARTI,",
    )
    text = text.replace(
        "(Prioriteret sideordnet opstilling anmeldt i følgende opstillingskredse: )",
        "",
    )
    text = text.replace(
        "(Partiliste anmeldt) .",
        "",
    )
    text = re.sub("\s\s+", ",", text)
    text = text.replace(
        ",,",
        ",",
    )
    text = text.replace(
        "Internal - KMD A/S . ",
        "",
    )
    text = text.replace(
        "Internal - KMD A/S ",
        "",
    )

    text_party = text.split("PARTI")

    parties = []
    for i in range(len(text_party)):
        text_element = text_party[i].split(",")
        text_element = [text_ele.strip() for text_ele in text_element]

        while "" in text_element:
            text_element.remove("")

        for i, text_ele in enumerate(text_element):
            if text_ele[0:2] == ". ":
                text_element[i] = text_ele[2:]

        parties.append(text_element)

    parties.pop(0)

    # List of parties
    party_list = [
        "A",
        "B",
        "C",
        "D",
        "F",
        "I",
        "K",
        "M",
        "O",
        "Q",
        "V",
        "Æ",
        "Ø",
        "Å",
        "UDEN",
    ]

    candidate_dict = {"Candidate": [], "Party": []}
    for (candidates, party) in zip(parties, party_list):
        for candidate in candidates:
            candidate_dict["Candidate"].append(candidate)
            candidate_dict["Party"].append(party)

    party_df = pd.DataFrame.from_dict(candidate_dict)

    return party_df


final_df = pd.DataFrame.from_dict({"Candidate": [], "Party": []})

# Make dataframe for all pdf files.
for i in range(len(file_paths)):
    party_df = get_data(file_paths[i])
    final_df = final_df.append(party_df, ignore_index=True)
# %%
final_df = final_df.drop_duplicates()
final_df.to_csv("all_candidates.csv", index=False)

## Scrape Twitter users

In [None]:
from __future__ import annotations

import csv
import os
import time
from selenium import webdriver
from lxml import html

from pelutils import log, LogLevels

SEARCH_URL = "https://twitter.com/search?q={0}&src=unknown&f=user"
DRIVER_FOLDER = "."
os.environ["PATH"] += ":" + DRIVER_FOLDER

def search_user(name: str, driver: webdriver) -> tuple[str, str, str]:
    log("Getting twitter info for %s" % name)
    driver.get(SEARCH_URL.format(name))
    time.sleep(7)  # Wait for JS to load :-P
    tree = html.fromstring(driver.page_source)
    allres = list()
    m = True
    i = 1
    while m:
        m = tree.xpath(f"/html/body/div[1]/div/div/div[2]/main/div/div/div/div[1]/div/div[3]/div/section/div/div/div[{i}]/div/div/div/div/div[2]")
        i += 1
        allres.extend(m)
    for res in allres:
        try:
            name_div, bio_div = list(res)
            name_div = list(list(name_div)[0])[0]
            display_div, handle_div = list(name_div)
            name = display_div[0][0][0][0][0].text
            handle = handle_div[0][0][0][0][0].text
            bio = bio_div[0].text
            bio = ""
            for bio_elem in bio_div:
                try:
                    # Links split up the text
                    bio += bio_elem[0][0].text
                except IndexError:
                    bio += bio_elem.text
            log("Got data for %s with bio" % handle, with_info=False)
        except ValueError:
            # No bio (probably, at least in one case)
            name_div = res[0]
            name_div = list(list(name_div)[0])[0]
            display_div, handle_div = list(name_div)
            name = display_div[0][0][0][0][0].text
            handle = handle_div[0][0][0][0][0].text
            bio = ""
            log("Got data %s with no bio" % handle, with_info=False)
        return name, handle, bio

    options = webdriver.FirefoxOptions()
    options.headless = True
    driver = webdriver.Firefox(options=options)
    log.configure("scrape.log", print_level=LogLevels.DEBUG)

    data = list()

with open("data/all_candidates.csv") as fp_cand, open("data/candidates_full.csv", "w") as fp_full:
    reader = csv.reader(fp_cand, delimiter=",")
    writer = csv.writer(fp_full, delimiter=",", quoting=csv.QUOTE_MINIMAL)
    fails = list()
    writer.writerow(("Name", "Handle", "Party", "Bio"))
    next(reader)  # Skip header
    for row in reader:
        name, party_letter = row
        try:
            name, handle, bio = search_user(name, driver)
        except TypeError:
            log.error("Failed to get data for %s" % name)
            fails.append(name)
        writer.writerow((name, handle, party_letter, bio))
        fp_full.flush()  # Force write to file during run
    log("Failed to get info for the following", *fails)

## Get tweets from Twitters API

In [None]:
import sys, os
from datetime import datetime
import time
import json
from typing import List, Tuple

import pandas as pd
import numpy as np
import tweepy

STD_PATH = os.path.join(os.path.dirname(sys.argv[0]), "..", "secrets.json")


def get_client(secret_path: str = STD_PATH):
    try:
        with open(secret_path, "r") as f:
            secrets = json.load(f)
    except FileNotFoundError:
        raise FileNotFoundError(
            "HEY! You don't have any secrets! You must get them secretly from Søren"
        )

    return tweepy.Client(secrets["bearer"], wait_on_rate_limit=True)


def get_ids(client: tweepy.Client, usernames: List[str]) -> List[Tuple[int, str]]:
    BATCH_SIZE = 75
    SLEEP = 5
    res = list()
    num_batches = int(np.ceil(len(usernames) / BATCH_SIZE))
    for i in range(num_batches):
        print(f"{i}/{num_batches-1}")
        batch = usernames[i * BATCH_SIZE : (i + 1) * BATCH_SIZE]
        try:
            res.extend([r["id"] for r in client.get_users(usernames=batch).data])
        except Exception as e:
            print(f"Failed! with {e}")
            res.append([None for _ in batch])
        time.sleep(SLEEP)
    return res


def get_user_tweets(client: tweepy.Client, user_id: str):
    END = datetime.fromisoformat("2022-11-02")
    START = datetime.fromisoformat("2022-10-04")

    tweets = list()
    res = client.get_users_tweets(user_id, max_results=100)
    if res.data:
        tweets.extend(res.data)
    while res.meta.get("next_token") and len(res.data):
        res = client.get_users_tweets(
            user_id,
            pagination_token=res.meta["next_token"],
            max_results=100,
            start_time=START,
            end_time=END,
        )
        if res.data:
            tweets.extend(res.data)
    return [(t.id, t.text) for t in tweets] if tweets else []


def get_all_user_tweets(client: tweepy.Client, user_ids: List[str]) -> pd.DataFrame:
    SLEEP = 2

    failed_ids = list()
    tweets, tids, uids = list(), list(), list()
    for i, uid in enumerate(user_ids):
        print(f"{i}/{len(user_ids)-1}")
        try:
            utweets = get_user_tweets(client, uid)
            for (tid, ttxt) in utweets:
                tids.append(tid)
                tweets.append(ttxt)
                uids.append(uid)
        except Exception as e:
            print(e, uid)
            failed_ids.append(uid)
        time.sleep(SLEEP)
    print("pls retry:", failed_ids)
    return pd.DataFrame(dict(tweetID=tids, userID=uids, tweet=tweets))


client = get_client()
df = pd.read_csv("data/candidates_full.csv")
df["id"] = get_ids(client, [h.replace("@", "") for h in df.Handle])
df.to_csv("data/candidates_with_id.csv")

df = pd.read_csv("data/candidates_with_id.csv")
df_tweet = get_all_user_tweets(client, df["id"])
df_tweet.to_csv("data/tweets.csv")