In [1]:
import gzip

from pathlib import Path

import pandas as pd
import altair as alt

import netaddr

In [2]:
from os import access, R_OK
from os.path import isfile
from io import StringIO
import geohash2
import logging
from geolite2 import geolite2
import re
from typing import List, Dict, Union
from hashlib import sha256
from base64 import b64encode

import datetime

Event = Dict[str,Union[str, Dict]]

LOG = logging.getLogger('rsyncstats')
LOG.setLevel(logging.INFO)
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)

# create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# add formatter to ch
ch.setFormatter(formatter)

# add ch to logger
LOG.addHandler(ch)

In [3]:
"""
Derived from the rsyncstats package [0], whose regex did not work with our rsync logs.

[0]: https://gitlab.com/resif/rsyncstats/-/tree/master
"""
def iterable_log(data: str):
    """
    Generator to iterate over lines in file or in string.
    Very nice.
    """
    if isfile(data) and access(data, R_OK):
        with open(data,'r') as loglines:
            for logline in loglines:
                yield logline.strip()
    else:
       # Consider data is the log lines to analyse
       for logline in data.split("\n"):
           yield logline

GLOBAL_PATTERN = re.compile(r'(?P<timestamp>20[0-9][0-9]/[0-9/]+ [012][0-9]:[0-5][0-9]:[0-5][0-9]) \[(?P<pid>[0-9]+)\] (?P<logtype>(rsync (to|on)|sent)) ((?P<sentbytes>[0-9]+) bytes\s+received (?P<receivedbytes>[0-9]+) bytes\s+total size (?P<totalbytes>[0-9]+)|(?P<module>[-\w_]+)(?P<directory>\/\S*) from (?P<hostname>\S+) \((?P<clientip>\S+)\))')
        
        
def parse_log(lines: str) -> List[Event]:
    """
    Read a rsync log file and parses information.
    Returns a list of events (dictionary)
    """
    fail_count = 0
    georeader = geolite2.reader()
    events = []
    events_buffer = {} # dict of events started but not ended. Key is the PID
    linecount = 0

    for log in lines:
        linecount +=1
        event = GLOBAL_PATTERN.search(log)
        if event == None:
            fail_count += 1
            LOG.debug("Ignoring log at %d : %s"%(linecount, log))
            if LOG.isEnabledFor(logging.DEBUG) and fail_count > 10:
                LOG.error("Aborting due to failed lines.")
                return
            continue
        event_data = event.groupdict()
        # store time as epoch
        event_data['timestamp'] = datetime.datetime.strptime(re.sub(' +', ' ', event_data['timestamp']), '%Y/%m/%d %H:%M:%S').strftime('%Y-%m-%d %H:%M:%S')
        # 2 possible logs are captured by the pattern : connection log and transfer log.
        if event_data['logtype'] == 'rsync to' or event_data['logtype'] == 'rsync on':
            location = georeader.get(event_data['clientip'])
            # hash location and get the city name
            if location != None and 'location' in location:
                event_data['geohash'] = geohash2.encode(location['location']['latitude'], location['location']['longitude'])
                try:
                    event_data['city'] = location['city']['names']['en']
                except KeyError:
                    event_data['city'] = ''
            else:
                event_data['geohash'] = 'u0h0fpnzj9ft'
                event_data['city'] = 'Grenoble'
            # hash hostname
            event_data['hosthash'] = b64encode(sha256(event_data['hostname'].encode()).digest())[:12].decode('utf-8') # overcomplicated oneliner to hash the hostname
            LOG.debug("Storing event in buffer (pid %s)"%(event_data['pid']))
            event_data = {k:event_data[k] for k in event_data if event_data[k] != None}
            events_buffer[event_data['pid']] = event_data
            LOG.debug(event_data)
        elif event_data['logtype'] == 'sent':
            event_data['endtime'] = event_data['timestamp']
            # get the data from the events_buffer and merge with what we have
            try:
                previous_data = events_buffer.pop(event_data['pid'])
                events.append({ **event_data, **previous_data })
            except KeyError as e:
                LOG.debug("Event will not be accounted : "+str(event_data))
    return(events)



In [4]:
rsync_logs = list(Path('../logs/rsync').glob("*/rsync*"))

In [5]:
# Do not print logs in a public workbook
#rsync_logs[0].open().readlines()[0:10]

In [6]:
def blind_mask(ip) -> str:
    addr = netaddr.IPAddress(ip)
    if addr.version == 4:
        return addr & netaddr.IPAddress('255.255.255.0')
    if addr.version == 6:
        return addr & netaddr.IPAddress('FFFF:FFFF:FFFF:FFFF::')

def parse_all_logs():
    rsync_logs = list(Path('../logs/rsync').glob("rsync*"))
    
    dfs = []
    for file in rsync_logs:
        LOG.info("Reading %s", file)
        if file.name.endswith('.gz'):
            with gzip.open(file, 'rt') as f:
                lines = parse_log(f.readlines())
        else:
            with file.open('rt') as f:
                lines = parse_log(f.readlines())
            
        df = pd.DataFrame(lines)
        df['file'] = file.name
        dfs.append(df)
        
    df = pd.concat(dfs)
    df.timestamp = pd.to_datetime(df.timestamp)
    df.totalbytes = df.totalbytes.astype(int)
    df.sentbytes = df.sentbytes.astype(int)
    df.receivedbytes = df.receivedbytes.astype(int)
    
    df['date'] = df.timestamp.dt.date
    df['masked_clientip'] = df.clientip.map(blind_mask)
    return df

In [7]:
df = parse_all_logs()

2021-05-12 15:32:38,608 - rsyncstats - INFO - Reading ../logs/rsync/rsyncd.log.7.gz


In [8]:
df.shape

(52908, 17)

In [9]:
by_5_minutes = df[['module', 'timestamp', 'totalbytes', 'file']].groupby(['file','module', pd.Grouper(key="timestamp", freq="5min")]).count().reset_index().set_index(['timestamp'])
by_5_minutes = by_5_minutes.between_time('6:00', '18:00').reset_index()

In [10]:
alt.Chart(by_5_minutes).transform_joinaggregate(
    order='count(*)',
    groupby=['module']
).mark_line().encode(
    x=alt.X(
        'timestamp:T',
        axis=alt.Axis(title='Time')
    ),
    y=alt.Y('totalbytes:Q',
            axis=alt.Axis(title='Number of clients')
    ),
    color=alt.Color(
        'module:N',
        legend=alt.Legend(title='rsync module')
    ),
    row='file:N'
).properties(
    title='Number of connecting rsync clients over time'
)

#         overall_area = alt.Chart(period_data).transform_joinaggregate(
#             order='sum(c-ip)',
#             groupby=['rp_software']
#         ).mark_area().encode(
#             x=alt.X('datetime:T', axis=alt.Axis(title='Time')),
#             y=alt.Y("c-ip:Q",
#               scale=alt.Scale(domain=(0, 1.1*df_summed['c-ip'].max())),
#               axis=alt.Axis(title='Number of clients')
#             ),    
#             color=alt.Color('rp_software:N',
#               scale=alt.Scale(domain=color_range),
#               legend=alt.Legend(title='Relying Party implementation')
#             ),
#             order=alt.Order('order:Q', sort='descending')
#         ).properties(
#             title=f'Number of unique IPs by RP software ({prefix}{period_title})'
#         )

# Note that this assumes logfiles are available for the dates in the code!

Furthermore: You likely want to group by a longer period than 1h.

In [11]:
by_ip = df[['timestamp', 'module', 'clientip']].drop_duplicates().groupby([pd.Grouper(key="timestamp", freq="1H"), 'module']).count().rename(columns={'clientip': 'unique_ips'})

by_mask = df[['timestamp', 'module', 'masked_clientip']].drop_duplicates().groupby([pd.Grouper(key="timestamp", freq="1H"), 'module']).count().rename(columns={'masked_clientip': 'per_24_or_64'})

ips_by_date = pd.concat([by_ip, by_mask], axis=1).reset_index()

In [12]:
def stats_by_week(df):
    LOG.info("This may be broken because it's mid-refactor between date and timestamp")
    first_week = ((df.timestamp >= datetime.datetime(year=2021, month=3, day=26)) & (df.timestamp < datetime.datetime(year=2021, month=4, day=2)))
    last_week = ((df.timestamp >= datetime.datetime(year=2021, month=4, day=2)) & (df.timestamp < datetime.datetime(year=2021, month=4, day=9)))
    
    display(df[first_week].groupby(['module']).mean())
    display(df[last_week].groupby(['module']).mean())

In [13]:
stats_by_week(ips_by_date)

2021-05-12 15:32:58,307 - rsyncstats - INFO - This may be broken because it's mid-refactor between date and timestamp


Unnamed: 0_level_0,unique_ips,per_24_or_64
module,Unnamed: 1_level_1,Unnamed: 2_level_1


Unnamed: 0_level_0,unique_ips,per_24_or_64
module,Unnamed: 1_level_1,Unnamed: 2_level_1


Because of the structure of our repository we can split out clients connecting over rsync to retrieve the trust anchor from those connecting to the main repository.

We do see a change on the 2nd of April so I'm providing data both for the week before and after this date.

In the week leading up to the 2nd of april, on average per dag we see:
  * 192 unique IPs (from 182 /24's/64's) creating 8636 connections to /repository
  * 911 unique IPs (from 721 /24's/64's) creating 81855 connections to /ta
In the week starting on the 2nd of april on average per day we see:
  * 598 unique IPs (from 582 /24's/64's) creating 17594 connections to /repository
  * 1301 unique IPs (from 1114 /24's/64's) creating 89675 connections to /ta
  
We see ~1086 unique IPs accessing the TA certificate over HTTPS per day.

In [14]:
stats_by_week(df[['timestamp', 'module', 'clientip']].groupby([pd.Grouper(key='timestamp', freq='1d'), 'module']).count().reset_index())

2021-05-12 15:32:58,369 - rsyncstats - INFO - This may be broken because it's mid-refactor between date and timestamp


Unnamed: 0_level_0,clientip
module,Unnamed: 1_level_1


Unnamed: 0_level_0,clientip
module,Unnamed: 1_level_1


In [15]:
df.totalbytes.astype(int).sum()

1075693423678

In [16]:
traffic_by_day = df[['timestamp', 'receivedbytes', 'sentbytes', 'totalbytes']].groupby([pd.Grouper(key="timestamp", freq="1H")]).sum().reset_index()
#traffic_by_day = traffic_by_day[traffic_by_day.timestamp <= datetime.datetime(year=2021, month=4, day=13)]

In [17]:
traffic_by_day

Unnamed: 0,timestamp,receivedbytes,sentbytes,totalbytes
0,2021-05-05 03:00:00,52216592,3580178072,25112338250
1,2021-05-05 04:00:00,70188834,4950026547,43663783339
2,2021-05-05 05:00:00,86375488,5617980832,43675446252
3,2021-05-05 06:00:00,100732378,6289180834,44462386201
4,2021-05-05 07:00:00,108864813,6731189202,43227341643
5,2021-05-05 08:00:00,101913718,6133358500,41887338739
6,2021-05-05 09:00:00,132488729,7705188688,46002098268
7,2021-05-05 10:00:00,98290767,5966936213,44458956152
8,2021-05-05 11:00:00,77231371,5451907745,45800438247
9,2021-05-05 12:00:00,79530931,5472386352,44458207220


In [18]:
traffic_by_day_long = pd.melt(traffic_by_day, id_vars='timestamp', value_vars=['receivedbytes', 'sentbytes', 'totalbytes'], var_name='traffic')

alt.Chart(traffic_by_day_long).mark_line().encode(
    x='timestamp:T',
    y='value:Q',
    color='traffic:N'
)

# Start of some code that works with riswhois
## TODO: extract into library!

In [19]:
import bz2
import json
import io
import ipaddress
import logging

import altair as alt
import pandas as pd
import pytricia
import requests

from typing import Generator, NamedTuple, Optional, Set

from pandas.api.types import CategoricalDtype
from pandas.core.series import Series

LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)

In [20]:
class RouteOriginAuthorization(NamedTuple):
    asn: int
    prefix: str
    max_length: int

    prefix_length: Optional[int] = None
        
class ExpandedRisEntry(NamedTuple):
    origin: str
    prefix: str
    seen_by_peers: int
    prefix_length: int
    roa_validity: str

In [21]:
# https://www.ris.ripe.net/dumps/riswhoisdump.IPv4.gz
# https://www.ris.ripe.net/dumps/riswhoisdump.IPv6.gz
ris_v4_gz = requests.get("https://www.ris.ripe.net/dumps/riswhoisdump.IPv4.gz").content
ris_v6_gz = requests.get("https://www.ris.ripe.net/dumps/riswhoisdump.IPv6.gz").content

def read_ris_dump(url: str) -> pd.DataFrame:
    # Get file, accept that there are comment lines in there
    df = pd.read_csv(url,
                     compression='gzip',
                     sep="\t",
                     names=["origin", "prefix", "seen_by_peers"]
    )
    
    if df.origin.str.startswith('{').any():
        LOG.error("RIS dump contains row(s) with AS_SET! These will never be RPKI valid (https://tools.ietf.org/html/rfc6907#section-7.1.8)")
    # select the rows that do not have the '%' prefix
    df = df[~df.origin.str.startswith('%')].copy()
    
    # separate prefix length
    df['prefix_length'] = df.prefix.map(lambda p: ipaddress.ip_network(p).prefixlen)
    
    return df

In [22]:
ris_v4 = read_ris_dump("https://www.ris.ripe.net/dumps/riswhoisdump.IPv4.gz")
ris_v6 = read_ris_dump("https://www.ris.ripe.net/dumps/riswhoisdump.IPv6.gz")

RIS dump contains row(s) with AS_SET! These will never be RPKI valid (https://tools.ietf.org/html/rfc6907#section-7.1.8)
RIS dump contains row(s) with AS_SET! These will never be RPKI valid (https://tools.ietf.org/html/rfc6907#section-7.1.8)


In [23]:
class RisWhoisLookup:
    trie: pytricia.PyTricia
        
    def __init__(self, data: pd.DataFrame, visibility_threshold : int = 10) -> None:
        af = data.prefix.apply(lambda p: ipaddress.ip_network(p).version)
        assert af.nunique() == 1
        length = 128 if af.unique()[0] == 6 else 32
        
        self.trie = pytricia.PyTricia(length)
        data[data.seen_by_peers >= visibility_threshold].apply(self.__build_trie, axis=1)
        
    def __build_trie(self, row: Series) -> None:
        if not self.trie.has_key(row.prefix):
            # Add entry
            self.trie[row.prefix] = set()
            
        self.trie[row.prefix].add(
            ExpandedRisEntry(row.origin, row.prefix, row.seen_by_peers, row.prefix_length, row.roa_validity)
        )
       
    def lookup(self, prefix) -> Generator[ExpandedRisEntry, None, None]:
        key = self.trie.get_key(prefix)
        while key is not None:
            yield from self.trie[key]
            key = self.trie.parent(key)
            
    def __getitem__(self, prefix) -> Set[ExpandedRisEntry]:
        return set(self.lookup(prefix))

In [24]:
# And build patricia trie
ris_v4['roa_validity'] = ''
ris_v6['roa_validity'] = ''

ris_v4_lookup = RisWhoisLookup(ris_v4)
ris_v6_lookup = RisWhoisLookup(ris_v6)

In [25]:
def lookup_afi(ip_str) -> Generator[ExpandedRisEntry, None, None]:
    ip = netaddr.IPAddress(ip_str)
    if ip.version == 4:
        return ris_v4_lookup.lookup(ip_str)
    else:
        return ris_v6_lookup.lookup(ip_str)
    

def origin_as(ip_str) -> str:
    try:
        return next(iter(sorted(lookup_afi(ip_str)))).origin
    except:
        return -1
    
def origin_prefix(ip_str) -> str:
    try:
        return next(iter(sorted(lookup_afi(ip_str)))).prefix
    except:
        return -1

In [26]:
df['origin_as'] = df.clientip.map(origin_as)
df['origin_prefix'] = df.clientip.map(origin_prefix)


In [27]:
# First group by hour, then actually count the number of different origin as|prefixes
by_origin_as = df[['timestamp', 'module', 'origin_as']]\
    .groupby([pd.Grouper(key="timestamp", freq="1H"), 'module', 'origin_as']).count()\
    .reset_index()\
    .drop_duplicates()\
    .groupby([pd.Grouper(key="timestamp", freq="1H"), 'module'])\
    .count()\
    .rename(columns={'origin_as': 'origin_as'})

by_origin_prefix = df[['timestamp', 'module', 'origin_prefix']]\
    .groupby([pd.Grouper(key="timestamp", freq="1H"), 'module', 'origin_prefix']).count()\
    .reset_index()\
    .drop_duplicates()\
    .groupby([pd.Grouper(key="timestamp", freq="1H"), 'module'])\
    .count()\
    .rename(columns={'origin_prefix': 'origin_prefix'})

ips_by_date = pd.concat([by_ip, by_mask, by_origin_as, by_origin_prefix], axis=1).reset_index()

In [28]:
ips_by_date_long = pd.melt(ips_by_date,
                           id_vars=['timestamp', 'module'],
                           value_vars=['unique_ips', 'per_24_or_64', 'origin_as', 'origin_prefix'],
                           var_name='type')

alt.Chart(ips_by_date_long).mark_line().encode(
    x='timestamp:T',
    y='value:Q',
    color='type:N',
    row='module'
)

In [29]:
stats_by_week(ips_by_date_long)

Unnamed: 0_level_0,value
module,Unnamed: 1_level_1


Unnamed: 0_level_0,value
module,Unnamed: 1_level_1
