1. Получение данных о вакансиях с https://api.hh.ru/vacancies

In [None]:
import requests
import json
import time
import csv
import pandas as pd

def csv_escape(s):
    if isinstance(s, str):
        return s.replace('"', '""')
    return str(s) 
    
def get_page(page=0, keywords=['Java', 'TypeScript','JavaScript', 'Python', 'язык C', 'C#', 'C++', 'Go', 'PHP', 'Swift', 'SQL', 'Ruby', 'Kotlin', 'Dart', 'Rust']):
    keywords_str = ' OR '.join(keywords)
    params = {
        'text': keywords_str,
        'employment': "full",
        'only_with_salary' : "true",
        'area' : 113,
        'page': page,
        'per_page': 100
    }
    try:
        req = requests.get('https://api.hh.ru/vacancies', params)
        req.raise_for_status()  
        data = req.content.decode()
        req.close()
        return data
    except requests.exceptions.RequestException as e:
        print(f"Ошибка при запросе к API: {e}")
        return None
        
def flatten_dict(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        elif isinstance(v, list):
            items.append((new_key, json.dumps(v))) 
        elif v is None:
            items.append((new_key, ''))
        else:
            items.append((new_key, csv_escape(v)))
    return dict(items)

js_objs = []
for page in range(0, 20):
    page_data = get_page(page)
    if page_data:
        js_obj = json.loads(page_data)
        items = js_obj.get('items', [])
        if items: 
            js_objs.extend(items)
            pages = js_obj.get('pages', 0)
            if pages <= page +1: 
                break
        else:
            print(f"Вакансий не найдено на странице {page}")
            break 
    else:
        print(f"Запрос на страницу {page} не удался")
    time.sleep(0.25)

print(f"Найдено вакансий: {len(js_objs)}")

flattened_data = [flatten_dict(vacancy) for vacancy in js_objs]

df = pd.DataFrame(flattened_data)

df.to_csv('vacancies.csv', encoding='utf-8', index=False, quoting=csv.QUOTE_ALL)

print('Данные сохранены в vacancies.csv')

2. Получение данных о репозиториях с https://api.github.com/repositories

In [None]:
import asyncio
import aiohttp
import os
import time
import csv
from tqdm import tqdm
import pandas as pd

GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN")
if not GITHUB_TOKEN:
    print("Ошибка: Не установлен токен GitHub.")
    exit(1)

headers = {
    'Authorization': f'token {GITHUB_TOKEN}',
    'Accept': 'application/vnd.github.v3+json'
}

keywords = ['Java', 'TypeScript', 'JavaScript', 'Python', 'C', 'C#', 'C++', 'Go', 'PHP', 'Swift', 'SQL', 'Ruby', 'Kotlin', 'Dart', 'Rust']
search_url = 'https://api.github.com/search/repositories'
repo_url = "https://api.github.com/repositories"  

async def fetch_repo_data(session, query, page=1, per_page=100):
    params = {
        'q': query,
        'sort': 'stars',
        'per_page': per_page,
        'page': page
    }
    async with session.get(search_url, headers=headers, params=params) as response:
        if response.status == 200:
            data = await response.json()
            return data['items'] if 'items' in data else []
        elif response.status == 403:  
            print("Превышен лимит запросов к GitHub API. Ожидание...")
            await asyncio.sleep(60)  
            return await fetch_repo_data(session, query, page, per_page) 
        else:
            print(f"Ошибка при запросе: {response.status}, {query}")
            return []

async def get_languages(session, repo_url):
  async with session.get(repo_url + "/languages", headers=headers) as response:
    if response.status == 200:
      return await response.json()
    else:
      return {}

def csv_escape(s):
    if isinstance(s, str):
        return s.replace('"', '""')
    return str(s)

def flatten_dict(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        elif isinstance(v, list):
            items.append((new_key, json.dumps(v)))
        elif v is None:
            items.append((new_key, ''))
        else:
            items.append((new_key, csv_escape(v)))
    return dict(items)


async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_repo_data(session, f'language:{lang}') for lang in keywords]
        repo_data = []
        for items in await asyncio.gather(*tasks):
            repo_data.extend(items)

      
        for i, repo in enumerate(tqdm(repo_data, desc="Обработка репозиториев")):
            languages_data = await get_languages(session, repo['url'])
            total_size = sum(languages_data.values())
            if total_size > 0:
                repo['percentage'] = {lang: round((size / total_size) * 100, 2) for lang, size in languages_data.items()}
            else:
                repo['percentage'] = {}
        return [flatten_dict(repo) for repo in repo_data]

try:
    repo_data = await main()
    df = pd.DataFrame(repo_data)
    df.to_csv('github_repos.csv', index=False, encoding='utf-8', quoting=csv.QUOTE_ALL)
    print('Данные сохранены в github_repos.csv')
except Exception as e:
    print(f"Произошла ошибка: {e}")

3. Получение данных о количестве репозиториев с https://api.github.com/search/repositories

In [None]:
import requests
import csv
import time
import random

keywords = [
    'Java', 'TypeScript', 'JavaScript', 'Python', 'C', 'C#', 'C++',
    'Go', 'PHP', 'Swift', 'SQL', 'Ruby', 'Kotlin', 'Dart', 'Rust', 'Jupyter Notebook',
    'TSQL', 'PLpgSQL', 'PLSQL'
]

GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN")
headers = {
    'Authorization': f'token {GITHUB_TOKEN}',
    'Accept': 'application/vnd.github.v3+json'
}

search_url = 'https://api.github.com/search/repositories'
repo_url_base = "https://api.github.com/repos"

results = []

def get_repo_details(repo_url, headers):
    try:
      response = requests.get(repo_url, headers=headers)
      response.raise_for_status()
      data = response.json()
      return {
          'stars': data.get('stargazers_count', 0),
          'subscribers': data.get('subscribers_count', 0),
          'forks': data.get('forks_count', 0)
      }
    except requests.exceptions.HTTPError as e:
      print(f"Error fetching repo details: {e}")
      return {}
    except Exception as e:
      print(f"Error fetching repo details: {e}")
      return {}




for language in keywords:
    params = {
        'q': f'language:{language}',
        'per_page': 10 
    }

    try:
        response = requests.get(search_url, headers=headers, params=params)
        response.raise_for_status()
        data = response.json()
        total_count = data.get('total_count', 0)
        repo_details = []
        
        if data['items']: 
            for item in data['items']:
              repo_url = f"{repo_url_base}/{item['owner']['login']}/{item['name']}"
              repo_info = get_repo_details(repo_url, headers)
              repo_details.append(repo_info)
              time.sleep(random.uniform(0.5,1))
        else:
          repo_details = []

        print(f"Язык: {language}, Количество репозиториев: {total_count}")
        results.append({
            'Language': language,
            'RepositoryCount': total_count,
            'RepoDetails': repo_details
        })

    except requests.exceptions.HTTPError as http_err:
        print(f"HTTP ошибка для языка {language}: {http_err}")
    except Exception as err:
        print(f"Ошибка для языка {language}: {err}")
    


import json
csv_file = 'language_popularity_details.csv'
with open(csv_file, 'w', newline='', encoding='utf-8') as csvfile:
    fieldnames = ['Language', 'RepositoryCount', 'RepoDetails']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    for row in results:
        row['RepoDetails'] = json.dumps(row['RepoDetails']) #Serialize nested dictionary
        writer.writerow(row)

print(f"\nДанные успешно сохранены в файл {csv_file}")

4. Получение данных об обсуждениях на форуме с https://api.stackexchange.com/2.2/search/advanced

In [None]:
import requests
import json
import time
import csv
import pandas as pd


STACK_EXCHANGE_API_KEY = "MY_STACK_EXCHAGE_API_KEY"

keywords = ['java', 'typescript', 'javascript', 'python', 'c', 'c#', 'c++', 'go', 'php', 'swift', 'sql', 'ruby', 'kotlin', 'dart', 'rust']
site = "stackoverflow"

def get_posts(page=1, pageSize=100, keyword=None):
    if keyword is None:
        return [], False

    url = f"https://api.stackexchange.com/2.2/search/advanced?order=desc&sort=activity&site={site}&pagesize={pageSize}&page={page}&tagged={keyword}"
    params = {'key': STACK_EXCHANGE_API_KEY}
    try:
        response = requests.get(url, params=params)
        response.raise_for_status()
        data = response.json()
        return data.get('items', []), data.get('has_more', False)
    except requests.exceptions.RequestException as e:
        print(f"Ошибка при запросе к API для {keyword}: {e}")
        return [], False
    except json.JSONDecodeError as e:
        print(f"Ошибка декодирования JSON для {keyword}: {e}. Пропуск страницы.")
        return [], False
        
def flatten_dict(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        elif isinstance(v, list):
            items.append((new_key, json.dumps(v))) 
        elif v is None:
            items.append((new_key, ''))
        else:
            items.append((new_key, str(v).replace('"', ''))) 
    return dict(items)



def process_and_save(all_posts):
    flattened_data = []
    
    for post in all_posts:
        try:
            flattened_data.append(flatten_dict(post))
        except (TypeError, KeyError) as e:
            print(f"Ошибка при обработке поста: {e}, пост пропущен.")

    if flattened_data:
      df = pd.DataFrame(flattened_data)

      try:
          df.to_csv('stack_overflow_posts.csv', index=False, encoding='utf-8', quoting=csv.QUOTE_ALL)
          print('Данные сохранены в stack_overflow_posts.csv')
      except Exception as e:
          print(f"Ошибка при записи в CSV: {e}")
    else:
        print("Нет данных для сохранения.")


if __name__ == "__main__":
    all_posts = []

    for keyword in keywords:
        page = 1
        has_more = True
        keyword_posts = []
        keyword_count = 0
        

        while has_more :
            posts, has_more = get_posts(page=page, keyword=keyword)
            keyword_posts.extend(posts)
            page += 1
            time.sleep(1)

        all_posts.extend(keyword_posts)
        

    process_and_save(all_posts) 