In [1]:
import os
import requests
import threading
from urllib.parse import urljoin
from bs4 import BeautifulSoup
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

class GitHubDownloader:
    def __init__(self, repo_url, max_workers=5):
        self.repo_url = repo_url
        self.base_url = repo_url.replace('github.com', 'raw.githubusercontent.com').replace('/tree/', '/') + '/'
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        self.download_dir = "github_downloads"
        self.max_workers = max_workers
        self.lock = threading.Lock()
        self.downloaded_files = 0
        
        # Create download directory
        os.makedirs(self.download_dir, exist_ok=True)
    
    def get_file_urls(self):
        """Get all raw file URLs from the GitHub repository"""
        print("Discovering files in repository...")
        
        file_urls = []
        
        def crawl_directory(url, path=""):
            try:
                response = self.session.get(url)
                response.raise_for_status()
                soup = BeautifulSoup(response.text, 'html.parser')
                
                # Find all file and directory links
                for link in soup.find_all('a', {'class': 'js-navigation-open'}):
                    href = link.get('href')
                    name = link.text.strip()
                    
                    if href and '/tree/' in href and name not in ['.', '..']:
                        # It's a directory
                        dir_url = urljoin('https://github.com', href)
                        crawl_directory(dir_url, os.path.join(path, name))
                    elif href and '/blob/' in href:
                        # It's a file
                        raw_url = href.replace('/blob/', '/')  # Convert to raw URL
                        raw_url = urljoin('https://raw.githubusercontent.com', raw_url)
                        file_path = os.path.join(path, name)
                        file_urls.append((raw_url, file_path))
                        
            except Exception as e:
                print(f"Error crawling {url}: {e}")
        
        # Start crawling from the main repository URL
        crawl_directory(self.repo_url)
        return file_urls
    
    def download_file(self, url, file_path):
        """Download a single file"""
        try:
            # Create directory structure if needed
            full_path = os.path.join(self.download_dir, file_path)
            os.makedirs(os.path.dirname(full_path), exist_ok=True)
            
            # Download the file
            response = self.session.get(url, stream=True, timeout=30)
            response.raise_for_status()
            
            # Save the file
            with open(full_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
            
            # Update progress safely
            with self.lock:
                self.downloaded_files += 1
                print(f"✓ Downloaded ({self.downloaded_files}): {file_path}")
            
            return True
            
        except requests.exceptions.RequestException as e:
            print(f"✗ Failed to download {file_path}: {e}")
            return False
        except Exception as e:
            print(f"✗ Error with {file_path}: {e}")
            return False
    
    def download_all_files(self):
        """Download all files using multithreading"""
        print(f"Starting download from: {self.repo_url}")
        print(f"Files will be saved to: {self.download_dir}")
        print("-" * 50)
        
        # Get all file URLs
        file_urls = self.get_file_urls()
        
        if not file_urls:
            print("No files found to download!")
            return
        
        print(f"Found {len(file_urls)} files to download")
        print("Starting download with multithreading...")
        print("-" * 50)
        
        start_time = time.time()
        
        # Use ThreadPoolExecutor for efficient multithreading
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all download tasks
            future_to_file = {
                executor.submit(self.download_file, url, file_path): (url, file_path)
                for url, file_path in file_urls
            }
            
            # Process completed downloads
            successful = 0
            failed = 0
            
            for future in as_completed(future_to_file):
                url, file_path = future_to_file[future]
                try:
                    if future.result():
                        successful += 1
                    else:
                        failed += 1
                except Exception as e:
                    print(f"✗ Exception for {file_path}: {e}")
                    failed += 1
        
        end_time = time.time()
        
        # Print summary
        print("-" * 50)
        print("Download Summary:")
        print(f"Total files: {len(file_urls)}")
        print(f"Successful: {successful}")
        print(f"Failed: {failed}")
        print(f"Time taken: {end_time - start_time:.2f} seconds")
        print(f"Download location: {os.path.abspath(self.download_dir)}")

# Alternative simpler version without directory crawling
class SimpleGitHubDownloader:
    def __init__(self, repo_url, max_workers=5):
        self.repo_url = repo_url
        self.base_api_url = f"https://api.github.com/repos/{repo_url.split('github.com/')[1]}/contents"
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'application/vnd.github.v3+json'
        })
        self.download_dir = "github_downloads"
        self.max_workers = max_workers
        self.lock = threading.Lock()
        self.downloaded_files = 0
        
        os.makedirs(self.download_dir, exist_ok=True)
    
    def get_files_via_api(self):
        """Get files using GitHub API (simpler but might have rate limits)"""
        print("Fetching repository contents via GitHub API...")
        
        try:
            response = self.session.get(self.base_api_url)
            response.raise_for_status()
            contents = response.json()
            
            file_urls = []
            
            for item in contents:
                if item['type'] == 'file':
                    file_urls.append((item['download_url'], item['path']))
                elif item['type'] == 'dir':
                    # For simplicity, we'll just get files from root directory
                    pass
            
            return file_urls
            
        except Exception as e:
            print(f"Error using GitHub API: {e}")
            return []
    
    def download_all_files_simple(self):
        """Simplified download using GitHub API"""
        file_urls = self.get_files_via_api()
        
        if not file_urls:
            print("No files found via API. Trying alternative method...")
            return
        
        print(f"Found {len(file_urls)} files to download")
        
        start_time = time.time()
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit download tasks
            futures = [executor.submit(self.download_file, url, path) 
                      for url, path in file_urls]
            
            successful = sum(1 for future in futures if future.result())
        
        end_time = time.time()
        
        print(f"\nDownloaded {successful}/{len(file_urls)} files in {end_time - start_time:.2f} seconds")

def main():
    # Repository URL
    repo_url = "https://github.com/khushisampath/My-Python-journey-with-Sharmila-ma-am"
    
    # Create downloader instance
    downloader = GitHubDownloader(repo_url, max_workers=8)
    
    try:
        # Download all files
        downloader.download_all_files()
        
    except KeyboardInterrupt:
        print("\nDownload interrupted by user!")
    except Exception as e:
        print(f"Unexpected error: {e}")

if __name__ == "__main__":
    main()

Starting download from: https://github.com/khushisampath/My-Python-journey-with-Sharmila-ma-am
Files will be saved to: github_downloads
--------------------------------------------------
Discovering files in repository...
No files found to download!


In [3]:
!pip install requests beautifulsoup4



In [4]:
python github_downloader.py

SyntaxError: invalid syntax (2442328538.py, line 1)

In [5]:
import os
import requests
import threading
from urllib.parse import urljoin
from bs4 import BeautifulSoup
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

class GitHubDirectoryDownloader:
    def __init__(self, directory_url, max_workers=8):
        self.directory_url = directory_url
        # Convert to raw content URL
        self.raw_base_url = directory_url.replace('github.com', 'raw.githubusercontent.com').replace('/blob/', '/')
        # For web scraping
        self.github_base_url = directory_url.replace('/blob/', '/tree/')
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        self.download_dir = "github_python_journey"
        self.max_workers = max_workers
        self.lock = threading.Lock()
        self.downloaded_files = 0
        self.total_files = 0
        
        # Create download directory
        os.makedirs(self.download_dir, exist_ok=True)
    
    def get_files_from_directory(self):
        """Get all file URLs from the specific GitHub directory"""
        print(f"Discovering files in directory: {self.github_base_url}")
        
        file_urls = []
        
        try:
            response = self.session.get(self.github_base_url)
            response.raise_for_status()
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # Find all file links in the directory
            file_links = soup.find_all('a', {
                'class': 'js-navigation-open', 
                'href': True
            })
            
            for link in file_links:
                href = link.get('href')
                filename = link.text.strip()
                
                # Skip directories and special files
                if (href and '/blob/' in href and 
                    not filename.startswith('.') and 
                    filename not in ['', '..', '.']):
                    
                    # Convert to raw GitHub URL
                    raw_url = href.replace('/blob/', '/')
                    raw_url = urljoin('https://raw.githubusercontent.com', raw_url)
                    
                    file_urls.append((raw_url, filename))
                    print(f"Found: {filename}")
            
            return file_urls
            
        except Exception as e:
            print(f"Error scanning directory: {e}")
            return []
    
    def download_file(self, url, filename):
        """Download a single file with progress tracking"""
        try:
            full_path = os.path.join(self.download_dir, filename)
            
            # Download the file
            response = self.session.get(url, stream=True, timeout=30)
            response.raise_for_status()
            
            # Get file size for progress tracking
            file_size = int(response.headers.get('content-length', 0))
            
            # Save the file
            with open(full_path, 'wb') as f:
                if file_size == 0:
                    f.write(response.content)
                else:
                    downloaded = 0
                    for chunk in response.iter_content(chunk_size=8192):
                        if chunk:
                            f.write(chunk)
                            downloaded += len(chunk)
            
            # Update progress safely
            with self.lock:
                self.downloaded_files += 1
                print(f"✓ [{self.downloaded_files}/{self.total_files}] Downloaded: {filename}")
            
            return True
            
        except requests.exceptions.RequestException as e:
            print(f"✗ Failed to download {filename}: {e}")
            return False
        except Exception as e:
            print(f"✗ Error with {filename}: {e}")
            return False
    
    def download_all_files(self):
        """Download all files from the directory using multithreading"""
        print(f"Starting download from: {self.directory_url}")
        print(f"Files will be saved to: {os.path.abspath(self.download_dir)}")
        print("-" * 60)
        
        # Get all file URLs from the directory
        file_urls = self.get_files_from_directory()
        
        if not file_urls:
            print("No files found in the directory!")
            return
        
        self.total_files = len(file_urls)
        print(f"\nFound {self.total_files} files to download")
        print("Starting multithreaded download...")
        print("-" * 60)
        
        start_time = time.time()
        
        # Use ThreadPoolExecutor for efficient multithreading
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all download tasks
            future_to_file = {
                executor.submit(self.download_file, url, filename): (url, filename)
                for url, filename in file_urls
            }
            
            # Process completed downloads
            successful = 0
            failed = 0
            
            for future in as_completed(future_to_file):
                url, filename = future_to_file[future]
                try:
                    if future.result():
                        successful += 1
                    else:
                        failed += 1
                except Exception as e:
                    print(f"✗ Exception for {filename}: {e}")
                    failed += 1
        
        end_time = time.time()
        
        # Print summary
        print("-" * 60)
        print("DOWNLOAD SUMMARY:")
        print(f"Directory: {self.directory_url}")
        print(f"Total files available: {self.total_files}")
        print(f"Successfully downloaded: {successful}")
        print(f"Failed downloads: {failed}")
        print(f"Time taken: {end_time - start_time:.2f} seconds")
        print(f"Download location: {os.path.abspath(self.download_dir)}")
        print("-" * 60)

def main():
    # Specific directory URL you provided
    directory_url = "https://github.com/khushisampath/My-Python-journey-with-Sharmila-ma-am/blob/main/"
    
    # Create downloader instance with more workers for faster download
    downloader = GitHubDirectoryDownloader(directory_url, max_workers=10)
    
    try:
        # Download all files from the directory
        downloader.download_all_files()
        
    except KeyboardInterrupt:
        print("\nDownload interrupted by user!")
    except Exception as e:
        print(f"Unexpected error: {e}")

if __name__ == "__main__":
    main()

Starting download from: https://github.com/khushisampath/My-Python-journey-with-Sharmila-ma-am/blob/main/
Files will be saved to: C:\Users\femal\My python journey\Sharmila\github_python_journey
------------------------------------------------------------
Discovering files in directory: https://github.com/khushisampath/My-Python-journey-with-Sharmila-ma-am/tree/main/
No files found in the directory!


In [6]:
# %% [markdown]
# # GitHub Directory Downloader
# 
# This notebook downloads all files from a specific GitHub directory using multithreading for faster downloads.
# 

# %%
# Install required packages (run this cell first)
!pip install requests beautifulsoup4

# %%
import os
import requests
import threading
from urllib.parse import urljoin
from bs4 import BeautifulSoup
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from IPython.display import display, HTML, clear_output
import ipywidgets as widgets

# %%
class GitHubDirectoryDownloader:
    def __init__(self, directory_url, max_workers=8):
        self.directory_url = directory_url
        # Convert to raw content URL
        self.raw_base_url = directory_url.replace('github.com', 'raw.githubusercontent.com').replace('/blob/', '/')
        # For web scraping
        self.github_base_url = directory_url.replace('/blob/', '/tree/')
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        self.download_dir = "github_python_journey"
        self.max_workers = max_workers
        self.lock = threading.Lock()
        self.downloaded_files = 0
        self.total_files = 0
        
        # Create download directory
        os.makedirs(self.download_dir, exist_ok=True)
    
    def get_files_from_directory(self):
        """Get all file URLs from the specific GitHub directory"""
        display(HTML(f"<b>Discovering files in directory:</b> {self.github_base_url}"))
        
        file_urls = []
        
        try:
            response = self.session.get(self.github_base_url)
            response.raise_for_status()
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # Find all file links in the directory
            file_links = soup.find_all('a', {
                'class': 'js-navigation-open', 
                'href': True
            })
            
            for link in file_links:
                href = link.get('href')
                filename = link.text.strip()
                
                # Skip directories and special files
                if (href and '/blob/' in href and 
                    not filename.startswith('.') and 
                    filename not in ['', '..', '.']):
                    
                    # Convert to raw GitHub URL
                    raw_url = href.replace('/blob/', '/')
                    raw_url = urljoin('https://raw.githubusercontent.com', raw_url)
                    
                    file_urls.append((raw_url, filename))
                    display(HTML(f"🔍 Found: <code>{filename}</code>"))
            
            return file_urls
            
        except Exception as e:
            display(HTML(f"<span style='color: red;'>Error scanning directory: {e}</span>"))
            return []
    
    def download_file(self, url, filename):
        """Download a single file with progress tracking"""
        try:
            full_path = os.path.join(self.download_dir, filename)
            
            # Download the file
            response = self.session.get(url, stream=True, timeout=30)
            response.raise_for_status()
            
            # Get file size for progress tracking
            file_size = int(response.headers.get('content-length', 0))
            
            # Save the file
            with open(full_path, 'wb') as f:
                if file_size == 0:
                    f.write(response.content)
                else:
                    downloaded = 0
                    for chunk in response.iter_content(chunk_size=8192):
                        if chunk:
                            f.write(chunk)
                            downloaded += len(chunk)
            
            # Update progress safely
            with self.lock:
                self.downloaded_files += 1
                progress = f"[{self.downloaded_files}/{self.total_files}]"
                display(HTML(f"✅ <span style='color: green;'>{progress}</span> Downloaded: <code>{filename}</code>"))
            
            return True
            
        except requests.exceptions.RequestException as e:
            display(HTML(f"❌ <span style='color: red;'>Failed to download {filename}: {e}</span>"))
            return False
        except Exception as e:
            display(HTML(f"❌ <span style='color: red;'>Error with {filename}: {e}</span>"))
            return False
    
    def download_all_files(self):
        """Download all files from the directory using multithreading"""
        display(HTML(f"<h3>🚀 Starting Download</h3>"))
        display(HTML(f"<b>Source:</b> {self.directory_url}"))
        display(HTML(f"<b>Destination:</b> {os.path.abspath(self.download_dir)}"))
        display(HTML("<hr>"))
        
        # Get all file URLs from the directory
        file_urls = self.get_files_from_directory()
        
        if not file_urls:
            display(HTML("<span style='color: orange;'>No files found in the directory!</span>"))
            return
        
        self.total_files = len(file_urls)
        display(HTML(f"<b>📊 Found {self.total_files} files to download</b>"))
        display(HTML("<b>⏳ Starting multithreaded download...</b>"))
        display(HTML("<hr>"))
        
        start_time = time.time()
        
        # Use ThreadPoolExecutor for efficient multithreading
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all download tasks
            future_to_file = {
                executor.submit(self.download_file, url, filename): (url, filename)
                for url, filename in file_urls
            }
            
            # Process completed downloads
            successful = 0
            failed = 0
            
            for future in as_completed(future_to_file):
                url, filename = future_to_file[future]
                try:
                    if future.result():
                        successful += 1
                    else:
                        failed += 1
                except Exception as e:
                    display(HTML(f"❌ <span style='color: red;'>Exception for {filename}: {e}</span>"))
                    failed += 1
        
        end_time = time.time()
        
        # Print summary
        display(HTML("<hr>"))
        display(HTML("<h3>📋 DOWNLOAD SUMMARY</h3>"))
        display(HTML(f"<b>Directory:</b> {self.directory_url}"))
        display(HTML(f"<b>Total files available:</b> {self.total_files}"))
        display(HTML(f"<b style='color: green;'>Successfully downloaded:</b> {successful}"))
        display(HTML(f"<b style='color: red;'>Failed downloads:</b> {failed}"))
        display(HTML(f"<b>Time taken:</b> {end_time - start_time:.2f} seconds"))
        display(HTML(f"<b>Download location:</b> {os.path.abspath(self.download_dir)}"))
        
        # Show downloaded files
        if successful > 0:
            display(HTML("<h4>📁 Downloaded Files:</h4>"))
            downloaded_files = os.listdir(self.download_dir)
            for file in downloaded_files:
                file_path = os.path.join(self.download_dir, file)
                file_size = os.path.getsize(file_path)
                display(HTML(f"• <code>{file}</code> ({file_size} bytes)"))

# %%
# Create interactive widgets for the downloader
def create_downloader_ui():
    """Create an interactive UI for the downloader"""
    
    # Widgets
    url_input = widgets.Text(
        value='https://github.com/khushisampath/My-Python-journey-with-Sharmila-ma-am/blob/main/',
        placeholder='Enter GitHub directory URL',
        description='GitHub URL:',
        layout=widgets.Layout(width='80%')
    )
    
    workers_slider = widgets.IntSlider(
        value=8,
        min=1,
        max=20,
        step=1,
        description='Threads:',
        continuous_update=False
    )
    
    download_button = widgets.Button(
        description='🚀 Start Download',
        button_style='success',
        tooltip='Click to start downloading'
    )
    
    output = widgets.Output()
    
    def on_download_click(b):
        with output:
            clear_output()
            try:
                downloader = GitHubDirectoryDownloader(
                    url_input.value, 
                    max_workers=workers_slider.value
                )
                downloader.download_all_files()
            except Exception as e:
                display(HTML(f"<span style='color: red;'>Error: {e}</span>"))
    
    download_button.on_click(on_download_click)
    
    # Display UI
    display(HTML("<h1>📥 GitHub Directory Downloader</h1>"))
    display(HTML("<p>Download all files from a GitHub directory using multithreading</p>"))
    
    display(widgets.VBox([
        url_input,
        workers_slider,
        download_button,
        output
    ]))

# %%
# Run the interactive UI
create_downloader_ui()

# %%
# Alternative: Direct download without UI (uncomment to use)
# directory_url = "https://github.com/khushisampath/My-Python-journey-with-Sharmila-ma-am/blob/main/"
# downloader = GitHubDirectoryDownloader(directory_url, max_workers=10)
# downloader.download_all_files()

# %%
# Check downloaded files
def show_downloaded_files():
    download_dir = "github_python_journey"
    if os.path.exists(download_dir):
        files = os.listdir(download_dir)
        if files:
            display(HTML("<h3>📂 Currently Downloaded Files:</h3>"))
            for file in files:
                file_path = os.path.join(download_dir, file)
                file_size = os.path.getsize(file_path)
                display(HTML(f"• <code>{file}</code> ({file_size} bytes)"))
        else:
            display(HTML("<span style='color: orange;'>No files downloaded yet.</span>"))
    else:
        display(HTML("<span style='color: orange;'>Download directory doesn't exist yet.</span>"))

# %%
# Show downloaded files button
show_files_button = widgets.Button(
    description='📁 Show Downloaded Files',
    button_style='info'
)

def on_show_files_click(b):
    with output:
        clear_output()
        show_downloaded_files()

show_files_button.on_click(on_show_files_click)
display(show_files_button)

# %%
# Cleanup function (optional)
def cleanup_downloads():
    import shutil
    download_dir = "github_python_journey"
    if os.path.exists(download_dir):
        shutil.rmtree(download_dir)
        display(HTML("<span style='color: green;'>✅ Download directory cleaned up!</span>"))
    else:
        display(HTML("<span style='color: orange;'>No download directory to clean.</span>"))

# %%
# Cleanup button
cleanup_button = widgets.Button(
    description='🗑️ Cleanup Downloads',
    button_style='warning',
    tooltip='Delete all downloaded files'
)

def on_cleanup_click(b):
    with output:
        clear_output()
        cleanup_downloads()

cleanup_button.on_click(on_cleanup_click)
display(cleanup_button)

# %%
# Create a separate output for cleanup/show files
output = widgets.Output()
display(output)



VBox(children=(Text(value='https://github.com/khushisampath/My-Python-journey-with-Sharmila-ma-am/blob/main/',…

Button(button_style='info', description='📁 Show Downloaded Files', style=ButtonStyle())



Output()