Notebook for listhost signup -- have to sign into listhost, then interact with page

In [22]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import logging
import time
import json
from typing import Optional, Dict
from urllib.parse import urlparse

In [20]:
with open('../login.json', 'r') as file:
    login = json.load(file)

# Access the credentials
email = login['email']
password = login['password']

In [2]:
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='listhost_login.log'
)

In [16]:
class ListhostLogin:
    def __init__(self, email: str, password: str):
        self.email = email
        self.password = password
        self.driver = webdriver.Chrome()
        self.wait = WebDriverWait(self.driver, 10)
        
    def login(self) -> bool:
        """Handle the complete login process"""
        try:
            # 1. Navigate to the main page
            logging.info("Navigating to main page...")
            self.driver.get("https://lists.uchicago.edu/web")
            time.sleep(2)  # Wait for page to fully load
            
            # 2. Click the initial login button
            logging.info("Clicking initial login button...")
            login_button = self.wait.until(
                EC.element_to_be_clickable((By.NAME, "action_login"))
            )
            login_button.click()
            time.sleep(1.5)  # Wait for transition
            
            # Wait for page transition (checking for email field presence)
            email_field = self.wait.until(
                EC.presence_of_element_located((By.ID, "email_login"))
            )
            
            # 3. Fill in credentials
            logging.info("Filling credentials...")
            email_field.send_keys(self.email)
            time.sleep(0.5)  # Small wait between fields
            
            password_field = self.driver.find_element(By.ID, "passwd")
            password_field.send_keys(self.password)
            time.sleep(0.5)  # Wait before clicking submit
            
            # 4. Click the login submit button
            logging.info("Submitting login form...")
            submit_button = self.driver.find_element(
                By.CSS_SELECTOR, 
                "button[class='heavyWork'][type='submit'][name='action_login']"
            )
            submit_button.click()
            time.sleep(2)  # Wait for login processing
            
            # 5. Wait for successful login (checking for email element)
            self.wait.until(
                EC.presence_of_element_located(
                    (By.CSS_SELECTOR, f'li[aria-label*="{self.email}"]')
                )
            )
            
            logging.info("Successfully logged in!")
            time.sleep(1)  # Final wait to ensure everything is loaded
            return True
            
        except TimeoutException as e:
            logging.error(f"Timeout while waiting for element: {str(e)}")
            return False
        except Exception as e:
            logging.error(f"Error during login process: {str(e)}")
            return False
    
    def is_logged_in(self) -> bool:
        """Check if we're currently logged in"""
        try:
            email_element = self.driver.find_element(
                By.CSS_SELECTOR, 
                f'li[aria-label*="{self.email}"]'
            )
            return email_element is not None
        except:
            return False
    
    def close(self):
        """Close the browser"""
        self.driver.quit()


In [12]:

# Usage example
def test_login(email: str, password: str):
    login_handler = ListhostLogin(email, password)
    try:
        success = login_handler.login()
        if success:
            print("Login successful!")
            # Here we could continue with list subscription logic
        else:
            print("Login failed!")
    finally:
        login_handler.close()

In [None]:
test_login(email, password)

Now making full scraper


In [25]:
class ListhostSubscriber(ListhostLogin):
    def __init__(self, email: str, password: str, max_subscriptions: int = 5):
        super().__init__(email, password)
        self.max_subscriptions = max_subscriptions
        self.subscription_log_path = 'subscription_log.json'
        self.subscription_attempts = self.load_subscription_log()
        
        # Track RSO subscription statuses
        self.subscribed_lists = []
        self.failed_lists = []
        self.pending_lists = []
        
        # Set up logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('listhost_subscriber.log'),
                logging.StreamHandler()  # This will print to console as well
            ]
        )
        
    def load_rso_data(self, rso_data_path: str) -> None:
        """Load RSO data and categorize listhosts based on previous attempts"""
        logging.info(f"Loading RSO data from: {rso_data_path}")
        
        try:
            with open(rso_data_path, 'r') as f:
                rsos = json.load(f)
            
            logging.info(f"Successfully loaded {len(rsos)} RSOs from file")
            
            # Reset tracking lists
            self.subscribed_lists = []
            self.failed_lists = []
            self.pending_lists = []
            
            valid_listhost_count = 0
            invalid_listhost_count = 0
            
            for rso in rsos:
                rso_name = rso.get('name', 'Unknown RSO')
                logging.info(f"\nProcessing RSO: {rso_name}")
                
                # Extract listhost email
                listhost_email = rso.get('additional_info', {}).get('RSO Listhost')
                if not listhost_email:
                    logging.warning(f"No listhost email found for: {rso_name}")
                    invalid_listhost_count += 1
                    continue
                
                logging.info(f"Found listhost email: {listhost_email}")
                
                # Extract listhost name
                listhost_name = self.extract_listhost_name(listhost_email)
                if not listhost_name:
                    logging.warning(f"Could not extract valid listhost name from: {listhost_email}")
                    invalid_listhost_count += 1
                    continue
                
                valid_listhost_count += 1
                
                # Create RSO info dictionary
                rso_info = {
                    'name': rso_name,
                    'listhost': listhost_name,
                    'email': listhost_email
                }
                
                # Categorize based on previous attempts
                if listhost_name in self.subscription_attempts:
                    status = self.subscription_attempts[listhost_name]['status']
                    logging.info(f"Previous attempt found for {listhost_name} - Status: {status}")
                    
                    if status == 'success' or status == 'already_subscribed':
                        self.subscribed_lists.append(rso_info)
                    else:
                        self.failed_lists.append({**rso_info, 'status': status})
                else:
                    logging.info(f"No previous attempts for {listhost_name} - Adding to pending")
                    self.pending_lists.append(rso_info)
            
            logging.info("\nRSO Data Loading Summary:")
            logging.info(f"Total RSOs processed: {len(rsos)}")
            logging.info(f"Valid listhosts found: {valid_listhost_count}")
            logging.info(f"Invalid/missing listhosts: {invalid_listhost_count}")
            logging.info(f"Currently subscribed: {len(self.subscribed_lists)}")
            logging.info(f"Previously failed: {len(self.failed_lists)}")
            logging.info(f"Pending subscription: {len(self.pending_lists)}")
            
        except FileNotFoundError:
            logging.error(f"RSO data file not found: {rso_data_path}")
            raise
        except json.JSONDecodeError:
            logging.error(f"Invalid JSON format in file: {rso_data_path}")
            raise
        except Exception as e:
            logging.error(f"Unexpected error loading RSO data: {str(e)}")
            raise
            
    def get_subscription_status(self) -> dict:
        """Return current subscription status counts"""
        return {
            'subscribed': len(self.subscribed_lists),
            'failed': len(self.failed_lists),
            'pending': len(self.pending_lists)
        }
        
    def process_pending_batch(self):
        """
        Process a batch of pending subscriptions up to max_subscriptions
        """
        subscriptions_processed = 0
        
        for rso in self.pending_lists[:self.max_subscriptions]:
            if subscriptions_processed >= self.max_subscriptions:
                break
                
            if self.subscribe_to_listhost(rso['listhost']):
                subscriptions_processed += 1
                self.pending_lists.remove(rso)
                self.subscribed_lists.append(rso)
            else:
                self.pending_lists.remove(rso)
                self.failed_lists.append(rso)
                
        logging.info(f"Batch complete. Processed {subscriptions_processed} subscriptions")
        
    def load_subscription_log(self) -> Dict:
        """Load or create subscription attempt log"""
        try:
            with open(self.subscription_log_path, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return {}
            
    def save_subscription_log(self):
        """Save current subscription attempts"""
        with open(self.subscription_log_path, 'w') as f:
            json.dump(self.subscription_attempts, f, indent=2)
            
    def extract_listhost_name(self, email: str) -> Optional[str]:
        """Extract listhost name from email address"""
        logging.info(f"Attempting to extract listhost name from: {email}")
        
        if not email:
            logging.warning("Empty email provided")
            return None
            
        if '@lists.uchicago.edu' not in email:
            logging.warning(f"Invalid listhost email format: {email}")
            return None
            
        listhost_name = email.split('@')[0].strip()
        logging.info(f"Extracted listhost name: {listhost_name}")
        return listhost_name
        
    def is_already_subscribed(self, listhost_name: str) -> bool:
        """Check if already subscribed to listhost"""
        try:
            # Navigate to info page
            self.driver.get(f"https://lists.uchicago.edu/web/info/{listhost_name}")
            time.sleep(5)  # Wait for page load
            
            # Look for unsubscribe link
            unsubscribe_links = self.driver.find_elements(
                By.XPATH, 
                f"//a[@href='/web/signoff/{listhost_name}?previous_action=info']"
            )
            return len(unsubscribe_links) > 0
            
        except Exception as e:
            logging.error(f"Error checking subscription status for {listhost_name}: {str(e)}")
            return False
            
    def subscribe_to_listhost(self, listhost_name: str) -> bool:
        """
        Attempt to subscribe to a single listhost
        Returns True if successful, False otherwise
        """
        if listhost_name in self.subscription_attempts:
            logging.info(f"Already attempted {listhost_name}, skipping")
            return False
            
        try:
            # Check if already subscribed
            if self.is_already_subscribed(listhost_name):
                logging.info(f"Already subscribed to {listhost_name}")
                self.subscription_attempts[listhost_name] = {
                    'timestamp': time.time(),
                    'status': 'already_subscribed'
                }
                return False
                
            # Click subscribe link
            subscribe_link = self.wait.until(
                EC.element_to_be_clickable(
                    (By.XPATH, f"//a[@href='/web/subscribe/{listhost_name}?previous_action=info']")
                )
            )
            subscribe_link.click()
            time.sleep(5)
            
            # Click "I subscribe" button
            subscribe_button = self.wait.until(
                EC.element_to_be_clickable(
                    (By.XPATH, f"//input[@value='I subscribe to list {listhost_name}']")
                )
            )
            subscribe_button.click()
            time.sleep(5)
            
            # Click confirm button
            confirm_button = self.wait.until(
                EC.element_to_be_clickable(
                    (By.ID, "response_action_confirm")
                )
            )
            confirm_button.click()
            time.sleep(5)
            
            # Verify subscription was successful
            success = self.is_already_subscribed(listhost_name)
            
            self.subscription_attempts[listhost_name] = {
                'timestamp': time.time(),
                'status': 'success' if success else 'failed'
            }
            
            self.save_subscription_log()
            return success
            
        except Exception as e:
            logging.error(f"Error subscribing to {listhost_name}: {str(e)}")
            self.subscription_attempts[listhost_name] = {
                'timestamp': time.time(),
                'status': 'error',
                'error': str(e)
            }
            self.save_subscription_log()
            return False
            
    def process_rso_batch(self, rso_data_path: str):
        """
        Process a batch of RSO subscriptions up to max_subscriptions
        """
        # Load RSO data
        with open(rso_data_path, 'r') as f:
            rsos = json.load(f)
            
        subscriptions_processed = 0
        
        for rso in rsos:
            if subscriptions_processed >= self.max_subscriptions:
                logging.info("Reached maximum subscriptions for this batch")
                break
                
            listhost_email = rso.get('additional_info', {}).get('RSO Listhost')
            if not listhost_email:
                continue
                
            listhost_name = self.extract_listhost_name(listhost_email)
            if not listhost_name:
                continue
                
            if self.subscribe_to_listhost(listhost_name):
                subscriptions_processed += 1
                
        logging.info(f"Batch complete. Processed {subscriptions_processed} subscriptions")

In [24]:
def run_subscription_batch(email: str, password: str, rso_data_path: str):
    logging.info("Starting subscription batch process")
    
    subscriber = ListhostSubscriber(email, password)
    try:
        logging.info("Attempting to login...")
        if subscriber.login():
            logging.info("Login successful! Loading RSO data...")
            
            # Load and categorize RSO data
            subscriber.load_rso_data(rso_data_path)
            
            # Print initial status
            status = subscriber.get_subscription_status()
            logging.info(f"Initial status: {status}")
            
            # Process batch of pending subscriptions
            logging.info("Starting to process pending subscriptions...")
            subscriber.process_pending_batch()
            
            # Print final status
            status = subscriber.get_subscription_status()
            logging.info(f"Final status: {status}")
        else:
            logging.error("Failed to login")
    except Exception as e:
        logging.error(f"Error during subscription process: {str(e)}")
    finally:
        logging.info("Closing browser...")
        subscriber.close()
        logging.info("Process complete")

In [28]:
logging.info("Starting script")

try:
    with open('../login.json', 'r') as file:
        login = json.load(file)
        logging.info("Successfully loaded login credentials")
        
    run_subscription_batch(
        login['email'],
        login['password'],
        '../rso_data_detailed.json'
    )
except Exception as e:
    logging.error(f"Script failed: {str(e)}")