## Data collection from various sources

In [None]:
import requests
import json
import time
from bs4 import BeautifulSoup
from datetime import datetime

API_KEY = "rl_zbkvT7h5FY8576ZfSWCLhfXRW"
TOPICS = ["git", "bash", "tar", "gzip", "grep", "venv", "docker", "ssh", "curl", "awk"]
MAX_QUESTIONS_PER_TOPIC = 5
OUTPUT_FILE = "command_line_qa_dataset.json"

def html_to_text(html):
    """Convert HTML to clean text with preserved code blocks"""
    soup = BeautifulSoup(html, 'html.parser')

    # Format code blocks
    for pre in soup.find_all('pre'):
        pre.string = f"\n```\n{pre.get_text()}\n```\n"

    return soup.get_text().strip()

def fetch_questions(topic):
    """Fetch multiple questions for a topic"""
    questions = []

    try:
        response = requests.get(
            "https://api.stackexchange.com/2.3/questions",
            params={
                "order": "desc",
                "sort": "votes",
                "tagged": topic,
                "site": "stackoverflow",
                "filter": "withbody",
                "pagesize": MAX_QUESTIONS_PER_TOPIC,
                "key": API_KEY
            }
        )
        response.raise_for_status()
        items = response.json().get("items", [])

        for item in items:
            if "accepted_answer_id" not in item:
                continue

            answer = fetch_answer(item['accepted_answer_id'])
            if answer:
                questions.append({
                    "topic": topic,
                    "question": {
                        "id": item["question_id"],
                        "title": item["title"],
                        "body": html_to_text(item["body"]),
                        "url": item["link"],
                        "votes": item["score"],
                        "tags": item["tags"],
                        "created_at": datetime.fromtimestamp(item["creation_date"]).isoformat()
                    },
                    "answer": {
                        "id": answer["answer_id"],
                        "body": html_to_text(answer["body"]),
                        "votes": answer["score"],
                        "accepted": answer["is_accepted"],
                        "created_at": datetime.fromtimestamp(answer["creation_date"]).isoformat()
                    }
                })
                time.sleep(1)  # Respect API rate limits

    except Exception as e:
        print(f"⚠️ Error fetching {topic}: {str(e)}")

    return questions

def fetch_answer(answer_id):
    """Fetch a single answer"""
    try:
        response = requests.get(
            f"https://api.stackexchange.com/2.3/answers/{answer_id}",
            params={
                "site": "stackoverflow",
                "filter": "withbody",
                "key": API_KEY
            }
        )
        response.raise_for_status()
        return response.json()["items"][0]
    except:
        return None

def build_dataset():
    """Build complete dataset across all topics"""
    dataset = {
        "metadata": {
            "source": "Stack Overflow API",
            "created_at": datetime.now().isoformat(),
            "topics": TOPICS,
            "questions_per_topic": MAX_QUESTIONS_PER_TOPIC
        },
        "qna_pairs": []
    }

    print("🚀 Building command-line Q&A dataset...\n")

    for topic in TOPICS:
        print(f"🔍 Fetching {MAX_QUESTIONS_PER_TOPIC} questions about {topic}...")
        questions = fetch_questions(topic)
        dataset["qna_pairs"].extend(questions)
        print(f"✅ Got {len(questions)} questions with accepted answers")
        time.sleep(2)  # Additional rate limiting

    # Save to file
    with open(OUTPUT_FILE, 'w') as f:
        json.dump(dataset, f, indent=2)

    print(f"\n🎉 Dataset saved to {OUTPUT_FILE}")
    print(f"Total Q&A pairs collected: {len(dataset['qna_pairs'])}")

if __name__ == "__main__":
    build_dataset()

🚀 Building command-line Q&A dataset...

🔍 Fetching 5 questions about git...
✅ Got 5 questions with accepted answers
🔍 Fetching 5 questions about bash...
✅ Got 5 questions with accepted answers
🔍 Fetching 5 questions about tar...
✅ Got 5 questions with accepted answers
🔍 Fetching 5 questions about gzip...
✅ Got 5 questions with accepted answers
🔍 Fetching 5 questions about grep...
✅ Got 5 questions with accepted answers
🔍 Fetching 5 questions about venv...
✅ Got 2 questions with accepted answers
🔍 Fetching 5 questions about docker...
✅ Got 5 questions with accepted answers
🔍 Fetching 5 questions about ssh...
✅ Got 4 questions with accepted answers
🔍 Fetching 5 questions about curl...
✅ Got 5 questions with accepted answers
🔍 Fetching 5 questions about awk...
✅ Got 4 questions with accepted answers

🎉 Dataset saved to command_line_qa_dataset.json
Total Q&A pairs collected: 45


In [None]:
import requests
import json
import time
from bs4 import BeautifulSoup
from datetime import datetime

API_KEY = "rl_zbkvT7h5FY8576ZfSWCLhfXRW"
OUTPUT_FILE = "command_line_qa_dataset.json"

# More reliable filter that's known to work
DEFAULT_FILTER = "!-MBr1_IzpA7I9nsmVLyoWwR*5QqS7qyDqR"

# Slightly reduced but comprehensive topic list
COMMAND_LINE_TOPICS = [
    "git", "bash", "terminal", "shell",
    "awk", "sed", "grep", "find",
    "ssh", "curl", "wget", "rsync",
    "tar", "gzip", "zip", "unzip",
    "docker", "kubectl", "vagrant",
    "vim", "emacs", "nano",
    "cron", "systemd", "htop",
    "python", "pip", "conda", "venv"
]

def html_to_text(html):
    """Convert HTML to clean text with preserved code blocks"""
    soup = BeautifulSoup(html, 'html.parser')

    # Format code blocks
    for pre in soup.find_all('pre'):
        pre.string = f"\n```\n{pre.get_text()}\n```\n"

    return soup.get_text().strip()

def fetch_questions(topic, max_questions=50):
    """Fetch questions for a topic with reliable filters"""
    questions = []
    page = 1

    print(f"\n📚 Fetching questions about {topic}...")

    while len(questions) < max_questions:
        try:
            # Using a simpler, more reliable filter
            response = requests.get(
                "https://api.stackexchange.com/2.3/questions",
                params={
                    "order": "desc",
                    "sort": "votes",
                    "tagged": topic,
                    "site": "stackoverflow",
                    "filter": "withbody",
                    "pagesize": 50,  # More reliable page size
                    "page": page,
                    "key": API_KEY
                },
                timeout=10
            )
            response.raise_for_status()
            data = response.json()

            for item in data.get("items", []):
                if "accepted_answer_id" in item:
                    answer = fetch_answer(item['accepted_answer_id'])
                    if answer:
                        questions.append(create_qa_pair(item, answer, topic))
                        if len(questions) >= max_questions:
                            break

            if not data.get("has_more", False):
                break

            page += 1
            time.sleep(1)  # Respect API rate limits

            print(f"  ✅ Collected {len(questions)}/{max_questions} questions", end='\r')

        except Exception as e:
            print(f"\n⚠️ Error fetching {topic}: {str(e)}")
            time.sleep(5)
            break

    return questions

def fetch_answer(answer_id):
    """Fetch an answer with reliable filter"""
    try:
        response = requests.get(
            f"https://api.stackexchange.com/2.3/answers/{answer_id}",
            params={
                "site": "stackoverflow",
                "filter": "withbody",
                "key": API_KEY
            },
            timeout=5
        )
        response.raise_for_status()
        return response.json()["items"][0]
    except Exception as e:
        print(f"⚠️ Error fetching answer {answer_id}: {str(e)}")
        return None

def create_qa_pair(question, answer, topic):
    """Create a structured Q&A pair"""
    return {
        "topic": topic,
        "question": {
            "id": question["question_id"],
            "title": question["title"],
            "body": html_to_text(question["body"]),
            "url": question["link"],
            "votes": question["score"],
            "tags": question["tags"],
            "created_at": datetime.fromtimestamp(question["creation_date"]).isoformat()
        },
        "answer": {
            "id": answer["answer_id"],
            "body": html_to_text(answer["body"]),
            "votes": answer["score"],
            "accepted": answer["is_accepted"],
            "created_at": datetime.fromtimestamp(answer["creation_date"]).isoformat()
        }
    }

def build_dataset():
    """Build complete dataset with reliable API calls"""
    dataset = {
        "metadata": {
            "source": "Stack Overflow API",
            "created_at": datetime.now().isoformat(),
            "topics": COMMAND_LINE_TOPICS
        },
        "qna_pairs": []
    }

    print("🚀 Building reliable command-line Q&A dataset")
    print(f"📂 Will save to {OUTPUT_FILE}\n")

    for topic in COMMAND_LINE_TOPICS:
        questions = fetch_questions(topic)
        dataset["qna_pairs"].extend(questions)
        print(f"✅ {topic}: Collected {len(questions)} Q&A pairs")
        time.sleep(2)  # Additional rate limiting

    # Save to file
    with open(OUTPUT_FILE, 'w') as f:
        json.dump(dataset, f, indent=2)

    print(f"\n🎉 Dataset saved to {OUTPUT_FILE}")
    print(f"Total Q&A pairs collected: {len(dataset['qna_pairs'])}")

if __name__ == "__main__":
    build_dataset()

🚀 Building reliable command-line Q&A dataset
📂 Will save to command_line_qa_dataset.json


📚 Fetching questions about git...
✅ git: Collected 50 Q&A pairs

📚 Fetching questions about bash...
✅ bash: Collected 50 Q&A pairs

📚 Fetching questions about terminal...
✅ terminal: Collected 50 Q&A pairs

📚 Fetching questions about shell...
✅ shell: Collected 50 Q&A pairs

📚 Fetching questions about awk...
✅ awk: Collected 43 Q&A pairs

📚 Fetching questions about sed...
✅ sed: Collected 50 Q&A pairs

📚 Fetching questions about grep...
✅ grep: Collected 50 Q&A pairs

📚 Fetching questions about find...
✅ find: Collected 50 Q&A pairs

📚 Fetching questions about ssh...
✅ ssh: Collected 50 Q&A pairs

📚 Fetching questions about curl...
✅ curl: Collected 50 Q&A pairs

📚 Fetching questions about wget...
✅ wget: Collected 50 Q&A pairs

📚 Fetching questions about rsync...
✅ rsync: Collected 50 Q&A pairs

📚 Fetching questions about tar...
✅ tar: Collected 50 Q&A pairs

📚 Fetching questions about gzip...
✅

In [None]:
import json
from datetime import datetime
import time
import requests
import re

# Configuration - USE YOUR OWN TOKEN!
GITHUB_TOKEN = "your_github_token_here"  # Replace with your token
REPO_NAME = "git-guides/git-guide"  # Corrected repository name
OUTPUT_FILE = "git_guides_qa.json"

def fetch_discussions_graphql(repo_name):
    """Fetch discussions from GitHub GraphQL API"""
    owner, repo = repo_name.split('/')
    query_template = """
    query($cursor: String) {
      repository(owner: "%s", name: "%s") {
        hasDiscussionsEnabled
        discussions(first: 100, after: $cursor) {
          pageInfo {
            hasNextPage
            endCursor
          }
          nodes {
            title
            body
            url
            createdAt
            upvoteCount
            answer {
              body
            }
            comments(first: 15) {
              nodes {
                body
                isAnswer
              }
            }
          }
        }
      }
    }
    """ % (owner, repo)

    headers = {
        "Authorization": f"Bearer {GITHUB_TOKEN}",
        "Content-Type": "application/json"
    }

    # First check if discussions are enabled
    try:
        check_response = requests.post(
            "https://api.github.com/graphql",
            json={"query": query_template, "variables": {}},
            headers=headers
        )
        check_response.raise_for_status()
        check_data = check_response.json()

        if 'errors' in check_data:
            print(f"⚠️ GraphQL errors: {check_data['errors'][0]['message']}")
            return []

        if not check_data['data']['repository']['hasDiscussionsEnabled']:
            print(f"⚠️ Discussions not enabled in {repo_name}")
            return []
    except Exception as e:
        print(f"⚠️ Error checking discussions status: {str(e)}")
        return []

    # Now fetch discussions
    discussions = []
    has_next_page = True
    cursor = None

    print(f"🔍 Fetching discussions from {repo_name}...")

    while has_next_page:
        variables = {"cursor": cursor} if cursor else {}
        try:
            response = requests.post(
                "https://api.github.com/graphql",
                json={"query": query_template, "variables": variables},
                headers=headers
            )
            response.raise_for_status()
            data = response.json()

            if 'errors' in data:
                print(f"⚠️ GraphQL errors: {data['errors'][0]['message']}")
                break

            discussion_data = data['data']['repository']['discussions']
            nodes = discussion_data['nodes']

            for node in nodes:
                # Extract answer (either direct answer or from comments)
                answer_body = None
                if node.get('answer'):
                    answer_body = node['answer']['body']
                else:
                    for comment in node['comments']['nodes']:
                        if comment['isAnswer']:
                            answer_body = comment['body']
                            break

                if answer_body:
                    discussions.append({
                        "question": node['title'],
                        "question_body": node['body'],
                        "answer": answer_body,
                        "url": node['url'],
                        "created_at": node['createdAt'],
                        "upvotes": node['upvoteCount']
                    })

            # Handle pagination
            page_info = discussion_data['pageInfo']
            has_next_page = page_info['hasNextPage']
            cursor = page_info['endCursor']

            print(f"📥 Collected {len(discussions)} discussions so far...")
            time.sleep(0.5)

        except Exception as e:
            print(f"⚠️ Error: {str(e)}")
            break

    return discussions

def clean_markdown(text):
    """Clean markdown content"""
    if not text:
        return ""
    # Remove HTML tags
    text = re.sub(r'<[^>]+>', '', text)
    # Remove excessive newlines
    text = re.sub(r'\n{3,}', '\n\n', text)
    # Clean code blocks
    text = text.replace("```\n\n", "```\n")
    return text.strip()

def save_dataset(data):
    """Save dataset to JSON file"""
    dataset = {
        "metadata": {
            "source": "github",
            "repository": REPO_NAME,
            "created_at": datetime.now().isoformat(),
            "discussion_count": len(data)
        },
        "qnas": data
    }

    with open(OUTPUT_FILE, 'w') as f:
        json.dump(dataset, f, indent=2)

    print(f"\n✅ Dataset saved to {OUTPUT_FILE}")
    print(f"Total Q&A pairs collected: {len(data)}")

if __name__ == "__main__":
    # Fetch discussions
    discussions = fetch_discussions_graphql(REPO_NAME)

    # Clean markdown content
    for qa in discussions:
        qa['question'] = clean_markdown(qa['question'])
        qa['question_body'] = clean_markdown(qa['question_body'])
        qa['answer'] = clean_markdown(qa['answer'])

    # Save results
    save_dataset(discussions)

🚀 Collecting GitHub Discussions...

📚 Processing github/docs...
⚠️ github/docs: Discussions not enabled
⚠️ github/docs: No discussions found or not enabled

📚 Processing facebook/react...
⚠️ facebook/react: Discussions not enabled
⚠️ facebook/react: No discussions found or not enabled

📚 Processing docker/for-linux...
⚠️ docker/for-linux: Discussions not enabled
⚠️ docker/for-linux: No discussions found or not enabled

📚 Processing python/cpython...
⚠️ python/cpython: Discussions not enabled
⚠️ python/cpython: No discussions found or not enabled

📚 Processing pypa/pipx...
✅ pypa/pipx: 11 answered discussions

📚 Processing pypa/virtualenv...
✅ pypa/virtualenv: 7 answered discussions

📚 Processing git/git...
⚠️ git/git: Discussions not enabled
⚠️ git/git: No discussions found or not enabled

🎉 Dataset saved to github_qa_dataset.json
Total Discussions Collected: 18


In [None]:
import json
from datetime import datetime
import time
import requests
import re

# Configuration - USE YOUR OWN TOKEN!
# GITHUB_TOKEN = "your_github_token_here"  # Replace with your token
REPO_NAME = "github/git-guide"  # Corrected repository name
OUTPUT_FILE = "git_guides_qa.json"

def fetch_discussions_graphql(repo_name):
    """Fetch discussions from GitHub GraphQL API"""
    owner, repo = repo_name.split('/')
    query_template = """
    query($cursor: String) {
      repository(owner: "%s", name: "%s") {
        hasDiscussionsEnabled
        discussions(first: 100, after: $cursor) {
          pageInfo {
            hasNextPage
            endCursor
          }
          nodes {
            title
            body
            url
            createdAt
            upvoteCount
            answer {
              body
            }
            comments(first: 15) {
              nodes {
                body
                isAnswer
              }
            }
          }
        }
      }
    }
    """ % (owner, repo)

    headers = {
        "Authorization": f"Bearer {GITHUB_TOKEN}",
        "Content-Type": "application/json"
    }

    # First check if discussions are enabled
    try:
        check_response = requests.post(
            "https://api.github.com/graphql",
            json={"query": query_template, "variables": {}},
            headers=headers
        )
        check_response.raise_for_status()
        check_data = check_response.json()

        if 'errors' in check_data:
            print(f"⚠️ GraphQL errors: {check_data['errors'][0]['message']}")
            return []

        if not check_data['data']['repository']['hasDiscussionsEnabled']:
            print(f"⚠️ Discussions not enabled in {repo_name}")
            return []
    except Exception as e:
        print(f"⚠️ Error checking discussions status: {str(e)}")
        return []

    # Now fetch discussions
    discussions = []
    has_next_page = True
    cursor = None

    print(f"🔍 Fetching discussions from {repo_name}...")

    while has_next_page:
        variables = {"cursor": cursor} if cursor else {}
        try:
            response = requests.post(
                "https://api.github.com/graphql",
                json={"query": query_template, "variables": variables},
                headers=headers
            )
            response.raise_for_status()
            data = response.json()

            if 'errors' in data:
                print(f"⚠️ GraphQL errors: {data['errors'][0]['message']}")
                break

            discussion_data = data['data']['repository']['discussions']
            nodes = discussion_data['nodes']

            for node in nodes:
                # Extract answer (either direct answer or from comments)
                answer_body = None
                if node.get('answer'):
                    answer_body = node['answer']['body']
                else:
                    for comment in node['comments']['nodes']:
                        if comment['isAnswer']:
                            answer_body = comment['body']
                            break

                if answer_body:
                    discussions.append({
                        "question": node['title'],
                        "question_body": node['body'],
                        "answer": answer_body,
                        "url": node['url'],
                        "created_at": node['createdAt'],
                        "upvotes": node['upvoteCount']
                    })

            # Handle pagination
            page_info = discussion_data['pageInfo']
            has_next_page = page_info['hasNextPage']
            cursor = page_info['endCursor']

            print(f"📥 Collected {len(discussions)} discussions so far...")
            time.sleep(0.5)

        except Exception as e:
            print(f"⚠️ Error: {str(e)}")
            break

    return discussions

def clean_markdown(text):
    """Clean markdown content"""
    if not text:
        return ""
    # Remove HTML tags
    text = re.sub(r'<[^>]+>', '', text)
    # Remove excessive newlines
    text = re.sub(r'\n{3,}', '\n\n', text)
    # Clean code blocks
    text = text.replace("```\n\n", "```\n")
    return text.strip()

def save_dataset(data):
    """Save dataset to JSON file"""
    dataset = {
        "metadata": {
            "source": "github",
            "repository": REPO_NAME,
            "created_at": datetime.now().isoformat(),
            "discussion_count": len(data)
        },
        "qnas": data
    }

    with open(OUTPUT_FILE, 'w') as f:
        json.dump(dataset, f, indent=2)

    print(f"\n✅ Dataset saved to {OUTPUT_FILE}")
    print(f"Total Q&A pairs collected: {len(data)}")

if __name__ == "__main__":
    # Fetch discussions
    discussions = fetch_discussions_graphql(REPO_NAME)

    # Clean markdown content
    for qa in discussions:
        qa['question'] = clean_markdown(qa['question'])
        qa['question_body'] = clean_markdown(qa['question_body'])
        qa['answer'] = clean_markdown(qa['answer'])

    # Save results
    save_dataset(discussions)

⚠️ GraphQL errors: Could not resolve to a Repository with the name 'github/git-guide'.

✅ Dataset saved to git_guides_qa.json
Total Q&A pairs collected: 0


## Data Preprocessing

In [None]:
import json
import os

formatted_data = []

for file in ["file1.json", "file2.json"]:
    with open(file, "r", encoding="utf-8") as f:
        raw_data = json.load(f)

    qna_pairs = raw_data["qna_pairs"]

    for pair in qna_pairs:
        title = pair["question"].get("title", "").strip()
        body = pair["question"]["body"].strip()
        answer = pair["answer"]["body"].strip()

        # instruction = f"{title}\n\n{body}" if title else body

        formatted_data.append({
            "instruction": title,
            "output": answer
        })

# Save as JSONL
os.makedirs("data", exist_ok=True)
with open("data/command_qa.jsonl", "w", encoding="utf-8") as f:
    for item in formatted_data:
        f.write(json.dumps(item, ensure_ascii=False) + "\n")

print("✅ Data saved using title + body as instruction: data/command_qa.jsonl")


✅ Data saved using title + body as instruction: data/command_qa.jsonl


In [None]:
import json
import re

def extract_commands(text):
    return re.findall(r'```(?:bash|sh)?\n(.*?)\n```', text, re.DOTALL)

processed_data = []
seen_questions = set()

# Process Stack Overflow-style files
for file in ["file1.json", "file2.json"]:
    data = json.load(open(file))
    for pair in data["qna_pairs"]:
        # Extract commands from answer
        commands = extract_commands(pair["answer"]["body"])
        if not commands: continue

        # Create clean Q&A pair
        question = f"{pair['question']['title']} {pair['question']['body']}".strip()
        answer = commands[0].strip()  # Take first valid command

        # Remove shell prompts and line numbers
        answer = re.sub(r'^\$\s*|\s*#\s*\d+', '', answer, flags=re.MULTILINE)

        if question not in seen_questions:
            seen_questions.add(question)
            processed_data.append({"question": question, "answer": answer})

# # Process GitHub-style file
# github_data = json.load(open("file3.json"))
# for qna in github_data["qnas"]:
#     commands = extract_commands(qna["answer"])
#     if commands:
#         answer = commands[0].strip()
#         answer = re.sub(r'^\$\s*', '', answer, flags=re.MULTILINE)

#         if qna["question"] not in seen_questions:
#             seen_questions.add(qna["question"])
#             processed_data.append({"question": qna["question"], "answer": answer})

# Save processed data
with open("processed_data.json", "w") as f:
    json.dump(processed_data, f, indent=2)

In [None]:
len(processed_data)

973

In [None]:
import json
import re

def extract_commands(text):
    return re.findall(r'```(?:bash|sh)?\n(.*?)\n```', text, re.DOTALL)

def clean_text_block(text):
    # Remove leading/trailing spaces on each line
    lines = [line.strip() for line in text.strip().splitlines()]
    # Remove empty lines and normalize to max 1 blank line
    cleaned = []
    blank_line = False
    for line in lines:
        if not line:
            if not blank_line:
                cleaned.append('')
                blank_line = True
        else:
            cleaned.append(line)
            blank_line = False
    return '\n'.join(cleaned).strip()

processed_data = []
seen_questions = set()

for file in ["file1.json", "file2.json"]:
    with open(file, "r") as f:
        data = json.load(f)

    for pair in data.get("qna_pairs", []):
        question = f"{pair['question']['title']} {pair['question']['body']}".strip()

        if question in seen_questions:
            continue

        raw_answer = pair.get("answer", {}).get("body", "")

        # Extract and clean commands
        commands = extract_commands(raw_answer)
        cleaned_commands = []
        for cmd in commands:
            cmd = re.sub(r'^\s*\$\s*', '', cmd, flags=re.MULTILINE)
            cmd = re.sub(r'^\s*#.*$', '', cmd, flags=re.MULTILINE)
            cmd = re.sub(r'^\s*\d+\s+', '', cmd, flags=re.MULTILINE)
            cmd_cleaned = clean_text_block(cmd)
            if cmd_cleaned:
                cleaned_commands.append(cmd_cleaned)

        # Remove code blocks from explanation
        explanation = re.sub(r'```(?:bash|sh)?\n.*?\n```', '', raw_answer, flags=re.DOTALL)
        explanation_clean = clean_text_block(explanation)

        # Combine both parts cleanly
        full_answer_parts = []
        if cleaned_commands:
            full_answer_parts.append('\n'.join(cleaned_commands))
        if explanation_clean:
            full_answer_parts.append(explanation_clean)

        full_answer = '\n\n'.join(full_answer_parts).strip()

        if full_answer:
            processed_data.append({"question": question, "answer": full_answer})
            seen_questions.add(question)

# Save clean output
with open("processed_data.json", "w") as f:
    json.dump(processed_data, f, indent=2)


In [None]:
len(processed_data)

1297