In [1]:
# %load utils.py
from datetime import datetime

import ipaddress as ip

log_ts = None

def reset_delta():
    global log_ts
    
    ts = datetime.now()
    log_ts = [ts, ts]
    

# Logging utils
def log(msg, delta=True):
    global log_ts
    
    curr_ts = datetime.now()
    
    # init logging
    if log_ts is None:
        reset_delta()
        out(log_ts, "Logging initialized.", delta=False)
    
    log_ts[1] = curr_ts
    out(log_ts, msg, delta=delta)
       
    # set new logging timestamps
    log_ts = [log_ts[1], datetime.now()]
    
    
def out(ts, msg, delta=True):
    fmt = None
    args = None
    
    if delta:
        fmt = '[%s | +%s s] %s'
        args = (str(ts[1]), (ts[1] - ts[0]).total_seconds(), msg)
    else:
        fmt = '[%s] %s'
        args = (str(ts[1]), msg)
        
    print(fmt % args)
    

def str2ip(s):
    return int(ip.IPv4Address(s))

def ip2str(i):
    return str(ip.IPv4Address(i))

In [2]:
import findspark
findspark.init('/opt/spark')

from pyspark import SparkConf
from pyspark.sql import SparkSession

log('Finished imports.')

[2021-05-28 11:24:00.544335] Logging initialized.
[2021-05-28 11:24:00.544329 | +-6e-06 s] Finished imports.


In [3]:
# Some static variables

CA = 'campus'
CL = 'cloud'
RE = 'residential'

CA_CL = CA + CL
CA_RE = CA + RE
CL_RE = CL + RE
ALL = CA + CL + RE


In [4]:
# Init variables

SPARK_URI = "spark://10.10.10.80:7077"
SPARK_APP = "First steps research project"

"""
spark = SparkSession.builder \
    .master(SPARK_URI) \
    .appName(SPARK_APP) \
    .config(conf=SparkConf()) \
    .getOrCreate()
"""

spark = SparkSession.builder \
    .master(SPARK_URI) \
    .appName(SPARK_APP) \
    .config("spark.driver.host", "10.10.10.80") \
    .config("spark.jars", "postgresql-42.2.20.jar") \
    .getOrCreate()


log(f"Created Session: {str(spark)}")


[2021-05-28 11:24:10.926942 | +10.382613 s] Created Session: <pyspark.sql.session.SparkSession object at 0x7fe616efd0b8>


In [5]:
PG_URL="jdbc:postgresql://localhost:5432/honeypot"
PG_USER="max"
PG_PASS="K8yrQKMD151"

options = {
    "url" : PG_URL,
    "user" : PG_USER,
    "password" : PG_PASS,
    "driver" : "org.postgresql.Driver"
}

dfLog = spark.read.format("jdbc").options(
    dbtable="log",
    **options
    #url=PG_URL,
    #dbtable="log",
    #user=PG_USER,
    #password=PG_PASS,
    #driver="org.postgresql.Driver"
).load()

dfLog.printSchema()

dfDataplane = spark.read.format("jdbc").options(
    dbtable="dataplane",
    **options
).load()

dfDataplane.printSchema()

dfIp = spark.read.format("jdbc").options(
    dbtable="ip",
    **options
).load()

dfIp.printSchema()

#recs = dfLog.select('*').where("username='ubuntu'").collect()
#recs = dfLog.select('id', 'origin', 'origin_id', 'timestamp', 'category', 'ip', 'username').where("username='ubuntu'").collect()

#log(f"Lenght of records: {len(recs)}")

log("Loaded dataframes for log and dataplane.")

root
 |-- id: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- origin_id: integer (nullable = true)
 |-- sync_ts: timestamp (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- category: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- username: string (nullable = true)
 |-- raw: string (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- sync_ts: timestamp (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- asn: integer (nullable = true)
 |-- asname: string (nullable = true)
 |-- category: string (nullable = true)
 |-- ip: string (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- ip: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- org: string (nullable = true)
 |-- postal: string (nullable = true)
 |-- timezone: string (nullable = true)

[2021-05-28 11:24:17.41811

In [6]:

df_ip = {
    CA: dfLog.select("ip").where("origin='campus'").distinct(),
    CL: dfLog.select("ip").where("origin='cloud'").distinct(),
    RE: dfLog.select("ip").where("origin='residential'").distinct(),
}

df_ip[CA_CL] = df_ip[CA].intersect(df_ip[CL])
df_ip[CA_RE] = df_ip[CA].intersect(df_ip[RE])
df_ip[CL_RE] = df_ip[CL].intersect(df_ip[RE])
df_ip[ALL] = df_ip[CA_CL].intersect(df_ip[RE])

log(f"CAMPUS:                        {df_ip[CA].count()}")
log(f"CLOUD:                         {df_ip[CL].count()}")
log(f"RESIDENTIAL:                   {df_ip[RE].count()}")
log(f"CAMPUS + CLOUD:                {df_ip[CA_CL].count()}")
log(f"CAMPUS + RESIDENTIAL:          {df_ip[CA_RE].count()}")
log(f"CLOUD + RESIDENTIAL:           {df_ip[CL_RE].count()}")
log(f"CAMPUS + CLOUD + RESIDENTIAL:  {df_ip[ALL].count()}")




[2021-05-28 11:24:53.963663 | +36.545552 s] CAMPUS:                        2355
[2021-05-28 11:25:08.229606 | +14.265943 s] CLOUD:                         3396
[2021-05-28 11:25:16.925119 | +8.695513 s] RESIDENTIAL:                   1194
[2021-05-28 11:25:48.006859 | +31.08174 s] CAMPUS + CLOUD:                505
[2021-05-28 11:26:10.297593 | +22.290734 s] CAMPUS + RESIDENTIAL:          161
[2021-05-28 11:26:30.252701 | +19.955108 s] CLOUD + RESIDENTIAL:           241
[2021-05-28 11:26:57.257614 | +27.004913 s] CAMPUS + CLOUD + RESIDENTIAL:  124


In [7]:
for r in df_ip[ALL].collect():
    print(r.ip)

116.110.68.228
209.141.56.73
164.52.24.179
116.106.16.84
88.166.170.133
209.141.62.52
221.131.165.56
107.131.14.238
167.71.0.98
167.71.15.34
185.213.155.169
45.129.56.200
222.187.238.136
141.98.10.193
221.181.185.159
165.227.229.167
171.226.5.243
185.36.81.184
5.2.69.42
194.165.16.109
185.220.102.247
62.210.125.141
192.42.116.14
195.133.40.139
116.98.164.231
5.188.206.102
45.153.160.134
162.247.74.201
209.141.54.56
222.186.42.137
31.210.21.37
205.185.114.251
222.186.180.130
189.113.131.44
209.127.17.242
5.188.206.54
193.27.228.233
185.220.102.4
77.247.181.165
221.181.185.19
5.2.69.50
222.168.30.19
5.188.206.100
116.110.71.155
185.191.124.143
5.188.206.98
185.220.102.250
18.27.197.252
195.133.40.214
91.138.27.127
195.133.40.141
205.185.114.91
221.181.185.143
222.186.42.213
167.71.9.148
199.19.225.14
205.185.119.198
116.98.168.101
205.185.120.95
195.154.56.235
222.187.239.109
185.220.102.252
138.68.185.214
193.169.254.234
209.141.59.243
185.216.32.130
171.232.253.241
200.216.31.148
116.1