<a href="https://colab.research.google.com/github/shamilkv-623/Assignments-in-course/blob/main/Election_Data_Scraping_and_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Data scraping

In [1]:
import requests
import json
import pandas as pd
import numpy as np
from tqdm import tqdm
from urllib.parse import urljoin


In [2]:
def get_data_and_next_page(response_text:str):
    start_idx = response_text.find('const data =') + len('const data =')
    end_idx = response_text.rfind(';', start_idx, response_text.find('const next ='))
    data_json = response_text[start_idx:end_idx].strip()
    data = json.loads(data_json)

    start_idx = response_text.find('const next =') + len('const next =')
    end_idx = response_text.find(';', start_idx)
    next_page = response_text[start_idx:end_idx].strip().strip('"')

    return data, next_page

def navigate_next(base_url, next_page):
    if next_page != "null":
        return urljoin(base_url, next_page)
    else:
        return None

def scrape_data(base_url, state_data):
    election_data = {"ID": [], "ST_NAME": [], "YEAR": [], "AC": [],
                 "CANDIDATE": [], "SEX": [], "AGE": [], "CATEGORY": [],
                 "PARTY": [], "VOTES": []}
    for entry in state_data:
        next = entry['link']
        url = urljoin(base_url, next)
        while True:
            response = requests.get(url)
            data, next_page = get_data_and_next_page(response.text)
            for row in tqdm(data, desc=f"Scraping data for year {entry['YEAR']}"):
                for key in election_data.keys():
                    election_data[key].append(str(row.get(key, list(row.values())[0])).strip())
            url = navigate_next(base_url, next_page)
            if not url:
                break
    return election_data

def save(ST_NAME, AC_NAME, election_data):
    df = pd.DataFrame(election_data)
    df.replace('', np.nan, inplace=True)
    df = df.loc[df['VOTES'] != "None"]
    df['AC'] = df['AC'].str.replace(r'^\d+\s+', '', regex=True)
    df['AC'] = df['AC'].str.replace(r'[^a-zA-Z\s\.]', '', regex=True)
    df['CANDIDATE'] = df['CANDIDATE'].str.replace(r'^\d+\s+', '', regex=True)
    df['CANDIDATE'] = df['CANDIDATE'].str.replace(r'[^a-zA-Z\s\.]', '', regex=True)
    df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)
    df = df.drop_duplicates()
    df = df.drop_duplicates(subset=['ST_NAME', 'YEAR', 'AC', 'CANDIDATE', 'SEX', 'AGE', 'CATEGORY', 'PARTY', 'VOTES'])
    df.to_csv(f"{ST_NAME}.csv", index=False)
    condition = (df["AC"] == AC_NAME)
    df = df[condition]
    df.to_csv(f"{ST_NAME}_{AC_NAME}.csv", index=False)

def find_state_data(start_url, ST_NAME):
    url = start_url
    state_data = []
    n_pages = 10
    for i in tqdm(range(n_pages), desc="Crawling all pages"):
        if url:
            response = requests.get(url)
            data, next_page = get_data_and_next_page(response.text)
            state_data.extend([entry for entry in data if entry['ST_NAME'] == ST_NAME])
            url = navigate_next(start_url, next_page)
    return state_data



In [None]:
ST_NAME = "HIMACHAL PRADESH"
AC_NAME = "RAMPUR"
# saves full state csv and also constituency csv
# scraped data undergoes priliminary cleaning as well
data = main_scraper(ST_NAME, AC_NAME)

In [None]:
def main_scraper(ST_NAME, AC_NAME):
    start_url = "https://22f3001919.github.io/tds_project_1/"
    state_data = find_state_data(start_url, ST_NAME)

    if not state_data:
        print(f"No data found for state: {ST_NAME}")
        return

    election_data = scrape_data(start_url, state_data)
    save(ST_NAME, AC_NAME, election_data)
    return election_data


For the given AC_NAME, What percentage of elections did female candidates win, when there was at least one female candidate in that election

In [None]:
ac_df = pd.read_csv(f"{ST_NAME}_{AC_NAME}.csv")
elections_with_female = ac_df.groupby('YEAR').filter(lambda x: (x['SEX'] == 'F').any())
winners = elections_with_female.loc[elections_with_female.groupby('YEAR')['VOTES'].idxmax()]
total_elections_with_female = winners['YEAR'].nunique()
female_wins = winners[winners['SEX'] == 'F']['YEAR'].nunique()
if total_elections_with_female == 0:
    percentage_female_wins = 0
else:
  percentage_female_wins = (female_wins / total_elections_with_female) * 100
percentage_female_wins

For the given AC_NAME, Which election year had the most female candidates contesting an election?

In [None]:
ac_df = pd.read_csv(f"{ST_NAME}_{AC_NAME}.csv")
elections_with_female_candidates = ac_df[ac_df["SEX"] == 'F']
print("List of years sorted in decreasing order of number of female candidates:")
elections_with_female_candidates['YEAR'].value_counts().sort_values(ascending=False)

For the given AC_NAME, If the second and the third candidate in an election combined their vote, how many elections would they win?

In [None]:
ac_df = pd.read_csv(f"{ST_NAME}_{AC_NAME}.csv")
year_counts = ac_df["YEAR"].value_counts()
filtered_years = year_counts[year_counts >= 3].index
filtered_df = ac_df[ac_df['YEAR'].isin(filtered_years)].sort_values(by=['YEAR', 'VOTES'], ascending=[True, False])
second_third_candidates = filtered_df.groupby('YEAR').head(3).groupby('YEAR').tail(2)
second_third_combined = second_third_candidates.groupby('YEAR')["VOTES"].sum()
winner_count = filtered_df.groupby('YEAR').head(1).groupby('YEAR')["VOTES"].sum()
result = (winner_count - second_third_combined) < 0
result.sum()

What is the latest year in which the winner won more than 50 percent of the vote

In [None]:
ac_df = pd.read_csv(f"{ST_NAME}_{AC_NAME}.csv")
winners = ac_df.loc[ac_df.groupby('YEAR')['VOTES'].idxmax()].copy()
total_votes =  ac_df.groupby('YEAR')['VOTES'].sum().values
winners["VOTE_SHARE"] = 100 * ac_df.loc[ac_df.groupby('YEAR')['VOTES'].idxmax()]["VOTES"].values / total_votes
winners_above_50 = winners[winners["VOTE_SHARE"] > 50]
winners_above_50["YEAR"].max()

For the given AC_NAME, What is the average vote share of the winners across all elections? Give your answer to 2 decimal places

In [None]:
ac_df = pd.read_csv(f"{ST_NAME}_{AC_NAME}.csv")
winners = ac_df.loc[ac_df.groupby('YEAR')['VOTES'].idxmax()].copy()
total_votes = ac_df.groupby('YEAR')['VOTES'].sum()
winners['VOTE_SHARE'] = (100 * winners['VOTES'].values/total_votes).values
winners['VOTE_SHARE'].values.mean()

For the given AC_NAME, In which year was the difference between the first and last candidates votes the biggest

In [None]:
ac_df = pd.read_csv(f"{ST_NAME}_{AC_NAME}.csv")
df_sorted = ac_df.sort_values(by=['YEAR', 'VOTES'], ascending=[True, False])
grouped = df_sorted.groupby('YEAR')
def vote_difference(group):
    return group.iloc[0]['VOTES'] - group.iloc[-1]['VOTES']

differences = grouped.apply(vote_difference)
print("List of years sorted in decreasing order of difference between first and last candidate:")
differences.sort_values(ascending=False)

What is the highest margin percentage by which the winning candidate has defeated the runner up in any election? Give your answer to 2 decimal places.

In [None]:
ac_df = pd.read_csv(f"{ST_NAME}_{AC_NAME}.csv")
winners = ac_df.loc[ac_df.groupby('YEAR')['VOTES'].idxmax()].copy()
runner_ups = ac_df.drop(winners.index)
runner_ups = ac_df.loc[runner_ups.groupby('YEAR')['VOTES'].idxmax()]
total_votes = ac_df.groupby('YEAR')['VOTES'].sum().values
winners["VOTE_SHARE"] = 100 * ac_df.loc[ac_df.groupby('YEAR')['VOTES'].idxmax()]["VOTES"].values / total_votes
runner_ups["VOTE_SHARE"] = 100 * ac_df.loc[runner_ups.groupby('YEAR')['VOTES'].idxmax()]["VOTES"].values / total_votes
margins = winners["VOTE_SHARE"].values - runner_ups["VOTE_SHARE"].values
margins.max()

For a given constituency, how many constituencies within the same state are less than 20 kms

In [None]:
state_df = pd.read_csv(f"{ST_NAME}.csv")
def get_coordinates(state:str, constituency:str):
    try:
        geolocator = Nominatim(user_agent="Mozilla/5.0")
        location = geolocator.geocode(f"{constituency.lower()}, {state.lower()}, India")
        # print(location)
        if location:
            return location.latitude, location.longitude
        else:
            return None, None
    except:
        return None, None

def calculate_distance(given_coords, lat, lon):
    try:
        if lat and lon:
            return geodesic(given_coords, (lat, lon)).kilometers
        else:
            return None
    except:
        return None
constituency_list = {place:() for place in state_df["AC"].unique()}
estimated_time = 2.8 * len(constituency_list.keys())
print("Getting latitude and longitude for all constituencies..")
print(f"Estimated Time:{estimated_time} seconds")

for key in constituency_list.keys():
    sleep(2)
    constituency_list[key] = get_coordinates(ST_NAME, key)
print("Done")
temp_df = state_df.copy()
temp_df['LAT'], temp_df['LON'] = zip(*state_df.apply(lambda row: constituency_list[row["AC"]], axis=1))
given_coords = temp_df.loc[state_df['AC'] == AC_NAME, ['LAT', 'LON']].values[0]
temp_df['DISTANCE'] = temp_df.apply(lambda row: calculate_distance(given_coords, row['LAT'], row['LON']), axis=1)
nearby_constituencies = temp_df[(temp_df['DISTANCE'] < 20) & (temp_df['AC'] != AC_NAME)]
print("Count of constituencies within 20 kms:")
nearby_constituencies['AC'].unique().shape

Find the Pearson correlation coefficient between the votes won by female candidates and male candidates. Only include constituencies and election years that had at least 1 female candidate. Each row in the correlation data table should represent an election.

In [None]:
state_df = pd.read_csv(f"{ST_NAME}.csv")
df_filtered = state_df.groupby(['YEAR', 'AC']).filter(lambda x: (x['SEX'] == 'F').any())
df_female = df_filtered[df_filtered['SEX'] == 'F'][['YEAR', 'VOTES']].rename(columns={'VOTES': 'FEMALE_VOTES'})
female_votes_by_year = df_female.groupby('YEAR')['FEMALE_VOTES'].sum()
df_male = df_filtered[df_filtered['SEX'] == 'M'][['YEAR', 'VOTES']].rename(columns={'VOTES': 'MALE_VOTES'})
male_votes_by_year = df_male.groupby('YEAR')["MALE_VOTES"].sum()
male_votes_by_year.corr(female_votes_by_year)

For a given state, identify the election year which has the most outliers on the basis of candidate votes. Use the interquartile range (IQR) rule to detect outliers

In [None]:
state_df = pd.read_csv(f"{ST_NAME}.csv")
grouped = state_df.groupby('YEAR')
Q1 = grouped['VOTES'].transform(lambda x: x.quantile(0.25))
Q3 = grouped['VOTES'].transform(lambda x: x.quantile(0.75))
IQR = Q3 - Q1
lb = Q1 - 1.5 * IQR
ub = Q3 + 1.5 * IQR
condition = (state_df['VOTES'] < lb) | (state_df['VOTES'] > ub)
print("List of years sorted in descending order of number of outliers:")
state_df[condition]["YEAR"].value_counts()