# RDD / DataFrame Joins - Lab 5

Reference/API Links


*   [Apache Spark Quick Start](https://spark.apache.org/docs/3.3.0/quick-start.html)
*   [PySpark v3.3.0 API](https://spark.apache.org/docs/3.3.0/api/python/reference/index.html)
*    [RDD Programming Guide](https://spark.apache.org/docs/3.3.0/rdd-programming-guide.html)
*    [Spark SQL Programming Guide](https://spark.apache.org/docs/3.3.0/sql-programming-guide.html)









In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
openjdk-8-jdk-headless is already the newest version (8u342-b07-0ubuntu1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 20 not upgraded.


# Imports 




In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import types as sparktypes
from pyspark.sql.functions import col, split, trim
import pyspark.sql.functions as pyspar

sc = SparkContext() 
spark = SparkSession(sc)

In [3]:
# download sample access log / hostnames_country.csv for use in code below
!rm -f apache.access.log
!wget -q https://raw.githubusercontent.com/databricks/reference-apps/master/logs_analyzer/data/apache.access.log

!rm -f hostname_country.csv
!wget -q https://cpslo-my.sharepoint.com/:x:/g/personal/amigler_calpoly_edu/EcJv6-ZtZR5PpTEcKuDcYmsBq38YIwrNL6d6JtVztjrkhA?rtime=gKcu8Dc82kg\&download=1 -O hostname_country.csv

# Apache HTTP Log - Resilient Distributed Dataset (RDD)

A SparkContext instance can be used to create RDDs from various data/files/resources (text files, CSV, Hadoop data files, etc.)

In [4]:
# find the top 10 clients, map/reduce style using RDD transformations
access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[0], 1 ))  # field 0 = client address
                  .reduceByKey(lambda x, y: x + y)
                  .sortBy(lambda t: -t[1])) 

print ("Total count of client hostnames:")
print(access_log_rdd.count())

print ("Top 10 client hostnames:")
print(access_log_rdd.take(10))


Total count of client hostnames:
169
Top 10 client hostnames:
[('64.242.88.10', 452), ('10.0.0.153', 188), ('cr020r01-3.sac.overture.com', 44), ('h24-71-236-129.ca.shawcable.net', 36), ('h24-70-69-74.ca.shawcable.net', 32), ('market-mail.panduit.com', 29), ('ts04-ip92.hevanet.com', 28), ('ip68-228-43-49.tc.ph.cox.net', 22), ('proxy0.haifa.ac.il', 19), ('207.195.59.160', 15)]


# Apache HTTP Log - DataFrame

A DataFrame is equivalent to a relational table in Spark SQL, and can be created from on a variety of input formats (CSV, JSON, relational database, etc.) using the SparkSession.

In [5]:
access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")

log_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))

log_df.show(truncate=False)
log_df.printSchema()

+------------+---------------------+-------------------------------------------------------------------------------------------------+------+------------+
|host        |timestamp            |path                                                                                             |status|content_size|
+------------+---------------------+-------------------------------------------------------------------------------------------------+------+------------+
|64.242.88.10|[07/Mar/2004:16:05:49|GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1   |401   |12846       |
|64.242.88.10|[07/Mar/2004:16:06:51|GET /twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2 HTTP/1.1                            |200   |4523        |
|64.242.88.10|[07/Mar/2004:16:10:02|GET /mailman/listinfo/hsdivision HTTP/1.1                                                        |200   |6291        |
|64.242.88.10|[07/Mar/2004:16:11:58|GET /twiki/bin/view/TWiki/WikiSynt

# Part 1: Reporting Tasks from Prior Lab

1.   Given an Apache HTTP access log, along with a CSV file that includes country information for hostnames (sample file: [hostname_country.csv](https://cpslo-my.sharepoint.com/:x:/g/personal/amigler_calpoly_edu/EcJv6-ZtZR5PpTEcKuDcYmsBq38YIwrNL6d6JtVztjrkhA?e=xTmRdh)), produce a report that shows the total request count for each country, sorted by request count (highest to lowest)
2.   Using the same two input files (access.log and [hostname_country.csv](https://cpslo-my.sharepoint.com/:x:/g/personal/amigler_calpoly_edu/EcJv6-ZtZR5PpTEcKuDcYmsBq38YIwrNL6d6JtVztjrkhA?e=xTmRdh)), produce a report that lists, for each country, the count of each URL visited. Sort by country (alphabetically, A-Z) then count (from highest to lowest), For example:

```
  Argentina  /home   115
  Argentina /another/page.html  105
  ...
  United States  /robots.txt   185
  United States /another/page.html  120
  Uruguay  /home  310
  Uruguay  /another/page.html  120
```







# (A) RDD Implementations

Perform reporting tasks 1 and 2 using RDD transformations

[RDD APIs PySpark v3.3.0](https://spark.apache.org/docs/3.3.0/api/python/reference/pyspark.html#rdd-apis)

In [6]:
# RDD implementation
# (1) Given an Apache HTTP access log, along with a CSV file that includes country information for 
# hostnames (sample file: [hostname_country.csv](https://cpslo-my.sharepoint.com/:x:/g/personal/amigler_calpoly_edu/EcJv6-ZtZR5PpTEcKuDcYmsBq38YIwrNL6d6JtVztjrkhA?e=xTmRdh)), 
# produce a report that shows the total request count for each country, sorted by request count (highest to lowest)

def logfile_to_hostname_count(line: str) -> tuple:
  hostname = line.split(' ')[0]
  return (hostname, 1)

def csvfile_to_hostname_country(line: str) -> tuple:
  hostname = line.split(',')[0]
  country = line.split(',')[1]
  return (hostname, country)

def join_rdd_a1(val: tuple) -> tuple:
  count = val[1][0]
  country = val[1][1]
  return (country, count)

access_log_rdd_a1 = (sc.textFile("apache.access.log")
                    .map(logfile_to_hostname_count))

hostname_country_a1 = (sc.textFile("hostname_country.csv")
                    .map(csvfile_to_hostname_country))

output_a1 = (access_log_rdd_a1.join(hostname_country_a1)
                    .map(join_rdd_a1)
                    .reduceByKey(lambda x, y: x + y)
                    .sortBy(lambda t: -t[1])
                    .collect() ) 

display(output_a1)

[('Unknown Location', 812),
 ('Intranet', 198),
 ('Canada', 127),
 ('United States', 87),
 ('Robot', 80),
 ('Israel', 19),
 ('Australia', 17),
 ('France', 16),
 ('Wherever You Want to Ship', 13),
 ('Germany', 13),
 ('United Kingdom', 6),
 ('Denmark', 4),
 ('Korea', 4),
 ('Spain', 4),
 ('Netherlands', 2),
 ('Finland', 1),
 ('Mexico', 1),
 ('Los Alamos', 1),
 ('Taiwan', 1)]

In [7]:
# RDD implementation
# (2) Using the same two input files (access.log and hostname_country.csv), produce a report that lists, 
# for each country, the count of each URL visited. Sort by country (alphabetically, A-Z) then count (from highest to lowest)

def logfile_to_hostname_url(line: str) -> tuple:
  hostname = line.split(' ')[0]
  url = line.split(' ')[6]
  return (hostname, url)

def csvfile_to_hostname_country(line: str) -> tuple:
  hostname = line.split(',')[0]
  country = line.split(',')[1]
  return (hostname, country)

def join_rdd_a2(val: tuple) -> tuple:
  country = val[1][1]
  url = val[1][0]
  return (country + ',' + url, 1)

def tuple_country_url_one(val: tuple) -> tuple:
  country = val[0].split(',')[0]
  url = val[0].split(',')[1]
  one = val[1]
  return (country, url, one)



access_log_rdd_a2 = (sc.textFile("apache.access.log")
                    .map(logfile_to_hostname_url))

hostname_country_a2 = (sc.textFile("hostname_country.csv")
                    .map(csvfile_to_hostname_country))

output_a2 = (access_log_rdd_a2.join(hostname_country_a2)
                    .map(join_rdd_a2)
                    .reduceByKey(lambda country_url, one: country_url + one)
                    .map(tuple_country_url_one)
                    .sortBy(lambda t: (t[0], -t[2])))

display(output_a2.collect())

[('Australia', '/twiki/bin/view/Main/SpamAssassinDeleting', 2),
 ('Australia', '/mailman/listinfo', 2),
 ('Australia', '/twiki/bin/view/Main/SpamAssassinAndPostFix', 2),
 ('Australia', '/mailman/admin/webct', 1),
 ('Australia', '/mailman', 1),
 ('Australia', '/twiki/pub/TWiki/TWikiLogos/twikiRobot46x50.gif', 1),
 ('Australia', '/twiki/bin/view/Main/WebHome', 1),
 ('Australia', '/icons/mailman.jpg', 1),
 ('Australia', '/icons/PythonPowered.png', 1),
 ('Australia', '/mailman/listinfo/cnc_notice', 1),
 ('Australia', '/twiki/bin/view/Main/SpamAssassinTaggingOnly', 1),
 ('Australia', '/mailman/admin', 1),
 ('Australia', '/icons/gnu-head-tiny.jpg', 1),
 ('Australia', '/mailman/listinfo/administration', 1),
 ('Canada', '/mailman/listinfo/ppwc', 9),
 ('Canada', '/mailman/admin/ppwc', 9),
 ('Canada', '/mailman/listinfo', 6),
 ('Canada', '/icons/mailman.jpg', 5),
 ('Canada', '/ie.htm', 5),
 ('Canada', '/images/msgops.JPG', 5),
 ('Canada', '/icons/PythonPowered.png', 5),
 ('Canada', '/icons/gnu-h

# (B) DataFrame Implementations

Perform reporting tasks 1 and 2 using Spark's DataFrame API. Note that you should *not* use the Spark SQL abstraction.

In [8]:
# DataFrame implementation
# (1) Given an Apache HTTP access log, along with a CSV file that includes country information for 
# hostnames (sample file: [hostname_country.csv](https://cpslo-my.sharepoint.com/:x:/g/personal/amigler_calpoly_edu/EcJv6-ZtZR5PpTEcKuDcYmsBq38YIwrNL6d6JtVztjrkhA?e=xTmRdh)), 
# produce a report that shows the total request count for each country, sorted by request count (highest to lowest)
read_file = spark.read.options(delimiter=',').csv('hostname_country.csv')

csv_df = read_file.select(col('_c0').cast('string').alias('hostname'),
                          col('_c1').cast('string').alias('country'))

csv_log_df = csv_df.join(log_df, csv_df.hostname ==  log_df.host, 'inner').drop(col('hostname'))

csv_log_df = csv_log_df.withColumn('path', split(csv_log_df.path, ' ')[1]).cache()

output_b1 = csv_log_df.groupBy(col('country')).count().sort(col('count').desc()).show()

+--------------------+-----+
|             country|count|
+--------------------+-----+
|    Unknown Location|  812|
|            Intranet|  198|
|              Canada|  127|
|       United States|   87|
|               Robot|   80|
|              Israel|   19|
|           Australia|   17|
|              France|   16|
|Wherever You Want...|   13|
|             Germany|   13|
|      United Kingdom|    6|
|               Spain|    4|
|             Denmark|    4|
|               Korea|    4|
|         Netherlands|    2|
|              Taiwan|    1|
|             Finland|    1|
|              Mexico|    1|
|          Los Alamos|    1|
+--------------------+-----+



In [9]:
# DataFrame implementation
# (2) Using the same two input files (access.log and hostname_country.csv), produce a report that lists, 
# for each country, the count of each URL visited. Sort by country (alphabetically, A-Z) then count (from highest to lowest)

query_b2 = csv_log_df.groupBy('country', 'path').count()

query_b2.sort(col('country'), col('count').desc()).show()

+---------+--------------------+-----+
|  country|                path|count|
+---------+--------------------+-----+
|Australia|/twiki/bin/view/M...|    2|
|Australia|/twiki/bin/view/M...|    2|
|Australia|   /mailman/listinfo|    2|
|Australia|/twiki/bin/view/M...|    1|
|Australia|/mailman/admin/webct|    1|
|Australia|/icons/gnu-head-t...|    1|
|Australia|      /mailman/admin|    1|
|Australia|/twiki/pub/TWiki/...|    1|
|Australia|/mailman/listinfo...|    1|
|Australia|/twiki/bin/view/M...|    1|
|Australia|/icons/PythonPowe...|    1|
|Australia|/mailman/listinfo...|    1|
|Australia|  /icons/mailman.jpg|    1|
|Australia|            /mailman|    1|
|   Canada| /mailman/admin/ppwc|    9|
|   Canada|/mailman/listinfo...|    9|
|   Canada|   /mailman/listinfo|    6|
|   Canada|             /ie.htm|    5|
|   Canada|/images/image004.jpg|    5|
|   Canada|/images/image005.jpg|    5|
+---------+--------------------+-----+
only showing top 20 rows



# Part 2: Reporting Tasks Based on Student / Course Data



1.   Find the names of the students that have taken at least one of the courses with the greatest difficulty. 
2.   Find the average course difficulty of the classes that are taken by each student. Print average course difficulty of 0 if a student hasn't taken any classes (hint, use left outer or right outer join).



In [10]:
# Data for exercises below

# course information: course code, difficulty (1-3)
courses = [("CSC365", 1),
           ("CSC369", 1),
           ("CSC430", 3),
           ("CSC469", 2)]
courses_df = spark.createDataFrame(courses, ["Course", "Difficulty"])
courses_rdd = courses_df.rdd.map(tuple)

# student information: unique student ID, name, email
students = [(1,"Nikita","Trevett","ntrevett0@abc.net.au"),
            (2,"Jacquenette","Tolson","jtolson1@ucoz.com"),
            (3,"Helsa","St Ledger","hstledger2@elpais.com"),
            (4,"Eli","Golland","egolland3@wordpress.com"),
            (5,"Amitie","Mytton","amytton4@mit.edu"),
            (6,"Adan","Holtum","aholtum1i@amazon.co.uk")]
students_df = spark.createDataFrame(students, ["ID", "FirstName", "LastName", "Email"])
students_rdd = students_df.rdd.map(tuple)


# student grades: student ID, course code, grade earned
student_grades = [(1, "CSC365", "A"),
                  (1, "CSC369", "A-"),
                  (1, "CSC469", "B"),
                  (2, "CSC369", "B"),
                  (3, "CSC365", "A"),
                  (2, "CSC430", "A"),
                  (3, "CSC430", "B")]
student_grades_df = spark.createDataFrame(student_grades, ["StudentId", "Course", "Grade"])
student_grades_rdd = student_grades_df.rdd.map(tuple)


print('courses_df')
courses_df.show(truncate=False)
print('students_df')
students_df.show(truncate=False)
print('student_grades_df')
student_grades_df.show(truncate=False)

courses_df
+------+----------+
|Course|Difficulty|
+------+----------+
|CSC365|1         |
|CSC369|1         |
|CSC430|3         |
|CSC469|2         |
+------+----------+

students_df
+---+-----------+---------+-----------------------+
|ID |FirstName  |LastName |Email                  |
+---+-----------+---------+-----------------------+
|1  |Nikita     |Trevett  |ntrevett0@abc.net.au   |
|2  |Jacquenette|Tolson   |jtolson1@ucoz.com      |
|3  |Helsa      |St Ledger|hstledger2@elpais.com  |
|4  |Eli        |Golland  |egolland3@wordpress.com|
|5  |Amitie     |Mytton   |amytton4@mit.edu       |
|6  |Adan       |Holtum   |aholtum1i@amazon.co.uk |
+---+-----------+---------+-----------------------+

student_grades_df
+---------+------+-----+
|StudentId|Course|Grade|
+---------+------+-----+
|1        |CSC365|A    |
|1        |CSC369|A-   |
|1        |CSC469|B    |
|2        |CSC369|B    |
|3        |CSC365|A    |
|2        |CSC430|A    |
|3        |CSC430|B    |
+---------+------+-----+



# (A) RDD Implementations

In [11]:
# RDD implementation
# (1) Find the names of the students that have taken at least one of the courses with the greatest difficulty. 

def map_student_studentgrade(data: tuple) -> tuple:
  course = data[1][1]
  name = data[1][0]
  return (course, name)

def map_student_studentgrade_course(data: tuple) -> tuple:
  course = data[0]
  name = data[1][0]
  diff = data[1][1]
  return (diff, course, name)

def filter_highest_diff(data: tuple) -> tuple:
  diff = data[0]
  name = data[2]
  if diff == highest_diff:
    return True

student_studentgrade_rdd = (students_rdd.join(student_grades_rdd)
                        .map(map_student_studentgrade))

student_studentgrade_course_rdd = (student_studentgrade_rdd.join(courses_rdd)
                        .map(map_student_studentgrade_course))

highest_diff = (student_studentgrade_course_rdd
                        .max(lambda x: x[0]))[0]

output_part2_a1 = (student_studentgrade_course_rdd
                        .filter(filter_highest_diff)
                        .map(lambda x: x[2]))

display(output_part2_a1.collect())

['Jacquenette', 'Helsa']

In [12]:
# RDD implementation
# (2) Find the average course difficulty of the classes that are taken by each student. Print average course difficulty of 0
#     if a student hasn't taken any classes (hint, use left outer or right outer join).

def map_studentid_diff_course(data: tuple) -> tuple:
  studentid = data[1][0].split(' ')[1]
  diff = data[1][1]
  course = data[0]
  return  (studentid, diff)

def map_studentid_name_to_aver_diff(data: tuple) -> tuple:
  name = data[1][1]
  studentid = data[0]
  aver_diff = data[1][0]

  if aver_diff == None:
    aver_diff = 0

  return (studentid, name, aver_diff)

# join courses_rdd to student_grades_rdd
temp_rdd = student_grades_rdd.map(lambda x: (str(x[1]), str(x[2]) + ' ' + str(x[0]))) # (course, (grade, studentId))
temp_rdd = temp_rdd.join(courses_rdd).map(map_studentid_diff_course) # (studentid, diff)

studentid_averdiff = list()
for group, values in temp_rdd.groupByKey().collect():
  sum = 0
  for val in values:
    sum += val
  studentid = int(group)
  ave_diff = sum / len(values)
  studentid_averdiff.append((studentid, ave_diff))

studentid_averdiff_rdd = sc.parallelize(studentid_averdiff)

output_part2_a2 = (studentid_averdiff_rdd
                  .rightOuterJoin(students_rdd)
                  .map(map_studentid_name_to_aver_diff)
                  .sortBy(lambda x: x[2]))


display(output_part2_a2.collect())

[(4, 'Eli', 0),
 (5, 'Amitie', 0),
 (6, 'Adan', 0),
 (1, 'Nikita', 1.3333333333333333),
 (2, 'Jacquenette', 2.0),
 (3, 'Helsa', 2.0)]

# (B) DataFrame Implementations

Please note: Spark SQL is not permitted for these exercises.

In [13]:
# DataFrame implementation
# (1) Find the names of the students that have taken at least one of the courses with the greatest difficulty. 
csv_log_df = csv_df.join(log_df, csv_df.hostname ==  log_df.host, 'inner').drop(col('hostname'))

course_studentgrade_df = courses_df.join(student_grades_df, courses_df.Course == student_grades_df.Course, 'inner')

all_tables_join = (course_studentgrade_df
                  .join(students_df, course_studentgrade_df.StudentId == students_df.ID, 'inner')
                  .fillna(0)
                  .persist())

highest_diff = all_tables_join.agg({"Difficulty": "max"}).collect()[0][0]

output_part2_b1 = all_tables_join.filter(all_tables_join.Difficulty == highest_diff).select('ID', 'FirstName', 'LastName').show()

+---+-----------+---------+
| ID|  FirstName| LastName|
+---+-----------+---------+
|  3|      Helsa|St Ledger|
|  2|Jacquenette|   Tolson|
+---+-----------+---------+



In [14]:
# DataFrame implementation
# (2) Find the average course difficulty of the classes that are taken by each student. Print average course difficulty of 0
#     if a student hasn't taken any classes (hint, use left outer or right outer join).

studentid_aveg_diff_table = all_tables_join.groupBy('StudentId').mean('Difficulty')

output_part2_b2 = (students_df
            .join(studentid_aveg_diff_table, studentid_aveg_diff_table.StudentId == students_df.ID, 'outer')
            .select('ID', 'FirstName', 'LastName', 'avg(Difficulty)')
            .na.fill(value=0,subset=['avg(Difficulty)'])
            .show())

+---+-----------+---------+------------------+
| ID|  FirstName| LastName|   avg(Difficulty)|
+---+-----------+---------+------------------+
|  1|     Nikita|  Trevett|1.3333333333333333|
|  2|Jacquenette|   Tolson|               2.0|
|  3|      Helsa|St Ledger|               2.0|
|  4|        Eli|  Golland|               0.0|
|  5|     Amitie|   Mytton|               0.0|
|  6|       Adan|   Holtum|               0.0|
+---+-----------+---------+------------------+

