In [0]:
%sh curl --remote-name-all 'https://files.training.databricks.com/assessments/cse-take-home/{covertype,kafka,treecover,u.data,u.item}.csv'

In [0]:
dbutils.fs.cp("file:/databricks/driver/covertype.csv", "dbfs:/FileStore/tmp/covertype.csv")
dbutils.fs.cp("file:/databricks/driver/kafka.csv", "dbfs:/FileStore/tmp/kafka.csv")
dbutils.fs.cp("file:/databricks/driver/treecover.csv", "dbfs:/FileStore/tmp/treecover.csv")
dbutils.fs.cp("file:/databricks/driver/u.data.csv", "dbfs:/FileStore/tmp/u.data.csv")
dbutils.fs.cp("file:/databricks/driver/u.item.csv", "dbfs:/FileStore/tmp/u.item.csv")

Part 1: Reading and Parsing Data
    
    Question 1: Code Challenge - Load a CSV

    Load the CSV file at dbfs:/FileStore/tmp/nl/treecover.csv into a DataFrame.
    Use Apache Spark to read in the data, assigned to the variable treeCoverDF.
    Your method to get the CSV file into Databricks isn't graded. We are only concerned with how you use Spark to parse and load the actual data.
    Please use the inferSchema option.

MY CODE

In [0]:
treeCoverDF = spark.read.csv('/FileStore/tmp/treecover.csv', inferSchema = True, header = True)

Question 2: Code Challenge - Print the Schema
Use Apache Spark to display the Schema of the treeCoverDF Dataframe.

MY CODE

In [0]:
treeCoverDF.printSchema()

Question 3: Code Challenge - Rows & Columns

Use Apache Spark to display the number of rows and columns in the DataFrame.

MY CODE

In [0]:
row_count = treeCoverDF.count()
print("The number of rows: ",row_count)
column_count = len(treeCoverDF.columns)
print("The number of columns: ",column_count)

Part 2: Analysis

Question 4: Code Challenge - Summary Statistics for a Feature
Use Apache Spark to answer these questions about the treeCoverDF DataFrame:

    What is the range - minimum and maximum - of values for the feature elevation?
    What are the mean and standard deviation of the feature elevation?

MY CODE

In [0]:
# using the agg() function to get te min,max,mean and standard deviation

min_elevation = treeCoverDF.agg({'Elevation':'min'})
min_elevation.show()
max_elevation = treeCoverDF.agg({'Elevation':'max'})
max_elevation.show()
mean_elevation = treeCoverDF.agg({'Elevation':'mean'})
mean_elevation.show()
stddev_elevation = treeCoverDF.agg({'Elevation':'stddev'})
stddev_elevation.show()



In [0]:
# using the psark sql functions
# get the range- min and max , mean and standaard deviation of the column ELEVATION
from pyspark.sql.functions import min,max,mean,stddev
treeCoverDF.select(max(treeCoverDF['Elevation']),min(treeCoverDF['Elevation'])).show()
treeCoverDF.select(mean(treeCoverDF['Elevation']),stddev(treeCoverDF['Elevation'])).show()

In [0]:
# using describe()
summary_df = treeCoverDF.describe()
summary_df.select(summary_df['summary'],summary_df['Elevation']).show()

Question 5: Code Challenge - Record Count using Spark

    How many entries in the dataset have an elevation greater than or equal to 2749.32 meters AND a Cover_Type of 1 or 2?

In [0]:
 
cover_list = [1,2]
count_elevation = treeCoverDF.filter((treeCoverDF.Elevation >=2749.32) & (treeCoverDF.Cover_Type.isin(cover_list))).count()
print("count of entries in the dataset have an elevation greater than or equal to 2749.32 meters AND a Cover_Type of 1 or 2: ",count_elevation)


Question 6: Code Challenge - Compute a Percentage

Use Apache Spark to answer the following question:

    What percentage of entries with Cover_Type 1 or 2 have an elevation at or above 2749.32 meters?

In [0]:
percentage = count_elevation *100/treeCoverDF.count()
print("percentage of entries with Cover_Type 1 or 2 have an elevation at or above 2749.32 meters: ",percentage)

Question 7: Code Challenge - Visualize Feature Distribution

Use any visualization tool available in the Databricks Runtime to generate the following visualization:

    a bar chart that helps visualize the distribution of different Wilderness Areas in our dataset

In [0]:
display(treeCoverDF.select('Wilderness_Area'))

Wilderness_Area
1
1
1
1
1
1
1
1
1
1


Question 8: Code Challenge - Visualize Average Elevation by Cover Type

Use any visualization tool available in the Databricks Runtime to generate the following visualization:

    a bar chart showing the average elevation of each cover type with string labels for cover type

NOTE: you will need to match the integer values in the column treeCoverDF.Cover_Type to the string values in dbfs:/FileStore/tmp/nl/covertype.csv to retrieve the Cover Type Labels. It is recommended to use an Apache Spark join.

In [0]:
cover_type = spark.read.csv('/FileStore/tmp/covertype.csv', inferSchema = True, header = True)
cover_type.show()

In [0]:
tree_join = treeCoverDF.join(cover_type,treeCoverDF.Cover_Type == cover_type.cover_type_key,"inner")
tree_join.registerTempTable("Tree_Cover")
tree_join.show()

In [0]:
#average elevation of each cover type with string labels for cover type
avg_elevation_cover_type = spark.sql("Select cover_type_label,avg(Elevation) from Tree_Cover group by cover_type_label")
avg_elevation_cover_type.show()
display(avg_elevation_cover_type)

cover_type_label,avg(Elevation)
Cottonwood/Willow,2223.42037037037
Lodgepole Pine,2922.5402777777776
Douglas-fir,2423.276851851852
Krummholz,3362.769907407408
Aspen,2786.801388888889
Spruce/Fir,3128.025925925926
Ponderosa Pine,2398.4231481481484


Part 3: Data Ingestion, Cleansing, and Transformations

Instructions
 This is a multi-step, data pipeline question in which you need to achieve a few objectives to build a successful job.
Data Sets
u.data.csv

    The full u data set, 100000 ratings by 943 users on 1682 items.
    Each user has rated at least 20 movies.
    Users and items are numbered consecutively from 1.
    The data is randomly ordered.
    This is a tab separated file consisting of four columns:
        user id
        movie id
        rating
        date (unix seconds since 1/1/1970 
        
 Desired schema

    user_id INTEGER
    movie_id INTEGER
    rating INTEGER
    date DATE
u.item.csv

    This is a | separated file consisting of six columns:
        movie id
        movie title
        release date
        video release date
        IMDb URL
        genre
    movie ids in this file match movie ids in u.data.

Desired schema

    movie_id INTEGER
    movie_title STRING

Question 9: Code Challenge - Load DataFrames

Use Apache Spark to perform the following:

    define the correct schemas for each Data Set to be imported as described above
    note:
        for u.data.csv, date must be stored using DateType with the format yyyy-MM-dd
        you may need to ingest timestamp data using IntegerType
        be sure to drop unneccesary columns for u.item.csv
    import the two files as DataFrames names uDataDF and uItemDF using the schemas you defined and these paths:
        dbfs:/FileStore/tmp/u.data.csv
        dbfs:/FileStore/tmp/u.item.csv
    order the uDataDF DataFrame by the date column

NOTE: Please display the DataFrames, uDataDF and uItemDF after loading.

In [0]:
#uData
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,DateType,LongType,TimestampType
from pyspark.sql.functions import to_date

schema_udata = StructType([ 
    StructField("user_id",IntegerType(),True), 
    StructField("movie_id",IntegerType(),True),
     StructField("rating",IntegerType(),True),
     StructField("date",LongType(),True)])
uDataDF = spark.read.format('csv').option("header",True).option("delimiter","\t").schema(schema_udata).load('/FileStore/tmp/u.data.csv')
uDataDF = uDataDF.withColumn("date", to_date(uDataDF["date"].cast(TimestampType())))
uDataDF.printSchema()
uDataDF.orderBy("date").show()
#uDataDF.show()


# uItem
schema_uitem = StructType([
    StructField("movie_id",IntegerType(),True), 
    StructField("movie_title",StringType(),True)])
uItemDF = spark.read.format('csv').option("header",True).schema(schema_uitem).option("delimiter","|").load('/FileStore/tmp/u.item.csv')
uItemDF.printSchema()
uItemDF.show()

Question 10: Code Challenge - Perform a Join

Use Apache Spark to do the following:

    join uDataDF and uItemDf on movie_id as a new DataFrame called uMovieDF
    note: make sure you do not create duplicate movie_id columns

NOTE: Please display the DataFrame uMovieDF.

In [0]:
#uMovieDF = uDataDF.join(uItemDF,uDataDF.movie_id == uItemDF.movie_id,"inner") ## this will show duplicae columns for the movie_id(join made on basis of)
uMovieDF = uDataDF.join(uItemDF,['movie_id']) ## this will show only one column for movie_id
uMovieDF.show()

Question 11: Code Challenge - Perform an Aggregation

Use Apache Spark to do the following:

    create an aggregate DataFrame, aggDF by
        extracting the year from the date (of the review)
        getting the average rating of each film per year as a column named average_rating
        ordering descending by year and average rating
    write the resulting dataframe to a table named "movie_by_year_average_rating" in the Default database
    note: use mode(overwrite)

In [0]:
from pyspark.sql.functions import year,mean,count,desc,asc
year_movieDF = uMovieDF.withColumn("movie_year",year(uMovieDF.date))
year_movieDF.show()
avg_rating = year_movieDF.groupby("movie_title","movie_year").avg("rating").orderBy(desc("movie_year"),desc("avg(rating)"))
avg_rating = avg_rating.withColumnRenamed("avg(rating)","average_rating")
avg_rating.show()

avg_rating.write.saveAsTable("movie_by_year_average_rating")

#ratingDF.groupBy("userId").agg(count("*").alias("number_cnt")).orderBy(desc("number_cnt")) 


In [0]:
%sql
show tables;

database,tableName,isTemporary
default,movie_by_year_average_rating,False
,tree_cover,True


Part 4: Fun with JSON

JSON values are typically passed by message brokers such as Kafka or Kinesis in a string encoding. When consumed by a Spark Structured Streaming application, this json must be converted into a nested object in order to be used.

Below is a list of json strings that represents how data might be passed from a message broker.

Note: Make sure to run the cell below to retrieve the sample data.

In [0]:
%python


sampleJson = [
 ('{"user":100, "ips" : ["191.168.192.101", "191.168.192.103", "191.168.192.96", "191.168.192.99"]}',), 
 ('{"user":101, "ips" : ["191.168.192.102", "191.168.192.105", "191.168.192.103", "191.168.192.107"]}',), 
 ('{"user":102, "ips" : ["191.168.192.105", "191.168.192.101", "191.168.192.105", "191.168.192.107"]}',), 
 ('{"user":103, "ips" : ["191.168.192.96", "191.168.192.100", "191.168.192.107", "191.168.192.101"]}',), 
 ('{"user":104, "ips" : ["191.168.192.99", "191.168.192.99", "191.168.192.102", "191.168.192.99"]}',), 
 ('{"user":105, "ips" : ["191.168.192.99", "191.168.192.99", "191.168.192.100", "191.168.192.96"]}',), 
]
type(sampleJson)

In [0]:
type(eval(sampleJson[0][0]))

Question 12: Code Challenge - Count the IPs

Use any coding techniques known to you to parse this list of JSON strings to answer the following question:

    how many occurrences of each IP address are in this list?

Desired Output

Your results should be this:
ip 	count
191.168.192.96 ---	3,
191.168.192.99---- 	6,
191.168.192.100 ---- 2,
191.168.192.101 ---	3

In [0]:
sampleJson_test = [
 {"user":100, "ips" : ["191.168.192.101", "191.168.192.103", "191.168.192.96", "191.168.192.99"]}, 
 {"user":101, "ips" : ["191.168.192.102", "191.168.192.105", "191.168.192.103", "191.168.192.107"]}, 
 {"user":102, "ips" : ["191.168.192.105", "191.168.192.101", "191.168.192.105", "191.168.192.107"]}, 
 {"user":103, "ips" : ["191.168.192.96", "191.168.192.100", "191.168.192.107", "191.168.192.101"]}, 
 {"user":104, "ips" : ["191.168.192.99", "191.168.192.99", "191.168.192.102", "191.168.192.99"]}, 
 {"user":105, "ips" : ["191.168.192.99", "191.168.192.99", "191.168.192.100", "191.168.192.96"]}, 
]
import json
a=[json.dumps(sampleJson_test)]
jsonRDD = sc.parallelize(a)
df = spark.read.json(jsonRDD)
df.show()
# splitting list of values in the ips column into multiple column

length = len(df.select('ips').take(1)[0][0])


df_final = df.select([df.user]+[df.ips]+[df.ips[i] for i in range(length)])
df_final.show()
'''
df_final_count0 = (df_final
                .groupBy('ips[0]').count())
df_final_count1 = (df_final
                .groupBy('ips[1]').count())
df_final_count2 = (df_final
                .groupBy('ips[2]').count())
df_final_count3 = (df_final
                .groupBy('ips[3]').count())

df_final_count0.show()
df_final_count1.show()
df_final_count2.show()
df_final_count3.show()
'''


In [0]:
from pyspark.sql.functions import expr
df_count_ips = df_final.select("user","ips",expr("stack(4,'ips[0]',ips[0],'ips[1]',ips[1],'ips[2]',ips[2],'ips[3]',ips[3]) as (ip_collected, ip_list)"))
df_count_ips_final = df_count_ips.drop("ips")
df_count_ips_final.show()


In [0]:
  df_count_ips_final.groupBy('ip_list').count().show()