# RDD / DataFrame Joins - Lab 5

Reference/API Links


*   [Apache Spark Quick Start](https://spark.apache.org/docs/3.3.0/quick-start.html)
*   [PySpark v3.3.0 API](https://spark.apache.org/docs/3.3.0/api/python/reference/index.html)
*    [RDD Programming Guide](https://spark.apache.org/docs/3.3.0/rdd-programming-guide.html)
*    [Spark SQL Programming Guide](https://spark.apache.org/docs/3.3.0/sql-programming-guide.html)









In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u372-ga~us1-0ubuntu1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 8 not upgraded.


# Imports




In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import types as sparktypes
from pyspark.sql.functions import col

sc = SparkContext()
spark = SparkSession(sc)

In [3]:
# download sample access log / hostnames_country.csv for use in code below
!rm -f apache.access.log
!wget -q https://raw.githubusercontent.com/databricks/reference-apps/master/logs_analyzer/data/apache.access.log

!rm -f hostname_country.csv
!wget -q https://cpslo-my.sharepoint.com/:x:/g/personal/amigler_calpoly_edu/EcJv6-ZtZR5PpTEcKuDcYmsBq38YIwrNL6d6JtVztjrkhA?rtime=gKcu8Dc82kg\&download=1 -O hostname_country.csv

# Apache HTTP Log - Resilient Distributed Dataset (RDD)

A SparkContext instance can be used to create RDDs from various data/files/resources (text files, CSV, Hadoop data files, etc.)

In [4]:
# find the top 10 clients, map/reduce style using RDD transformations
access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[0], 1 ))  # field 0 = client address
                  .reduceByKey(lambda x, y: x + y)
                  .sortBy(lambda t: -t[1]))

print ("Total count of client hostnames:")
print(access_log_rdd.count())

print ("Top 10 client hostnames:")
print(access_log_rdd.take(10))


Total count of client hostnames:
169
Top 10 client hostnames:
[('64.242.88.10', 452), ('10.0.0.153', 188), ('cr020r01-3.sac.overture.com', 44), ('h24-71-236-129.ca.shawcable.net', 36), ('h24-70-69-74.ca.shawcable.net', 32), ('market-mail.panduit.com', 29), ('ts04-ip92.hevanet.com', 28), ('ip68-228-43-49.tc.ph.cox.net', 22), ('proxy0.haifa.ac.il', 19), ('207.195.59.160', 15)]


# Apache HTTP Log - DataFrame

A DataFrame is equivalent to a relational table in Spark SQL, and can be created from on a variety of input formats (CSV, JSON, relational database, etc.) using the SparkSession.

In [5]:
access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")

named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))



named_df.show(truncate=False)
named_df.printSchema()

+------------+---------------------+-------------------------------------------------------------------------------------------------+------+------------+
|host        |timestamp            |path                                                                                             |status|content_size|
+------------+---------------------+-------------------------------------------------------------------------------------------------+------+------------+
|64.242.88.10|[07/Mar/2004:16:05:49|GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1   |401   |12846       |
|64.242.88.10|[07/Mar/2004:16:06:51|GET /twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2 HTTP/1.1                            |200   |4523        |
|64.242.88.10|[07/Mar/2004:16:10:02|GET /mailman/listinfo/hsdivision HTTP/1.1                                                        |200   |6291        |
|64.242.88.10|[07/Mar/2004:16:11:58|GET /twiki/bin/view/TWiki/WikiSynt

# Part 1: Reporting Tasks from Prior Lab

1.   Given an Apache HTTP access log, along with a CSV file that includes country information for hostnames (sample file: [hostname_country.csv](https://cpslo-my.sharepoint.com/:x:/g/personal/amigler_calpoly_edu/EcJv6-ZtZR5PpTEcKuDcYmsBq38YIwrNL6d6JtVztjrkhA?e=xTmRdh)), produce a report that shows the total request count for each country, sorted by request count (highest to lowest)
2.   Using the same two input files (access.log and [hostname_country.csv](https://cpslo-my.sharepoint.com/:x:/g/personal/amigler_calpoly_edu/EcJv6-ZtZR5PpTEcKuDcYmsBq38YIwrNL6d6JtVztjrkhA?e=xTmRdh)), produce a report that lists, for each country, the count of each URL visited. Sort by country (alphabetically, A-Z) then count (from highest to lowest), For example:

```
  Argentina  /home   115
  Argentina /another/page.html  105
  ...
  United States  /robots.txt   185
  United States /another/page.html  120
  Uruguay  /home  310
  Uruguay  /another/page.html  120
```







# (A) RDD Implementations

Perform reporting tasks 1 and 2 using RDD transformations

[RDD APIs PySpark v3.3.0](https://spark.apache.org/docs/3.3.0/api/python/reference/pyspark.html#rdd-apis)

In [6]:
# RDD implementation
# (1) Given an Apache HTTP access log, along with a CSV file that includes country information for
# hostnames (sample file: [hostname_country.csv](https://cpslo-my.sharepoint.com/:x:/g/personal/amigler_calpoly_edu/EcJv6-ZtZR5PpTEcKuDcYmsBq38YIwrNL6d6JtVztjrkhA?e=xTmRdh)),
# produce a report that shows the total request count for each country, sorted by request count (highest to lowest)

access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[0], 1 ))
                  .reduceByKey(lambda x, y: x + y)
                  .sortBy(lambda t: -t[1])
                  )

hostnames_rdd = (sc.textFile("hostname_country.csv")
                  .map(lambda line: ( line.split(",")[0], line.split(",")[1] )))
joined_rdd = access_log_rdd.join(hostnames_rdd)
counts_rdd = (joined_rdd
                  .map(lambda x: ( x[1][1], x[1][0]))
                  .reduceByKey(lambda x, y: x + y)
                  .sortBy(lambda t: -t[1]))

for country, count in counts_rdd.collect():
  print(f"{country}\t{count}")



Unknown Location	812
Intranet	198
Canada	127
United States	87
Robot	80
Israel	19
Australia	17
France	16
Wherever You Want to Ship	13
Germany	13
United Kingdom	6
Denmark	4
Korea	4
Spain	4
Netherlands	2
Finland	1
Mexico	1
Los Alamos	1
Taiwan	1


In [7]:
# RDD implementation
# (2) Using the same two input files (access.log and hostname_country.csv), produce a report that lists,
# for each country, the count of each URL visited. Sort by country (alphabetically, A-Z) then count (from highest to lowest)
access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[0], line.split(" ")[6] ))
                  .map(lambda x: ( (x[0], x[1]), 1 ))
                  .reduceByKey(lambda x, y: x + y)
                  .map(lambda x: ( x[0][0], (x[0][1], x[1])))
                  )
hostnames_rdd = (sc.textFile("hostname_country.csv")
                  .map(lambda line: ( line.split(",")[0], line.split(",")[1] )))
joined_rdd = access_log_rdd.join(hostnames_rdd)
counts_rdd = (joined_rdd
                  .map(lambda x: ( x[1][1], x[1][0]))
                  )
print(counts_rdd.collect())
sorted_counts_rdd = counts_rdd.sortBy(lambda x: (x[0], -x[1][1]))

for country, (url, count) in sorted_counts_rdd.collect():
  print(f"{country}\t{url}\t{count}")



Australia	/mailman/listinfo	2
Australia	/twiki/bin/view/Main/SpamAssassinAndPostFix	2
Australia	/twiki/bin/view/Main/SpamAssassinDeleting	2
Australia	/mailman/admin	1
Australia	/icons/gnu-head-tiny.jpg	1
Australia	/mailman	1
Australia	/icons/mailman.jpg	1
Australia	/icons/PythonPowered.png	1
Australia	/mailman/admin/webct	1
Australia	/mailman/listinfo/administration	1
Australia	/mailman/listinfo/cnc_notice	1
Australia	/twiki/bin/view/Main/SpamAssassinTaggingOnly	1
Australia	/twiki/pub/TWiki/TWikiLogos/twikiRobot46x50.gif	1
Australia	/twiki/bin/view/Main/WebHome	1
Canada	/mailman/admin/ppwc	9
Canada	/mailman/listinfo/ppwc	7
Canada	/mailman/listinfo	4
Canada	/mailman/admindb/ppwc	3
Canada	/mailman/admin/ppwc/gateway	2
Canada	/cgi-bin/mailgraph2.cgi	2
Canada	/cgi-bin/mailgraph.cgi/mailgraph_0.png	2
Canada	/cgi-bin/mailgraph.cgi/mailgraph_1.png	2
Canada	/cgi-bin/mailgraph.cgi/mailgraph_1_err.png	2
Canada	/cgi-bin/mailgraph.cgi/mailgraph_0_err.png	2
Canada	/cgi-bin/mailgraph.cgi/mailgraph_2

# (B) DataFrame Implementations

Perform reporting tasks 1 and 2 using Spark's DataFrame API. Note that you should *not* use the Spark SQL abstraction.

In [8]:
# DataFrame implementation
# (1) Given an Apache HTTP access log, along with a CSV file that includes country information for
# hostnames (sample file: [hostname_country.csv](https://cpslo-my.sharepoint.com/:x:/g/personal/amigler_calpoly_edu/EcJv6-ZtZR5PpTEcKuDcYmsBq38YIwrNL6d6JtVztjrkhA?e=xTmRdh)),
# produce a report that shows the total request count for each country, sorted by request count (highest to lowest)

access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")

access_log_df = access_log_df.select(col('_c0').alias('hostname'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))
hostname_country_df = spark.read.csv("hostname_country.csv")

hostname_country_df = hostname_country_df.select(col('_c0').alias('hostname'),
                                                 col('_c1').alias('country'))

joined_df = access_log_df.join(hostname_country_df,
                               access_log_df['hostname'] == hostname_country_df['hostname'],
                               how='left_outer')

count_df = joined_df.groupBy('country').count().orderBy('count', ascending=False)

count_df.show(truncate=False)

+-------------------------+-----+
|country                  |count|
+-------------------------+-----+
|Unknown Location         |812  |
|Intranet                 |198  |
|Canada                   |127  |
|United States            |87   |
|Robot                    |80   |
|Israel                   |19   |
|Australia                |17   |
|France                   |16   |
|Germany                  |13   |
|Wherever You Want to Ship|13   |
|United Kingdom           |6    |
|Spain                    |4    |
|Denmark                  |4    |
|Korea                    |4    |
|Netherlands              |2    |
|Taiwan                   |1    |
|Finland                  |1    |
|Mexico                   |1    |
|Los Alamos               |1    |
+-------------------------+-----+



In [18]:
# DataFrame implementation
# (2) Using the same two input files (access.log and hostname_country.csv), produce a report that lists,
# for each country, the count of each URL visited. Sort by country (alphabetically, A-Z) then count (from highest to lowest)

access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")

access_log_df = access_log_df.select(col('_c0').alias('hostname'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))

hostname_country_df = spark.read.csv("hostname_country.csv")

hostname_country_df = hostname_country_df.select(col('_c0').alias('hostname'),
                                                 col('_c1').alias('country'))

joined_df = access_log_df.join(hostname_country_df, access_log_df['hostname'] == hostname_country_df['hostname'], 'left_outer')

count_df = joined_df.groupBy('country', 'path').count().orderBy(['country', 'count'], ascending=[True, False])

count_df.show(count_df.count(), truncate=False)

+-------------------------+------------------------------------------------------------------------------------------------------------------------+-----+
|country                  |path                                                                                                                    |count|
+-------------------------+------------------------------------------------------------------------------------------------------------------------+-----+
|Australia                |GET /mailman/listinfo HTTP/1.1                                                                                          |2    |
|Australia                |GET /twiki/bin/view/Main/SpamAssassinAndPostFix HTTP/1.1                                                                |2    |
|Australia                |GET /twiki/bin/view/Main/SpamAssassinDeleting HTTP/1.1                                                                  |2    |
|Australia                |GET /icons/gnu-head-tiny.jpg HTTP/1.1      

# Part 2: Reporting Tasks Based on Student / Course Data



1.   Find the names of the students that have taken at least one of the courses with the greatest difficulty.
2.   Find the average course difficulty of the classes that are taken by each student. Print average course difficulty of 0 if a student hasn't taken any classes (hint, use left outer or right outer join).



In [10]:
# Data for exercises below

# course information: course code, difficulty (1-3)
courses = [("CSC365", 1),
           ("CSC369", 1),
           ("CSC430", 3),
           ("CSC469", 2)]
courses_df = spark.createDataFrame(courses, ["Course", "Difficulty"])
courses_rdd = courses_df.rdd.map(tuple)

# student information: unique student ID, name, email
students = [(1,"Nikita","Trevett","ntrevett0@abc.net.au"),
            (2,"Jacquenette","Tolson","jtolson1@ucoz.com"),
            (3,"Helsa","St Ledger","hstledger2@elpais.com"),
            (4,"Eli","Golland","egolland3@wordpress.com"),
            (5,"Amitie","Mytton","amytton4@mit.edu"),
            (6,"Adan","Holtum","aholtum1i@amazon.co.uk")]
students_df = spark.createDataFrame(students, ["ID", "FirstName", "LastName", "Email"])
students_rdd = students_df.rdd.map(tuple)


# student grades: student ID, course code, grade earned
student_grades = [(1, "CSC365", "A"),
                  (1, "CSC369", "A-"),
                  (1, "CSC469", "B"),
                  (2, "CSC369", "B"),
                  (3, "CSC365", "A"),
                  (3, "CSC430", "B")]
student_grades_df = spark.createDataFrame(student_grades, ["StudentId", "Course", "Grade"])
student_grades_rdd = student_grades_df.rdd.map(tuple)

courses_df.show(truncate=False)
students_df.show(truncate=False)
student_grades_df.show(truncate=False)

+------+----------+
|Course|Difficulty|
+------+----------+
|CSC365|1         |
|CSC369|1         |
|CSC430|3         |
|CSC469|2         |
+------+----------+

+---+-----------+---------+-----------------------+
|ID |FirstName  |LastName |Email                  |
+---+-----------+---------+-----------------------+
|1  |Nikita     |Trevett  |ntrevett0@abc.net.au   |
|2  |Jacquenette|Tolson   |jtolson1@ucoz.com      |
|3  |Helsa      |St Ledger|hstledger2@elpais.com  |
|4  |Eli        |Golland  |egolland3@wordpress.com|
|5  |Amitie     |Mytton   |amytton4@mit.edu       |
|6  |Adan       |Holtum   |aholtum1i@amazon.co.uk |
+---+-----------+---------+-----------------------+

+---------+------+-----+
|StudentId|Course|Grade|
+---------+------+-----+
|1        |CSC365|A    |
|1        |CSC369|A-   |
|1        |CSC469|B    |
|2        |CSC369|B    |
|3        |CSC365|A    |
|3        |CSC430|B    |
+---------+------+-----+



In [11]:
s = students_df.withColumnRenamed("ID", "StudentId")
s.join(student_grades_df, "StudentId").show()

+---------+-----------+---------+--------------------+------+-----+
|StudentId|  FirstName| LastName|               Email|Course|Grade|
+---------+-----------+---------+--------------------+------+-----+
|        1|     Nikita|  Trevett|ntrevett0@abc.net.au|CSC365|    A|
|        1|     Nikita|  Trevett|ntrevett0@abc.net.au|CSC369|   A-|
|        1|     Nikita|  Trevett|ntrevett0@abc.net.au|CSC469|    B|
|        2|Jacquenette|   Tolson|   jtolson1@ucoz.com|CSC369|    B|
|        3|      Helsa|St Ledger|hstledger2@elpais...|CSC365|    A|
|        3|      Helsa|St Ledger|hstledger2@elpais...|CSC430|    B|
+---------+-----------+---------+--------------------+------+-----+



# (A) RDD Implementations

In [12]:
# RDD implementation
# (1) Find the names of the students that have taken at least one of the courses with the greatest difficulty.
max_difficulty = courses_rdd.map(lambda x: x[1]).max()
filtered_courses_rdd = (courses_rdd
                        .filter(lambda x: x[1] == max_difficulty)
                        .map(lambda x: (x[0], None)))
students_and_grades_rdd = student_grades_rdd.join(students_rdd)
filtered_students_rdd = (students_and_grades_rdd
                         .map(lambda x: ( x[1][0], ( x[1][1] )))
                         .join(filtered_courses_rdd)
                         .map(lambda x: x[1][0])
                         .distinct())
for student in filtered_students_rdd.collect():
  print(f"{student}")

Helsa


In [13]:
# RDD implementation
# (2) Find the average course difficulty of the classes that are taken by each student. Print average course difficulty of 0
#     if a student hasn't taken any classes (hint, use left outer or right outer join).
students_and_grades_rdd = student_grades_rdd.rightOuterJoin(students_rdd)
students_and_difficulties_rdd = (students_and_grades_rdd
                                  .map(lambda x: (x[1][0], x[1][1]))
                                  .leftOuterJoin(courses_rdd)
                                  .map(lambda x: (x[1][0], (x[1][1], 1)) if x[0] else (x[1][0], (0, 0)))
                                  .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
                                  .map(lambda x: (x[0], round(x[1][0]/x[1][1], 1) if x[1][0] and x[1][1] else 0.0))
                                  .sortBy(lambda x: x[1])
                                  )
for student, avg in students_and_difficulties_rdd.collect():
  print(f"{student}\t{avg}")


Eli	0.0
Adan	0.0
Amitie	0.0
Jacquenette	1.0
Nikita	1.3
Helsa	2.0


# (B) DataFrame Implementations

Spark SQL is not permitted for these exercises.

In [14]:
# DataFrame implementation
# (1) Find the names of the students that have taken at least one of the courses with the greatest difficulty.
max_difficulty = courses_df.agg({"Difficulty": "max"}).collect()[0][0]
filtered_courses_df = courses_df.filter(courses_df.Difficulty == max_difficulty)
joined_df = (student_grades_df
             .join(students_df.withColumnRenamed("ID", "StudentID"), "StudentID")
             .join(filtered_courses_df, "Course")
             .select("FirstName")
             .distinct())
joined_df.show()

+---------+
|FirstName|
+---------+
|    Helsa|
+---------+



In [15]:
# DataFrame implementation
# (2) Find the average course difficulty of the classes that are taken by each student. Print average course difficulty of 0
#     if a student hasn't taken any classes (hint, use left outer or right outer join).
gradebook_df = student_grades_df.join(students_df.withColumnRenamed("ID", "StudentID"), "StudentID", 'right_outer')
gradebook_with_diff_df = gradebook_df.join(courses_df, 'Course', 'leftouter')
result_df = (gradebook_with_diff_df
                                  .groupBy('FirstName')
                                  .avg('Difficulty')
                                  .withColumnRenamed('avg(Difficulty)', 'Difficulty')
                                  .fillna(0))
result_df = result_df.withColumn('Difficulty', (result_df['Difficulty'] * 10).cast('integer') / 10)
result_df.show()

+-----------+----------+
|  FirstName|Difficulty|
+-----------+----------+
|     Amitie|       0.0|
|       Adan|       0.0|
|        Eli|       0.0|
|Jacquenette|       1.0|
|     Nikita|       1.3|
|      Helsa|       2.0|
+-----------+----------+

