### Coding Challenge #1:

In this coding challenge, you will work through mini-challenges that help you become familiar with the nuances of Spark programming.



**Question 1**:  The dataset contains information about airports around the world. The first few columns are the **1**) Airport Id, **2) **Name of the Airport, **3) ** City of the Airport, **4)** Country of the Airport, etc. The ask is to retreive the a) Name of the Airport and b) City of the Airport for a country of your choice (i.e. Iceland, Canada or United States) and output the details to a 'select_airports.txt' file.

Hint: **1)** Initialize a Spark Context, **2)** Specify how many cores you would like to dedicate to the application, **3)** Leverage the Filter-Map transformation to retrieve the **a)** Name of the Airport and **b)** City of the Airport for a country of your choice.

**Dataset**: https://www.dropbox.com/s/zup13k470xj7hhh/airports.txt?raw=1 - Download the file and save it to a local folder and then utilize the textfile method of the SparkContext package to read in the file

Reference: https://www.tutorialkart.com/apache-spark/read-input-text-file-to-rdd-example/

Reference on Regular Expressions: http://www.pythonforbeginners.com/regex/regular-expressions-in-python

In [1]:
import re
import os
from pyspark import SparkContext, SparkConf, SparkFiles

In [2]:
conf = SparkConf().setAppName("CC1").setMaster("local[2]")
sc = SparkContext(conf=conf)

In [3]:
sc.addFile('https://uc557ef0cc08d9b1c56ab1be8f81.dl.dropboxusercontent.com/cd/0/inline/AJMIZP6Jl2SdI8_ghUmnxrI69-83FMxiVIgMdCJfNs-Gips36eDBfS33GAD6C5rEPlPXBAmCqtGVutJu__nOT27iS2dxcptfXxQ-1qAS-VhIwpq1ptnEY-RqQ2gKcPNuvsfqh2SgtTYl9LGNodhDtxuPcttXh7YRNbg8Ui6gTEaWNJHxM1mmlLttKbU01GHe_T8/file')

In [4]:
airports = sc.textFile(SparkFiles.get('file'))

In [5]:
airports.collect()[:5]

['1,"Goroka","Goroka","Papua New Guinea","GKA","AYGA",-6.081689,145.391881,5282,10,"U","Pacific/Port_Moresby"',
 '2,"Madang","Madang","Papua New Guinea","MAG","AYMD",-5.207083,145.7887,20,10,"U","Pacific/Port_Moresby"',
 '3,"Mount Hagen","Mount Hagen","Papua New Guinea","HGU","AYMH",-5.826789,144.295861,5388,10,"U","Pacific/Port_Moresby"',
 '4,"Nadzab","Nadzab","Papua New Guinea","LAE","AYNZ",-6.569828,146.726242,239,10,"U","Pacific/Port_Moresby"',
 '5,"Port Moresby Jacksons Intl","Port Moresby","Papua New Guinea","POM","AYPY",-9.443383,147.22005,146,10,"U","Pacific/Port_Moresby"']

In [6]:
# https://stackoverflow.com/questions/26810010/need-explanation-on-this-regex?rq=1

def split_on_comma(line: str):
    splits = re.compile(''',(?=(?:[^"]*"[^"]*")*[^"]*$)''').split(line)
    return "{}, {}".format(splits[1], splits[2])

In [7]:
canada_airports = airports.filter(lambda line: 
                                  re.compile(''',(?=(?:[^"]*"[^"]*")*[^"]*$)''')
                                  .split(line)[3]== '"Canada"')

In [8]:
canada_airports.collect()[:5]

['21,"Sault Ste Marie","Sault Sainte Marie","Canada","YAM","CYAM",46.485001,-84.509445,630,-5,"A","America/Toronto"',
 '22,"Winnipeg St Andrews","Winnipeg","Canada","YAV","CYAV",50.056389,-97.0325,760,-6,"A","America/Winnipeg"',
 '23,"Shearwater","Halifax","Canada","YAW","CYAW",44.639721,-63.499444,167,-4,"A","America/Halifax"',
 '24,"St Anthony","St. Anthony","Canada","YAY","CYAY",51.391944,-56.083056,108,-3.5,"A","America/St_Johns"',
 '25,"Tofino","Tofino","Canada","YAZ","CYAZ",49.082222,-125.7725,80,-8,"A","America/Vancouver"']

In [9]:
canada_airports_names_cities = canada_airports.map(lambda line: split_on_comma(line))

In [10]:
canada_airports_names_cities.collect()[:5]

['"Sault Ste Marie", "Sault Sainte Marie"',
 '"Winnipeg St Andrews", "Winnipeg"',
 '"Shearwater", "Halifax"',
 '"St Anthony", "St. Anthony"',
 '"Tofino", "Tofino"']

In [11]:
os.remove(SparkFiles.get('file'))


**Question 2**: For this question, you will leverage pair RDD's. Pair RDD's expose operations that enable you to act on each key of a key-value in parallel to regroup/aggregate data across the network. In this case, you will leverage a pair RDD to count the number of words in a file. The **ask** is to utilize a pair RDD to count the number of times a word occurs in a body of text (i.e. the WordCount.txt file).

**Dataset**: https://www.dropbox.com/s/unau4wxfsidrddq/WordCount.txt?raw=1 - Download the file and save it to a local folder and then utilize the textfile method of the SparkContext package to read in the file

Reference(s):

https://programmathics.com/big-data/apache-spark/apache-spark-resilient-distributed-dataset-rdd-programming-part-1/apache-spark-resilient-distributed-dataset-rdd-programming-action-operations/

https://programmathics.com/big-data/apache-spark/apache-spark-working-with-key-value-pairs-in-rdd/apache-spark-pyspark-actions-on-pair-rdd/




In [12]:
sc.addFile('https://uc96933343f8cf158207d9d5f37d.dl.dropboxusercontent.com/cd/0/inline/AJMzZSb-Yrd5lJemNl0DdAV6YlJzJUNn3xIpkEGbfhdeohdGYiliaspHUbTUpd8tkbTkignXOfyKFchYRb5nCbhIFKbjXmycJi61z1hT-j5AD6w5lm_o32MW1u3WZ6I1ppOYxp2K32Vy8xuXuyaZbi9GKn3NwqlLDycK-sJgqBDcuQPswEbsATjFP54VZkwACAk/file')

In [13]:
history = sc.textFile(SparkFiles.get('file'))

In [14]:
history.collect()[0]

"The history of New York begins around 10,000 BC, when the first Native Americans arrived. By 1100 AD, New York's main native cultures, the Iroquoian and Algonquian, had developed. European discovery of New York was led by the French in 1524 and the first land claim came in 1609 by the Dutch. As part of New Netherland, the colony was important in the fur trade and eventually became an agricultural resource thanks to the patroon system. In 1626 the Dutch bought the island of Manhattan from Native Americans.[1] In 1664, England renamed the colony New York, after the Duke of York (later James II & VII.) New York City gained prominence in the 18th century as a major trading port in the Thirteen Colonies."

In [15]:
history = history.flatMap(lambda x: x.split(' '))

In [16]:
print(history.collect()[:10])

['The', 'history', 'of', 'New', 'York', 'begins', 'around', '10,000', 'BC,', 'when']


In [17]:
type(history)

pyspark.rdd.PipelinedRDD

In [18]:
history.countByValue()

defaultdict(int,
            {'The': 10,
             'history': 1,
             'of': 33,
             'New': 20,
             'York': 17,
             'begins': 1,
             'around': 4,
             '10,000': 1,
             'BC,': 1,
             'when': 1,
             'the': 71,
             'first': 5,
             'Native': 2,
             'Americans': 1,
             'arrived.': 1,
             'By': 1,
             '1100': 1,
             'AD,': 1,
             "York's": 2,
             'main': 2,
             'native': 1,
             'cultures,': 1,
             'Iroquoian': 1,
             'and': 21,
             'Algonquian,': 1,
             'had': 1,
             'developed.': 1,
             'European': 2,
             'discovery': 1,
             'was': 8,
             'led': 2,
             'by': 2,
             'French': 1,
             'in': 21,
             '1524': 1,
             'land': 1,
             'claim': 1,
             'came': 2,
             '1609': 

In [19]:
os.remove(SparkFiles.get('file'))

**Question 3:** This question will provide an opportunity to work with the "groupByKey" operator. The "groupByKey" operator when called on a **key, value** pair  returns a dataset of (K, Iterable<V>) pairs. For example, you can utilize the "groupByKey" operator to list all the airport within a country. The **ask** here is to leverage the "groupByKey" operator to get a listing of all locations by status. 
  
 **Dataset**: https://www.dropbox.com/s/4z3i0b05p7p22b2/RealEstate.csv?raw=1

In [20]:
sc.addFile('https://uc7a338832108755c134c082fcb1.dl.dropboxusercontent.com/cd/0/inline/AJNX8cCB_IMQJ6YZfsTrgVbgS0FT-w_rQRqeV_Dsu2W93GuzxLJ7b7pU6_KDekOWoYkm5FWFD7GAeMxJVoD0Nwvqb_eWeDSfSWvNM7G5_CHWc2fxx_gmiEljmxRIE89ytQcCScf8AcUWmlOjjCg-jFfJZKrwU3peKWrQdQrfwHINhoET-oiQ3A_yWcpCd-5Xodc/file')

In [21]:
real_estate = sc.textFile(SparkFiles.get('file'))

In [22]:
real_estate.collect()[:5]

['MLS,Location,Price,Bedrooms,Bathrooms,Size,Price SQ Ft,Status',
 '132842,Arroyo Grande,795000.00,3,3,2371,335.30,Short Sale',
 '134364,Paso Robles,399000.00,4,3,2818,141.59,Short Sale',
 '135141,Paso Robles,545000.00,4,3,3032,179.75,Short Sale',
 '135712,Morro Bay,909000.00,4,4,3540,256.78,Short Sale']

In [23]:
real_estate = real_estate.map(lambda x: x.split(','))

In [24]:
print(real_estate.collect()[:2])

[['MLS', 'Location', 'Price', 'Bedrooms', 'Bathrooms', 'Size', 'Price SQ Ft', 'Status'], ['132842', 'Arroyo Grande', '795000.00', '3', '3', '2371', '335.30', 'Short Sale']]


In [25]:
real_estate = real_estate.map(lambda x: (x.pop(), x))

In [26]:
print(real_estate.collect()[:3])

[('Status', ['MLS', 'Location', 'Price', 'Bedrooms', 'Bathrooms', 'Size', 'Price SQ Ft']), ('Short Sale', ['132842', 'Arroyo Grande', '795000.00', '3', '3', '2371', '335.30']), ('Short Sale', ['134364', 'Paso Robles', '399000.00', '4', '3', '2818', '141.59'])]


In [27]:
real_estate = real_estate.groupByKey()

In [28]:
real_estate.collect()

[('Short Sale', <pyspark.resultiterable.ResultIterable at 0x22f35e196d8>),
 ('Status', <pyspark.resultiterable.ResultIterable at 0x22f35e9feb8>),
 ('Foreclosure', <pyspark.resultiterable.ResultIterable at 0x22f35e9fef0>),
 ('Regular', <pyspark.resultiterable.ResultIterable at 0x22f35eb74a8>)]

In [29]:
foreclosures = real_estate.filter(lambda item: item[0]=='Foreclosure').map(lambda item: item[1])

In [30]:
foreclosures

PythonRDD[17] at RDD at PythonRDD.scala:49

In [31]:
foreclosures.collect()

[<pyspark.resultiterable.ResultIterable at 0x22f35e4c3c8>]

In [32]:
list(foreclosures.collect()[0])[:5]

[['143436', 'Templeton', '1399000.00', '4', '3', '6500', '215.23'],
 ['143534', 'Morro Bay', '789000.00', '3', '3', '2100', '375.71'],
 ['144314', 'Morro Bay', '899000.00', '3', '3', '2430', '369.96'],
 ['144316', 'Morro Bay', '1045000.00', '3', '3', '2100', '497.62'],
 ['144318', 'Morro Bay', '774000.00', '2', '2', '1550', '499.35']]

**Question 4**: In tie question, we will step through select operations in SparkSQL. The dataset that we will use is the same one as Question 3. 

**a)** Read in the file and output the schema of the file

**b)** Print only the following columns from the table: "location", "price", 
   "bedrooms", "bathrooms", "status"
   
**c) ** Print records where the location is Morro Bay

**d) ** Print the count of locations 

**e) ** Print records with price less than 500000 

**f) ** Print the records by price in descending order

**g)** Group the records by location and aggregate by average price

**h)** Group by location, aggregate the average price and sort by average price


**Reference (Spark SQL)**: https://spark.apache.org/docs/1.2.1/sql-programming-guide.html

In [33]:
from pyspark.sql import SQLContext, Row

In [34]:
sqlContext = SQLContext(sc)

**a)** Read in the file and output the schema of the file

In [35]:
df = sqlContext.read.format("csv").option("header", "true")\
                                  .option("inferSchema", "true")\
                                  .load(SparkFiles.get('file'))

In [36]:
df.printSchema()

root
 |-- MLS: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Bedrooms: integer (nullable = true)
 |-- Bathrooms: integer (nullable = true)
 |-- Size: integer (nullable = true)
 |-- Price SQ Ft: double (nullable = true)
 |-- Status: string (nullable = true)



**b)** Print only the following columns from the table: "location", "price", "bedrooms", "bathrooms", "status"

In [37]:
df[["location", "price", "bedrooms", "bathrooms", "status"]].show()

+------------------+--------+--------+---------+----------+
|          location|   price|bedrooms|bathrooms|    status|
+------------------+--------+--------+---------+----------+
|     Arroyo Grande|795000.0|       3|        3|Short Sale|
|       Paso Robles|399000.0|       4|        3|Short Sale|
|       Paso Robles|545000.0|       4|        3|Short Sale|
|         Morro Bay|909000.0|       4|        4|Short Sale|
|Santa Maria-Orcutt|109900.0|       3|        1|Short Sale|
|            Oceano|324900.0|       3|        3|Short Sale|
|Santa Maria-Orcutt|192900.0|       4|        2|Short Sale|
|Santa Maria-Orcutt|215000.0|       3|        2|Short Sale|
|         Morro Bay|999000.0|       4|        3|Short Sale|
|        Atascadero|319000.0|       3|        2|Short Sale|
|Santa Maria-Orcutt|350000.0|       3|        2|Short Sale|
|Santa Maria-Orcutt|249000.0|       3|        2|Short Sale|
|     Arroyo Grande|299000.0|       2|        2|Short Sale|
|Santa Maria-Orcutt|235900.0|       3|  

**c)** Print records where the location is Morro Bay

In [38]:
df.filter(df['location']=='Morro Bay').show()

+------+---------+---------+--------+---------+----+-----------+-----------+
|   MLS| Location|    Price|Bedrooms|Bathrooms|Size|Price SQ Ft|     Status|
+------+---------+---------+--------+---------+----+-----------+-----------+
|135712|Morro Bay| 909000.0|       4|        4|3540|     256.78| Short Sale|
|137159|Morro Bay| 999000.0|       4|        3|3360|     297.32| Short Sale|
|140077|Morro Bay|1100000.0|       4|        3|4168|     263.92| Short Sale|
|142528|Morro Bay| 415000.0|       3|        3|1350|     307.41| Short Sale|
|143534|Morro Bay| 789000.0|       3|        3|2100|     375.71|Foreclosure|
|144314|Morro Bay| 899000.0|       3|        3|2430|     369.96|Foreclosure|
|144316|Morro Bay|1045000.0|       3|        3|2100|     497.62|Foreclosure|
|144318|Morro Bay| 774000.0|       2|        2|1550|     499.35|Foreclosure|
|149230|Morro Bay| 359000.0|       3|        2|1008|     356.15| Short Sale|
|150262|Morro Bay| 395000.0|       4|        2|1736|     227.53| Short Sale|

**d) ** Print the count of locations 

In [39]:
df.groupBy('location').count().show()

+-------------------+-----+
|           location|count|
+-------------------+-----+
|        Pismo Beach|   12|
|          King City|    3|
|         New Cuyama|    1|
|             Nipomo|    3|
|             Oceano|   10|
|             Nipomo|   34|
|          Templeton|   11|
|      Arroyo Grande|   12|
|        Bakersfield|    1|
|          Guadalupe|    4|
|            Creston|    1|
|       Grover Beach|   20|
|         Atascadero|   10|
|          Morro Bay|    4|
| Santa Maria-Orcutt|   13|
|        Avila Beach|    3|
|             Lompoc|    1|
|        Paso Robles|   85|
|            Creston|    1|
|            Cayucos|    2|
+-------------------+-----+
only showing top 20 rows



**e) ** Print records with price less than 500000 

In [40]:
df.filter(df['Price']<500000).show()

+------+------------------+--------+--------+---------+----+-----------+----------+
|   MLS|          Location|   Price|Bedrooms|Bathrooms|Size|Price SQ Ft|    Status|
+------+------------------+--------+--------+---------+----+-----------+----------+
|134364|       Paso Robles|399000.0|       4|        3|2818|     141.59|Short Sale|
|136282|Santa Maria-Orcutt|109900.0|       3|        1|1249|      87.99|Short Sale|
|136431|            Oceano|324900.0|       3|        3|1800|      180.5|Short Sale|
|137036|Santa Maria-Orcutt|192900.0|       4|        2|1603|     120.34|Short Sale|
|137090|Santa Maria-Orcutt|215000.0|       3|        2|1450|     148.28|Short Sale|
|137570|        Atascadero|319000.0|       3|        2|1323|     241.12|Short Sale|
|138053|Santa Maria-Orcutt|350000.0|       3|        2|1750|      200.0|Short Sale|
|138730|Santa Maria-Orcutt|249000.0|       3|        2|1400|     177.86|Short Sale|
|139291|     Arroyo Grande|299000.0|       2|        2|1257|     237.87|Shor

**f) ** Print the records by price in descending order

In [41]:
df.sort('Price', ascending=False).show()

+------+----------------+---------+--------+---------+----+-----------+-----------+
|   MLS|        Location|    Price|Bedrooms|Bathrooms|Size|Price SQ Ft|     Status|
+------+----------------+---------+--------+---------+----+-----------+-----------+
|154526|   Arroyo Grande|5499000.0|       4|        5|5060|    1086.76|    Regular|
|154491|         Cambria|2995000.0|       5|        4|3684|     812.98|    Regular|
|154463| San Luis Obispo|2369000.0|       5|        6|4174|     567.56|    Regular|
|154434|         Cambria|2000000.0|       4|        4|3576|     559.28|    Regular|
|152768|     Avila Beach|1999000.0|       4|        5|5307|     376.67| Short Sale|
|150439|   Arroyo Grande|1900000.0|       4|        5|5411|     351.14| Short Sale|
|151419|     Pismo Beach|1799000.0|       4|        4|3609|     498.48|Foreclosure|
|150949|          Nipomo|1700000.0|       3|        5|4463|     380.91| Short Sale|
|154533|         Bradley|1600000.0|       3|        3|2640|     606.06|    R

**g)** Group the records by location and aggregate by average price

In [42]:
avg_price_by_loc = df.groupBy('Location').mean('Price')
avg_price_by_loc.show()

+-------------------+------------------+
|           Location|        avg(Price)|
+-------------------+------------------+
|        Pismo Beach| 772374.5833333334|
|          King City|          131190.0|
|         New Cuyama|           40900.0|
|             Nipomo| 454166.6666666667|
|             Oceano|          392640.0|
|             Nipomo| 430629.4117647059|
|          Templeton| 705890.9090909091|
|      Arroyo Grande|1013958.3333333334|
|        Bakersfield|           91500.0|
|          Guadalupe|          117250.0|
|            Creston|          309900.0|
|       Grover Beach|          365615.0|
|         Atascadero|          477950.0|
|          Morro Bay|          659700.0|
| Santa Maria-Orcutt|332546.07692307694|
|        Avila Beach|1205666.6666666667|
|             Lompoc|          149900.0|
|        Paso Robles|334280.22352941177|
|            Creston|          549000.0|
|            Cayucos|          610000.0|
+-------------------+------------------+
only showing top

**h)** Group by location, aggregate the average price and sort by average price

In [43]:
avg_price_by_loc.sort('avg(Price)', ascending=False).show()

+----------------+------------------+
|        Location|        avg(Price)|
+----------------+------------------+
|         Bradley|         1600000.0|
|         Cayucos|         1500000.0|
| San Luis Obispo|1444666.6666666667|
|     Avila Beach|1205666.6666666667|
|          Oceano|         1195000.0|
|     Out Of Area|         1195000.0|
|         Cambria|1076333.3333333333|
|   Arroyo Grande|1013958.3333333334|
|      Santa Ynez|          881800.0|
|     Pismo Beach| 772374.5833333334|
|       Templeton| 705890.9090909091|
|       Morro Bay| 689223.0769230769|
|    Grover Beach|          684000.0|
|       Morro Bay|          659700.0|
|     Pismo Beach|          636037.5|
|         Cayucos|          610000.0|
|      San Miguel|          595000.0|
|         Creston|          549000.0|
|   Arroyo Grande| 537023.2142857143|
|        Los Osos|          531000.0|
+----------------+------------------+
only showing top 20 rows

