In [None]:
#This is simple pyspark demo

In [1]:
import sys
from random import random
from operator import add

from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
            .builder \
            .appName("PythonPi") \
            .getOrCreate()

In [3]:
partitions = 2

In [4]:
n = 100000000

In [6]:
def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0

In [7]:
count = spark.sparkContext \
    .parallelize(range(1, n + 1), partitions) \
    .map(f) \
    .reduce(add)

In [8]:
print("Pi is roughly %f" % (4.0 * count / n))

Pi is roughly 3.141605


In [9]:
spark.stop()

In [1]:
!wget http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz

--2020-02-06 21:30:32--  http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz
Resolving kdd.ics.uci.edu... 128.195.1.86
Connecting to kdd.ics.uci.edu|128.195.1.86|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2144903 (2.0M) [application/x-gzip]
Saving to: 'kddcup.data_10_percent.gz'


2020-02-06 21:30:36 (787 KB/s) - 'kddcup.data_10_percent.gz' saved [2144903/2144903]



In [4]:
data_file = "./kddcup.data_10_percent.gz"


In [5]:
import sys

from pyspark.sql import SparkSession

In [6]:
spark = SparkSession\
        .builder\
        .appName("wordcount")\
        .getOrCreate()


# create SparkContext as sc
sc = spark.sparkContext

In [7]:
raw_data = sc.textFile(data_file).cache()

In [8]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [9]:
from pyspark.sql import Row

csv_data = raw_data.map(lambda l: l.split(","))
row_data = csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5])
    )
)

In [10]:
interactions_df = sqlContext.createDataFrame(row_data)
interactions_df.registerTempTable("interactions")

In [11]:
# Select tcp network interactions with more than 1 second duration and no transfer from destination
tcp_interactions = sqlContext.sql("""
    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")
tcp_interactions.show()

+--------+---------+
|duration|dst_bytes|
+--------+---------+
|    5057|        0|
|    5059|        0|
|    5051|        0|
|    5056|        0|
|    5051|        0|
|    5039|        0|
|    5062|        0|
|    5041|        0|
|    5056|        0|
|    5064|        0|
|    5043|        0|
|    5061|        0|
|    5049|        0|
|    5061|        0|
|    5048|        0|
|    5047|        0|
|    5044|        0|
|    5063|        0|
|    5068|        0|
|    5062|        0|
+--------+---------+
only showing top 20 rows



In [19]:
# Output duration together with dst_bytes
#tcp_interactions_out = tcp_interactions.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes))
#for ti_out in tcp_interactions_out.collect():
#    print (ti_out)

In [17]:
interactions_df.printSchema()

root
 |-- dst_bytes: long (nullable = true)
 |-- duration: long (nullable = true)
 |-- flag: string (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- src_bytes: long (nullable = true)



In [18]:
from time import time

t0 = time()
interactions_df.select("protocol_type", "duration", "dst_bytes").groupBy("protocol_type").count().show()
tt = time() - t0

print ("Query performed in {} seconds".format(round(tt,3)))

+-------------+------+
|protocol_type| count|
+-------------+------+
|          tcp|190065|
|          udp| 20354|
|         icmp|283602|
+-------------+------+

Query performed in 4.64 seconds
