In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=d4274b3bd44a42d67f9567d824e02201df56bb97f62314df9ed5d2226924a0f3
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indic
The follow


# Imports / Starter Example




In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import types as sparktypes
from pyspark.sql.functions import col

sc = SparkContext()
spark = SparkSession(sc)

In [None]:
# download a sample access log for use in demos below
!rm -f apache.access.log
!wget -q https://raw.githubusercontent.com/databricks/reference-apps/master/logs_analyzer/data/apache.access.log

# (A) RDD Implementations

Perform reporting tasks 1-5 using RDD transformations

[RDD APIs PySpark v3.3.0](https://spark.apache.org/docs/3.3.0/api/python/reference/pyspark.html#rdd-apis)

In [None]:
# RDD implementation
# (1) Most popular URL paths (top 15)
access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[6], 1 ))
                  .reduceByKey(lambda count1, count2: count1 + count2)
                  .sortBy(lambda t: -t[1]))

print ("Top 15 URL Paths:")
for line in access_log_rdd.take(15):
  print(line[0], ", ", line[1], sep="")

Top 15 URL Paths:
/twiki/bin/view/Main/WebHome, 40
/twiki/pub/TWiki/TWikiLogos/twikiRobot46x50.gif, 32
/, 31
/favicon.ico, 28
/robots.txt, 27
/razor.html, 23
/twiki/bin/view/Main/SpamAssassinTaggingOnly, 18
/twiki/bin/view/Main/SpamAssassinAndPostFix, 17
/cgi-bin/mailgraph2.cgi, 16
/cgi-bin/mailgraph.cgi/mailgraph_0.png, 16
/cgi-bin/mailgraph.cgi/mailgraph_1_err.png, 16
/cgi-bin/mailgraph.cgi/mailgraph_1.png, 16
/cgi-bin/mailgraph.cgi/mailgraph_0_err.png, 16
/cgi-bin/mailgraph.cgi/mailgraph_2.png, 16
/cgi-bin/mailgraph.cgi/mailgraph_2_err.png, 16


In [None]:
# RDD implementation
# (2) Request count for each HTTP response code, sorted by response code
access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[8], 1 ))
                  .reduceByKey(lambda count1, count2: count1 + count2)
                  .sortBy(lambda t: t[0]))

print ("Count for each HTTP response Code (sorted ascending order by response code):")
for line in access_log_rdd.collect():
  print(line[0], ", ", line[1], sep="")

Count for each HTTP response Code (sorted ascending order by response code):
200, 1272
302, 6
401, 123
404, 5


In [None]:
# RDD implementation
# (3) Request count for each calendar month and year, sorted chronologically
access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: (line.split(" ")[3][4:12], 1))  # Extracting only the month and year
                  .reduceByKey(lambda count1, count2: count1 + count2)
                  .sortBy(lambda t: t[0]))

print("Count for each calendar month and year, sorted chronologically:")
for line in access_log_rdd.collect():
  print(line[0].replace("/", " "), ", ", line[1], sep="")

Count for each calendar month and year, sorted chronologically:
Mar 2004, 1406


In [None]:
# RDD implementation
# (4) Total bytes sent to the client with a specified hostname or IPv4 address (you may hard code an address)
def selectClient(line):
    client = "145.253.208.9"
    if client in line:
        return True
    else:
        return False

access_log_rdd = (sc.textFile("apache.access.log")
                  .filter(selectClient)
                  .map(lambda line: (line.split(" ")[0], int(line.split(" ")[9])))
                  .reduceByKey(lambda count1, count2: count1 + count2))

print("Total bytes sent to the client with the specified hostname or IPv4 address:")
for line in access_log_rdd.collect():
  print(line[0], ", ", line[1], sep="")

Total bytes sent to the client with the specified hostname or IPv4 address:
145.253.208.9, 26098


In [None]:
# RDD implementation
# (5) Based on a given URL (hard coded), compute a request count for each client (hostname or IPv4) who accessed that URL, sorted by request count, highest to lowest
def selectURL(line):
    url = "/robots.txt"
    if url in line:
        return True
    else:
        return False

access_log_rdd = (sc.textFile("apache.access.log")
                  .filter(selectURL)
                  .map(lambda line: (line.split(" ")[0], 1))
                  .reduceByKey(lambda count1, count2: count1 + count2)
                  .sortBy(lambda t: t[1], ascending=False))

print("Request Count for each client of a given URL")
for line in access_log_rdd.collect():
  print(line[0], ", ", line[1], sep="")

Request Count for each client of a given URL
lj1024.inktomisearch.com, 8
lj1048.inktomisearch.com, 5
lj1036.inktomisearch.com, 5
mmscrm07-2.sac.overture.com, 3
64.242.88.10, 2
lj1052.inktomisearch.com, 1
cr020r01-3.sac.overture.com, 1
crawl24-public.alexa.com, 1
lj1007.inktomisearch.com, 1


# (B) DataFrame Implementations

Perform reporting tasks 1-5 using Spark's DataFrame API

[DataFrame API PySpark v.3.1.1](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame)

Please Note: Spark SQL (`createOrReplaceTempView(...)`, `spark.sql("SELECT * ...")`) **is not** permitted for these exercises. You must use the Spark DataFrame API. You may, however, use the SQL functions defined in [`pyspark.sql.functions`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html)

In [None]:
access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")

named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('http_request_line'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))

In [None]:
# DataFrame implementation
# (1) Most popular URL paths (top 15)
from pyspark.sql.functions import regexp_replace

named_df.select(regexp_replace(regexp_replace(named_df["http_request_line"], "^GET ", ""), " HTTP/\\d\\.\\d$", "")
  .alias("http_request")) \
  .groupby("http_request") \
  .count() \
  .orderBy("count", ascending=False) \
  .limit(15) \
  .show()

+--------------------+-----+
|        http_request|count|
+--------------------+-----+
|/twiki/bin/view/M...|   40|
|/twiki/pub/TWiki/...|   32|
|                   /|   31|
|        /favicon.ico|   28|
|         /robots.txt|   27|
|         /razor.html|   23|
|/twiki/bin/view/M...|   18|
|/twiki/bin/view/M...|   17|
|/cgi-bin/mailgrap...|   16|
|/cgi-bin/mailgrap...|   16|
|/cgi-bin/mailgrap...|   16|
|/cgi-bin/mailgrap...|   16|
|/cgi-bin/mailgrap...|   16|
|/cgi-bin/mailgrap...|   16|
|/cgi-bin/mailgrap...|   16|
+--------------------+-----+



In [None]:
# DataFrame implementation
# (2) Request count for each HTTP response code, sorted by response code
named_df.select("status") \
  .groupby("status") \
  .count() \
  .orderBy("status") \
  .show()

+------+-----+
|status|count|
+------+-----+
|   200| 1272|
|   302|    6|
|   401|  123|
|   404|    5|
+------+-----+



In [None]:
# DataFrame implementation
# (3) Request count for each calendar month and year, sorted chronologically
from pyspark.sql.functions import year, month, to_date

# Assuming "timestamp" is a string column
result_df = named_df.withColumn("date", to_date(named_df["timestamp"].substr(2, 20), "dd/MMM/yyyy:HH:mm:ss")) \
                    .withColumn("year", year("date")) \
                    .withColumn("month", month("date")) \
                    .groupBy("year", "month") \
                    .count() \
                    .orderBy("year", "month")

result_df.show()

+----+-----+-----+
|year|month|count|
+----+-----+-----+
|2004|    3| 1406|
+----+-----+-----+



In [None]:
# DataFrame implementation
# (4) Total bytes sent to the client with a specified hostname or IPv4 address (you may hard code an address)
from pyspark.sql.functions import col

named_df.select("host", "content_size") \
                        .where(col("host") == "145.253.208.9") \
                        .groupby("host") \
                        .sum("content_size") \
                        .show()

+-------------+-----------------+
|         host|sum(content_size)|
+-------------+-----------------+
|145.253.208.9|            26098|
+-------------+-----------------+



In [None]:
# DataFrame implementation
# (5) Based on a given URL (hard coded), compute a request count for each client (hostname or IPv4) who accessed that URL, sorted by request count, highest to lowest
named_df.select("host") \
                           .where(col("http_request_line").contains("/robots.txt")) \
                           .groupby("host") \
                           .count() \
                           .orderBy(col("count").desc()) \
                           .show()

+--------------------+-----+
|                host|count|
+--------------------+-----+
|lj1024.inktomisea...|    8|
|lj1048.inktomisea...|    5|
|lj1036.inktomisea...|    5|
|mmscrm07-2.sac.ov...|    3|
|        64.242.88.10|    2|
|cr020r01-3.sac.ov...|    1|
|crawl24-public.al...|    1|
|lj1052.inktomisea...|    1|
|lj1007.inktomisea...|    1|
+--------------------+-----+

