Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions config.ini.example

This file was deleted.

18 changes: 18 additions & 0 deletions config.toml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[pulse]
userid = "<username>"
host = "pulse.mozilla.org"
port = 5671
exchange = "exchange/<username>/test"
routing_key = ""
queue = "queue/<username>/test"
password = "<pulse-password>"

[clones]
directory = "../git_hg_sync_repos/"

[mappings.test]
git_repository = "/home/fbessou/dev/MOZI/fake-forge/git/test"

[mappings.test.rules.esr1]
branch_pattern = "test-esr1"
mercurial_repository = "/home/fbessou/dev/MOZI/fake-forge/hg/test-esr1"
27 changes: 14 additions & 13 deletions git_hg_sync/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from kombu import Connection, Exchange, Queue
from mozlog import commandline

from git_hg_sync import config
from git_hg_sync.config import Config
from git_hg_sync.pulse_worker import PulseWorker
from git_hg_sync.repo_synchronizer import RepoSynchronyzer

Expand All @@ -19,37 +19,38 @@ def get_parser():

def get_connection(config):
return Connection(
hostname=config["host"],
port=config["port"],
userid=config["userid"],
password=config["password"],
hostname=config.host,
port=config.port,
userid=config.userid,
password=config.password,
heartbeat=10,
ssl=True,
)


def get_queue(config):
exchange = Exchange(config["exchange"], type="topic")
exchange = Exchange(config.exchange, type="topic")
return Queue(
name=config["queue"],
name=config.queue,
exchange=exchange,
routing_key=config["routing_key"],
routing_key=config.routing_key,
exclusive=False,
)


def main():
def main() -> None:
parser = get_parser()
commandline.add_logging_group(parser)
args = parser.parse_args()
logger = commandline.setup_logging("service", args, {"raw": sys.stdout})
pulse_config = config.get_pulse_config(HERE.parent / "config.ini")["pulse"]
config = Config.from_file(HERE.parent / "config.toml")
pulse_config = config.pulse
connection = get_connection(pulse_config)

repos_config = config.get_repos_config(HERE.parent / "repos.json")

queue = get_queue(pulse_config)
repo_synchronyzer = RepoSynchronyzer(repos_config=repos_config)
repo_synchronyzer = RepoSynchronyzer(
clones_directory=config.clones.directory, mappings=config.mappings
)
with connection as conn:
logger.info(f"connected to {conn.host}")
worker = PulseWorker(conn, queue, repo_synchronyzer=repo_synchronyzer)
Expand Down
63 changes: 51 additions & 12 deletions git_hg_sync/config.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,55 @@
import configparser
import json
import pathlib
import tomllib
from collections import Counter

import pydantic

def get_pulse_config(config_file_path):
assert config_file_path.exists(), f"config file {config_file_path} doesn't exists"
config = configparser.ConfigParser()
config.read(config_file_path)
return config

class PulseConfig(pydantic.BaseModel):
userid: str
host: str
port: int
exchange: str
routing_key: str
queue: str
password: str

def get_repos_config(repo_file_path):
assert repo_file_path.exists(), f"config file {repo_file_path} doesn't exists"
with open(repo_file_path) as f:
repos = json.load(f)
return repos

class MappingRule(pydantic.BaseModel):
branch_pattern: str
mercurial_repository: str


class MappingConfig(pydantic.BaseModel):
git_repository: str
rules: dict[str, MappingRule]


class ClonesConfig(pydantic.BaseModel):
directory: pathlib.Path


class Config(pydantic.BaseModel):
pulse: PulseConfig
clones: ClonesConfig
mappings: dict[str, MappingConfig]

@pydantic.field_validator("mappings")
@staticmethod
def no_duplicate_git_repositories(
mappings: dict[str, MappingConfig]
) -> dict[str, MappingConfig]:
counter = Counter([mapping.git_repository for mapping in mappings.values()])
for git_repo, count in counter.items():
if count > 1:
raise ValueError(
f"Found {count} different mappings for the same git repository."
)
return mappings

@staticmethod
def from_file(file_path: pathlib.Path) -> "Config":
assert file_path.exists(), f"config file {file_path} doesn't exists"
with open(file_path, "rb") as config_file:
config = tomllib.load(config_file)
return Config(**config)
94 changes: 67 additions & 27 deletions git_hg_sync/repo_synchronizer.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
from dataclasses import dataclass
from pathlib import Path
from typing import Literal

from git import Repo
from mozlog import get_proxy_logger

from git_hg_sync.config import MappingConfig

logger = get_proxy_logger("sync_repo")


@dataclass
class Push:
repo_url: str
heads: list[str]
commits: list[str]
branches: dict[
str, str
] # Mapping between branch names (key) and corresponding commit sha (value)
time: int
pushid: int
user: str
Expand All @@ -31,45 +35,81 @@ class Tag:

class RepoSynchronyzer:

def __init__(self, repos_config):
self._repos_config = repos_config
def __init__(self, clones_directory: Path, mappings: dict[str, MappingConfig]):
self._clones_directory = clones_directory
self._mappings = mappings

def get_remote(self, repo, remote_url):
def get_remote(self, repo, remote_name: Literal["git", "hg"], remote_url: str):
"""
get the repo if it exists, create it otherwise
the repo name is the last part of the url
"""
remote_name = remote_url.split("/")[-1]
for rem in repo.remotes:
if rem.name == remote_name:
return repo.remote(remote_name)
break
remote = repo.remote(remote_name)
remote.set_url(remote_url, allow_unsafe_protocols=True)
return remote
else:
return repo.create_remote(remote_name, remote_url)
return repo.create_remote(
remote_name, remote_url, allow_unsafe_protocols=True
)

def handle_commits(self, entity, clone_dir, remote_url, remote_target):
logger.info(f"Handle entity {entity.pushid}")
def handle_commits(
self, push_message: Push, clone_dir: Path, mapping: MappingConfig
):
logger.info(f"Handle entity {push_message.pushid}")
assert Path(clone_dir).exists(), f"clone {clone_dir} doesn't exists"
repo = Repo(clone_dir)
remote = self.get_remote(repo, remote_url)
remote = self.get_remote(repo, "git", mapping.git_repository)
# fetch new commits
remote.fetch()
match entity:
case Push():
for head in entity.heads:
remote.pull(head)
case _:
pass # TODO
# push on good repo/branch
remote = repo.remote(remote_target)
remote.push()
logger.info(f"Done for entity {entity.pushid}")
for branch_name, commit in push_message.branches.items():
remote.fetch(commit)
if branch_name in repo.branches:
branch = repo.branches[branch_name]
branch.commit = commit
else:
branch = repo.create_head(branch_name, commit)
breakpoint()
for rule_name, rule in mapping.rules.items():
if rule.branch_pattern == branch_name:
remote = self.get_remote(
repo, "hg", "hg::" + rule.mercurial_repository
)
remote.push(branch.name)

# match push_message:
# case Push():
# for head in push_message.heads:
# remote.pull(head)
# case _:
# pass # TODO
## push on good repo/branch
# remote = self.get_remote(repo, "hg", "hg::" + mapping.rules.mercurial_repository)
# remote.push(push_message.heads)
# logger.info(f"Done for entity {push_message.pushid}")

def sync(self, entity: Push | Tag) -> None:
repo_config = self._repos_config.get(entity.repo_url)
if not repo_config:
logger.warning(f"repo {entity.repo_url} is not supported yet")
if isinstance(entity, Tag):
logger.warning("Tag message not handled not implemented yet")
return

matching_mappings = [
(mapping_name, mapping)
for mapping_name, mapping in self._mappings.items()
if mapping.git_repository == entity.repo_url
]
if not matching_mappings:
logger.warning(f"No mapping found for git repository {entity.repo_url} ")
return

if len(matching_mappings) > 1:
logger.warning(f"No mapping found for git repository {entity.repo_url} ")
return

mapping_name, mapping = matching_mappings[0]
clone_directory = self._clones_directory / mapping_name
self.handle_commits(
entity, repo_config["clone"], entity.repo_url, repo_config["target"]
entity,
clone_directory,
mapping,
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "git-hg-sync"
readme = "README.md"
requires-python = ">=3.10"
version = "0.1"
dependencies = ['kombu', 'mozillapulse', 'GitPython', 'mozlog']
dependencies = ['kombu', 'mozillapulse', 'GitPython', 'mozlog', "pydantic"]

[tool.ruff]
line-length = 100
6 changes: 0 additions & 6 deletions repos.json

This file was deleted.

59 changes: 39 additions & 20 deletions tests/pulse_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from datetime import datetime
from pathlib import Path
import sys

import kombu

from git_hg_sync import config
from git_hg_sync.config import Config

HERE = Path(__file__).parent

Expand All @@ -14,13 +15,13 @@ def send_pulse_message(pulse_config, payload):
The Pulse message will be constructed from the specified payload
and sent to the requested exchange.
"""
userid = pulse_config["userid"]
password = pulse_config["password"]
routing_key = pulse_config["routing_key"]
host = pulse_config["host"]
port = pulse_config["port"]
exchange = pulse_config["exchange"]
queue = pulse_config["queue"]
userid = pulse_config.userid
password = pulse_config.password
routing_key = pulse_config.routing_key
host = pulse_config.host
port = pulse_config.port
exchange = pulse_config.exchange
queue = pulse_config.queue
print(f"connecting to pulse at {host}:{port} as {userid}")

connection = kombu.Connection(
Expand Down Expand Up @@ -64,15 +65,33 @@ def send_pulse_message(pulse_config, payload):


if __name__ == "__main__":
payload = {
"type": "tag",
"repo_url": "https://github.com/djangoliv/chatzilla.git",
"tag": "truc",
"commit": "88949ac3ad633e92cf52354d91857074e264ad12",
"time": 0,
"pushid": 0,
"user": "user",
"push_json_url": "push_json_url",
}
pulse_conf = config.get_pulse_config(HERE.parent / "config.ini")["pulse"]
send_pulse_message(pulse_conf, payload)
config = Config.from_file(HERE.parent / "config.toml")
config.mappings
message_type = sys.argv[1]
mapping = config.mappings[sys.argv[2]]
match message_type:
case "push":
payload = {
"type": "push",
"repo_url": mapping.git_repository,
"branches": {sys.argv[3]: sys.argv[4]},
"time": 0,
"pushid": 0,
"user": "user",
"push_json_url": "push_json_url",
}
case "tag":
payload = {
"type": "tag",
"repo_url": "/home/fbessou/dev/MOZI/fake-forge/git/chatzilla",
"tag": "tag",
"commit": "88949ac3ad633e92cf52354d91857074e264ad12",
"time": 0,
"pushid": 0,
"user": "user",
"push_json_url": "push_json_url",
}
case _:
raise NotImplementedError()
print(payload)
send_pulse_message(config.pulse, payload)
3 changes: 1 addition & 2 deletions tests/test_pulse_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ def raw_push_entity():
return {
"type": "push",
"repo_url": "repo_url",
"heads": ["head"],
"commits": ["commit"],
"branches": {"mybranch": "acommitsha"},
"time": 0,
"pushid": 0,
"user": "user",
Expand Down
Loading