<a href="https://colab.research.google.com/github/smduarte/spbd-2425/blob/main/docs/labs/lab5/SPBD_Labs_spark2_exercise_solution.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Python Spark Dataframes Exercises


In [None]:
#@title Install Pyspark
!pip install --quiet pyspark

In [None]:
#@title Download "Os Maias"
!wget -q -O os_maias.txt https://www.dropbox.com/s/n24v0z7y79np319/os_maias.txt?dl=0

##1. Sorted Word Frequency

1.1) Create a [Spark Dataframes](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html) program that counts the number of occurrences of each word in "Os Maias" novel, sorting them by frequency (the words with higher occurrence first).


In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try :
  lines = sc.textFile('os_maias.txt') \
            .filter( lambda line : len(line) > 1 )

  structured_lines = lines.map( lambda line : Row( line = line, listOfWords = line.split(' ') ) )

  wordsOfLine = spark.createDataFrame( structured_lines )

  x = wordsOfLine.select(explode("listOfWords").alias('words')) \
      .groupBy('words').count() \
      .orderBy('count', ascending=False)


  x.show(5)
except Exception as err:
  print(err)
  sc.stop()

1.2) Create a Spark Dataframes program that computes the top 10 most used words in "Os Maias" novel.

In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try :
  lines = sc.textFile('os_maias.txt') \
            .filter( lambda line : len(line) > 1 )

  structured_lines = lines.map( lambda line : Row( line = line, listOfWords = line.split(' ') ) )

  wordsOfLine = spark.createDataFrame( structured_lines )

  x = wordsOfLine.select(explode("listOfWords").alias('words')) \
      .groupBy('words').count() \
      .orderBy('count', ascending=False) \
      .limit(10)


  x.show()
except Exception as err:
  print(err)
  sc.stop()

##2. Weblog Analysis

Consider a set of log files captured during a DDOS (*Distributed Denial of Service*) attack, containing information for the web accesses performed during the attack to the server.

The log files contain text lines as shown below, with TAB as the separator:

date |IP_source | status_code | operation | URL | execution time |
-|-|-|-|-|-
timestamp  | string | int | string | string| float |
2016-12-06T08:58:35.318+0000|37.139.9.11|404|GET|/codemove/TTCENCUFMH3C|0.026

In [None]:
#@title Download the dataset
!wget -q -O web.log https://www.dropbox.com/s/0r8902uj9yum7dg/web.log?dl=0
!head -1 web.log

!echo "date ipSource retValue op url time" > weblog_with_header.log
!cat web.log >> weblog_with_header.log
!head -2 weblog_with_header.log

2.1. Count the number of unique IP addresses involved in the DDOS attack.


In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]') \
						.appName('weblog').getOrCreate()

sc = spark.sparkContext
try :
#    linesRDD = sc.textFile('web.log')
#    logRowsRDD = linesRDD.filter( lambda line : len(line) > 0 ) \
#                   .map( lambda line : line.split(' ') ) \
#                  .map( lambda l : Row( date = l[0], \
#				    		            ipSource = l[1], retValue = l[2], \
#                            op = l[3], url = l[4], time = float(l[5])))
#    logRows = spark.createDataFrame( logRowsRDD )

    logRows = spark.read.csv('weblog_with_header.log',
                             sep =' ', header=True, inferSchema=True)

    logRows.printSchema()

    countIps = logRows.select('ipSource').distinct().count()

    print(countIps)
except Exception as err:
    print(err)


2.2. For each interval of 10 seconds, provide the following information: [number of requests, average execution time, maximum time, minimum time]

In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import time

def toInterval( date ):
  seconds = int(date.timestamp() / 10) * 10
  return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(seconds))

spark = SparkSession.builder.master('local[*]') \
						.appName('weblog').getOrCreate()
sc = spark.sparkContext
try :

    logRows = spark.read.csv('weblog_with_header.log',
                             sep =' ', header=True, inferSchema=True)


    interval = udf(lambda date: toInterval(date), StringType())


    intervals = logRows.select(interval('date').alias("interval"), "time")
    x = intervals.groupBy('interval').agg( count('*').alias('count'), avg('time'), min('time'), max('time')) \
        .orderBy('interval')

    x.show(10)
except Exception as err:
    print(err)

2.3. Create an inverted index that, for each interval of 10 seconds, has a list of (unique) IPs executing accesses (to each URL).

In [None]:
from os import truncate
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

def toInterval( date ):
  seconds = int(date.timestamp() / 10) * 10
  return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(seconds))

spark = SparkSession.builder.master('local[*]') \
						.appName('weblog').getOrCreate()
sc = spark.sparkContext
try :
    logRows = spark.read.csv('weblog_with_header.log',
                             sep =' ', header=True, inferSchema=True)


    countIps = udf( lambda l : len(l))
    interval = udf(lambda date: toInterval(date), StringType())

    intervals = logRows.select(interval('date').alias("interval"), 'ipSource', "url")

    stats = intervals.groupBy('interval', 'url').agg( collect_set('ipSource').alias('ips')) \
    .orderBy('interval', 'url', ascending=False)

    stats = intervals.groupBy('interval', 'url').agg( collect_set('ipSource').alias('ips')) \
    .select( "*", countIps('ips').alias('#ips')).orderBy('interval', 'url', '#ips', ascending=False)

    stats.show(10, truncate = False)

    sc.stop()
except Exception as err:
    print(err)
    sc.stop()

2.3. Create an inverted index that, for each interval of 15 seconds, has a list of (unique) IPs executing accesses (to each URL).

In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

from dateutil.parser import parse

spark = SparkSession.builder.master('local[*]') \
						.appName('weblog').getOrCreate()
sc = spark.sparkContext
try :
    logRows = spark.read.csv('weblog_with_header.log',
                             sep =' ', header=True, inferSchema=True)

    # use window() to define the interval
    intervals = logRows.select('date', 'ipSource', 'url') \
        .select('*', window('date', '15 seconds').alias('interval'))

    stats = intervals.groupBy('interval', 'url').agg( collect_set('ipSource').alias('ips')) \
    .select( '*', countIps('ips').alias('#ips')).orderBy('#ips', ascending=False)

    stats.show(10, truncate=False)

    sc.stop()
except Exception as err:
    print(err)
    sc.stop()