In [49]:
import glob
import concurrent.futures

from dataclasses import dataclass
from typing import Optional
from pathlib import Path

import sys
sys.path.insert(0, '..')

from rrdp_tools.rrdp import parse_snapshot_or_delta, UnexpectedDocumentException, ValidationException

import pandas as pd
import dataclasses
import requests
import pathlib
import urllib
import re

In [2]:
files = glob.glob("/srv/timemachine/bulk/**/*/*.xml")

In [51]:
FILE_HASH_RE = re.compile(r'[0-9]+\.xml')

In [53]:
files_without_hash = [f for f in  files if FILE_HASH_RE.match(Path(f).name)]
display(len(files_without_hash))
display(len(files))

40508

185045

In [4]:
@dataclass
class FileResult:
    file: str
    status: str
    serial: Optional[int] = None
    session: Optional[str] = None
    num_entries: int = 0
    first_uri: Optional[str] = None


def process_delta(file):
    try:
        parsed = parse_snapshot_or_delta(file)
        return FileResult(file, "parsed", parsed.serial, parsed.session_id, len(parsed.content), parsed.content[0].uri)
    except UnexpectedDocumentException:
        return FileResult(file, "unexpected_document")
    except ValidationException:
        return FileResult(file, "invalid_xml_schema")


def extract_host(url: Optional[str]) -> str:
    try:
        parsed = urllib.parse.urlparse(url)
        return parsed.hostname
    except:
        return None

Process all the deltas in the archive to see how many cases of >1 file for a serial are present. This will only work with the deltas that have a hash in their filename.

As another efect this will give some statistics on delta content size.

In [54]:
result = []
target_files = list(set(files) - set(files_without_hash))
display(f"Number of target files: {len(target_files)} out of {len(files)}")

with concurrent.futures.ProcessPoolExecutor() as executor:
    result = list(executor.map(process_delta, target_files))

df = pd.DataFrame(result)
df['uri'] = df.first_uri.apply(extract_host)
df['dir'] = df.file.apply(lambda x: x.split("bulk/")[1].split("/")[0])


'Number of target files: 144537 out of 185045'

In [74]:
df

Unnamed: 0,file,status,serial,session,num_entries,first_uri,uri,dir
0,/srv/timemachine/bulk/files-paas/1c33ba5d-4e16...,parsed,71090.0,1c33ba5d-4e16-448d-9a22-b12599ef1cba,2,rsync://rsync.paas.rpki.ripe.net/repository/7b...,rsync.paas.rpki.ripe.net,files-paas
1,/srv/timemachine/bulk/files-lacnic/61a52eef-1b...,parsed,68.0,61a52eef-1ba9-4a96-8995-bc9437c61355,162,rsync://repository.lacnic.net/rpki/lacnic/241b...,repository.lacnic.net,files-lacnic
2,/srv/timemachine/bulk/files-apnic/2451f731-01a...,parsed,90407.0,2451f731-01a7-4188-801b-6d6ee66fca1d,12,rsync://rpki.apnic.net/member_repository/A916E...,rpki.apnic.net,files-apnic
3,/srv/timemachine/bulk/files-paas/1c33ba5d-4e16...,parsed,64262.0,1c33ba5d-4e16-448d-9a22-b12599ef1cba,2,rsync://rsync.paas.rpki.ripe.net/repository/fc...,rsync.paas.rpki.ripe.net,files-paas
4,/srv/timemachine/bulk/files-lacnic/f4df615d-df...,parsed,5.0,f4df615d-dff2-480b-93ed-e0fecda07c36,820,rsync://repository.lacnic.net/rpki/lacnic/cdbf...,repository.lacnic.net,files-lacnic
...,...,...,...,...,...,...,...,...
144532,/srv/timemachine/bulk/files-nicbr/aa4bfebe-4ea...,parsed,327198.0,aa4bfebe-4ea2-44f5-9da7-9492753e8e98,6,rsync://rpki-repo.registro.br/repo/9SGb9oSxuzE...,rpki-repo.registro.br,files-nicbr
144533,/srv/timemachine/bulk/files-nicbr/aa4bfebe-4ea...,parsed,293055.0,aa4bfebe-4ea2-44f5-9da7-9492753e8e98,6,rsync://rpki-repo.registro.br/repo/G95uSNd8BkH...,rpki-repo.registro.br,files-nicbr
144534,/srv/timemachine/bulk/files-paas/1c33ba5d-4e16...,parsed,66494.0,1c33ba5d-4e16-448d-9a22-b12599ef1cba,2,rsync://rsync.paas.rpki.ripe.net/repository/83...,rsync.paas.rpki.ripe.net,files-paas
144535,/srv/timemachine/bulk/files-nicbr/aa4bfebe-4ea...,parsed,330617.0,aa4bfebe-4ea2-44f5-9da7-9492753e8e98,7,rsync://rpki-repo.registro.br/repo/nicbr_repo/...,rpki-repo.registro.br,files-nicbr


In [83]:
df_cnt = df.groupby(['dir', 'uri', 'session', 'serial']).count()

In [89]:
df_doubles = df_cnt.reset_index()[df_cnt.reset_index().file > 1]
display(df_doubles)

Unnamed: 0,dir,uri,session,serial,file,status,num_entries,first_uri


In [85]:
df_doubles.groupby(['uri', 'session']).count()

Unnamed: 0_level_0,Unnamed: 1_level_0,dir,serial,file,status,num_entries,first_uri
uri,session,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1


In [86]:
display(df.dtypes)
display(df_doubles.dtypes)

file            object
status          object
serial         float64
session         object
num_entries      int64
first_uri       object
uri             object
dir             object
dtype: object

dir             object
uri             object
session         object
serial         float64
file             int64
status           int64
num_entries      int64
first_uri        int64
dtype: object

In [87]:
df_duplicate_deltas = df_doubles.merge(df, on=['dir', 'uri', 'session', 'serial'], how='left')
display(df_duplicate_deltas)

Unnamed: 0,file_x,status_x,num_entries_x,first_uri_x,file_y,status_y,serial,session,num_entries_y,first_uri_y,uri,dir


In [88]:
display(df_duplicate_deltas[df_duplicate_deltas.uri == 'rsync.paas.rpki.prepdev.ripe.net'].file_y)

Series([], Name: file_y, dtype: object)

In [71]:
df_duplicate_deltas.sort_values(['session', 'serial']).to_csv("duplicate-hash.csv")