<a href="https://colab.research.google.com/github/samayNathani/CSC-369-Lab-05/blob/main/CSC_369_PySpark_Introduction_Lab_5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction to Apache Spark / PySpark

As of May 2021, these examples are based on [Spark 3.1.1](https://spark.apache.org/docs/3.1.1/)  

Reference/API Links


*   [Apache Spark Quick Start](https://spark.apache.org/docs/3.1.1/quick-start.html)
*   [PySpark v3.1.1 API](https://spark.apache.org/docs/3.1.1/api/python/reference/index.html)
*    [RDD Programming Guide](https://spark.apache.org/docs/3.1.1/rdd-programming-guide.html)
*    [Spark SQL Programming Guide](https://spark.apache.org/docs/3.1.1/sql-programming-guide.html)









In [191]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u292-b10-0ubuntu1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 34 not upgraded.



# Imports / Starter Example




In [192]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import types as sparktypes
from pyspark.sql.functions import col

sc = SparkContext.getOrCreate() 
spark = SparkSession(sc)

In [193]:
# create a Resilient Distributed Dataset (RDD) from a sequence of integers perform filter() and reduce() operations

# Function to be used in the filter() transformation
def filterSmall(x):    
   if x < 20 == 0:
      return False
   else:
      return True

# Function to be used in the map() transformation
def mapSquare(x):
    return x*x

# Function to be used in the reduce() action
def reduceSim(x,y):
    return x+y
  
rdd = sc.parallelize(range(100))         ## create an RDD of 100 numbers from 0 to 99

out1 = rdd.filter(filterSmall).map(mapSquare)  ## perform filter and map transformations
out2 = out1.reduce(reduceSim)                  ## perform reduce operation

print(out1.collect())           ## print first output (all numbers less than 20 squared)
print(out2)                 ## print second output (sum of all numbers from first output)


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]
328350


In [194]:
# download a sample access log for use in demos below
# !rm -f apache.access.log
# !wget -q https://raw.githubusercontent.com/databricks/reference-apps/master/logs_analyzer/data/apache.access.log

# Apache HTTP Log Example - Resilient Distributed Dataset (RDD)

A SparkContext instance can be used to create RDDs from various data/files/resources (text files, CSV, Hadoop data files, etc.)

In [195]:
# find the top 10 clients, map/reduce style using RDD transformations
access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[0], 1 ))  # field 0 = client address
                  .reduceByKey(lambda x, y: x + y)
                  .sortBy(lambda t: -t[1])) 

print ("Total count of client hostnames:")
print(access_log_rdd.count())

print ("Top 10 client hostnames:")
print(access_log_rdd.take(10))


Total count of client hostnames:
169
Top 10 client hostnames:
[('64.242.88.10', 452), ('10.0.0.153', 188), ('cr020r01-3.sac.overture.com', 44), ('h24-71-236-129.ca.shawcable.net', 36), ('h24-70-69-74.ca.shawcable.net', 32), ('market-mail.panduit.com', 29), ('ts04-ip92.hevanet.com', 28), ('ip68-228-43-49.tc.ph.cox.net', 22), ('proxy0.haifa.ac.il', 19), ('207.195.59.160', 15)]


# Apache HTTP Log Example - DataFrame

A DataFrame is equivalent to a relational table in Spark SQL, and can be created from on a variety of input formats (CSV, JSON, relational database, etc.) using the SparkSession.

In [196]:
access_log_df = spark.read.text("apache.access.log")

access_log_df.show(truncate=False)
access_log_df.printSchema()

+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                      |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846   |
|64.242.88.10 - - [07/Mar/2004:16:06:51 -0800] "GET /twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2 HTTP/1.1" 200 4523                             |
|64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291                                                         |
|64.242.88.10 - - [07/Mar/2004:16:11:58 -0800] "GET 

In [197]:
access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")

access_log_df.show(truncate=False)
access_log_df.printSchema()

+------------+---+---+---------------------+------+-------------------------------------------------------------------------------------------------+---+-----+
|_c0         |_c1|_c2|_c3                  |_c4   |_c5                                                                                              |_c6|_c7  |
+------------+---+---+---------------------+------+-------------------------------------------------------------------------------------------------+---+-----+
|64.242.88.10|-  |-  |[07/Mar/2004:16:05:49|-0800]|GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1   |401|12846|
|64.242.88.10|-  |-  |[07/Mar/2004:16:06:51|-0800]|GET /twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2 HTTP/1.1                            |200|4523 |
|64.242.88.10|-  |-  |[07/Mar/2004:16:10:02|-0800]|GET /mailman/listinfo/hsdivision HTTP/1.1                                                        |200|6291 |
|64.242.88.10|-  |-  |[07/Mar/2004:16:11

In [198]:
access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")

named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))
named_df.show(truncate=False)
named_df.printSchema()

+------------+---------------------+-------------------------------------------------------------------------------------------------+------+------------+
|host        |timestamp            |path                                                                                             |status|content_size|
+------------+---------------------+-------------------------------------------------------------------------------------------------+------+------------+
|64.242.88.10|[07/Mar/2004:16:05:49|GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1   |401   |12846       |
|64.242.88.10|[07/Mar/2004:16:06:51|GET /twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2 HTTP/1.1                            |200   |4523        |
|64.242.88.10|[07/Mar/2004:16:10:02|GET /mailman/listinfo/hsdivision HTTP/1.1                                                        |200   |6291        |
|64.242.88.10|[07/Mar/2004:16:11:58|GET /twiki/bin/view/TWiki/WikiSynt

In [199]:
named_df.createOrReplaceTempView("log")
sql_df = spark.sql("EXPLAIN FORMATTED SELECT * FROM log WHERE status = 404")

sql_df.show(truncate=False)
sql_df.printSchema()

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|plan                                                                                                                                                                                                                                              

## What About Datasets?

Added in Spark 1.6, a **Dataset** is a distributed collection of data that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (`map`, `flatMap`, `filter`, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

A **DataFrame** is a Dataset organized into named columns. ([source](https://spark.apache.org/docs/3.1.1/sql-programming-guide.html)) 

# Reporting Tasks (from Lab 1)


1. Most popular URL paths (top 15)
2. Request count for each HTTP response code, sorted by response code
3. Request count for each calendar month and year, sorted chronologically
4. Total bytes sent to the client with a specified hostname or IPv4 address (you may hard code an address)
5. Based on a given URL (hard coded), compute a request count for each client (hostname or IPv4) who accessed that URL, sorted by request count, highest to lowest


# (A) RDD Implementations

Perform reporting tasks 1-5 using RDD transformations

[RDD APIs PySpark v3.1.1](https://spark.apache.org/docs/3.1.1/api/python/reference/pyspark.html#rdd-apis)

In [200]:
# RDD implementation
# (1) Most popular URL paths (top 15)

# find the top 10 clients, map/reduce style using RDD transformations
access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[6], 1 ))  # field 0 = client address
                  .reduceByKey(lambda x, y: x + y)
                  .sortBy(lambda t: -t[1])) 

print(access_log_rdd.take(15))


[('/twiki/bin/view/Main/WebHome', 40), ('/twiki/pub/TWiki/TWikiLogos/twikiRobot46x50.gif', 32), ('/', 31), ('/favicon.ico', 28), ('/robots.txt', 27), ('/razor.html', 23), ('/twiki/bin/view/Main/SpamAssassinTaggingOnly', 18), ('/twiki/bin/view/Main/SpamAssassinAndPostFix', 17), ('/cgi-bin/mailgraph2.cgi', 16), ('/cgi-bin/mailgraph.cgi/mailgraph_0.png', 16), ('/cgi-bin/mailgraph.cgi/mailgraph_1_err.png', 16), ('/cgi-bin/mailgraph.cgi/mailgraph_1.png', 16), ('/cgi-bin/mailgraph.cgi/mailgraph_0_err.png', 16), ('/cgi-bin/mailgraph.cgi/mailgraph_2.png', 16), ('/cgi-bin/mailgraph.cgi/mailgraph_2_err.png', 16)]


In [201]:
# RDD implementation
# (2) Request count for each HTTP response code, sorted by response code

access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[8], 1 ))  # field 0 = client address
                  .reduceByKey(lambda x, y: x + y)
                  .sortBy(lambda t: t[0])) 

print(access_log_rdd.take(access_log_rdd.count()))


[('200', 1272), ('302', 6), ('401', 123), ('404', 5)]


In [202]:
# RDD implementation
# (3) Request count for each calendar month and year, sorted chronologically

monthsToInt = {"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun":6, "Jul": 7, "Aug":8, "Sep": 9, "Oct": 10, "Nov":11, "Dec":12}
intToMonth = {1:"Jan", 2:"Feb", 3:"Mar", 4:"Apr", 5:"May", 6:"Jun", 7:"Jul", 8:"Aug", 9:"Sep", 10:"Oct", 11:"Nov", 12:"Dec"}

def makeKey(line):
  tokens = line.split(" ")
  month = tokens[3][4:7]
  year = tokens[3][8:12]
  return year + str(monthsToInt[month])

def unpackKey(key):
  year = key[:4]
  month = key[4:]
  return intToMonth[int(month)] + "/" + year

access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: (makeKey(line), 1))
                  .reduceByKey(lambda x, y: x + y)
                  .sortBy(lambda t: t[0])
                  .map(lambda pair : (unpackKey(pair[0]), pair[1])))

print(access_log_rdd.take(access_log_rdd.count()))

[('Dec/1999', 1), ('Feb/2003', 1), ('Jan/2004', 1), ('Mar/2004', 1402), ('Jun/2016', 1)]


In [203]:
# RDD implementation
# (4) Total bytes sent to the client with a specified hostname or IPv4 address (you may hard code an address)

HOSTNAME = "dsl-80-43-113-44.access.uk.tiscali.com"
def filterIP(ip):
  return ip == HOSTNAME

access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[0], int(line.split(" ")[9])))
                  .filter(lambda pair: (filterIP(pair[0])))
                  .reduceByKey(lambda x, y: x + y))

print(access_log_rdd.take(access_log_rdd.count()))

[('dsl-80-43-113-44.access.uk.tiscali.com', 7125)]


In [204]:
# RDD implementation
# (5) Based on a given URL (hard coded), compute a request count for each client (hostname or IPv4) who accessed that URL, sorted by request count, highest to lowest

URL = "/cgi-bin/mailgraph.cgi/mailgraph_0.png"
def filterUrl(url):
  return url == URL

access_log_rdd = (sc.textFile("apache.access.log")
                  .filter(lambda line: (filterUrl(line.split(" ")[6])))
                  .map(lambda line: ( line.split(" ")[0], 1))
                  .reduceByKey(lambda x, y: x + y)
                  .sortBy(lambda t: -t[1]))

print(access_log_rdd.take(access_log_rdd.count()))

[('10.0.0.153', 12), ('h24-70-69-74.ca.shawcable.net', 2), ('ts04-ip92.hevanet.com', 1), ('3_343_lt_someone', 1)]


# (B) DataFrame Implementations

Perform reporting tasks 1-5 using Spark's DataFrame API

[DataFrame API PySpark v.3.1.1](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame)

In [205]:
# DataFrame implementation
# (1) Most popular URL paths (top 15)
access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")
named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))
top15url = named_df.groupBy('path').count().orderBy('count', ascending=[0]).head(15)
print(top15url)

[Row(path='GET /twiki/bin/view/Main/WebHome HTTP/1.1', count=31), Row(path='GET / HTTP/1.1', count=26), Row(path='GET /robots.txt HTTP/1.0', count=25), Row(path='GET /twiki/pub/TWiki/TWikiLogos/twikiRobot46x50.gif HTTP/1.1', count=25), Row(path='GET /favicon.ico HTTP/1.1', count=24), Row(path='GET /twiki/bin/view/Main/SpamAssassinTaggingOnly HTTP/1.1', count=16), Row(path='GET /cgi-bin/mailgraph.cgi/mailgraph_0.png HTTP/1.1', count=16), Row(path='GET /cgi-bin/mailgraph.cgi/mailgraph_2.png HTTP/1.1', count=16), Row(path='GET /razor.html HTTP/1.1', count=16), Row(path='GET /cgi-bin/mailgraph.cgi/mailgraph_0_err.png HTTP/1.1', count=16), Row(path='GET /cgi-bin/mailgraph.cgi/mailgraph_1.png HTTP/1.1', count=16), Row(path='GET /cgi-bin/mailgraph.cgi/mailgraph_1_err.png HTTP/1.1', count=16), Row(path='GET /cgi-bin/mailgraph.cgi/mailgraph_3_err.png HTTP/1.1', count=16), Row(path='GET /cgi-bin/mailgraph2.cgi HTTP/1.1', count=16), Row(path='GET /cgi-bin/mailgraph.cgi/mailgraph_3.png HTTP/1.1', 

In [206]:
# DataFrame implementation
# (2) Request count for each HTTP response code, sorted by response code
access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")
named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))
httpResponseCounts = named_df.groupBy('status').count().orderBy('status').collect()
print(httpResponseCounts)

[Row(status=200, count=1272), Row(status=302, count=6), Row(status=401, count=123), Row(status=404, count=5)]


In [207]:
# DataFrame implementation
# (3) Request count for each calendar month and year, sorted chronologically

from pyspark.sql.functions import udf
from pyspark.sql.types import *

monthsToInt = {"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun":6, "Jul": 7, "Aug":8, "Sep": 9, "Oct": 10, "Nov":11, "Dec":12}
intToMonth = {1:"Jan", 2:"Feb", 3:"Mar", 4:"Apr", 5:"May", 6:"Jun", 7:"Jul", 8:"Aug", 9:"Sep", 10:"Oct", 11:"Nov", 12:"Dec"}

def makeKey(line):
  month = line[4:7]
  year = line[8:12]
  return year + str(monthsToInt[month])

def unpackKey(key):
  year = key[:4]
  month = key[4:]
  return intToMonth[int(month)] + "/" + year

udf_makeKey = udf(makeKey, StringType())
udf_unpackKey = udf(unpackKey, StringType())

access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")
named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))

dateRequestCountHashed = named_df.withColumn('timeHash', udf_makeKey(named_df.timestamp)).select('timehash').orderBy('timeHash').groupBy('timeHash').count()
dateRequestCount = dateRequestCountHashed.withColumn('Date', udf_unpackKey(dateRequestCountHashed.timeHash)).select('Date', 'count')
dateRequestCount.show(truncate=False)
dateRequestCount.printSchema()

+--------+-----+
|Date    |count|
+--------+-----+
|Dec/1999|1    |
|Feb/2003|1    |
|Jan/2004|1    |
|Mar/2004|1402 |
|Jun/2016|1    |
+--------+-----+

root
 |-- Date: string (nullable = true)
 |-- count: long (nullable = false)



In [208]:
# DataFrame implementation
# (4) Total bytes sent to the client with a specified hostname or IPv4 address (you may hard code an address)
access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")
named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))
HOSTNAME = "dsl-80-43-113-44.access.uk.tiscali.com"
httpResponseCounts = named_df.filter(named_df['host'] == HOSTNAME).select('content_size').groupBy().sum().collect()
print(httpResponseCounts)

[Row(sum(content_size)=7125)]


In [209]:
# DataFrame implementation
# (5) Based on a given URL (hard coded), compute a request count for each client (hostname or IPv4) who accessed that URL, sorted by request count, highest to lowest

access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")
named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))
URL = "/cgi-bin/mailgraph.cgi/mailgraph_0.png"
clientRequestCounts = named_df.filter(named_df['path'].contains(URL)).groupBy('host').count().orderBy('count', ascending=[0]).collect()
print(clientRequestCounts)

[Row(host='10.0.0.153', count=12), Row(host='h24-70-69-74.ca.shawcable.net', count=2), Row(host='3_343_lt_someone', count=1), Row(host='ts04-ip92.hevanet.com', count=1)]


# (C) Spark SQL Implementations

Perform reporting tasks 1-5 using Spark SQL

[Spark SQL API PySpark v3.1.1](https://spark.apache.org/docs/3.1.1/api/python/reference/pyspark.sql.html)

Specifically, [pyspark.sql.SparkSession.sql](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.SparkSession.sql.html#pyspark.sql.SparkSession.sql) returns a DataFrame representing the result of the given SQL query.

In [210]:
# Spark SQL implementation 
# (1) Most popular URL paths (top 15)
access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")
named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))
named_df.createOrReplaceTempView("log")
sql_df = spark.sql("SELECT path AS path, count(path) FROM log GROUP BY path ORDER BY count(path) DESC LIMIT 15;")


sql_df.show(truncate=False)
sql_df.printSchema()

+------------------------------------------------------------+-----------+
|path                                                        |count(path)|
+------------------------------------------------------------+-----------+
|GET /twiki/bin/view/Main/WebHome HTTP/1.1                   |31         |
|GET / HTTP/1.1                                              |26         |
|GET /twiki/pub/TWiki/TWikiLogos/twikiRobot46x50.gif HTTP/1.1|25         |
|GET /robots.txt HTTP/1.0                                    |25         |
|GET /favicon.ico HTTP/1.1                                   |24         |
|GET /cgi-bin/mailgraph2.cgi HTTP/1.1                        |16         |
|GET /cgi-bin/mailgraph.cgi/mailgraph_1.png HTTP/1.1         |16         |
|GET /razor.html HTTP/1.1                                    |16         |
|GET /cgi-bin/mailgraph.cgi/mailgraph_0_err.png HTTP/1.1     |16         |
|GET /cgi-bin/mailgraph.cgi/mailgraph_3_err.png HTTP/1.1     |16         |
|GET /cgi-bin/mailgraph.c

In [211]:
# Spark SQL implementation 
# (2) Request count for each HTTP response code, sorted by response code

access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")
named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))
named_df.createOrReplaceTempView("log")
sql_df = spark.sql("SELECT status AS status, count(status) FROM log GROUP BY status ORDER BY status;")
sql_df.show(truncate=False)
sql_df.printSchema()

+------+-------------+
|status|count(status)|
+------+-------------+
|200   |1272         |
|302   |6            |
|401   |123          |
|404   |5            |
+------+-------------+

root
 |-- status: integer (nullable = true)
 |-- count(status): long (nullable = false)



In [212]:
# Spark SQL implementation 
# (3) Request count for each calendar month and year, sorted chronologically

monthsToInt = {"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun":6, "Jul": 7, "Aug":8, "Sep": 9, "Oct": 10, "Nov":11, "Dec":12}
intToMonth = {1:"Jan", 2:"Feb", 3:"Mar", 4:"Apr", 5:"May", 6:"Jun", 7:"Jul", 8:"Aug", 9:"Sep", 10:"Oct", 11:"Nov", 12:"Dec"}

def makeKey(line):
  month = line[4:7]
  year = line[8:12]
  return year + str(monthsToInt[month])

def unpackKey(key):
  year = key[:4]
  month = key[4:]
  return intToMonth[int(month)] + "/" + year

udf_makeKey = udf(makeKey, StringType())
udf_unpackKey = udf(unpackKey, StringType())

spark.udf.register("makeKey", makeKey)
spark.udf.register("unpackKey", unpackKey)


access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")
named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))
named_df.createOrReplaceTempView("log")
sql_df1 = spark.sql("SELECT timestamp, makeKey(timestamp) as hash FROM log group by hash, timestamp;")
sql_df1.createOrReplaceTempView("table1")
sql_df = spark.sql("SELECT hash, unpackKey(hash) as date, count(hash) as count FROM table1 group by date, hash order by hash")
sql_df.createOrReplaceTempView("table2")
dateRequest = spark.sql("SELECT date, count from table2")
dateRequest.show(truncate=False)
dateRequest.printSchema()

+--------+-----+
|date    |count|
+--------+-----+
|Dec/1999|1    |
|Feb/2003|1    |
|Jan/2004|1    |
|Mar/2004|1087 |
|Jun/2016|1    |
+--------+-----+

root
 |-- date: string (nullable = true)
 |-- count: long (nullable = false)



In [213]:
# Spark SQL implementation 
# (4) Total bytes sent to the client with a specified hostname or IPv4 address (you may hard code an address)

access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")
named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))
named_df.createOrReplaceTempView("log")
sql_df = spark.sql("SELECT host AS host, SUM(content_size) FROM log WHERE host='dsl-80-43-113-44.access.uk.tiscali.com' GROUP BY host;")
sql_df.show(truncate=False)
sql_df.printSchema()

+--------------------------------------+-----------------+
|host                                  |sum(content_size)|
+--------------------------------------+-----------------+
|dsl-80-43-113-44.access.uk.tiscali.com|7125             |
+--------------------------------------+-----------------+

root
 |-- host: string (nullable = true)
 |-- sum(content_size): long (nullable = true)



In [214]:
# Spark SQL implementation 
# (5) Based on a given URL (hard coded), compute a request count for each client (hostname or IPv4) who accessed that URL, sorted by request count, highest to lowest

access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")
named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))
named_df.createOrReplaceTempView("log")

sql_df = spark.sql("SELECT path, host, count(host) FROM log WHERE path = 'GET /cgi-bin/mailgraph.cgi/mailgraph_0.png HTTP/1.1' group by host, path order by count(host) desc;")
sql_df.show(truncate=False)
sql_df.printSchema()

+---------------------------------------------------+-----------------------------+-----------+
|path                                               |host                         |count(host)|
+---------------------------------------------------+-----------------------------+-----------+
|GET /cgi-bin/mailgraph.cgi/mailgraph_0.png HTTP/1.1|10.0.0.153                   |12         |
|GET /cgi-bin/mailgraph.cgi/mailgraph_0.png HTTP/1.1|h24-70-69-74.ca.shawcable.net|2          |
|GET /cgi-bin/mailgraph.cgi/mailgraph_0.png HTTP/1.1|3_343_lt_someone             |1          |
|GET /cgi-bin/mailgraph.cgi/mailgraph_0.png HTTP/1.1|ts04-ip92.hevanet.com        |1          |
+---------------------------------------------------+-----------------------------+-----------+

root
 |-- path: string (nullable = true)
 |-- host: string (nullable = true)
 |-- count(host): long (nullable = false)

