### Analyse search terms on the e-commerce web server


##### In this assignment you will download the search term data set for the e-commerce web server and run analytic queries on it.


In [1]:
# Install spark

In [1]:
!pip install pyspark
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.0/199.0 KB[0m [31m33.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=fcb6987e688793b1264a62b15657845b73356c9c6f71f2a3c3aecb2541234791
  Stored in directory: /home/jupyterlab/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
Collecting finds

In [None]:
# Start session

In [2]:
import findspark
findspark.init()

In [7]:
import pandas as pd
import matplotlib.pyplot as plt
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, Normalizer, StandardScaler
from pyspark.ml.stat import Correlation
from pyspark.ml.regression import LinearRegression

In [4]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Big Data Analytics with SparkML Ops Model").getOrCreate()

22/04/30 08:14:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
spark

In [1]:
# Download The search term dataset from the below url
# https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/searchterms.csv

In [8]:
searchterms = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/searchterms.csv')

In [None]:
# Load the csv into a spark dataframe

In [9]:
sdf = spark.createDataFrame(searchterms)

In [None]:
# Print the number of rows and columns
# Take a screenshot of the code and name it as shape.jpg)

In [13]:
sdf.show()

# extracting number of rows from the Dataframe
row = sdf.count()

# extracting number of columns from the Dataframe
col = len(sdf.columns)

# printing
print(f'Dimension of the Dataframe is: {(row,col)}')
print(f'Number of Rows are: {row}')
print(f'Number of Columns are: {col}')

                                                                                

+---+-----+----+--------------+
|day|month|year|    searchterm|
+---+-----+----+--------------+
| 12|   11|2021| mobile 6 inch|
| 12|   11|2021| mobile latest|
| 12|   11|2021|   tablet wifi|
| 12|   11|2021|laptop 14 inch|
| 12|   11|2021|     mobile 5g|
| 12|   11|2021| mobile 6 inch|
| 12|   11|2021|        laptop|
| 12|   11|2021|        laptop|
| 12|   11|2021|     mobile 5g|
| 12|   11|2021|   tablet wifi|
| 12|   11|2021|     mobile 5g|
| 12|   11|2021| gaming laptop|
| 12|   11|2021|     mobile 5g|
| 12|   11|2021|     mobile 5g|
| 12|   11|2021| mobile 6 inch|
| 12|   11|2021| mobile latest|
| 12|   11|2021| mobile 6 inch|
| 12|   11|2021|   tablet wifi|
| 12|   11|2021|     mobile 5g|
| 12|   11|2021|        laptop|
+---+-----+----+--------------+
only showing top 20 rows



[Stage 1:>                                                          (0 + 8) / 8]

Dimension of the Dataframe is: (10000, 4)
Number of Rows are: 10000
Number of Columns are: 4


                                                                                

In [None]:
# Print the top 5 rows
# Take a screenshot of the code and name it as top5rows.jpg)

In [21]:
sdf.head(5)

[Row(day=12, month=11, year=2021, searchterm='mobile 6 inch'),
 Row(day=12, month=11, year=2021, searchterm='mobile latest'),
 Row(day=12, month=11, year=2021, searchterm='tablet wifi'),
 Row(day=12, month=11, year=2021, searchterm='laptop 14 inch'),
 Row(day=12, month=11, year=2021, searchterm='mobile 5g')]

In [None]:
# Find out the datatype of the column searchterm?
# Take a screenshot of the code and name it as datatype.jpg)

In [22]:
sdf.dtypes

[('day', 'bigint'),
 ('month', 'bigint'),
 ('year', 'bigint'),
 ('searchterm', 'string')]

In [None]:
# How many times was the term `gaming laptop` searched?
# Take a screenshot of the code and name it as gaminglaptop.jpg)

In [26]:
value = sdf.select('searchterm').where(sdf.searchterm == 'gaming laptop').count()
print(value)
print("How many times was the term `gaming laptop` was searched is ", value)

499
How many times was the term `gaming laptop` was searched is  499


In [None]:
# Print the top 5 most frequently used search terms?
# Take a screenshot of the code and name it as top5terms.jpg)

In [32]:
from pyspark.sql import functions as F, Window

result = sdf.select(*[
    F.struct(
        F.count(c).over(Window.partitionBy(c)).alias("cnt"),
        F.col(c).alias("val")
    ).alias(c) for c in sdf.columns
]).agg(*[
    F.slice(
        F.expr(f"transform(sort_array(collect_set({c}), false), x -> x.val)"),
        1, 5
    ).alias(c) for c in sdf.columns
])

result.show()



+--------------------+--------+------+--------------------+
|                 day|   month|  year|          searchterm|
+--------------------+--------+------+--------------------+
|[23, 22, 19, 18, 15]|[12, 11]|[2021]|[mobile 6 inch, m...|
+--------------------+--------+------+--------------------+



                                                                                

In [None]:
# The pretrained sales forecasting model is available at  the below url
# https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/model.tar.gz

In [43]:
import urllib.request

url = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/model.tar.gz'

download_file = 'model.tar.gz'

urllib.request.urlretrieve(url, download_file)

('model.tar.gz', <http.client.HTTPMessage at 0x7fbf81b36310>)

In [45]:
import tarfile

tar_file = 'model.tar.gz'

if tar_file.endswith("tar.gz"):
    tar = tarfile.open(tar_file, "r:gz")
    tar.extractall('./extracted_folder')
    tar.close()

In [3]:
# Load the sales forecast model.
# Take a screenshot of the code and name it as loadmodel.jpg)

In [46]:
from pyspark.ml.regression import LinearRegressionModel

In [47]:
model = LinearRegressionModel.load('sales_prediction.model')

                                                                                

In [4]:
# Using the sales forecast model, predict the sales for the year of 2023.
# Take a screenshot of the code and name it as forecast.jpg

In [49]:
def predict(year):
    assembler = VectorAssembler(inputCols=["year"],outputCol="features")
    data = [[year,0]]
    columns = ["year", "searchterm"]
    _ = spark.createDataFrame(data, columns)
    __ = assembler.transform(_).select('features','searchterm')
    predictions = model.transform(__)
    predictions.select('prediction').show()

In [52]:
predict(2023)

+------------------+
|        prediction|
+------------------+
|175.16564294006457|
+------------------+

