In [1]:
from datetime import datetime

In [2]:
import requests, json, re
import warnings

In [3]:
class CacheRequests:
    def __init__(self):
        self._store = {'all_queries_keys': [], 'all_queries_values': []}
        self._api_limit_reached = False
    
    def get(self, url, params=None, **kwargs):
        key = self._key(url, params, kwargs)
        if key not in self._store:
            self._get(url, params, **kwargs)
        return self._store[key]
    
    @staticmethod
    def _key(url, params, kwargs):
        return url + json.dumps(params) + json.dumps(kwargs)
    
    def _get(self, url, params, **kwargs):
        print('Call request!')
        key = self._key(url, params, kwargs)
        if self._api_limit_reached:
            raise RuntimeError("API limit reached, no content!")
        self._store[key] = requests.get(url, params, **kwargs)
        self._store['all_queries_keys'].append(key)
        self._store['all_queries_values'].append(self._store[key])
        data = json.loads(r._store[key].content)
        if 'message' in data and data['message'].startswith('API rate limit exceeded for'):
            self._api_limit_reached = True
            if key in self._store['all_queries_keys']:
                key_index = list(reversed(self._store['all_queries_keys'])).index(key)
                self._store[key] = self._store['all_queries_values'][key_index]
                warnings.warnings('API limit reaced, restored previous value!')
            else:
                self._store[key] = {}
    
    def refresh_get(self, url, params=None, **kwargs):
        self._get(url, params, **kwargs)
        return self._store[key]
    

In [4]:
r = CacheRequests()

In [5]:
class RepoOverview:
    _available_repos = ['pyiron_atomistics', 'pyiron_base', 'pyiron_contrib', 'pyiron_gui']
    
    def __init__(self, repo=None):
        self._repos = {repo: {} for repo in self._available_repos}
        self.repo = repo
        self.debug = False
    
    @property
    def _dependency_updates(self):
        if 'dependency_updates' not in self._repo_dict:
            self._repo_dict['dependency_updates'] = []
        return self._repo_dict['dependency_updates']
    
    @property
    def _infrastructure_prs(self):
        if 'infrastructure_prs' not in self._repo_dict:
            self._repo_dict['infrastructure_prs'] = []
        return self._repo_dict['infrastructure_prs']
    
    @property
    def _pr_dict(self):
        if 'pr_dict' not in self._repo_dict:
            self._repo_dict['pr_dict'] = {}
        return self._repo_dict['pr_dict']
    
    @property
    def tags(self):
        if 'tags' not in self._repo_dict:
            self.get_tags()
        return [tag for tag in self._repo_dict['tags'] if tag['name'].startswith(self._repo)]
    
    def get_tags(self, refresh=False):
        if refresh:
            self._repo_dict['tags'] = json.loads(r.refresh_get("https://api.github.com/repos/pyiron/" + self._repo + "/tags", params={'per_page':100}).content)
        else:
            self._repo_dict['tags'] = json.loads(r.get("https://api.github.com/repos/pyiron/" + self._repo + "/tags", params={'per_page':100}).content)
    
    @_pr_dict.setter
    def _pr_dict(self, value):
        self._repo_dict['pr_dict'] = value
    
    @property
    def repo(self):
        return self._repo
    
    @repo.setter
    def repo(self, value):
        if value in self._available_repos:
            self._repo = value
        else:
            raise ValueError(f"Unknown repository {value}, expected one of {self._available_repos}.")
    
    @property
    def _repo_dict(self):
        return self._repos[self._repo]
    
    def _link_body_pr_to_urls(self, body:str):
        working_body = re.sub("#([0-9]*)", '[#\\1](https://github.com/pyiron/' + self.repo + '/pull/\\1)', body )
        return working_body.strip()
    
    def get_releases(self, num_pages=1):
        all_releases = []
        page=0
        while page < num_pages:
            page+=1
            data = r.get("https://api.github.com/repos/pyiron/" + self.repo + "/releases", params={"page":page, 'per_page':100})
            releases = json.loads(data.content)
            all_releases += releases
            if len(releases) == 0:
                break
        self._repo_dict['realeases'] = all_releases
    
    @property
    def full_release_history(self):
        result = ""
        if 'realeases' not in self._repo_dict:
            self.get_releases()
        for release in self._repo_dict['realeases']:
            result += "# " + release['tag_name'] + "\n"
            result += self._link_body_pr_to_urls(release['body']) + "\n\n"
        return result
    
    @staticmethod
    def _github_to_datetime(time_str: str):
        #print(f"github_to_datetime: {time_str} -> {datetime.fromisoformat(time_str[:-1])}")
        return datetime.fromisoformat(time_str[:-1])
    
    def get_merged_prs(self, refresh=False):
        commit_tag0 = json.loads(r.get(self.tags[0]['commit']['url']).content)
        tag_date = self._github_to_datetime(commit_tag0['commit']['committer']['date'])

        page=0
        all_pulls = []
        while True:
            page+=1
            if refresh:
                data = r.refresh_get("https://api.github.com/repos/pyiron/" + self._repo + "/pulls", params={"page":page, 'state': 'closed', 'per_page':100})
            else:
                data = r.get("https://api.github.com/repos/pyiron/" + self._repo + "/pulls", params={"page":page, 'state': 'closed', 'per_page':100})
            pulls = json.loads(data.content)
            if self.debug:
                print(f"received {len(pulls)} PRs:")
            for pr in pulls:
                pr_merged_at = pr['merged_at']
                if pr_merged_at is None:
                    if self.debug:
                        print("not merged, only closed")
                    continue
                if self._github_to_datetime(pr_merged_at) > tag_date:
                    if self.debug:
                        print("Pr newer than last tag!")
                    if pr['base']['ref'] == 'master':
                        if self.debug:
                            print("PR to master - added")
                        all_pulls.append(pr)                   
            if len(pulls) == 0:
                break
        if self.debug:
            print(all_pulls)
        self._pr_dict = {pr['number']:pr for pr in all_pulls}
        self._get_pure_dependency_updates()

        
    def _get_pure_dependency_updates(self):
        for pr_number, pr in self._pr_dict.items():
            if len(pr['labels']) == 1 and pr['labels'][0]['name'] == 'dependencies':
                self._dependency_updates.append(pr_number)
                
    @property
    def dependency_updates(self):
        return self._dependency_updates
    
    @property
    def infrastructure_prs(self):
        return self._infrastructure_prs
    
    @property
    def labels(self):
        return ['dependencies', 'format_black', 'enhancement', 'bug']
    
    @property
    def merged_prs(self):
        return self._pr_dict.copy()
    
    def _format_changelog_entry(self, pr_number):
        pr_title = self._pr_dict[pr_number]['title']
        return f"- {pr_title} ({self.pr_link(pr_number)})\n"
    
    def pr_link(self, pr_number):
        return f"[#{pr_number}](https://github.com/pyiron/{self._repo}/pull/{pr_number})"
    
    def _bumped_patch_version(self):
        latest_tag = self.tags[0]['name']
        bumped_patch_version = str(int(latest_tag.split('.')[-1]) + 1)
        
        return ".".join(latest_tag.split('.')[:-1] + [bumped_patch_version])
    
    def changelog(self, include_title=True, version=None):
        if version is None:
            version = self._bumped_patch_version()
        if len(self._pr_dict) == 0:
            self.get_merged_prs()
        
        if include_title and version.startswith(self._repo):
            result = f"# {version}\n\n"
        elif include_title:
            result = f"# {self._repo}-{version}\n\n"
        else:
            result = ""
        for pr_number in self._pr_dict:
            if pr_number in self._dependency_updates:
                continue
            if pr_number in self._infrastructure_prs:
                continue
            else:
                result += self._format_changelog_entry(pr_number)
        if len(self._dependency_updates) > 0:
            result += "- Dependency updates: "
            result += ", ".join([self.pr_link(pr_number) for pr_number in self._dependency_updates])
            result += '\n'
        if len(self._infrastructure_prs) > 0:
            result += "- GitHub infrastructure: "
            result += ", ".join([self.pr_link(pr_number) for pr_number in self._infrastructure_prs])
            result += '\n'
        
        return result


In [6]:
repo_overview = RepoOverview('pyiron_base')

In [6]:
repo_overview = RepoOverview('pyiron_atomistics')

In [7]:
print(repo_overview.changelog())

Call request!
Call request!
Call request!
Call request!
Call request!
Call request!
Call request!
Call request!
# pyiron_atomistics-0.2.44

- Update to pyiron_base-0.5.11 ([#589](https://github.com/pyiron/pyiron_atomistics/pull/589))
- Murnnaghan Convergence check ([#584](https://github.com/pyiron/pyiron_atomistics/pull/584))
- Cosmetics for Phonopy.plot_band_structure() ([#571](https://github.com/pyiron/pyiron_atomistics/pull/571))
- Handling empty indices ([#552](https://github.com/pyiron/pyiron_atomistics/pull/552))

