# Introduction to Apache Spark / PySpark

These examples are based on [Spark 3.3.x](https://spark.apache.org/docs/3.3.0/)  

Reference/API Links


*   [Apache Spark Quick Start](https://spark.apache.org/docs/3.3.0/quick-start.html)
*   [PySpark v3.3.0 API](https://spark.apache.org/docs/3.3.0/api/python/reference/index.html)
*    [RDD Programming Guide](https://spark.apache.org/docs/3.3.0/rdd-programming-guide.html)
*    [Spark SQL Programming Guide](https://spark.apache.org/docs/3.3.0/sql-programming-guide.html)









In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u372-ga~us1-0ubuntu1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 8 not upgraded.



# Imports / Starter Example




In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import types as sparktypes
from pyspark.sql.functions import *


sc = SparkContext()
spark = SparkSession(sc)

In [3]:
# create a Resilient Distributed Dataset (RDD) from a sequence of integers perform filter() and reduce() operations

# Function to be used in the filter() transformation
def filterSmall(x):
   if x < 20:
      return False
   else:
      return True

# Function to be used in the map() transformation
def mapSquare(x):
    return x*x

# Function to be used in the reduce() action
def reduceSum(x,y):
    return x+y

rdd = sc.parallelize(range(100))         ## create an RDD of 100 numbers from 0 to 99

#print(rdd.filter(filterSmall).collect())

out1 = rdd.filter(filterSmall).map(mapSquare)  ## perform filter and map transformations
out2 = out1.reduce(reduceSum)                  ## perform reduce operation

print(out1.collect())           ## print first output (all numbers less than 20 squared)
print(out2)                 ## print second output (sum of all numbers from first output)


[400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]
325880


In [4]:
# download a sample access log for use in demos below
!rm -f apache.access.log
!wget -q https://raw.githubusercontent.com/nathanip101/csc369-lab2/main/input_access_log/apache.access.log

# Apache HTTP Log Example - Resilient Distributed Dataset (RDD)

A SparkContext instance can be used to create RDDs from various data/files/resources (text files, CSV, Hadoop data files, etc.)

In [5]:
# find the top 10 clients, map/reduce style using RDD transformations
access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[0], 1 ))  # field 0 = client address
                  .reduceByKey(lambda count1, count2: count1 + count2)
                  .sortBy(lambda t: -t[1]))

print ("Total count of client hostnames:")
print(access_log_rdd.count())

print ("Top 10 client hostnames:")
print(access_log_rdd.take(10))


Total count of client hostnames:
169
Top 10 client hostnames:
[('64.242.88.10', 452), ('10.0.0.153', 188), ('cr020r01-3.sac.overture.com', 44), ('h24-71-236-129.ca.shawcable.net', 36), ('h24-70-69-74.ca.shawcable.net', 32), ('market-mail.panduit.com', 29), ('ts04-ip92.hevanet.com', 28), ('ip68-228-43-49.tc.ph.cox.net', 22), ('proxy0.haifa.ac.il', 19), ('207.195.59.160', 15)]


# Apache HTTP Log Example - DataFrame

A DataFrame is equivalent to a relational table in Spark SQL, and can be created from on a variety of input formats (CSV, JSON, relational database, etc.) using the SparkSession.

In [6]:
access_log_df = spark.read.text("apache.access.log")

access_log_df.show(truncate=False)
access_log_df.printSchema()

+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                      |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|64.242.88.10 - - [02/Dec/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846   |
|64.242.88.10 - - [03/Sep/2004:16:06:51 -0800] "GET /twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2 HTTP/1.1" 200 4523                             |
|64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291                                                         |
|64.242.88.10 - - [07/Mar/2004:16:11:58 -0800] "GET 

In [7]:
access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")

access_log_df.show(truncate=False)
access_log_df.printSchema()

+------------+---+---+---------------------+------+-------------------------------------------------------------------------------------------------+---+-----+
|_c0         |_c1|_c2|_c3                  |_c4   |_c5                                                                                              |_c6|_c7  |
+------------+---+---+---------------------+------+-------------------------------------------------------------------------------------------------+---+-----+
|64.242.88.10|-  |-  |[02/Dec/2004:16:05:49|-0800]|GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1   |401|12846|
|64.242.88.10|-  |-  |[03/Sep/2004:16:06:51|-0800]|GET /twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2 HTTP/1.1                            |200|4523 |
|64.242.88.10|-  |-  |[07/Mar/2004:16:10:02|-0800]|GET /mailman/listinfo/hsdivision HTTP/1.1                                                        |200|6291 |
|64.242.88.10|-  |-  |[07/Mar/2004:16:11

In [8]:
access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")

named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))



named_df.show(truncate=False)
named_df.printSchema()

+------------+---------------------+-------------------------------------------------------------------------------------------------+------+------------+
|host        |timestamp            |path                                                                                             |status|content_size|
+------------+---------------------+-------------------------------------------------------------------------------------------------+------+------------+
|64.242.88.10|[02/Dec/2004:16:05:49|GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1   |401   |12846       |
|64.242.88.10|[03/Sep/2004:16:06:51|GET /twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2 HTTP/1.1                            |200   |4523        |
|64.242.88.10|[07/Mar/2004:16:10:02|GET /mailman/listinfo/hsdivision HTTP/1.1                                                        |200   |6291        |
|64.242.88.10|[07/Mar/2004:16:11:58|GET /twiki/bin/view/TWiki/WikiSynt

In [9]:
named_df.createOrReplaceTempView("log")
sql_df = spark.sql("SELECT * FROM log WHERE status = 404")

sql_df.show(truncate=False)
sql_df.printSchema()

+-----------------------------+---------------------+--------------------------------------------------------------------------+------+------------+
|host                         |timestamp            |path                                                                      |status|content_size|
+-----------------------------+---------------------+--------------------------------------------------------------------------+------+------------+
|h24-70-56-49.ca.shawcable.net|[07/Mar/2004:21:16:17|GET /twiki/view/Main/WebHome HTTP/1.1                                     |404   |300         |
|61.9.4.61                    |[08/Mar/2004:07:27:36|GET /_vti_bin/owssvr.dll?UL=1&ACT=4&BUILD=2614&STRMVER=4&CAPREQ=0 HTTP/1.0|404   |284         |
|61.9.4.61                    |[08/Mar/2004:07:27:37|GET /MSOffice/cltreq.asp?UL=1&ACT=4&BUILD=2614&STRMVER=4&CAPREQ=0 HTTP/1.0|404   |284         |
|1513.cps.virtua.com.br       |[11/Mar/2004:02:27:39|GET /pipermail/cipg/2003-november.txt HTTP/1.1       

## What About Datasets?

Added in Spark 1.6, a **Dataset** is a distributed collection of data that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (`map`, `flatMap`, `filter`, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

A **DataFrame** is a Dataset organized into named columns. ([source](https://spark.apache.org/docs/3.1.1/sql-programming-guide.html))

# Reporting Tasks (from Lab 2)


1. Most popular URL paths (top 15)
2. Request count for each HTTP response code, sorted by response code
3. Request count for each calendar month and year, sorted chronologically
4. Total bytes sent to the client with a specified hostname or IPv4 address (you may hard code an address)
5. Based on a given URL (hard coded), compute a request count for each client (hostname or IPv4) who accessed that URL, sorted by request count, highest to lowest


# (A) RDD Implementations

Perform reporting tasks 1-5 using RDD transformations

[RDD APIs PySpark v3.3.0](https://spark.apache.org/docs/3.3.0/api/python/reference/pyspark.html#rdd-apis)

In [10]:
# RDD implementation
# (1) Most popular URL paths (top 15)

access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[6], 1 ))
                  .reduceByKey(lambda x, y: x + y)
                  .sortBy(lambda x: x[1], ascending=False))
for url, access_count in access_log_rdd.take(15):
  print(f"{url}\t{access_count}")

/twiki/bin/view/Main/WebHome	40
/twiki/pub/TWiki/TWikiLogos/twikiRobot46x50.gif	32
/	31
/favicon.ico	28
/robots.txt	27
/razor.html	23
/twiki/bin/view/Main/SpamAssassinTaggingOnly	18
/twiki/bin/view/Main/SpamAssassinAndPostFix	17
/cgi-bin/mailgraph2.cgi	16
/cgi-bin/mailgraph.cgi/mailgraph_0.png	16
/cgi-bin/mailgraph.cgi/mailgraph_1_err.png	16
/cgi-bin/mailgraph.cgi/mailgraph_1.png	16
/cgi-bin/mailgraph.cgi/mailgraph_0_err.png	16
/cgi-bin/mailgraph.cgi/mailgraph_2.png	16
/cgi-bin/mailgraph.cgi/mailgraph_2_err.png	16


In [11]:
# RDD implementation
# (2) Request count for each HTTP response code, sorted by response code

access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[8], 1 ))
                  .reduceByKey(lambda x, y: x + y)
                  .sortByKey())
for code, code_count in access_log_rdd.collect():
  print(f"{code}\t{code_count}")

200	1272
302	6
401	123
404	5


In [12]:
# RDD implementation
# (3) Request count for each calendar month and year, sorted chronologically
import calendar

month_dict = {i: e for e, i in enumerate(calendar.month_abbr[1:], 1)}
access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: (line.split(" ")[3][4:12], 1))
                  .reduceByKey(lambda x, y: x + y)
                  .sortBy(lambda x: (int(x[0][4:8]), month_dict[x[0][0:3]])))


# Display the results
for month_year, request_count in access_log_rdd.collect():
    print(f"{month_year}\t{request_count}")


Mar/2004	1404
Sep/2004	1
Dec/2004	1


In [13]:
# RDD implementation
# (4) Total bytes sent to the client with a specified hostname or IPv4 address (you may hard code an address)
address = "10.0.0.153"

access_log_rdd = (sc.textFile("apache.access.log")
                  .filter(lambda line: line.split(" ")[0] == address)
                  .map(lambda line: ( line.split(" ")[9] ))
                  .map(int)
                  .reduce(lambda x, y: x + y))
print(f"{address}\t{access_log_rdd}")

10.0.0.153	1200145


In [14]:
# RDD implementation
# (5) Based on a given URL (hard coded), compute a request count for each client (hostname or IPv4) who accessed that URL, sorted by request count, highest to lowest
url = "/robots.txt"

access_log_rdd = (sc.textFile("apache.access.log")
                  .filter(lambda line: line.split(" ")[6] == url)
                  .map(lambda line: ( line.split(" ")[0], 1 ))
                  .reduceByKey(lambda x, y: x + y)
                  .sortBy(lambda x: x[1], ascending=False))

for client, request_count in access_log_rdd.collect():
  print(f"{client}\t{request_count}")

lj1024.inktomisearch.com	8
lj1048.inktomisearch.com	5
lj1036.inktomisearch.com	5
mmscrm07-2.sac.overture.com	3
64.242.88.10	2
lj1052.inktomisearch.com	1
cr020r01-3.sac.overture.com	1
crawl24-public.alexa.com	1
lj1007.inktomisearch.com	1


# (B) DataFrame Implementations

Perform reporting tasks 1-5 using Spark's DataFrame API

[DataFrame API PySpark v.3.1.1](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame)

Please Note: Spark SQL (`createOrReplaceTempView(...)`, `spark.sql("SELECT * ...")`) **is not** permitted for these exercises. You must use the Spark DataFrame API. You may, however, use the SQL functions defined in [`pyspark.sql.functions`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html)

In [15]:
# DataFrame implementation
# (1) Most popular URL paths (top 15)

access_log_df = spark.read.text("apache.access.log")
url_df = access_log_df.select(split(col("value"), " ").getItem(6).alias("url"))
count_df = url_df.groupBy("url").count().orderBy(desc("count"))
top_15 = count_df.limit(15)
top_15.show(truncate=False)

+-----------------------------------------------+-----+
|url                                            |count|
+-----------------------------------------------+-----+
|/twiki/bin/view/Main/WebHome                   |40   |
|/twiki/pub/TWiki/TWikiLogos/twikiRobot46x50.gif|32   |
|/                                              |31   |
|/favicon.ico                                   |28   |
|/robots.txt                                    |27   |
|/razor.html                                    |23   |
|/twiki/bin/view/Main/SpamAssassinTaggingOnly   |18   |
|/twiki/bin/view/Main/SpamAssassinAndPostFix    |17   |
|/cgi-bin/mailgraph.cgi/mailgraph_3.png         |16   |
|/cgi-bin/mailgraph.cgi/mailgraph_0_err.png     |16   |
|/cgi-bin/mailgraph.cgi/mailgraph_2.png         |16   |
|/cgi-bin/mailgraph2.cgi                        |16   |
|/cgi-bin/mailgraph.cgi/mailgraph_1_err.png     |16   |
|/cgi-bin/mailgraph.cgi/mailgraph_0.png         |16   |
|/cgi-bin/mailgraph.cgi/mailgraph_1.png         

In [16]:
# DataFrame implementation
# (2) Request count for each HTTP response code, sorted by response code

access_log_df = spark.read.text("apache.access.log")
response_code_df = access_log_df.select(split(col("value"), " ").getItem(8).alias("response_code"))
count_df = response_code_df.groupBy("response_code").count().orderBy(asc("response_code"))
count_df.show(truncate=False)

+-------------+-----+
|response_code|count|
+-------------+-----+
|200          |1272 |
|302          |6    |
|401          |123  |
|404          |5    |
+-------------+-----+



In [17]:
# DataFrame implementation
# (3) Request count for each calendar month and year, sorted chronologically
import calendar

month_dict = {i: e for e, i in enumerate(calendar.month_abbr[1:], 1)}

access_log_df = spark.read.text("apache.access.log")
timestamp_df = access_log_df.select(split(col("value"), " ")[3][5:8].alias("timestamp"))
timestamp_df = timestamp_df.withColumn("month", substring("timestamp", 0, 3))
timestamp_df = timestamp_df.withColumn("year", substring("timestamp", 5, 4))
request_counts = timestamp_df.groupBy("year", "month").agg(count("*").alias("request_count"))
request_counts = request_counts.withColumn("month_num", when(col("month") == "Jan", 1)
                                                    .when(col("month") == "Feb", 2)
                                                    .when(col("month") == "Mar", 3)
                                                    .when(col("month") == "Apr", 4)
                                                    .when(col("month") == "May", 5)
                                                    .when(col("month") == "Jun", 6)
                                                    .when(col("month") == "Jul", 7)
                                                    .when(col("month") == "Aug", 8)
                                                    .when(col("month") == "Sep", 9)
                                                    .when(col("month") == "Oct", 10)
                                                    .when(col("month") == "Nov", 11)
                                                    .when(col("month") == "Dec", 12))
sorted_request_counts = request_counts.orderBy("year", "month_num")
sorted_request_counts = sorted_request_counts.drop("month_num")
sorted_request_counts.show()



+----+-----+-------------+
|year|month|request_count|
+----+-----+-------------+
|2004|  Mar|         1404|
|2004|  Sep|            1|
|2004|  Dec|            1|
+----+-----+-------------+



In [18]:
# DataFrame implementation
# (4) Total bytes sent to the client with a specified hostname or IPv4 address (you may hard code an address)

address = "10.0.0.153"

access_log_df = spark.read.text("apache.access.log")
filtered_df = access_log_df.filter(access_log_df.value.contains(address))
bytes_df = filtered_df.select(split(col("value"), " ").getItem(9).cast("integer").alias("bytes_sent"))
total_bytes = bytes_df.select(sum("bytes_sent")).first()[0]
print(f"{address}\t{total_bytes}")

10.0.0.153	1200145


In [19]:
# DataFrame implementation
# (5) Based on a given URL (hard coded), compute a request count for each client (hostname or IPv4) who accessed that URL, sorted by request count, highest to lowest

url = "/robots.txt"

access_log_df = spark.read.text("apache.access.log")
filtered_df = access_log_df.filter(access_log_df.value.contains(url))
client_df = filtered_df.select(split(col("value"), " ").getItem(0).alias("client"))
request_count_df = client_df.groupBy("client").agg(count("*").alias("request_count")).orderBy(desc("request_count"))
request_count_df.show(truncate=False)


+---------------------------+-------------+
|client                     |request_count|
+---------------------------+-------------+
|lj1024.inktomisearch.com   |8            |
|lj1048.inktomisearch.com   |5            |
|lj1036.inktomisearch.com   |5            |
|mmscrm07-2.sac.overture.com|3            |
|64.242.88.10               |2            |
|cr020r01-3.sac.overture.com|1            |
|crawl24-public.alexa.com   |1            |
|lj1052.inktomisearch.com   |1            |
|lj1007.inktomisearch.com   |1            |
+---------------------------+-------------+

