### Coding Challenge #1:

In this coding challenge, you will work through mini-challenges that help you become familiar with the nuances of Spark programming.



**Question 1**:  The dataset contains information about airports around the world. The first few columns are the **1**) Airport Id, **2) **Name of the Airport, **3) ** City of the Airport, **4)** Country of the Airport, etc. The ask is to retreive the a) Name of the Airport and b) City of the Airport for a country of your choice (i.e. Iceland, Canada or United States) and output the details to a 'select_airports.txt' file.

Hint: **1)** Initialize a Spark Context, **2)** Specify how many cores you would like to dedicate to the application, **3)** Leverage the Filter-Map transformation to retrieve the **a)** Name of the Airport and **b)** City of the Airport for a country of your choice.

**Dataset**: https://www.dropbox.com/s/zup13k470xj7hhh/airports.txt?raw=1 - Download the file and save it to a local folder and then utilize the textfile method of the SparkContext package to read in the file

Reference: https://www.tutorialkart.com/apache-spark/read-input-text-file-to-rdd-example/

Reference on Regular Expressions: http://www.pythonforbeginners.com/regex/regular-expressions-in-python

In [0]:
# https://mikestaszel.com/2018/03/07/apache-spark-on-google-colaboratory/
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate()

In [4]:
sc.getConf().getAll()

[('spark.driver.host', '40f8688769ba'),
 ('spark.driver.port', '35554'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.id', 'local-1529618797216'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell')]

In [0]:
from pyspark import SparkFiles

sc.addFile('https://raw.githubusercontent.com/saranyamandava/ML-Sprint-Challenges/master/Datasets/airports.txt')
airports =sc.textFile(SparkFiles.get('airports.txt'))


In [6]:
#airports = sc.textFile("airports.txt")
print (airports)

/tmp/spark-253a3359-e08e-46f8-a37b-2a962a012ac2/userFiles-37438292-b09d-4233-b4e1-6050e4dda8f3/airports.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0


In [0]:
# Split the line based on the comma character,
# ignoring commas within double quotes
import re

def split_on_comma(line: str):
    splits = re.compile(''',(?=(?:[^"]*"[^"]*")*[^"]*$)''').split(line)
    return "{}, {}\t".format(splits[1], splits[2])

In [0]:
def filter_on_city(city):
    return airports.filter(
        lambda line: 
        re.compile(''',(?=(?:[^"]*"[^"]*")*[^"]*$)''')
        .split(line)[3] == "\"%s\"" % city)

#airports_with_city_names = airports_in_iceland.map(split_on_comma)
#print(airports_with_city_names.collect())

In [0]:
def airports_in_country(country_name, count):
    print("Airports in %s:" % country_name)
    return (filter_on_city(country_name).map(split_on_comma).take(count))
          

In [10]:
print (airports_in_country("Iceland", 10))

Airports in Iceland:
['"Akureyri", "Akureyri"\t', '"Egilsstadir", "Egilsstadir"\t', '"Hornafjordur", "Hofn"\t', '"Husavik", "Husavik"\t', '"Isafjordur", "Isafjordur"\t', '"Keflavik International Airport", "Keflavik"\t', '"Patreksfjordur", "Patreksfjordur"\t', '"Reykjavik", "Reykjavik"\t', '"Siglufjordur", "Siglufjordur"\t', '"Vestmannaeyjar", "Vestmannaeyjar"\t']


**Question 2**: For this question, you will leverage pair RDD's. Pair RDD's expose operations that enable you to act on each key of a key-value in parallel to regroup/aggregate data across the network. In this case, you will leverage a pair RDD to count the number of words in a file. The **ask** is to utilize a pair RDD to count the number of times a word occurs in a body of text (i.e. the WordCount.txt file).

**Dataset**: https://www.dropbox.com/s/unau4wxfsidrddq/WordCount.txt?raw=1 - Download the file and save it to a local folder and then utilize the textfile method of the SparkContext package to read in the file

Reference(s):

https://programmathics.com/big-data/apache-spark/apache-spark-resilient-distributed-dataset-rdd-programming-part-1/apache-spark-resilient-distributed-dataset-rdd-programming-action-operations/

https://programmathics.com/big-data/apache-spark/apache-spark-working-with-key-value-pairs-in-rdd/apache-spark-pyspark-actions-on-pair-rdd/




In [11]:
from google.colab import files
uploaded = files.upload()

for fn in uploaded.keys():
    print('Uploaded file "{name}" with length {length} bytes'.format(name=fn, length=len(uploaded[fn])))

Saving WordCount.txt to WordCount.txt
Uploaded file "WordCount.txt" with length 4250 bytes


In [0]:
!kill -9 -1  # Kill the VM - with a vengeance! Use with caution...

In [12]:
#data = sc.textFile("WordCount.csv")

lines = sc.textFile("WordCount.txt")
print(lines)

# Tokenize, and set up pair RDD with base 1 count
words = lines.flatMap(lambda line: line.split(' '))
pairRDD = words.map(lambda word: (word, 1))

# Reduce by key to accumulate counts for each word
word_counts = pairRDD.reduceByKey(lambda x, y: x + y)

# Let's try doing a bunch at once - ugly, but functional
# lines.flatMap(lambda line: line.split(' ')).map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y).first()

# Also, there's a built-in, but it's good to understand the above
# words.countByValue()['The']

print(word_counts.take(5))

word_counts.coalesce(1) # Combine to a single worker




WordCount.txt MapPartitionsRDD[4] at textFile at NativeMethodAccessorImpl.java:0
[('The', 10), ('of', 33), ('New', 20), ('begins', 1), ('around', 4)]


CoalescedRDD[11] at coalesce at NativeMethodAccessorImpl.java:0

**Question 3:** This question will provide an opportunity to work with the "groupByKey" operator. The "groupByKey" operator when called on a **key, value** pair  returns a dataset of (K, Iterable<V>) pairs. For example, you can utilize the "groupByKey" operator to list all the airport within a country. The **ask** here is to leverage the "groupByKey" operator to get a listing of all locations by status. 
  
 **Dataset**: https://www.dropbox.com/s/4z3i0b05p7p22b2/RealEstate.csv?raw=1

In [0]:

sc.addFile('https://raw.githubusercontent.com/saranyamandava/ML-Sprint-Challenges/master/Datasets/RealEstate.csv')
lines = sc.textFile(SparkFiles.get('RealEstate.csv'))

In [16]:
lines.take(5)

['MLS,Location,Price,Bedrooms,Bathrooms,Size,Price SQ Ft,Status',
 '132842,Arroyo Grande,795000.00,3,3,2371,335.30,Short Sale',
 '134364,Paso Robles,399000.00,4,3,2818,141.59,Short Sale',
 '135141,Paso Robles,545000.00,4,3,3032,179.75,Short Sale',
 '135712,Morro Bay,909000.00,4,4,3540,256.78,Short Sale']

In [19]:
header = lines.first()
cleaned_lines = lines.filter(lambda row: row != header)

StatusLocationPairRdd = cleaned_lines.map(lambda line: \
       (line.split(",")[7], line.split(",")[1]))

LocationByStatus = StatusLocationPairRdd.groupByKey()

#Save grouping by Status
#LocationByStatus.coalesce(1).saveAsTextFile("py_out/status.txt")

for status, location in LocationByStatus.collectAsMap().items():
    print("{}: {}".format(status, list(location)))
  


Short Sale: ['Paso Robles', 'Paso Robles', 'Morro Bay', 'Santa Maria-Orcutt', 'Oceano', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Morro Bay', 'Atascadero', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Arroyo Grande', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Paso Robles', 'Los Alamos', 'San Miguel', 'Paso Robles', 'San Luis Obispo', 'Morro Bay', 'Cayucos', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Pismo Beach', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Atascadero', 'Nipomo', 'Guadalupe', 'Santa Maria-Orcutt', 'Pismo Beach', 'Santa Maria-Orcutt', 'Morro Bay', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Nipomo', 'Los Osos', 'Arroyo Grande', 'Templeton', 'Templeton', 'Grover Beach', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Santa Maria-Orcutt', 'Cambria', 'Nipomo', 'Nipomo', 'Pa

**Question 4**: In tie question, we will step through select operations in SparkSQL. The dataset that we will use is the same one as Question 3. 

**a)** Read in the file and output the schema of the file

**b)** Print only the following columns from the table: "location", "price", 
   "bedrooms", "bathrooms", "status"
   
**c) ** Print records where the location is Morro Bay

**d) ** Print the count of locations 

**e) ** Print records with price less than 500000 

**f) ** Print the records by price in descending order

**g)** Group the records by location and aggregate by average price

**h)** Group by location, aggregate the average price and sort by average price


**Reference (Spark SQL)**: https://spark.apache.org/docs/1.2.1/sql-programming-guide.html

In [6]:
sc = SparkContext.getOrCreate()
sc.getConf().getAll()

[('spark.driver.host', '40f8688769ba'),
 ('spark.driver.port', '35554'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.id', 'local-1529618797216'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell')]

In [0]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [13]:
!wget -nc https://raw.githubusercontent.com/saranyamandava/ML-Sprint-Challenges/master/Datasets/RealEstate.csv -O RealEstate.csv

File `RealEstate.csv' already there; not retrieving.


In [12]:
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("RealEstate.csv")
df.printSchema()

root
 |-- MLS: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Bedrooms: integer (nullable = true)
 |-- Bathrooms: integer (nullable = true)
 |-- Size: integer (nullable = true)
 |-- Price SQ Ft: double (nullable = true)
 |-- Status: string (nullable = true)



In [0]:
df.createOrReplaceTempView("listings")

In [15]:
sqlDF = spark.sql("SELECT Location, Price, Bedrooms, Bathrooms, Status FROM listings")
sqlDF.show()

+------------------+--------+--------+---------+----------+
|          Location|   Price|Bedrooms|Bathrooms|    Status|
+------------------+--------+--------+---------+----------+
|     Arroyo Grande|795000.0|       3|        3|Short Sale|
|       Paso Robles|399000.0|       4|        3|Short Sale|
|       Paso Robles|545000.0|       4|        3|Short Sale|
|         Morro Bay|909000.0|       4|        4|Short Sale|
|Santa Maria-Orcutt|109900.0|       3|        1|Short Sale|
|            Oceano|324900.0|       3|        3|Short Sale|
|Santa Maria-Orcutt|192900.0|       4|        2|Short Sale|
|Santa Maria-Orcutt|215000.0|       3|        2|Short Sale|
|         Morro Bay|999000.0|       4|        3|Short Sale|
|        Atascadero|319000.0|       3|        2|Short Sale|
|Santa Maria-Orcutt|350000.0|       3|        2|Short Sale|
|Santa Maria-Orcutt|249000.0|       3|        2|Short Sale|
|     Arroyo Grande|299000.0|       2|        2|Short Sale|
|Santa Maria-Orcutt|235900.0|       3|  

In [16]:
location = "Morro Bay"
sqlDF = spark.sql("SELECT * FROM listings WHERE Location == \"%s\"" % location)
sqlDF.show()

+------+---------+---------+--------+---------+----+-----------+-----------+
|   MLS| Location|    Price|Bedrooms|Bathrooms|Size|Price SQ Ft|     Status|
+------+---------+---------+--------+---------+----+-----------+-----------+
|135712|Morro Bay| 909000.0|       4|        4|3540|     256.78| Short Sale|
|137159|Morro Bay| 999000.0|       4|        3|3360|     297.32| Short Sale|
|140077|Morro Bay|1100000.0|       4|        3|4168|     263.92| Short Sale|
|142528|Morro Bay| 415000.0|       3|        3|1350|     307.41| Short Sale|
|143534|Morro Bay| 789000.0|       3|        3|2100|     375.71|Foreclosure|
|144314|Morro Bay| 899000.0|       3|        3|2430|     369.96|Foreclosure|
|144316|Morro Bay|1045000.0|       3|        3|2100|     497.62|Foreclosure|
|144318|Morro Bay| 774000.0|       2|        2|1550|     499.35|Foreclosure|
|149230|Morro Bay| 359000.0|       3|        2|1008|     356.15| Short Sale|
|150262|Morro Bay| 395000.0|       4|        2|1736|     227.53| Short Sale|

In [17]:
print(sqlDF.count())

13


In [18]:
price = 500000
sqlDF = spark.sql("SELECT * FROM listings WHERE Price < %d" % price)
print(sqlDF.count())
sqlDF.show()

629
+------+------------------+--------+--------+---------+----+-----------+----------+
|   MLS|          Location|   Price|Bedrooms|Bathrooms|Size|Price SQ Ft|    Status|
+------+------------------+--------+--------+---------+----+-----------+----------+
|134364|       Paso Robles|399000.0|       4|        3|2818|     141.59|Short Sale|
|136282|Santa Maria-Orcutt|109900.0|       3|        1|1249|      87.99|Short Sale|
|136431|            Oceano|324900.0|       3|        3|1800|      180.5|Short Sale|
|137036|Santa Maria-Orcutt|192900.0|       4|        2|1603|     120.34|Short Sale|
|137090|Santa Maria-Orcutt|215000.0|       3|        2|1450|     148.28|Short Sale|
|137570|        Atascadero|319000.0|       3|        2|1323|     241.12|Short Sale|
|138053|Santa Maria-Orcutt|350000.0|       3|        2|1750|      200.0|Short Sale|
|138730|Santa Maria-Orcutt|249000.0|       3|        2|1400|     177.86|Short Sale|
|139291|     Arroyo Grande|299000.0|       2|        2|1257|     237.87|

In [19]:
df.groupBy("Location").count().show()

+-------------------+-----+
|           Location|count|
+-------------------+-----+
|        Pismo Beach|   12|
|          King City|    3|
|         New Cuyama|    1|
|             Nipomo|    3|
|             Oceano|   10|
|             Nipomo|   34|
|          Templeton|   11|
|      Arroyo Grande|   12|
|        Bakersfield|    1|
|          Guadalupe|    4|
|            Creston|    1|
|       Grover Beach|   20|
|         Atascadero|   10|
|          Morro Bay|    4|
| Santa Maria-Orcutt|   13|
|        Avila Beach|    3|
|             Lompoc|    1|
|        Paso Robles|   85|
|            Creston|    1|
|            Cayucos|    2|
+-------------------+-----+
only showing top 20 rows



In [20]:
grouped_agg = df.groupBy("Location").avg()
grouped_agg.show()

+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|           Location|          avg(MLS)|        avg(Price)|     avg(Bedrooms)|    avg(Bathrooms)|         avg(Size)|  avg(Price SQ Ft)|
+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|        Pismo Beach|148322.08333333334| 772374.5833333334|2.5833333333333335|2.5833333333333335|1810.4166666666667|462.28416666666664|
|          King City|149289.66666666666|          131190.0|3.3333333333333335|               2.0|2032.3333333333333| 71.51333333333334|
|         New Cuyama|          154233.0|           40900.0|               3.0|               1.0|            1201.0|             34.05|
|             Nipomo|          154403.0| 454166.6666666667|3.3333333333333335|2.6666666666666665|1931.6666666666667|187.92333333333332|
|             Oceano|          150528.4|        

In [21]:
grouped_agg.orderBy(grouped_agg['avg(Price)']).show()

+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|           Location|          avg(MLS)|        avg(Price)|     avg(Bedrooms)|    avg(Bathrooms)|         avg(Size)|  avg(Price SQ Ft)|
+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|         New Cuyama|          154233.0|           40900.0|               3.0|               1.0|            1201.0|             34.05|
|    Santa Margarita|          153839.0|           59900.0|               1.0|               1.0|             628.0|             95.38|
|        Bakersfield|          154028.0|           91500.0|               3.0|               2.0|            1313.0|             69.69|
|          Guadalupe|          147908.5|          117250.0|               2.5|               1.5|             973.5|           120.175|
|          King City|149289.66666666666|        

**Alternate Way:**

In [22]:
# Selecting multiple columns, formatting as table
df.select('Location', 'Price', 'Status').show(5)

# Filter to a specific location
df.filter(df['Location'] == 'Nipomo').show(5)

# Counts of properties grouped by location
df.groupBy('Location').count().show(5)

# Get all properties with price < 500000
df.filter(df['Price'] < 500000).show(5)

# Order by price descending
df.orderBy(df['Price'], ascending=False).show(5)

# Group by location and aggregate by average price
df.groupBy('Location').avg('Price').show()

# Group, average, and sort
df.groupBy('Location').avg('Price').orderBy('avg(Price)').show()


+------------------+--------+----------+
|          Location|   Price|    Status|
+------------------+--------+----------+
|     Arroyo Grande|795000.0|Short Sale|
|       Paso Robles|399000.0|Short Sale|
|       Paso Robles|545000.0|Short Sale|
|         Morro Bay|909000.0|Short Sale|
|Santa Maria-Orcutt|109900.0|Short Sale|
+------------------+--------+----------+
only showing top 5 rows

+------+--------+--------+--------+---------+----+-----------+----------+
|   MLS|Location|   Price|Bedrooms|Bathrooms|Size|Price SQ Ft|    Status|
+------+--------+--------+--------+---------+----+-----------+----------+
|141847|  Nipomo|340000.0|       3|        1|1110|     306.31|Short Sale|
|142818|  Nipomo|525000.0|       3|        3|2365|     221.99|Short Sale|
|143540|  Nipomo|498000.0|       4|        3|2341|     212.73|Short Sale|
|143884|  Nipomo|430000.0|       4|        2|2168|     198.34|Short Sale|
|144815|  Nipomo|320000.0|       4|        2|1929|     165.89|Short Sale|
+------+------