## Instructions

- Please answer all questions
- You can use any language you wish (e.g. Python, Scala, SQL...)
- Several Markdown cells require completion. Please edit the Markdown cells to include your answer.
- Your final notebook should compile without errors when you click "Run All"

## Part 1: Reading and Parsing Data

### Question 1:  Code Challenge - Load a CSV

- Load the CSV file at `treecover.csv` into a DataFrame.
- Use Apache Spark to read in the data, assigned to the variable `treeCoverDF`.
- Please use the `inferSchema` option.

In [19]:
import findspark
import os
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *


spark = SparkSession.builder \
        .master("local") \
        .appName("Hands-on-Assignment-PySpark") \
        .config("spark.sql.warehouse.dir", "target/spark-warehouse") \
        .enableHiveSupport() \
        .getOrCreate()

In [2]:
treeCoverDF = spark.read.csv('treecover.csv', inferSchema=True, header=True)
treeCoverDF.show(2)

+---+---------+------+-----+--------------------------------+------------------------------+-------------------------------+----------------------------------+----------+---------+---------------+---------+
| Id|Elevation|Aspect|Slope|Horizontal_Distance_To_Hydrology|Vertical_Distance_To_Hydrology|Horizontal_Distance_To_Roadways|Horizontal_Distance_To_Fire_Points|Cover_Type|Soil_Type|Wilderness_Area|Hillshade|
+---+---------+------+-----+--------------------------------+------------------------------+-------------------------------+----------------------------------+----------+---------+---------------+---------+
|  1|     2596|    51|    3|                             258|                             0|                            510|                              6279|         5|       29|              1|      9am|
|  2|     2590|    56|    2|                             212|                            -6|                            390|                              6225|         5|  

In [4]:
treeCoverDF.columns

['Id',
 'Elevation',
 'Aspect',
 'Slope',
 'Horizontal_Distance_To_Hydrology',
 'Vertical_Distance_To_Hydrology',
 'Horizontal_Distance_To_Roadways',
 'Horizontal_Distance_To_Fire_Points',
 'Cover_Type',
 'Soil_Type',
 'Wilderness_Area',
 'Hillshade']

### Question 2:  Code Challenge - Print the Schema

Use Apache Spark to display the Schema of the `treeCoverDF` Dataframe.

In [5]:
treeCoverDF.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Elevation: integer (nullable = true)
 |-- Aspect: integer (nullable = true)
 |-- Slope: integer (nullable = true)
 |-- Horizontal_Distance_To_Hydrology: integer (nullable = true)
 |-- Vertical_Distance_To_Hydrology: integer (nullable = true)
 |-- Horizontal_Distance_To_Roadways: integer (nullable = true)
 |-- Horizontal_Distance_To_Fire_Points: integer (nullable = true)
 |-- Cover_Type: integer (nullable = true)
 |-- Soil_Type: integer (nullable = true)
 |-- Wilderness_Area: integer (nullable = true)
 |-- Hillshade: string (nullable = true)



### Question 3:  Code Challenge - Rows & Columns

Use Apache Spark to display the number of rows and columns in the DataFrame.

In [10]:
print("No. of rows    = {}".format(treeCoverDF.count()))
print("No. of Columns = {}".format(len(treeCoverDF.columns)))

No. of rows    = 15120
No. of Columns = 12


#Part 2: Analysis

### Question 4:  Code Challenge - Summary Statistics for a Feature

Use Apache Spark to answer these questions about the `treeCoverDF` DataFrame:
- What is the range - minimum and maximum - of values for the feature `elevation`?
- What are the mean and standard deviation of the feature `elevation`?

In [21]:
#1 way

summary_table = treeCoverDF.select('Elevation').describe()
#summary_table.show()
print("Min elevation :",float(summary_table.filter(col('summary') == 'min').first().asDict()['Elevation']))
print("Max elevation :",float(summary_table.filter(col('summary') == 'max').first().asDict()['Elevation']))
print("Mean elevation :",float(summary_table.filter(col('summary') == 'mean').first().asDict()['Elevation']))
print("Standard Deviation of elevation :",float(summary_table.filter(col('summary') == 'stddev').first().asDict()['Elevation']))

Min elevation : 1863.0
Max elevation : 3849.0
Mean elevation : 2749.3225529100528
Standard Deviation of elevation : 417.67818734804985


In [22]:
#2 way

from pyspark.sql.functions import *
treeCoverDF.agg(min(col("elevation")), max(col("elevation"))).show()

treeCoverDF.agg(mean(col("elevation")), stddev(col("elevation"))).show()

+--------------+--------------+
|min(elevation)|max(elevation)|
+--------------+--------------+
|          1863|          3849|
+--------------+--------------+

+------------------+----------------------+
|    avg(elevation)|stddev_samp(elevation)|
+------------------+----------------------+
|2749.3225529100528|    417.67818734804985|
+------------------+----------------------+



### Answer #4:

- Min `elevation`: `YOUR ANSWER HERE`
- Max `elevation`: `YOUR ANSWER HERE`
- Mean `elevation`: `YOUR ANSWER HERE`
- Standard Deviation of `elevation`: `YOUR ANSWER HERE`

### Question 5:  Code Challenge - Record Count

Use Apache Spark to answer the following question:
- How many entries in the dataset have an `elevation` greater than or equal to 2749.32 meters **AND** a `Cover_Type` of 1 or 2?

In [28]:
treeCoverDF.filter((col('elevation') >= 2749.32) & (col('Cover_Type').isin([1,2]))).count()

3883

In [26]:
treeCoverDF.filter('elevation >= 2749.32 and cover_type = 1 or cover_type = 2').count()

4279

### Question 6: Code Challenge - Compute a Percentage

Use Apache Spark to answer the following question:
- What percentage of entries with `Cover_Type` 1 or 2 have an `elevation` at or above 2749.32 meters?

In [39]:
filt_count = treeCoverDF.select('Elevation').filter(((col('Cover_Type') == 1) | (col('Cover_Type') == 2)) & (col('Elevation') >= 2749.32)).count()
filt_count

3883

In [40]:
filtercount = treeCoverDF.filter('elevation >= 2749.32 and cover_type = 1 or cover_type = 2').count()
filtercount

4279

In [41]:
totalcount = treeCoverDF.count()
print((filt_count/totalcount)*100)

print((filtercount/totalcount)*100)

25.681216931216934
28.300264550264547


### Question 7: Code Challenge - Visualize Feature Distribution


- a bar chart that helps visualize the distribution of different Wilderness Areas in our dataset

In [77]:
display(treeCoverDF.select('Wilderness_Area').distinct())

DataFrame[Wilderness_Area: int]

### Question 8: Code Challenge - Visualize Average Elevation by Cover Type 

- a bar chart showing the average elevation of each cover type with string labels for cover type

**NOTE: you will need to match the integer values in the column `treeCoverDF.Cover_Type` to the string values in `dbfs:/FileStore/tmp/nl/covertype.csv` to retrieve the Cover Type Labels. It is recommended to use an Apache Spark join.**

In [75]:
# way 1

treeCoverType = spark.read.load("covertype.csv",
                     format="csv", sep=",", inferSchema="true", header="true")

treeCoverType.show()   

newdf = treeCoverDF.join(treeCoverType,treeCoverDF.Cover_Type == treeCoverType.cover_type_key)

display(newdf.select('elevation','cover_type_label'))

+--------------+-----------------+
|cover_type_key| cover_type_label|
+--------------+-----------------+
|             1|       Spruce/Fir|
|             2|   Lodgepole Pine|
|             3|   Ponderosa Pine|
|             4|Cottonwood/Willow|
|             5|            Aspen|
|             6|      Douglas-fir|
|             7|        Krummholz|
+--------------+-----------------+



DataFrame[elevation: int, cover_type_label: string]

In [76]:
# way 2

df_coverType = spark.read.csv('covertype.csv', header=True)

print(df_coverType.count())

df_coverType.show()
df_coverType.printSchema()

from pyspark.sql.types import IntegerType

df_coverType = df_coverType.withColumn('cover_type_intkey', df_coverType['cover_type_key'].cast(IntegerType()))

df_coverType.printSchema()

df_coverType.count()

7
+--------------+-----------------+
|cover_type_key| cover_type_label|
+--------------+-----------------+
|             1|       Spruce/Fir|
|             2|   Lodgepole Pine|
|             3|   Ponderosa Pine|
|             4|Cottonwood/Willow|
|             5|            Aspen|
|             6|      Douglas-fir|
|             7|        Krummholz|
+--------------+-----------------+

root
 |-- cover_type_key: string (nullable = true)
 |-- cover_type_label: string (nullable = true)

root
 |-- cover_type_key: string (nullable = true)
 |-- cover_type_label: string (nullable = true)
 |-- cover_type_intkey: integer (nullable = true)



7

#Part 3: Data Ingestion, Cleansing, and Transformations

## Instructions 

This is a multi-step, data pipeline question in which you need to achieve a few objectives to build a successful job.

### Data Sets

#### `u.data.csv`

- The full u data set, 100000 ratings by 943 users on 1682 items. 
- Each user has rated at least 20 movies.  
- Users and items are numbered consecutively from 1. 
- The data is randomly ordered. 
- This is a tab separated file consisting of four columns: 
   - user id 
   - movie id 
   - rating 
   - date (unix seconds since 1/1/1970 UTC)

#### Desired schema

- `user_id INTEGER`
- `movie_id INTEGER`
- `rating INTEGER`
- `date DATE `

#### `u.item.csv`

- This is a `|` separated file consisting of six columns:
   - movie id
   - movie title
   - release date
   - video release date
   - IMDb URL
   - genre
- movie ids in this file match movie ids in `u.data`.

#### Desired schema

- `movie_id INTEGER`
- `movie_title STRING`

### Question 9:  Code Challenge - Load DataFrames

Use Apache Spark to perform the following:
1. define the correct schemas for each Data Set to be imported as described above  
   **note:** 
      - for `u.data.csv`, `date` *must* be stored using `DateType` with the format `yyyy-MM-dd`
      - you may need to ingest `timestamp` data using `IntegerType`
      - be sure to drop unneccesary columns for `u.item.csv`
1. import the two files as DataFrames names `uDataDF` and `uItemDF` using the schemas you defined and these paths:
   - `dbfs:/FileStore/tmp/u.data.csv`
   - `dbfs:/FileStore/tmp/u.item.csv`
1. order the `uDataDF` DataFrame by the `date` column

**NOTE:** Please display the DataFrames, `uDataDF` and `uItemDF` after loading.

#### `uDataDF`

In [46]:
# way 1

from pyspark.sql.functions import date_format
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import desc

uDataDF = spark.read.load("u.data.csv",
                     format="csv", sep="\t", inferSchema="true", header="false")

uDataDF.printSchema()
uDataDF = uDataDF.withColumnRenamed("_c0", "user_id").withColumnRenamed("_c1", "movie_id").withColumnRenamed("_c2", "rating").withColumnRenamed("_c3", "date").withColumn("tsDate",from_unixtime("date")).withColumn("newdate",date_format('tsDate', "yyyy-MM-dd")).drop('tsDate','date')

uDataDF.sort(desc("newdate")).show(3)

uDataDF.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)

+-------+--------+------+----------+
|user_id|movie_id|rating|   newdate|
+-------+--------+------+----------+
|    189|     732|     2|1998-04-23|
|    653|     272|     4|1998-04-23|
|    189|     381|     3|1998-04-23|
+-------+--------+------+----------+
only showing top 3 rows

root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- newdate: string (nullable = true)



In [48]:
# way 2

from pyspark.sql.types import StructType,StructField, StringType, IntegerType

from pyspark.sql.functions import *

schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("movie_id", IntegerType(), True),
    StructField("rating", IntegerType(), True),
    StructField("date", IntegerType(), True),
])

uDataDF = spark.read.csv('u.data.csv', inferSchema=True, sep='\t', schema = schema)

#uDataDF.show(3)

#uDataDF.printSchema()


uDataDF = uDataDF.select('user_id', 'movie_id', 'rating', to_date(from_unixtime(col('date'), 'yyyy-MM-dd')).alias('Converted_date'))

#uDataDF.printSchema()

#uDataDF.show(4)

uDataDF.registerTempTable('uDataTB')

spark.sql("select * from uDataTB order by Converted_date").show(3)

+-------+--------+------+--------------+
|user_id|movie_id|rating|Converted_date|
+-------+--------+------+--------------+
|    259|     117|     4|    1997-09-20|
|    119|     222|     5|    1997-09-20|
|    259|     255|     4|    1997-09-20|
+-------+--------+------+--------------+
only showing top 3 rows



#### `uItemDF`

In [58]:
# way 1

uItemDF = spark.read.load("u.item.csv",
                     format="csv", sep="|", inferSchema="true", header="false")

uItemDF = uItemDF.select("_c0","_c1","_c2","_c3","_c4","_c5").withColumnRenamed("_c0", "movie_id").withColumnRenamed("_c1", "movie_title").withColumnRenamed("_c2", "release_date").withColumnRenamed("_c3", "video_release_date").withColumnRenamed("_c4", "imdb_url").withColumnRenamed("_c5", "genre")

uItemDF.show(3)

+--------+-----------------+------------+------------------+--------------------+-----+
|movie_id|      movie_title|release_date|video_release_date|            imdb_url|genre|
+--------+-----------------+------------+------------------+--------------------+-----+
|       1| Toy Story (1995)| 01-Jan-1995|              null|http://us.imdb.co...|    0|
|       2| GoldenEye (1995)| 01-Jan-1995|              null|http://us.imdb.co...|    0|
|       3|Four Rooms (1995)| 01-Jan-1995|              null|http://us.imdb.co...|    0|
+--------+-----------------+------------+------------------+--------------------+-----+
only showing top 3 rows



In [53]:
#way 2

from pyspark.sql.types import StructType,StructField, StringType, IntegerType

schema = StructType([
    StructField("movie_id", IntegerType(), True),
    StructField("movie_title", StringType(), True),
])

uItemDF = spark.read.csv('u.item.csv', inferSchema=True, sep='|', schema = schema)

uItemDF.show(3)

#uItemDF.printSchema()


+--------+-----------------+
|movie_id|      movie_title|
+--------+-----------------+
|       1| Toy Story (1995)|
|       2| GoldenEye (1995)|
|       3|Four Rooms (1995)|
+--------+-----------------+
only showing top 3 rows



### Question 10:  Code Challenge - Perform a Join

Use Apache Spark to do the following:
- join `uDataDF` and `uItemDf` on `movie_id` as a new DataFrame called `uMovieDF`  
   **note:** make sure you do not create duplicate `movie_id` columns
   
**NOTE:** Please display the DataFrame `uMovieDF`.

In [56]:
uMovieDF = uDataDF.join(uItemDF,['movie_id'], "inner")
uMovieDF.show(3)

+--------+-------+------+--------------+--------------------+
|movie_id|user_id|rating|Converted_date|         movie_title|
+--------+-------+------+--------------+--------------------+
|     242|    196|     3|    1997-12-04|        Kolya (1996)|
|     302|    186|     3|    1998-04-05|L.A. Confidential...|
|     377|     22|     1|    1997-11-07| Heavyweights (1994)|
+--------+-------+------+--------------+--------------------+
only showing top 3 rows



### Question 11:  Code Challenge - Perform an Aggregation

Use Apache Spark to do the following:
1. create an aggregate DataFrame, `aggDF` by
  1. extracting the year from the `date` (of the review)
  1. getting the average rating of each film per year as a column named `average_rating`
  1. ordering descending by year and average rating
1. write the resulting dataframe to a table named "movie_by_year_average_rating" in the Default database  
   **note:** use `mode(overwrite)` 

#### Desired Schema
The schema of you resulting DataFrame should be:
- `year INTEGER`
- `movie_title STRING`
- `average_rating DOUBLE`

**NOTE:** Please display the DataFrame `aggDF`.

In [59]:
# way 1

from pyspark.sql.functions import year
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
uMovieDF1 = uMovieDF.withColumn("year",year("newdate"))


windowSpec = Window.partitionBy(uMovieDF1['year'])
cols = ["year","movie_title"]


uMovieDF1.withColumn('average_rating', avg("rating").over(Window.partitionBy(cols))).drop("movie_id","user_id","rating","newdate","release_date","video_release_date","imdb_url","genre").distinct().show()
  

AnalysisException: cannot resolve '`newdate`' given input columns: [Converted_date, movie_id, movie_title, rating, user_id];
'Project [movie_id#2283, user_id#2282, rating#2284, Converted_date#2290, movie_title#2622, year('newdate) AS year#2876]
+- Project [movie_id#2283, user_id#2282, rating#2284, Converted_date#2290, movie_title#2622]
   +- Join Inner, (movie_id#2283 = movie_id#2621)
      :- Project [user_id#2282, movie_id#2283, rating#2284, to_date(from_unixtime('date, yyyy-MM-dd, None), None) AS Converted_date#2290]
      :  +- Relation[user_id#2282,movie_id#2283,rating#2284,date#2285] csv
      +- Relation[movie_id#2621,movie_title#2622] csv


In [60]:
# way 2

df = uMovieDF.withColumn('year', year('Converted_date'))

df.registerTempTable('Movie')

final_df = spark.sql('select movie_title, year, avg(rating) as average_rating from movie group by movie_title,year order by year desc, average_rating desc')

final_df.filter(col('movie_title') == '39 Steps, The (1935)').show()

final_df.registerTempTable('finalTB')

#spark.sql('create table movie_by_year_average_rating as select * from finalTB')

#hive = spark.HiveContext

#final_df.write().mode("overwrite").saveAsTable("movie_by_year_average_rating");


+--------------------+----+-----------------+
|         movie_title|year|   average_rating|
+--------------------+----+-----------------+
|39 Steps, The (1935)|1998|              4.2|
|39 Steps, The (1935)|1997|3.896551724137931|
+--------------------+----+-----------------+



## Part 4: Fun with JSON

JSON values are typically passed by message brokers such as Kafka or Kinesis in a string encoding. When consumed by a Spark Structured Streaming application, this json must be converted into a nested object in order to be used.

Below is a list of json strings that represents how data might be passed from a message broker.

**Note:** Make sure to run the cell below to retrieve the sample data.

In [69]:
%%python


sampleJson =  [{
 		"user": 100,
 		"ips": ["191.168.192.101", "191.168.192.103", "191.168.192.96", "191.168.192.99"]
 	},
 	{
 		"user": 101,
 		"ips": ["191.168.192.102", "191.168.192.105", "191.168.192.103", "191.168.192.107"]
 	},
 	{
 		"user": 102,
 		"ips": ["191.168.192.105", "191.168.192.101", "191.168.192.105", "191.168.192.107"]
 	},
 	{
 		"user": 103,
 		"ips": ["191.168.192.96", "191.168.192.100", "191.168.192.107", "191.168.192.101"]
 	},
 	{
 		"user": 104,
 		"ips": ["191.168.192.99", "191.168.192.99", "191.168.192.102", "191.168.192.99"]
 	},
 	{
 		"user": 105,
 		"ips": ["191.168.192.99", "191.168.192.99", "191.168.192.100", "191.168.192.96"]
 	}
 ]

### Question 12:  Code Challenge - Count the IPs

Use any coding techniques known to you to parse this list of JSON strings to answer the following question:
- how many occurrences of each IP address are in this list?

#### Desired Output
Your results should be this:


| ip | count |
|:-:|:-:|
| `191.168.192.96` | `3` |
| `191.168.192.99` | `6` |
| `191.168.192.100` | `2` |
| `191.168.192.101` | `3` |
| `191.168.192.102` | `2` |
| `191.168.192.103` | `2` |
| `191.168.192.105` | `3` |
| `191.168.192.107` | `3` |

**NOTE:** The order of your results is not important.

In [71]:
sc = spark.sparkContext

jsondf = spark.read.option('multiLine','true').json(sc.parallelize(sampleJson))
jsondf.select(explode(col("ips"))).groupBy("col").agg(count('col')).withColumnRenamed("col","ip").show()


NameError: name 'sampleJson' is not defined