In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, StructType, BooleanType, ArrayType
from pyspark.sql.functions import *
import pyspark
from pyspark.sql.functions import struct
from pyspark.sql.functions import monotonically_increasing_id
import pyspark.sql.functions as f
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
import re
import ipaddress #import ip_network, ip_address
from ipaddress import ip_network, ip_address
import time
from pyspark.sql import Row
import pandas as pd
#from netaddr import *
#import geoip2.database
#import ipaddr
#Starten ohne Spark-Cluster
spark = SparkSession.builder.getOrCreate()

In [2]:
# Verbindung zu Minio
spark.sparkContext.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "minio")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "minio123")
spark.sparkContext._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://minio1:9000")

In [3]:
schema = StructType([
    StructField("Top level domain", StringType(), True),
    StructField("MX-Record from the name server", StringType(), True),
    StructField("A-Record of the specific domain", StringType(), True),
    StructField("Timestamp", StringType(), True)])

df = spark.read.csv("s3a://bucket/real_new.csv",header=False,sep=";",schema=schema)

# Datenbeschreibung

## real_domains

### Allgemein

In [4]:
df.show(5)

+----------------+------------------------------+-------------------------------+-------------------+
|Top level domain|MX-Record from the name server|A-Record of the specific domain|          Timestamp|
+----------------+------------------------------+-------------------------------+-------------------+
|         0--1.de|          "[""mail.0--1.de"...|           "[""46.38.249.145...|2020-12-13 15:36:05|
|         0--2.de|          "[""mxf993.netcup...|           "[""212.227.212.1...|2020-12-13 15:36:05|
|    0-0-0-0-0.de|          "[""smtp-02.tld.t...|           "[""80.150.6.143""]"|2020-12-13 15:36:05|
|      0-0-0-1.de|          "[""smtp-02.tld.t...|           "[""80.150.6.143""]"|2020-12-13 15:36:05|
|          0-0.de|                            []|           "[""185.53.178.13...|2020-12-13 15:36:05|
+----------------+------------------------------+-------------------------------+-------------------+
only showing top 5 rows



* Top level domain: Namensserver; das, was Personen sich merken können
* Ein MX Resource Record weist einem Namen einen Mailserver zu. Er stellt eine Besonderheit dar, da er sich auf einen speziellen Dienst im Internet, nämlich die E-Mailzustellung mittels SMTP, bezieht. Alle anderen Dienste nutzen CNAME, A und AAAA Resource Records für die Namensauflösung.
* Ein A Resource Record weist einem Namen eine IPv4-Adresse zu.

In [5]:
df= df.withColumn("id", monotonically_increasing_id())

In [6]:
df=df.select("Top level domain","MX-Record from the name server","A-Record of the specific domain")

* die Spalte Timestamp wird nicht benötigt

In [7]:
df=df.withColumnRenamed("MX-Record from the name server", "MX-Record")

In [8]:
df=df.withColumnRenamed("A-Record of the specific domain", "A-Record")

In [9]:
type(df)

pyspark.sql.dataframe.DataFrame

In [10]:
df.count()

4860885

In [11]:
df.dropDuplicates().count()

4860885

* es gibt keine Duplikate in dem Datensatz

In [12]:
#df.filter(df["MX-Record"].contains(']')).count()

* es gibt 3 Zeilen, wo die Aufzeichnung von MX Records abgebrochen wurde

In [13]:
#df=df.filter(df["MX-Record"].contains(']'))

* diese Zeilen werden entfernt, da sie sich nicht reparieren lassen

In [14]:
df.printSchema()

root
 |-- Top level domain: string (nullable = true)
 |-- MX-Record: string (nullable = true)
 |-- A-Record: string (nullable = true)



### Top Level Domain

In [15]:
df.filter(df["Top level domain"]=="null").count()

0

* keine Null Values

In [16]:
dfTCount=df.groupBy("Top level domain").count()
dfTCount.orderBy(col("count").desc()).show(5)

+----------------+-----+
|Top level domain|count|
+----------------+-----+
|         0001.de|    1|
|         0068.de|    1|
|  01705852694.de|    1|
|         01pc.de|    1|
|        02045.de|    1|
+----------------+-----+
only showing top 5 rows



* es gibt keine doppelte TLD

### MX Records

In [17]:
df.filter(df["MX-Record"]=="null").count()

0

* keine Null Values

In [18]:
dfMxCount=df.groupBy("MX-Record").count()
dfMxCount.orderBy(col("count").desc()).show(5,False)

+-----------------------------------------------------+------+
|MX-Record                                            |count |
+-----------------------------------------------------+------+
|[]                                                   |633040|
|"[""smtpin.rzone.de""]"                              |509127|
|"[""localhost""]"                                    |303839|
|"[""mx01.kundenserver.de"",""mx00.kundenserver.de""]"|176491|
|"[""mx00.kundenserver.de"",""mx01.kundenserver.de""]"|176182|
+-----------------------------------------------------+------+
only showing top 5 rows



### A Records

In [10]:
df.filter(df["A-Record"]=="null").count()

10732

* es gibt 10.732 Null Values
* diese können mit [] ersetzt werde

In [9]:
df=df.na.replace({"null": "[]"})

In [10]:
dfACount=df.groupBy("A-Record").count()
dfACount.orderBy(col("count").desc()).show(5, False)

+----------------------+------+
|A-Record              |count |
+----------------------+------+
|"[""91.195.241.137""]"|303204|
|[]                    |172019|
|"[""80.150.6.143""]"  |150156|
|"[""23.236.62.147""]" |59034 |
|"[""52.58.78.16""]"   |49200 |
+----------------------+------+
only showing top 5 rows



## asn ip4 Blocks

### Allgemein

In [12]:
schema_blocks = StructType([
    StructField("network", StringType(), True),
    StructField("autonomous_system_number", IntegerType(), True),
    StructField("autonomous_system_organization", StringType(), True)
])

df_asn_blocks_ipv4 = spark.read.csv("s3a://bucket/GeoLite2-ASN-Blocks-IPv4.csv", header=True, schema=schema_blocks).cache()

In [13]:
df_asn_blocks_ipv4.show(5)

+------------+------------------------+------------------------------+
|     network|autonomous_system_number|autonomous_system_organization|
+------------+------------------------+------------------------------+
|  1.0.0.0/24|                   13335|                 CLOUDFLARENET|
|  1.0.4.0/22|                   38803|          Wirefreebroadband...|
| 1.0.64.0/18|                   18144|          Energia Communica...|
|1.0.128.0/17|                   23969|          TOT Public Compan...|
|  1.1.1.0/24|                   13335|                 CLOUDFLARENET|
+------------+------------------------+------------------------------+
only showing top 5 rows



In [16]:
df_asn_blocks_ipv4.count()

461902

In [15]:
df_asn_blocks_ipv4.dropDuplicates().count()

461902

* es gibt keine Duplikate in dem Datensatz

In [16]:
df_asn_blocks_ipv4.printSchema()

root
 |-- network: string (nullable = true)
 |-- autonomous_system_number: integer (nullable = true)
 |-- autonomous_system_organization: string (nullable = true)



### Anlayse der Spalten

In [27]:
df_asn_blocks_ipv4.filter(df_asn_blocks_ipv4["network"]=="null").count()

0

* keine Null Values

In [28]:
df_asn_blocks_ipv4.filter(df_asn_blocks_ipv4["autonomous_system_number"]=="null").count()

0

* keine Null Values

In [29]:
df_asn_blocks_ipv4.filter(df_asn_blocks_ipv4["autonomous_system_organization"]=="null").count()

0

* keine Null Values

## GeoLite2-Country-Locations-en

In [30]:
schema_country = StructType([
    StructField("geoname_id", IntegerType(), True),
    StructField("locale_code", StringType(), True),
    StructField("continent_code", StringType(), True),
    StructField("continent_name", StringType(), True),
    StructField("country_iso_code", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("is_in_european_union", StringType(), True)
])

df_country = spark.read.csv("s3a://bucket/GeoLite2-Country-Locations-en.csv", header=True, schema=schema_country).cache()

In [31]:
df_country.show(5)

+----------+-----------+--------------+--------------+----------------+------------+--------------------+
|geoname_id|locale_code|continent_code|continent_name|country_iso_code|country_name|is_in_european_union|
+----------+-----------+--------------+--------------+----------------+------------+--------------------+
|     49518|         en|            AF|        Africa|              RW|      Rwanda|                   0|
|     51537|         en|            AF|        Africa|              SO|     Somalia|                   0|
|     69543|         en|            AS|          Asia|              YE|       Yemen|                   0|
|     99237|         en|            AS|          Asia|              IQ|        Iraq|                   0|
|    102358|         en|            AS|          Asia|              SA|Saudi Arabia|                   0|
+----------+-----------+--------------+--------------+----------------+------------+--------------------+
only showing top 5 rows



In [32]:
df_country.printSchema()

root
 |-- geoname_id: integer (nullable = true)
 |-- locale_code: string (nullable = true)
 |-- continent_code: string (nullable = true)
 |-- continent_name: string (nullable = true)
 |-- country_iso_code: string (nullable = true)
 |-- country_name: string (nullable = true)
 |-- is_in_european_union: string (nullable = true)



In [33]:
df_country.count()

252

In [34]:
df_country.dropDuplicates().count()

252

* es gibt keine Duplikate in dem Datensatz

In [35]:
df_country.filter(df_country["country_name"]=="null").count()

0

* keine Null Values

## GeoLite2-City-Locations-en

In [36]:
schema_city = StructType([
    StructField("geoname_id", IntegerType(), True),
    StructField("locale_code", StringType(), True),
    StructField("continent_code", StringType(), True),
    StructField("continent_name", StringType(), True),
    StructField("country_iso_code", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("subdivision_1_iso_code", StringType(), True),
    StructField("subdivision_1_name", StringType(), True),
    StructField("subdivision_2_iso_code", StringType(), True),
    StructField("subdivision_2_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("metro_code", StringType(), True),
    StructField("time_zone", StringType(), True),
    StructField("is_in_european_union", StringType(), True)
])

df_city = spark.read.csv("s3a://bucket/GeoLite2-City-Locations-en.csv", header=True, schema = schema_city).cache()

In [37]:
df_city.show(5)

+----------+-----------+--------------+--------------+----------------+------------+----------------------+------------------+----------------------+------------------+---------+----------+----------------+--------------------+
|geoname_id|locale_code|continent_code|continent_name|country_iso_code|country_name|subdivision_1_iso_code|subdivision_1_name|subdivision_2_iso_code|subdivision_2_name|city_name|metro_code|       time_zone|is_in_european_union|
+----------+-----------+--------------+--------------+----------------+------------+----------------------+------------------+----------------------+------------------+---------+----------+----------------+--------------------+
|     49518|         en|            AF|        Africa|              RW|      Rwanda|                  null|              null|                  null|              null|     null|      null|   Africa/Kigali|                   0|
|     49747|         en|            AF|        Africa|              SO|     Somalia|    

In [38]:
df_city.printSchema()

root
 |-- geoname_id: integer (nullable = true)
 |-- locale_code: string (nullable = true)
 |-- continent_code: string (nullable = true)
 |-- continent_name: string (nullable = true)
 |-- country_iso_code: string (nullable = true)
 |-- country_name: string (nullable = true)
 |-- subdivision_1_iso_code: string (nullable = true)
 |-- subdivision_1_name: string (nullable = true)
 |-- subdivision_2_iso_code: string (nullable = true)
 |-- subdivision_2_name: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- metro_code: string (nullable = true)
 |-- time_zone: string (nullable = true)
 |-- is_in_european_union: string (nullable = true)



In [39]:
df_city.count()

121754

In [40]:
df_city.dropDuplicates().count()

121754

* es gibt keine Duplikate in dem Datensatz

In [41]:
df_city.filter(df_city["city_name"]=="null").count()

0

* Keine Null Values

# Data Cleaning

## Entfernen von Sonderzeichen im Datensatz real Domains

In [14]:
commaRep = udf(lambda x: re.sub('"','', x))
commaRep2 = udf(lambda x: re.sub(']','', x))
commaRep3 = udf(lambda x: re.sub('\[','', x))
commaRep5 = udf(lambda x: re.sub('\.','', x))

In [15]:
df_MXClean=df.withColumn('MX-Record',commaRep('MX-Record'))
df_MXClean=df_MXClean.withColumn('MX-Record',commaRep2('MX-Record'))
df_MXClean=df_MXClean.withColumn('MX-Record',commaRep3('MX-Record'))
df_MXClean.show(3, False)

+----------------+-----------------------------------------------+-----------------------+
|Top level domain|MX-Record                                      |A-Record               |
+----------------+-----------------------------------------------+-----------------------+
|0--1.de         |mail.0--1.de,mxf993.netcup.net                 |"[""46.38.249.145""]"  |
|0--2.de         |mxf993.netcup.net,mail.0--2.de                 |"[""212.227.212.163""]"|
|0-0-0-0-0.de    |smtp-02.tld.t-online.de,smtp-01.tld.t-online.de|"[""80.150.6.143""]"   |
+----------------+-----------------------------------------------+-----------------------+
only showing top 3 rows



In [16]:
df_AClean=df.withColumn('A-Record',commaRep('A-Record'))
df_AClean=df_AClean.withColumn('A-Record',commaRep2('A-Record'))
df_AClean=df_AClean.withColumn('A-Record',commaRep3('A-Record'))
df_AClean.show(3, False)

+----------------+-----------------------------------------------------------+---------------+
|Top level domain|MX-Record                                                  |A-Record       |
+----------------+-----------------------------------------------------------+---------------+
|0--1.de         |"[""mail.0--1.de"",""mxf993.netcup.net""]"                 |46.38.249.145  |
|0--2.de         |"[""mxf993.netcup.net"",""mail.0--2.de""]"                 |212.227.212.163|
|0-0-0-0-0.de    |"[""smtp-02.tld.t-online.de"",""smtp-01.tld.t-online.de""]"|80.150.6.143   |
+----------------+-----------------------------------------------------------+---------------+
only showing top 3 rows



 ## Splitten der Spalten im Datensatz real_domains

In [17]:
df_MXSplit=df_MXClean.select(
        "Top level domain", "A-Record",
        f.split("MX-Record", ",").alias("MX-Record"),
        f.posexplode(f.split("MX-Record", ",")).alias("pos_MX", "val_MX")
    )
df_MXSplit.show(5)

+----------------+--------------------+--------------------+------+--------------------+
|Top level domain|            A-Record|           MX-Record|pos_MX|              val_MX|
+----------------+--------------------+--------------------+------+--------------------+
|         0--1.de|"[""46.38.249.145...|[mail.0--1.de, mx...|     0|        mail.0--1.de|
|         0--1.de|"[""46.38.249.145...|[mail.0--1.de, mx...|     1|   mxf993.netcup.net|
|         0--2.de|"[""212.227.212.1...|[mxf993.netcup.ne...|     0|   mxf993.netcup.net|
|         0--2.de|"[""212.227.212.1...|[mxf993.netcup.ne...|     1|        mail.0--2.de|
|    0-0-0-0-0.de|"[""80.150.6.143""]"|[smtp-02.tld.t-on...|     0|smtp-02.tld.t-onl...|
+----------------+--------------------+--------------------+------+--------------------+
only showing top 5 rows



In [18]:
df_ASplit=df_AClean.select(
        "Top level domain", "MX-Record",
        f.split("A-Record", ", ").alias("A-Record"),
        f.posexplode(f.split("A-Record", ",")).alias("pos_A", "val_A")
    )
df_ASplit.show(20)

+-------------------+--------------------+-----------------+-----+---------------+
|   Top level domain|           MX-Record|         A-Record|pos_A|          val_A|
+-------------------+--------------------+-----------------+-----+---------------+
|            0--1.de|"[""mail.0--1.de"...|  [46.38.249.145]|    0|  46.38.249.145|
|            0--2.de|"[""mxf993.netcup...|[212.227.212.163]|    0|212.227.212.163|
|       0-0-0-0-0.de|"[""smtp-02.tld.t...|   [80.150.6.143]|    0|   80.150.6.143|
|         0-0-0-1.de|"[""smtp-02.tld.t...|   [80.150.6.143]|    0|   80.150.6.143|
|             0-0.de|                  []|  [185.53.178.13]|    0|  185.53.178.13|
|            0-01.de|"[""mail.0-01.de""]"| [193.34.145.200]|    0| 193.34.145.200|
|             0-1.de|   "[""localhost""]"| [91.195.241.137]|    0| 91.195.241.137|
|            0-10.de|                  []|               []|    0|               |
|          0-1000.de|   "[""localhost""]"| [91.195.241.137]|    0| 91.195.241.137|
|   

In [19]:
df_ASplit.count()

5540060

In [20]:
df_ASplit=df_ASplit.filter(df_ASplit.val_A!="")

In [21]:
df_ASplit.count()

5368041

In [23]:
df_ASplit.printSchema()

root
 |-- Top level domain: string (nullable = true)
 |-- MX-Record: string (nullable = true)
 |-- A-Record: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- pos_A: integer (nullable = false)
 |-- val_A: string (nullable = true)



## Entfernen von Sonderzeichen im Datensatz asn ip4 Blocks

In [22]:
split_col = f.split(df_asn_blocks_ipv4['network'], '/')
df_asn_blocks_ipv4 = df_asn_blocks_ipv4.withColumn('network_adress', split_col.getItem(0))
df_asn_blocks_ipv4 = df_asn_blocks_ipv4.withColumn('subnetzmaske', split_col.getItem(1))

df_asn_blocks_ipv4.show()

+------------+------------------------+------------------------------+--------------+------------+
|     network|autonomous_system_number|autonomous_system_organization|network_adress|subnetzmaske|
+------------+------------------------+------------------------------+--------------+------------+
|  1.0.0.0/24|                   13335|                 CLOUDFLARENET|       1.0.0.0|          24|
|  1.0.4.0/22|                   38803|          Wirefreebroadband...|       1.0.4.0|          22|
| 1.0.64.0/18|                   18144|          Energia Communica...|      1.0.64.0|          18|
|1.0.128.0/17|                   23969|          TOT Public Compan...|     1.0.128.0|          17|
|  1.1.1.0/24|                   13335|                 CLOUDFLARENET|       1.1.1.0|          24|
|  1.1.8.0/24|                   58543|                     Guangdong|       1.1.8.0|          24|
| 1.1.64.0/19|                    2519|          ARTERIA Networks ...|      1.1.64.0|          19|
| 1.1.96.0

In [22]:
df_asn_blocks_ipv4.count()

461902

## Unbenötigte Spalten im Datensatz asn ip4 country entfernen 

In [26]:
df_country = df_country.drop("locale_code", "continent_code", "continent_name", "country_iso_code", "is_in_european_union")

NameError: name 'df_country' is not defined

In [27]:
df_country.show()

NameError: name 'df_country' is not defined

## Unbenötigte Spalten im Datensatz asn ip4 city entfernen

In [51]:
df_city = df_city.drop("locale_code", "continent_code", "continent_name", "country_iso_code", "country_name", 
                             "subdivision_1_iso_code", "subdivision_1_name", "subdivision_2_iso_code", "subdivision_2_name",
                             "metro_code", "time_zone", "is_in_european_union")

In [52]:
df_city.show()

+----------+------------+
|geoname_id|   city_name|
+----------+------------+
|     49518|        null|
|     49747|       Oddur|
|     51537|        null|
|     53654|   Mogadishu|
|     54225|       Merca|
|     55671|     Kismayo|
|     56335|      Giohar|
|     57289|    Hargeisa|
|     57723|    Gurmeyso|
|     58933|     Garoowe|
|     58994|Garbahaarrey|
|     59611|   Gaalkacyo|
|     60928| Dusa Marreb|
|     62691|     Erigavo|
|     63795|       Burao|
|     63949|     Bu'aale|
|     64013|      Bosaso|
|     64460|  Beledweyne|
|     64536|      Baidoa|
|     69426|    Zinjibar|
+----------+------------+
only showing top 20 rows



# Analyse

## Joining von Tabelle real_new und asn_blocks_ipv4

In [27]:
def ip_range(ip, network_range):
    return ipaddress.IPv4Address(str(ip)) in ipaddress.ip_network(str(network_range))

pred = udf(lambda ip, network_range:ipaddress.IPv4Address(str(ip)) in ipaddress.ip_network(str(network_range)), BooleanType())

In [25]:
dfJoinATest=df_ASplit.limit(50)

In [26]:
dfJoinATest.count()

50

In [28]:
dfJoinA=dfJoinATest.join(df_asn_blocks_ipv4).where(pred(dfJoinATest.val_A, df_asn_blocks_ipv4.network))

In [52]:
dfJoinA=df_ASplit.join(df_asn_blocks_ipv4).where(pred(df_ASplit.val_A, df_asn_blocks_ipv4.network))

In [29]:
dfJoinA.show()

+--------------------+--------------------+----------------+-----+--------------+---------------+------------------------+------------------------------+--------------+------------+
|    Top level domain|           MX-Record|        A-Record|pos_A|         val_A|        network|autonomous_system_number|autonomous_system_organization|network_adress|subnetzmaske|
+--------------------+--------------------+----------------+-----+--------------+---------------+------------------------+------------------------------+--------------+------------+
|             0--1.de|"[""mail.0--1.de"...| [46.38.249.145]|    0| 46.38.249.145| 46.38.224.0/19|                  197540|                   netcup GmbH|   46.38.224.0|          19|
|  0-auf-3k-system.de|"[""mxf9a3.netcup...| [46.38.249.161]|    0| 46.38.249.161| 46.38.224.0/19|                  197540|                   netcup GmbH|   46.38.224.0|          19|
|0-heizenergiehaus.de|"[""mx1.jimdo.com...| [52.213.24.106]|    0| 52.213.24.106|  52.208.

In [30]:
dfJoinA.count()

49

## Joining von Tabelle df_country und df_city

In [None]:
df_gejoint2 = df_country.join(df_city, df_country["geoname_id"] == df_city["geoname_id"])

In [None]:
df_gejoint2.show()

#### Joining von all Tabellen

In [None]:
df_gejoint_all = df_networkClean2\
            .join(df_AClean2, df_networkClean2["network2"] == df_AClean2["A-Record2"])\
            .join(df_country, df_networkClean2["autonomous_system_number"] == df_country["geoname_id"])\
            .join(df_city, df_networkClean2["autonomous_system_number"] == df_city["geoname_id"])

In [None]:
df_gejoint_all.show()