### Welcome to the Colab Spark Tutorial.

We will be using Spark a few times in this course, and the _colab_ environment provides the compute (for 12 hours at a time) we need, along with this wonderful web-based notebook.

Today we will be configuring PySpark and exploring the SparkSQL features in relation to the Spark API

Source material includes [[1](https://opensource.com/article/19/3/apache-spark-and-dataframes-tutorial)]

Sections:

 1. Configuring your _colab_
 2. Using PySpark


Firstly, we need to configure the _colab_ instance

In [None]:
!lsb_release -a

In [None]:
!apt-get update

In [None]:
# Install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [None]:
# get spark 
VERSION='3.2.2'
!wget https://dlcdn.apache.org/spark/spark-$VERSION/spark-$VERSION-bin-hadoop3.2.tgz

In [None]:
# decompress spark
!tar xf spark-$VERSION-bin-hadoop3.2.tgz

# install python package to help with system paths
!pip install -q findspark

In [None]:
# Let Colab know where the java and spark folders are

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-{VERSION}-bin-hadoop3.2"

In [None]:
# add pyspark to sys.path using findspark
import findspark
findspark.init()

# get a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Let's download some url data ("Anonymized 120-day subset of the ICML-09 URL data containing 2.4 million examples and 3.2 million features" [UCI](https://archive.ics.uci.edu/ml/datasets/URL+Reputation)

In [None]:
! wget http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz
! wget http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names

In [None]:
!gunzip kddcup.data_10_percent.gz

In [None]:
import pandas as pd
df = pd.read_csv('kddcup.data_10_percent', header=None)

In [None]:
df[2].value_counts()

In [None]:
raw_rdd = spark.sparkContext.textFile('kddcup.data_10_percent').cache()
raw_rdd.take(5)

In [None]:
csv_rdd = raw_rdd.map(lambda row: row.split(","))
print(csv_rdd.take(2))
print(type(csv_rdd))

Read the csv directly into a spark dataframe by defining a schema

In [None]:
from pyspark.sql import Row

parsed_rdd = csv_rdd.map(lambda r: Row(
    duration=int(r[0]),
    protocol_type=r[1],
    service=r[2],
    flag=r[3],
    src_bytes=int(r[4]),
    dst_bytes=int(r[5]),
    wrong_fragment=int(r[7]),
    urgent=int(r[8]),
    hot=int(r[9]),
    num_failed_logins=int(r[10]),
    num_compromised=int(r[12]),
    su_attempted=r[14],
    num_root=int(r[15]),
    num_file_creations=int(r[16]),
    label=r[-1]
    )
)
parsed_rdd.take(5)

Convert the RDD to a dataframe

In [None]:
df = spark.createDataFrame(parsed_rdd)
df.show()

In [None]:
from pyspark.sql import functions as f

In [None]:
# register a temporary table to query against.
df.registerTempTable('data')

# Comparing SQL to API 

---
#0. Select columns

In [None]:
select = spark.sql("""SELECT protocol_type, service
                      FROM data""")

In [None]:
select.show(10)

In [None]:
select_spark = df.select('protocol_type', 'service')

In [None]:
select_spark.show(10)

#### OR using a list also works

In [None]:
select_spark = df.select(['protocol_type', 'service'])
select_spark.show(10)

---
# 1. select as alias

In [None]:
alias = spark.sql("""SELECT protocol_type,
                            label as flag
                     FROM data
                  """)

In [None]:
alias.show()

In [None]:
alias_spark = df.select('protocol_type', 'label').withColumnRenamed('label', 'flag')

In [None]:
alias_spark.show()

#### OR using dataframe column-objects with .alias

In [None]:
alias_spark = df.select(df.protocol_type, df.label.alias('flag'))
alias_spark.show(10)

# 2. group by, count, order by

In [None]:
protocols = spark.sql("""
      SELECT protocol_type, count(*) as freq
      FROM data
      GROUP BY protocol_type
      ORDER BY 2 DESC
                           """)
protocols.show()

In [None]:
df.groupBy('protocol_type').count().orderBy('count', ascending=False).show()

In [None]:
df.count()

---
# 3. group by, count, order by (using agg)

In [None]:
labels = spark.sql("""
  SELECT label, count(*) as freq
  FROM data
  GROUP BY label
  ORDER BY 2 DESC
""")

In [None]:
labels.show()

In [None]:
labels_spark = df.groupBy('label')\
                .agg(f.count(f.lit(1))\
                    .alias('freq'))\
                    .orderBy('freq', ascending=False)

In [None]:
labels_spark.show()

---
#4. case, group by, count, order by

In [None]:
attack_protocol = spark.sql("""
                           SELECT
                             protocol_type,
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as freq
                           FROM data
                           GROUP BY protocol_type, state
                           ORDER BY 3 DESC
                           """)

In [None]:
attack_protocol.show()

In [None]:
att_prot_spark = df.withColumn('state', f.when(df.label=='normal.', 'no attack').otherwise('attack'))\
                  .groupBy('protocol_type', 'state')\
                  .agg(f.count(f.lit(1)).alias('freq'))\
                  .orderBy('freq', ascending=False)


In [None]:
att_prot_spark.show()

---
#5. group by, aggregations

In [None]:
attack_stats = spark.sql("""
                          SELECT
                            protocol_type,
                            CASE label
                              WHEN 'normal.' THEN 'no attack'
                              ELSE 'attack'
                            END AS state,
                            COUNT(*) as total_freq,
                            ROUND(AVG(src_bytes), 2) as mean_src_bytes,
                            ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
                            ROUND(AVG(duration), 2) as mean_duration,
                            SUM(num_failed_logins) as total_failed_logins,
                            SUM(num_compromised) as total_compromised,
                            SUM(num_file_creations) as total_file_creations,
                            SUM(su_attempted) as total_root_attempts,
                            SUM(num_root) as total_root_acceses
                          FROM data
                          GROUP BY protocol_type, state
                          ORDER BY 3 DESC
                          """)

In [None]:
attack_stats.show()

In [None]:
attack_stats_spark = df.withColumn('state', f.when(df.label=='normal.', 'no attack').otherwise('attack'))\
.groupBy('protocol_type', 'state')\
.agg(f.count(f.lit(1)).alias('total_freq'),
     f.avg('src_bytes').alias('mean_src_bytes'),
     f.avg('dst_bytes').alias('mean_dst_bytes'),
     f.avg('duration').alias('mean_duration'),
     f.sum('num_failed_logins').alias('total_failed_logins'),
     f.sum('num_compromised').alias('total_compromised'),
     f.sum('num_file_creations').alias('total_file_creations'),
     f.sum('su_attempted').alias('total_root_attempts'),
     f.sum('num_root').alias('total_root_acceses'),
     )\
     .orderBy('total_freq', ascending=False)

In [None]:
attack_stats_spark.show()

---
# 6. filter, group by 

In [None]:
tcp_attack_stats = spark.sql("""
                              SELECT
                                service,
                                label as attack_type,
                                COUNT(*) as total_freq,
                                ROUND(AVG(duration), 2) as mean_duration,
                                SUM(num_failed_logins) as total_failed_logins,
                                SUM(num_file_creations) as total_file_creations,
                                SUM(su_attempted) as total_root_attempts,
                                SUM(num_root) as total_root_acceses
                              FROM data
                              WHERE protocol_type = 'tcp'
                              AND label != 'normal.'
                              GROUP BY service, attack_type
                              ORDER BY total_freq DESC
                              """)

In [None]:
tcp_attack_stats.show()

In [None]:
tcp_attack_stats_spark = df.filter((df.protocol_type  == "tcp") & (df.label  != "normal.")).groupBy('service', df.label.alias('attack_type'))\
.agg(f.count(f.lit(1)).alias('total_freq'),
     f.avg('duration').alias('mean_duration'),
     f.sum('num_failed_logins').alias('total_failed_logins'),
     f.sum('num_file_creations').alias('total_file_creations'),
     f.sum('su_attempted').alias('total_root_attempts'),
     f.sum('num_root').alias('total_root_acceses'))\
.orderBy('total_freq', ascending=False)

In [None]:
tcp_attack_stats_spark.show()

---
#7. sub-queries

In [None]:
tcp_attack_stats = spark.sql("""
                              SELECT
                                t.service,
                                t.attack_type,
                                t.total_freq
                              FROM
                              (SELECT
                                service,
                                label as attack_type,
                                COUNT(*) as total_freq,
                                ROUND(AVG(duration), 2) as mean_duration,
                                SUM(num_failed_logins) as total_failed_logins,
                                SUM(num_file_creations) as total_file_creations,
                                SUM(su_attempted) as total_root_attempts,
                                SUM(num_root) as total_root_acceses
                              FROM data
                              WHERE protocol_type = 'tcp'
                              AND label != 'normal.'
                              GROUP BY service, attack_type
                              ORDER BY total_freq DESC) as t
                                WHERE t.mean_duration > 0
                              """)

In [None]:
tcp_attack_stats.show()

In [None]:
tcp_attack_stats_spark = df.filter((df.protocol_type  == "tcp") & (df.label  != "normal."))\
.groupBy('service', df.label.alias('attack_type'))\
.agg(f.count(f.lit(1)).alias('total_freq'),
     f.avg('duration').alias('mean_duration'),
     f.sum('num_failed_logins').alias('total_failed_logins'),
     f.sum('num_file_creations').alias('total_file_creations'),
     f.sum('su_attempted').alias('total_root_attempts'),
     f.sum('num_root').alias('total_root_acceses'))\
.orderBy('total_freq', ascending=False)\
.filter(f.col('mean_duration') > 0)\
.select('service', 'attack_type', 'total_freq')

In [None]:
tcp_attack_stats_spark.show()