In [None]:
# !pip install html2text

In [None]:
import requests
from bs4 import BeautifulSoup
import html2text
import re
import time
from urllib.parse import urljoin, urlparse
import json
from typing import Dict, List, Tuple, Optional, Set
from collections import deque

## **Grabbing Reliable Parkinson's Content From the Web**

| Organization Name                                               | Country/Region            | Description                                      |
|------------------------------------------------------------------|---------------------------|--------------------------------------------------|
| Parkinson’s Foundation                                           | USA (Global reach)        | [parkinson.org](https://www.parkinson.org/)     |
| Michael J. Fox Foundation for Parkinson’s Research              | USA (Global reach)        | [michaeljfox.org](https://www.michaeljfox.org/) |
| American Parkinson Disease Association (APDA)                   | USA                       | [apdaparkinson.org](https://www.apdaparkinson.org/) |
| Parkinson Canada                                                | Canada                    | [parkinson.ca](https://www.parkinson.ca/)       |
| European Parkinson’s Disease Association (EPDA)                 | Europe (Pan-European)     | [parkinsonseurope.org](https://parkinsonseurope.org/) |
| Parkinson’s UK                                                  | United Kingdom            | [parkinsons.org.uk](https://www.parkinsons.org.uk/) |
| Davis Phinney Foundation                                        | USA                       | [davisphinneyfoundation.org](https://davisphinneyfoundation.org/) |
| PMD Alliance                                                    | USA                       | [pmdalliance.org](https://www.pmdalliance.org/) |
| ParkinsonNet                                                    | Netherlands               | [parkinsonnet.com](https://www.parkinsonnet.com/) |



Grab all main pages and secondary pages with a limit of 50 pages and 3 levels deep, only crawls pages within the same domain + URLs for videos and podcast ressources

In [None]:
import requests
from bs4 import BeautifulSoup
import html2text
import re
import time
from urllib.parse import urljoin, urlparse
import json
from typing import Dict, List, Tuple, Optional, Set
from collections import deque

class ParkinsonsContentScraper:
    def __init__(self, max_pages_per_org: int = 50, max_depth: int = 3):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        })

        # Crawling limits
        self.max_pages_per_org = max_pages_per_org
        self.max_depth = max_depth

        # Parkinson's organization URLs
        self.organizations = {
            "Parkinson's Foundation": "https://www.parkinson.org/",
            "Michael J. Fox Foundation": "https://www.michaeljfox.org/",
            "American Parkinson Disease Association": "https://www.apdaparkinson.org/",
            "Parkinson Canada": "https://www.parkinson.ca/",
            "European Parkinson's Disease Association": "https://parkinsonseurope.org/",
            "Parkinson's UK": "https://www.parkinsons.org.uk/",
            "Davis Phinney Foundation": "https://davisphinneyfoundation.org/",
            "PMD Alliance": "https://www.pmdalliance.org/",
            "ParkinsonNet": "https://www.parkinsonnet.com/"
        }

        # Video/podcast indicators
        self.media_indicators = {
            'video': [
                'youtube.com', 'vimeo.com', 'video', 'watch', 'webinar',
                'presentation', 'lecture', 'talk', 'interview'
            ],
            'podcast': [
                'podcast', 'audio', 'listen', 'episode', 'spotify.com',
                'apple.com/podcasts', 'soundcloud.com', 'anchor.fm'
            ]
        }

        # URLs to exclude from crawling
        self.excluded_patterns = [
            r'.*\.(pdf|doc|docx|xls|xlsx|ppt|pptx|zip|rar|tar|gz)$',
            r'.*/(login|register|signin|signup|cart|checkout|donate|payment)',
            r'.*\?.*utm_',  # UTM tracking parameters
            r'.*#.*',  # Anchor links
            r'.*/search.*',
            r'.*/tag/.*',
            r'.*/category/.*',
            r'.*/author/.*',
            r'.*/wp-admin/.*',
            r'.*/wp-content/.*',
            r'.*\.php\?.*',
            r'mailto:.*',
            r'tel:.*',
            r'javascript:.*'
        ]

    def get_page_content(self, url: str) -> Optional[Tuple[str, Dict]]:
        """
        Retrieve text content and metadata from a given URL.

        Args:
            url (str): The URL to fetch content from.

        Returns:
            tuple: A tuple containing the text content (str) and metadata (dict).
        """
        try:
            response = self.session.get(url, timeout=10)
            if response.status_code == 500:
                print(f"Server error for {url}")
                return None
            if response.status_code != 200:
                print(f"HTTP {response.status_code} for {url}")
                return None

            # Parse the HTML content
            soup = BeautifulSoup(response.content, 'html.parser')

            # Remove js and css code
            for script in soup(["script", "style"]):
                script.extract()

            # Extract text in markdown format
            html = str(soup)
            html2text_instance = html2text.HTML2Text()
            html2text_instance.images_to_alt = True
            html2text_instance.body_width = 0
            html2text_instance.single_line_break = True
            text = html2text_instance.handle(html)

            # Extract page metadata
            try:
                page_title = soup.title.string.strip() if soup.title else ""
            except:
                page_title = urlparse(url).path[1:].replace("/", "-")

            meta_description = soup.find("meta", attrs={"name": "description"})
            meta_keywords = soup.find("meta", attrs={"name": "keywords"})

            description = meta_description.get("content") if meta_description else page_title
            keywords = meta_keywords.get("content") if meta_keywords else ""

            metadata = {
                'title': page_title,
                'url': url,
                'description': description,
                'keywords': keywords
            }

            return text, metadata

        except Exception as e:
            print(f"Error processing {url}: {str(e)}")
            return None

    def extract_media_links(self, soup: BeautifulSoup, base_url: str) -> List[Dict]:
        """
        Extract video and podcast links with descriptions from a webpage.

        Args:
            soup: BeautifulSoup object of the webpage
            base_url: Base URL for resolving relative links

        Returns:
            List of dictionaries containing media information
        """
        media_links = []

        # Find all links
        links = soup.find_all('a', href=True)

        for link in links:
            href = link.get('href')
            if not href:
                continue

            # Convert relative URLs to absolute
            full_url = urljoin(base_url, href)

            # Get link text and surrounding context
            link_text = link.get_text(strip=True)

            # Check if it's a media link
            media_type = self.classify_media_link(full_url, link_text)

            if media_type:
                # Extract description from various sources
                description = self.extract_media_description(link, link_text)

                media_info = {
                    'type': media_type,
                    'url': full_url,
                    'title': link_text,
                    'description': description,
                    'source_page': base_url
                }

                media_links.append(media_info)

        # Also look for embedded videos
        embedded_media = self.extract_embedded_media(soup, base_url)
        media_links.extend(embedded_media)

        return media_links

    def classify_media_link(self, url: str, text: str) -> Optional[str]:
        """
        Classify whether a link is a video or podcast based on URL and text.

        Args:
            url: The URL to classify
            text: The link text

        Returns:
            'video', 'podcast', or None
        """
        url_lower = url.lower()
        text_lower = text.lower()

        # Check for video indicators
        for indicator in self.media_indicators['video']:
            if indicator in url_lower or indicator in text_lower:
                return 'video'

        # Check for podcast indicators
        for indicator in self.media_indicators['podcast']:
            if indicator in url_lower or indicator in text_lower:
                return 'podcast'

        return None

    def extract_media_description(self, link_element, link_text: str) -> str:
        """
        Extract description for media content from surrounding HTML context.

        Args:
            link_element: BeautifulSoup link element
            link_text: Text content of the link

        Returns:
            Description string
        """
        description_parts = [link_text]

        # Check parent elements for additional context
        parent = link_element.parent
        if parent:
            # Look for title attributes
            title_attr = link_element.get('title')
            if title_attr:
                description_parts.append(title_attr)

            # Look for surrounding text in parent elements
            for level in range(3):  # Check up to 3 levels up
                if parent:
                    parent_text = parent.get_text(strip=True)
                    if parent_text and parent_text != link_text and len(parent_text) < 500:
                        description_parts.append(parent_text)
                    parent = parent.parent
                else:
                    break

        # Look for adjacent elements with descriptions
        next_sibling = link_element.find_next_sibling()
        if next_sibling:
            sibling_text = next_sibling.get_text(strip=True)
            if sibling_text and len(sibling_text) < 200:
                description_parts.append(sibling_text)

        return " | ".join(filter(None, description_parts))

    def extract_embedded_media(self, soup: BeautifulSoup, base_url: str) -> List[Dict]:
        """
        Extract embedded videos and audio content.

        Args:
            soup: BeautifulSoup object
            base_url: Base URL

        Returns:
            List of embedded media information
        """
        embedded_media = []

        # YouTube embeds
        youtube_embeds = soup.find_all('iframe', src=re.compile(r'youtube\.com|youtu\.be'))
        for embed in youtube_embeds:
            src = embed.get('src')
            title = embed.get('title') or "YouTube Video"

            media_info = {
                'type': 'video',
                'url': src,
                'title': title,
                'description': f"Embedded YouTube video: {title}",
                'source_page': base_url
            }
            embedded_media.append(media_info)

        # Vimeo embeds
        vimeo_embeds = soup.find_all('iframe', src=re.compile(r'vimeo\.com'))
        for embed in vimeo_embeds:
            src = embed.get('src')
            title = embed.get('title') or "Vimeo Video"

            media_info = {
                'type': 'video',
                'url': src,
                'title': title,
                'description': f"Embedded Vimeo video: {title}",
                'source_page': base_url
            }
            embedded_media.append(media_info)

        # Audio elements
        audio_elements = soup.find_all('audio')
        for audio in audio_elements:
            src = audio.get('src')
            if src:
                title = audio.get('title') or "Audio Content"
                media_info = {
                    'type': 'podcast',
                    'url': urljoin(base_url, src),
                    'title': title,
                    'description': f"Audio content: {title}",
                    'source_page': base_url
                }
                embedded_media.append(media_info)

        return embedded_media

    def is_valid_url(self, url: str, base_domain: str) -> bool:
        """
        Check if a URL should be crawled based on domain and exclusion patterns.

        Args:
            url: URL to check
            base_domain: Base domain of the organization

        Returns:
            True if URL should be crawled
        """
        try:
            parsed_url = urlparse(url)

            # Must be same domain or subdomain
            if base_domain not in parsed_url.netloc:
                return False

            # Check exclusion patterns
            for pattern in self.excluded_patterns:
                if re.match(pattern, url, re.IGNORECASE):
                    return False

            return True
        except:
            return False

    def extract_internal_links(self, soup: BeautifulSoup, base_url: str, base_domain: str) -> Set[str]:
        """
        Extract internal links from a webpage.

        Args:
            soup: BeautifulSoup object
            base_url: Base URL for resolving relative links
            base_domain: Base domain to filter links

        Returns:
            Set of valid internal URLs
        """
        internal_links = set()

        # Find all links
        links = soup.find_all('a', href=True)

        for link in links:
            href = link.get('href')
            if not href:
                continue

            # Convert relative URLs to absolute
            full_url = urljoin(base_url, href)

            # Clean URL (remove fragments and query params we don't want)
            parsed = urlparse(full_url)
            clean_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"

            # Remove trailing slash for consistency
            if clean_url.endswith('/') and len(clean_url) > 1:
                clean_url = clean_url[:-1]

            # Check if URL is valid for crawling
            if self.is_valid_url(clean_url, base_domain):
                internal_links.add(clean_url)

        return internal_links

    def crawl_organization_pages(self, org_name: str, base_url: str) -> Dict:
        """
        Crawl all pages from an organization's website using breadth-first search.

        Args:
            org_name: Name of the organization
            base_url: Base URL of the organization

        Returns:
            Dictionary containing all scraped pages and media
        """
        print(f"Crawling {org_name}...")

        # Parse base domain
        base_domain = urlparse(base_url).netloc

        # Initialize crawling data structures
        visited_urls = set()
        to_visit = deque([(base_url, 0)])  # (url, depth)
        scraped_pages = {}
        all_media = []

        while to_visit and len(visited_urls) < self.max_pages_per_org:
            current_url, depth = to_visit.popleft()

            # Skip if already visited or max depth reached
            if current_url in visited_urls or depth > self.max_depth:
                continue

            print(f"  Crawling (depth {depth}): {current_url}")
            visited_urls.add(current_url)

            # Get page content
            page_data = self.get_page_content(current_url)
            if not page_data:
                continue

            text, metadata = page_data

            # Store page content
            scraped_pages[current_url] = {
                'text': text,
                'metadata': metadata,
                'depth': depth,
                'crawled_at': time.strftime('%Y-%m-%d %H:%M:%S')
            }

            # Extract media from this page
            try:
                response = self.session.get(current_url, timeout=10)
                if response.status_code == 200:
                    soup = BeautifulSoup(response.content, 'html.parser')

                    # Extract media links
                    page_media = self.extract_media_links(soup, current_url)
                    all_media.extend(page_media)

                    # Extract internal links for further crawling
                    if depth < self.max_depth:
                        internal_links = self.extract_internal_links(soup, current_url, base_domain)
                        for link in internal_links:
                            if link not in visited_urls:
                                to_visit.append((link, depth + 1))

            except Exception as e:
                print(f"    Error processing {current_url}: {str(e)}")
                continue

            # Be respectful with requests
            time.sleep(1)

        # Remove duplicate media
        unique_media = []
        seen_urls = set()
        for media in all_media:
            if media['url'] not in seen_urls:
                unique_media.append(media)
                seen_urls.add(media['url'])

        return {
            'organization': org_name,
            'base_url': base_url,
            'total_pages_crawled': len(scraped_pages),
            'pages_content': scraped_pages,
            'media_content': {
                'videos': [m for m in unique_media if m['type'] == 'video'],
                'podcasts': [m for m in unique_media if m['type'] == 'podcast'],
                'total_media_found': len(unique_media)
            },
            'crawl_summary': {
                'pages_visited': list(visited_urls),
                'total_pages': len(visited_urls),
                'max_depth_reached': max([data['depth'] for data in scraped_pages.values()]) if scraped_pages else 0
            }
        }

    def scrape_organization(self, org_name: str, base_url: str) -> Dict:
        """
        Scrape content from a single organization (legacy method - use crawl_organization_pages for full crawling).

        Args:
            org_name: Name of the organization
            base_url: Base URL of the organization

        Returns:
            Dictionary containing scraped content and media
        """
        return self.crawl_organization_pages(org_name, base_url)

    def scrape_all_organizations(self) -> Dict:
        """
        Scrape content from all Parkinson's organizations with full page crawling.

        Returns:
            Dictionary containing all scraped content
        """
        results = {}

        for org_name, url in self.organizations.items():
            try:
                results[org_name] = self.crawl_organization_pages(org_name, url)
                # Be respectful with requests between organizations
                time.sleep(3)
            except Exception as e:
                print(f"Error scraping {org_name}: {str(e)}")
                results[org_name] = {'organization': org_name, 'error': str(e)}

        return results

    def get_content_summary(self, results: Dict) -> Dict:
        """
        Generate a summary of all scraped content.

        Args:
            results: Results from scrape_all_organizations()

        Returns:
            Summary statistics
        """
        summary = {
            'total_organizations': len(results),
            'successful_crawls': 0,
            'failed_crawls': 0,
            'total_pages_scraped': 0,
            'total_videos_found': 0,
            'total_podcasts_found': 0,
            'organizations_summary': {}
        }

        for org_name, data in results.items():
            if 'error' in data:
                summary['failed_crawls'] += 1
                summary['organizations_summary'][org_name] = {'status': 'failed', 'error': data['error']}
            else:
                summary['successful_crawls'] += 1
                summary['total_pages_scraped'] += data['total_pages_crawled']
                summary['total_videos_found'] += len(data['media_content']['videos'])
                summary['total_podcasts_found'] += len(data['media_content']['podcasts'])

                summary['organizations_summary'][org_name] = {
                    'status': 'success',
                    'pages_crawled': data['total_pages_crawled'],
                    'videos_found': len(data['media_content']['videos']),
                    'podcasts_found': len(data['media_content']['podcasts']),
                    'max_depth_reached': data['crawl_summary']['max_depth_reached']
                }

        return summary

    def save_results(self, results: Dict, filename: str = 'parkinsons_content.json'):
        """
        Save scraped results to a JSON file.

        Args:
            results: Scraped content dictionary
            filename: Output filename
        """
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(results, f, indent=2, ensure_ascii=False)

        print(f"Results saved to {filename}")


# Usage example
if __name__ == "__main__":
    # Initialize scraper with custom limits
    scraper = ParkinsonsContentScraper(
        max_pages_per_org=100,  # Maximum pages to crawl per organization
        max_depth=3             # Maximum depth to crawl (0 = main page only)
    )

    # Option 1: Scrape all organizations
    print("Starting full crawl of all organizations...")
    all_results = scraper.scrape_all_organizations()

    # Save results
    scraper.save_results(all_results, 'parkinsons_full_crawl.json')

    # Generate and display summary
    summary = scraper.get_content_summary(all_results)
    scraper.save_results(summary, 'parkinsons_crawl_summary.json')

    print("\n=== CRAWLING SUMMARY ===")
    print(f"Total Organizations: {summary['total_organizations']}")
    print(f"Successful Crawls: {summary['successful_crawls']}")
    print(f"Failed Crawls: {summary['failed_crawls']}")
    print(f"Total Pages Scraped: {summary['total_pages_scraped']}")
    print(f"Total Videos Found: {summary['total_videos_found']}")
    print(f"Total Podcasts Found: {summary['total_podcasts_found']}")

    print("\n=== ORGANIZATION DETAILS ===")
    for org_name, org_data in summary['organizations_summary'].items():
        if org_data['status'] == 'success':
            print(f"{org_name}: {org_data['pages_crawled']} pages, {org_data['videos_found']} videos, {org_data['podcasts_found']} podcasts (depth: {org_data['max_depth_reached']})")
        else:
            print(f"{org_name}: ERROR - {org_data['error']}")

    # Option 2: Scrape a single organization for testing
    # single_org_results = scraper.crawl_organization_pages("Parkinson's Foundation", "https://www.parkinson.org/")
    # scraper.save_results(single_org_results, 'single_org_test.json')

Starting full crawl of all organizations...
Crawling Parkinson's Foundation...
  Crawling (depth 0): https://www.parkinson.org/
  Crawling (depth 1): https://www.parkinson.org/understanding-parkinsons/movement-symptoms
  Crawling (depth 1): https://www.parkinson.org/advancing-research/advocate-research
  Crawling (depth 1): https://www.parkinson.org/living-with-parkinsons/stories
  Crawling (depth 1): https://www.parkinson.org/understanding-parkinsons/10-early-signs
  Crawling (depth 1): https://www.parkinson.org/resources-support/carepartners
  Crawling (depth 1): https://www.parkinson.org/about-us/careers
  Crawling (depth 1): https://www.parkinson.org/advancing-research/our-research/pdgeneration
  Crawling (depth 1): https://www.parkinson.org/living-with-parkinsons/finding-care
  Crawling (depth 1): https://www.parkinson.org/how-to-help/tribute
  Crawling (depth 1): https://www.parkinson.org/living-with-parkinsons/management
  Crawling (depth 1): https://www.parkinson.org/resources-


Assuming this really is an XML document, what you're doing might work, but you should know that using an XML parser will be more reliable. To parse this document as XML, make sure you have the Python package 'lxml' installed, and pass the keyword argument `features="xml"` into the BeautifulSoup constructor.




  soup = BeautifulSoup(response.content, 'html.parser')

Assuming this really is an XML document, what you're doing might work, but you should know that using an XML parser will be more reliable. To parse this document as XML, make sure you have the Python package 'lxml' installed, and pass the keyword argument `features="xml"` into the BeautifulSoup constructor.




  soup = BeautifulSoup(response.content, 'html.parser')


  Crawling (depth 1): https://www.michaeljfox.org/michaels-story
  Crawling (depth 1): https://www.michaeljfox.org/mjff-feed
  Crawling (depth 1): https://www.michaeljfox.org/team-fox-endurance
  Crawling (depth 1): https://www.michaeljfox.org/medications-treatments
  Crawling (depth 1): https://www.michaeljfox.org/causes
  Crawling (depth 1): https://www.michaeljfox.org/books-resources
  Crawling (depth 1): https://www.michaeljfox.org/publications
  Crawling (depth 1): https://www.michaeljfox.org/biospecimens
  Crawling (depth 1): https://www.michaeljfox.org/updates-washington
  Crawling (depth 1): https://www.michaeljfox.org/advocacy-resources
  Crawling (depth 1): https://www.michaeljfox.org/asap
  Crawling (depth 1): https://www.michaeljfox.org/key-research-initiatives
  Crawling (depth 1): https://www.michaeljfox.org/our-commitment-research-integrity
  Crawling (depth 1): https://www.michaeljfox.org/ppmi
  Crawling (depth 1): https://www.michaeljfox.org/what-we-fund
  Crawling (de

In [None]:
from google.colab import files

# Download the files to your local computer
files.download('parkinsons_full_crawl.json')
files.download('parkinsons_crawl_summary.json')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>