# Poject: Sequential Pattern Mining with Web log

In this project we will use spark and the prefix algorithm to mine a web log from an online store.


Author: <font color="blue">Mingjia "Jacky" Guan</font>

E-mail: <font color="blue">mguan@stu.feitian.edu</font>

Date: <font color="blue">04/05/2024</font>

## About Data and Cleaning

**Dataset**: https://dataverse.harvard.edu/dataset.xhtml?persistentId=doi:10.7910/DVN/3QBYB5

- Alternative download link:  https://cis331.guihang.org/data/access.log.zip 

- **Download and unzip** the file `Access.log.zip` ; Move the file `access.log` to the directory that you map into docker instance:

Say, I move my file into my home: `/Users/zhengqu` ; then in Mac terminal/Windows command prompt, I "cd" to my home dir (just type "cd" and enter, it will do).

```bash
pwd
```

<font size=-1 color=grey>/Users/zhengqu</font>

Let's assume I put `access.log` in my home dir as `/Users/zhengqu/access.log` and my spark docker container/instance was created by the following command, as I demonstrated in the video after the spark lab lecture:

```bash
docker run -p 10000:8888 -d -P --name notebook -it  -v "$PWD:/home/jovyan/mydir" jupyter/all-spark-notebook
```

whereby `"$PWD"` is actually `/Users/zhengqu` in my case, and it is mapped into docker container `notebook` as `/home/jovyan/mydir`. So anything I put in this folder `"/Users/zhengqu"` on my host laptop is available in my spark docker container `notebook` in this folder: `/home/jovyan/mydir`. 



### Access the data file from docker container

- First you start the spark docker contaner/instance (you don't have to if your docker container is not stopped since last running) 

Use this command to show docker container running status:

```bash
docker ps -a
```

- To start /start container:

```bash
docker start notebook
```

- Next you connect your container with bash:

```bash
# connect to docker container with bash
docker exec -it notebook /bin/bash
```

- Now you are inside the spark docker container (imagine it as a virtual linux machine).

----
**Note: commands below are run from docker container linux system**


- Let's "cd" to the mapped folder:

```bash
# change to mapped dir:
cd /home/jovyan/mydir
```


- Let's check the file by running: 

```bash
 ls -l access.log
```
<font size=-2 color=grey>`-rw-r--r--@ 1 jovyan  staff 3502440823 Jan 26  2019 access.log`</font>

- Let's check how many lines it has by running (this takes a while):

```bash
wc -l access.log
```
<font size=-2 color=grey>`10365152 access.log`</font>

- Let's print the last 100 lines from end of the file:

```bash
tail -n 100 access.log
```



<font size=-2 color=#524d43>

```
151.239.2.90 - - [26/Jan/2019:20:29:09 +0330] "GET /static/bundle-bundle_site_head.css HTTP/1.1" 499 0 "https://www.zanbil.ir/search/%D8%B3%D8%B1%D8%B1%D8%B3%DB%8C%D8%AF/p0" "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:64.0) Gecko/20100101 Firefox/64.0" "-"
151.239.2.90 - - [26/Jan/2019:20:29:09 +0330] "GET /image/%7B%7BbasketItem.id%7D%7D?type=productModel&wh=50x50 HTTP/1.1" 200 5 "https://www.zanbil.ir/search/%D8%B3%D8%B1%D8%B1%D8%B3%DB%8C%D8%AF/p0" "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:64.0) Gecko/20100101 Firefox/64.0" "-"
5.114.175.169 - - [26/Jan/2019:20:29:09 +0330] "GET /image/56602/productModel/200x200 HTTP/1.1" 200 6188 "https://www.zanbil.ir/m/filter/b2%2Cp65%2Ct31" "Mozilla/5.0 (iPhone; CPU iPhone OS 11_4_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/11.0 Mobile/15E148 Safari/604.1" "-"
5.114.175.169 - - [26/Jan/2019:20:29:09 +0330] "GET /image/62175/productModel/200x200 HTTP/1.1" 200 8140 "https://www.zanbil.ir/m/filter/b2%2Cp65%2Ct31" "Mozilla/5.0 (iPhone; CPU iPhone OS 11_4_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/11.0 Mobile/15E148 Safari/604.1" "-"
5.114.175.169 - - [26/Jan/2019:20:29:09 +0330] "GET /image/62064/productModel/200x200 HTTP/1.1" 200 6840 "https://www.zanbil.ir/m/filter/b2%2Cp65%2Ct31" "Mozilla/5.0 (iPhone; CPU iPhone OS 11_4_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/11.0 Mobile/15E148 Safari/604.1" "-"
151.239.2.90 - - [26/Jan/2019:20:29:09 +0330] "GET /static/images/search-category-arrow.png HTTP/1.1" 200 217 "https://znbl.ir/static/bundle-bundle_site_head.css" "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:64.0) Gecko/20100101 Firefox/64.0" "-"
[truncated]
```
</font>

---

## Understand the Web Log Format

You want to read [this article](https://www.sumologic.com/blog/apache-access-log/) to understand the web log format. Each web site may have a slightly different format. In general, you will see a user IP at the beginning, then the timestamp (in `[ ]` in above case) and user request (in simplified terms, sending data is called `POST`, like send login password; retrieving data is called `GET`). The request URL doesn't contain the base domain name. So 

```
GET /image/62175/productModel/200x200
```

means retrieving https://www.zanbil.ir/mage/62175/productModel/200x200
(the base domain name of this web log is www.zanbil.ir)

The 3 digits number after `HTTP/1.1` is the access status: 200 means success, 3** means success but user previously requested the same data so it might be served from browser cache (saved data in user browser), 4** means client (user browser) error (404 is the infamous non-existing status); 5** means server error. <font color="orange">So you only care about 200 and 3** status, and want to filter out lines with other status types.</font>

You will see each IP at one timestamp requesting data from many URLs simultaneously--that's because when you click a page, multiple images, css, javascript files within that page will be retrieved and each leaves a separate web log entry. 


<font color="orange">You want to remember before mining further you will filter out all lines containing requesting css or js (you want to manually figure out patterns for these types of urls) which are normally useless for our purpose.</font>

You will also notice some log entry contain this type of lines:

<font size=-2 color=#888>
```
37.129.59.160 - - [26/Jan/2019:20:29:12 +0330] "GET /basket/add/62424?mobile=1&addedValues= HTTP/1.1" 302 0 "https://www-zanbil-ir.cdn.ampproject.org/v/s/www.zanbil.ir/m/product/32148/%DA%AF%D9%88%D8%B4%DB%8C-%D8%AA%D9%84%D9%81%D9%86-%D8%A8%DB%8C-%D8%B3%DB%8C%D9%85-%D9%BE%D8%A7%D9%86%D8%A7%D8%B3%D9%88%D9%86%DB%8C%DA%A9-%D9%85%D8%AF%D9%84-Panasonic-Cordless-Telephone-KX-TGC412?amp_js_v=0.1&usqp=mq331AQECAEoAQ%3D%3D" "Mozilla/5.0 (Linux; Android 6.0.1; D6633 Build/23.5.A.1.291) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36" "-"
```
</font>

- Note: <font color="orange">this URL `/basket/add/62424` means the user's action to add a product to the basket!</font>


---

### Preprocess Tasks 

To simplify our task, we will only consider users who have added at least a product to the basket. So preprocess steps involve:

0. read the access.log into spark`*`
1. filter out all unnecessary lines as highlighted in orange before.
2. Get a list of IPs (let's call it uniqueIPs) who performed the action of adding product(s) to the basket 
3. filter out all records whose IP does not belong to the IP list in step 2.

The remaining records are what we will mine with prefixspan.

`*` For step 0, for reading text file into spark, follow examples on this page (click python version): https://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html


### Example on a notebook running from docker container

- Run this from docker container's Linux bash command line to show jupyter notebook tokens

```
jupyter notebook list
```

<font size=-2 color=grey>

```
Currently running servers:
http://0.0.0.0:8888/?token=01e38a2234531e653d320df569dbc94173ebe89460cd57a :: /home/jovyan
```
</font>


In your laptop browser you open this url:
`http://127.0.0.1:10000/?token=01e38a2234531e653d320df569dbc94173ebe89460cd57a`

Note you replace 8888 with 10000 , then you navigate to your mapped dir which is `mydir` if you follow above commands. Then you do "file" and new notebook, save the new notebook (file>save notebook). You will also see the new notebook in your laptop folder, in my case it's in `/Users/zhengqu` -- this notebook is running in docker container, so do not simultaneously open the same notebook in host laptop (Windows/MAC)'s jupyter! This creates conflict.

#### Let's see pyspark example from https://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html


```python
from pyspark.mllib.fpm import FPGrowth
from pyspark import SparkContext 
sc = SparkContext('local[*]')

# Note data is an object of RDD: pyspark.rdd.PipelinedRDD
data = sc.textFile("data/mllib/sample_fpgrowth.txt") 
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
result = model.freqItemsets().collect() ## << note .collect() 
for fi in result:
    print(fi)
```

Please run this example. Note 
- you need to download `sample_fpgrowth.txt` from 
https://github.com/apache/spark/raw/master/data/mllib/sample_fpgrowth.txt
and properly specify the file path in `sc.textFile()`. You can use `!wget URL-to-data` in jupyter notebook, then `!ls path/to/downloaded/file` to verify the file
- The spark object collection (data frame or pipelines) cannot be iterated until be applied with `.collect()` as in above example or using  `.take(n)` as in demo in class.

## <font color=red>You code to run example of sample_fpgrowth.txt with FPGrowth here </font>

In [1]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [2]:
!wget "https://github.com/apache/spark/raw/master/data/mllib/sample_fpgrowth.txt"

--2024-04-08 03:43:50--  https://github.com/apache/spark/raw/master/data/mllib/sample_fpgrowth.txt
Resolving github.com (github.com)... 140.82.114.3
Connecting to github.com (github.com)|140.82.114.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_fpgrowth.txt [following]
--2024-04-08 03:43:50--  https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_fpgrowth.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.108.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 68 [text/plain]
Saving to: ‘sample_fpgrowth.txt.5’


2024-04-08 03:43:50 (2.51 MB/s) - ‘sample_fpgrowth.txt.5’ saved [68/68]



In [3]:
from pyspark.mllib.fpm import FPGrowth

data = sc.textFile("sample_fpgrowth.txt")
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
result = model.freqItemsets().collect()
for fi in result:
    print(fi)

FreqItemset(items=['z'], freq=5)
FreqItemset(items=['x'], freq=4)
FreqItemset(items=['x', 'z'], freq=3)
FreqItemset(items=['y'], freq=3)
FreqItemset(items=['y', 'x'], freq=3)
FreqItemset(items=['y', 'x', 'z'], freq=3)
FreqItemset(items=['y', 'z'], freq=3)
FreqItemset(items=['r'], freq=3)
FreqItemset(items=['r', 'x'], freq=2)
FreqItemset(items=['r', 'z'], freq=2)
FreqItemset(items=['s'], freq=3)
FreqItemset(items=['s', 'y'], freq=2)
FreqItemset(items=['s', 'y', 'x'], freq=2)
FreqItemset(items=['s', 'y', 'x', 'z'], freq=2)
FreqItemset(items=['s', 'y', 'z'], freq=2)
FreqItemset(items=['s', 'x'], freq=3)
FreqItemset(items=['s', 'x', 'z'], freq=2)
FreqItemset(items=['s', 'z'], freq=2)
FreqItemset(items=['t'], freq=3)
FreqItemset(items=['t', 'y'], freq=3)
FreqItemset(items=['t', 'y', 'x'], freq=3)
FreqItemset(items=['t', 'y', 'x', 'z'], freq=3)
FreqItemset(items=['t', 'y', 'z'], freq=3)
FreqItemset(items=['t', 's'], freq=2)
FreqItemset(items=['t', 's', 'y'], freq=2)
FreqItemset(items=['t', '

Note:

- data is an object of RDD: 
```python
type(data)
```
> <font size=-2>pyspark.rdd.PipelinedRDD</font>
- `sc.textFile()` takes the path to your input file. Use jupyter command `!ls path/to/access.log` to verify your file path. Or switch to docker container command line, using something like `ls /home/jovyan/mydir/access.log` to verify.
- `data.map()` takes a function to be applied to each line in the input text file. If you prefer not using lambda function, you can define a named function like:
```python
def myParser (line): 
   return line.strip().split(' ')
```

And then you can use `data.map` like:
```python
transactions = data.map(myParser)
```

Please try it in the example. Also reference notebook "pyspark-ex.ipynb" used in my video demo for docker/spark.


In [4]:
type(data)

pyspark.rdd.RDD

In [5]:
!pwd

/home/jovyan/mydir/workspaces/CIS331/sequential_pattern_mining


## <font color=red>You code to run example of sample_fpgrowth.txt with myParser here </font>

In [6]:
from pyspark.mllib.fpm import FPGrowth

def myParser(line):
    return line.strip().split(" ")

data = sc.textFile("sample_fpgrowth.txt")
transactions = data.map(myParser)
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
result = model.freqItemsets().collect()
for fi in result:
    print(fi)

FreqItemset(items=['z'], freq=5)
FreqItemset(items=['x'], freq=4)
FreqItemset(items=['x', 'z'], freq=3)
FreqItemset(items=['y'], freq=3)
FreqItemset(items=['y', 'x'], freq=3)
FreqItemset(items=['y', 'x', 'z'], freq=3)
FreqItemset(items=['y', 'z'], freq=3)
FreqItemset(items=['r'], freq=3)
FreqItemset(items=['r', 'x'], freq=2)
FreqItemset(items=['r', 'z'], freq=2)
FreqItemset(items=['s'], freq=3)
FreqItemset(items=['s', 'y'], freq=2)
FreqItemset(items=['s', 'y', 'x'], freq=2)
FreqItemset(items=['s', 'y', 'x', 'z'], freq=2)
FreqItemset(items=['s', 'y', 'z'], freq=2)
FreqItemset(items=['s', 'x'], freq=3)
FreqItemset(items=['s', 'x', 'z'], freq=2)
FreqItemset(items=['s', 'z'], freq=2)
FreqItemset(items=['t'], freq=3)
FreqItemset(items=['t', 'y'], freq=3)
FreqItemset(items=['t', 'y', 'x'], freq=3)
FreqItemset(items=['t', 'y', 'x', 'z'], freq=3)
FreqItemset(items=['t', 'y', 'z'], freq=3)
FreqItemset(items=['t', 's'], freq=2)
FreqItemset(items=['t', 's', 'y'], freq=2)
FreqItemset(items=['t', '

## Read and parse data in spark

In this project, your function, let's still call it `myParser` for `data.map()` should take a line of string and  return a tuple of

`IP, TIMESTAMP, REQUEST_URL, STATUS`

- `IP` can be a string, 
- `TIMESTAMP` should be a datetime object, refer to [this tutorial](https://stackabuse.com/converting-strings-to-datetime-in-python/) for converting string to datetime object. (note: timezone doesn't matter in this project because all records are in the same timezone, which is server time, so you can ignore it.)
- `REQUEST_URL` is a string
- STATUS is integer, 200, 302 etc.


It's a good habit to test your function `myParser` first, with a smaller dataset and without spark, before proceeding. You can create a 20 line file for this by running this in docker container Linux command line:
```bash
tail -n 20 access.log > small20.log
```

This will create a file "small20.log"

You can view it by running following command in docker container Linux command:
```bash
cat small20.log
```

Now you can test this file (note it's sitting inside `"/home/jovyan/mydir"` ) by following python code in Jupyter notebook:

```python
# Note this test doesn't involve spark
lines = open("/home/jovyan/mydir/small20.log").readlines()
for l in map(myParser, lines):
   print(l)
```




## <font color=red>You code for defining function `myParser` and testing it on small20.log without using spark (you can use code above) here </font>

In [7]:
def myParser(line):
    x = line.split()
    return x[0], "".join(x[3:5]), x[6], x[8]

After your test, apply your function `myParser` in spark RDD and print out 15 rows （use data.take(15) ）from your large spark data (based upon the large access.log file), note each row will be:

IP, TIMESTAMP, REQUEST_URL, STATUS


## <font color=red>You code to test `myParser` on reading access.log as spark RDD and print out 15 rows here </font>

In [8]:
data = sc.textFile("small20.log").take(15)

for line in data:
    print(myParser(line))

('91.99.55.165', '[26/Jan/2019:20:29:12+0330]', '/m/filter/p2597%2Cb231?page=1', '200')
('151.242.106.253', '[26/Jan/2019:20:29:12+0330]', '/image/30181?name=fsr14-55.jpg&wh=max', '200')
('192.15.51.231', '[26/Jan/2019:20:29:13+0330]', '/image/267/productModel/150x150', '200')
('13.66.139.0', '[26/Jan/2019:20:29:13+0330]', '/product/29746/%D9%85%D8%A7%D8%B4%DB%8C%D9%86-%D8%A7%D8%B5%D9%84%D8%A7%D8%AD-%D8%A8%D8%AF%D9%86-%D9%BE%D8%B1%D9%86%D8%B3%D9%84%DB%8C-%D9%85%D8%AF%D9%84-Princely-Body-Groomer-PR461AT', '200')
('45.79.177.249', '[26/Jan/2019:20:29:13+0330]', '/m/browse/evaporative-air-cooler/%DA%A9%D9%88%D9%84%D8%B1-%D8%A2%D8%A8%DB%8C', '200')
('180.94.84.225', '[26/Jan/2019:20:29:13+0330]', '/image/get?path=/Image/ofa4bc2n.jpg', '200')
('5.134.161.178', '[26/Jan/2019:20:29:13+0330]', '/image/216/brand', '200')
('5.134.161.178', '[26/Jan/2019:20:29:13+0330]', '/image/217/brand', '200')
('5.134.161.178', '[26/Jan/2019:20:29:13+0330]', '/image/105/brand', '200')
('192.15.51.231', '[26/J

## Filtering the spark data

Now you want to filter your RDD. We completed step 0 below. The remaining steps are 1,2,3.

0. read the access.log into spark (**completed**)
1. filter out all unnecessary lines as highlighted before.
2. Get a list of IPs (let's call it uniqueIPs) who performed the action of adding product(s) to the basket 
3. filter out all records whose IP does not belong to the IP list in step 2.

In [9]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F

access_log_raw = sc.textFile("access.log")

access_log_rdd = access_log_raw.map(myParser)

spark = SparkSession(sc)

al_df = access_log_rdd.toDF(schema = ["ip", "timestamp", "url", "exit_code"])

al_df.printSchema()

al_df.show()

root
 |-- ip: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- url: string (nullable = true)
 |-- exit_code: string (nullable = true)

+-------------+--------------------+--------------------+---------+
|           ip|           timestamp|                 url|exit_code|
+-------------+--------------------+--------------------+---------+
| 54.36.149.41|[22/Jan/2019:03:5...|/filter/27|13%20%...|      200|
|  31.56.96.51|[22/Jan/2019:03:5...|/image/60844/prod...|      200|
|  31.56.96.51|[22/Jan/2019:03:5...|/image/61474/prod...|      200|
|40.77.167.129|[22/Jan/2019:03:5...|/image/14925/prod...|      200|
|  91.99.72.15|[22/Jan/2019:03:5...|/product/31893/62...|      200|
|40.77.167.129|[22/Jan/2019:03:5...|/image/23488/prod...|      200|
|40.77.167.129|[22/Jan/2019:03:5...|/image/45437/prod...|      200|
|40.77.167.129|[22/Jan/2019:03:5...|/image/576/articl...|      200|
|66.249.66.194|[22/Jan/2019:03:5...|/filter/b41,b665,...|      200|
|40.77.167.129|[22/Jan/2019

In [10]:
print(al_df.count())

10365152


#### Converting RDD to dataframe

In order to do filtering operations, we want to convert the RDD into spark dataframe. Let's run and study following examples. 

**Note**: in case you get error with `ValueError: Cannot run multiple SparkContexts at once` you need to stop previous `SparkContext` by running
```python
sc.stop()
```

Or you comment the line for `sc = SparkContext('local[*]')`

**Important** Run and **study** the code example below carefully, you might use every line of it, with modification, for the project.

```python
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F


sc = SparkContext('local[*]') # comment this in case you already have SparkContext running



dataList = [("James", "Cameron","Sales","NY",90000,34,10000),
    ("Michael", "Johnson","Sales","NY",86000,56,20000),
    ("Robert", "Stromberg","Sales","CA",81000,30,23000),
    ("Maria", "McDonald","Finance","CA",90000,24,23000),
    ("Raman","Pearson","Finance","CA",99000,40,24000),
    ("Scott","Kindel","Finance","NY",83000,36,19000),
    ("Jen","Lewis","Finance","NY",79000,53,15000),
    ("Jeff", "Scott","Marketing","CA",80000,25,18000),
    ("Kumar", "Jobs","Marketing","NY",91000,50,21000)
  ]
rdd=sc.parallelize(dataList) # Note RDD is created from list, similar to creating it from a text file

# *** Now we convert rdd to dataframe
spark = SparkSession(sc) # this is necessary for rdd to be converted to data frame

# converting RDD to DataFrame
df = rdd.toDF(schema = ["first_name", "last_name","department","state","salary","age","bonus"])

# Show schema
df.printSchema()

# Show data frame
df.show(truncate=False)

# Note we create two new columns from grouped data:
# they are renamed as dep_salary, dep_individual_salary
df.groupBy("department").agg(F.sum('salary').alias("dep_salary"), 
          F.collect_list('salary').alias("dep_individual_salary")).show()
```


In [11]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F


#sc = SparkContext('local[*]') # comment this in case you already have SparkContext running



dataList = [("James", "Cameron","Sales","NY",90000,34,10000),
    ("Michael", "Johnson","Sales","NY",86000,56,20000),
    ("Robert", "Stromberg","Sales","CA",81000,30,23000),
    ("Maria", "McDonald","Finance","CA",90000,24,23000),
    ("Raman","Pearson","Finance","CA",99000,40,24000),
    ("Scott","Kindel","Finance","NY",83000,36,19000),
    ("Jen","Lewis","Finance","NY",79000,53,15000),
    ("Jeff", "Scott","Marketing","CA",80000,25,18000),
    ("Kumar", "Jobs","Marketing","NY",91000,50,21000)
  ]

rdd=sc.parallelize(dataList) # Note RDD is created from list, similar to creating it from a text file

# *** Now we convert rdd to dataframe
spark = SparkSession(sc) # this is necessary for rdd to be converted to data frame

# converting RDD to DataFrame
df = rdd.toDF(schema = ["first_name", "last_name","department","state","salary","age","bonus"])

# Show schema
df.printSchema()

# Show data frame
df.show(truncate=False)

# Note we create two new columns from grouped data:
# they are renamed as dep_salary, dep_individual_salary
df.groupBy("department").agg(F.sum('salary').alias("dep_salary"), 
          F.collect_list('salary').alias("dep_individual_salary")).show()

root
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+----------+---------+----------+-----+------+---+-----+
|first_name|last_name|department|state|salary|age|bonus|
+----------+---------+----------+-----+------+---+-----+
|James     |Cameron  |Sales     |NY   |90000 |34 |10000|
|Michael   |Johnson  |Sales     |NY   |86000 |56 |20000|
|Robert    |Stromberg|Sales     |CA   |81000 |30 |23000|
|Maria     |McDonald |Finance   |CA   |90000 |24 |23000|
|Raman     |Pearson  |Finance   |CA   |99000 |40 |24000|
|Scott     |Kindel   |Finance   |NY   |83000 |36 |19000|
|Jen       |Lewis    |Finance   |NY   |79000 |53 |15000|
|Jeff      |Scott    |Marketing |CA   |80000 |25 |18000|
|Kumar     |Jobs     |Marketing |NY   |91000 |50 |21000|
+----------+---------+----------+-----+-----

## Actual steps for filtering the spark data

Now you write your code to 

1. Convert spark RDD to Data Frame, then filter out all unnecessary lines as highlighted in <font color=orange>orange</font> in the above section `Understand the Web Log Format`.
2. Get a list of IPs (let's call it `uniqueIPs`) who performed the action of adding product(s) to the basket 
3. filter out all records whose IP does not belong to the IP list in step 2.
4. Note step 2 and 3 should not be performed literally. I am using the language for easy understanding. This will be explained further in next section (see `last note`)



## Important resources for wrangling spark data, should you encounter problems

- When testing your code you have to go through many trial and error cycles; It may speed up your coding process by creating a reletively smaller data file (say, 10,000 ~ 30,000 lines) to test your code until it succeeds.
- Please bookmark [this tutorial](https://sparkbyexamples.com/) and study relevant examples (browse the menu on the left) if you have trouble with certain functions.
- You may want to explore `where` or `filter`, `groupBy`, `agg`, `withColumn`, `Date and Timestamp Functions`, `udf` (user defined function)
- **Note** the examples in general ommited import statements. If you have errors saying certain objects not defined, it may be resolved by 
```python
from pyspark.sql.functions import *
from from pyspark.sql.types import *
```
Or you search google with something like `pyspark import udf`
- One useful function to be applied to data frame column is `.isin()` which takes a list and check column elements against that list. see https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.isin.html#pyspark.sql.Column.isin
- To convert a spark dataframe column into a python list, you need to use toPandas then pandas function will work, for example to get first name list from above example df:
```python
h = df.select("first_name").toPandas()
huge_list = list( h["first_name"].values)
huge_list
```

- **Last note**: Filtering a spark data frame with a list could be extremely slow if you deal with huge list and big data frame. Standard approach is to use two dataframes and apply [inner join](https://sparkbyexamples.com/pyspark/pyspark-join-explained-with-examples/) for this scenario. So please use inner join or simply use sql syntax. If you stick with `.isin(huge_list)` you get 2 points deduction.

## <font color=red>You code for filtering spark data here </font>

In [12]:
time_converted = al_df.withColumn("timestamp", F.to_timestamp(F.regexp_replace("timestamp", "\[|\]", ""), "dd/MMM/yyyy:HH:mm:ssZ"))

time_converted = time_converted.withColumn("DATE", F.to_date("timestamp"))

exit_code_filtered = time_converted.filter(
    (time_converted["exit_code"].startswith("2")) | 
    (time_converted["exit_code"].startswith("3")) |
    (~time_converted["url"].contains("/static/css"))
)

basket_add_ip = exit_code_filtered.filter(exit_code_filtered["url"].contains("basket/add"))\
                                  .groupBy("ip").count().select("ip").withColumnRenamed("ip", "ip_address")

print("Entries for /basket/add ip:", basket_add_ip.count())
print("Entries for df with exit code filtered", exit_code_filtered.count())

Entries for /basket/add ip: 2310
Entries for df with exit code filtered 10364748


In [13]:
basket_add_ip.show()

+---------------+
|     ip_address|
+---------------+
|  5.117.242.204|
| 83.122.120.209|
|   89.196.66.66|
|  2.181.243.101|
|178.252.144.130|
|    5.125.13.45|
|  5.114.160.181|
|    2.191.96.86|
|  5.237.115.244|
| 188.166.113.28|
|  5.114.183.125|
|   5.211.134.24|
|  204.18.175.72|
|   185.30.7.254|
| 91.208.165.222|
|178.252.166.116|
|   5.119.221.47|
|  5.121.110.137|
| 204.18.119.253|
|  5.235.227.104|
+---------------+
only showing top 20 rows



In [14]:
inner_join_df = basket_add_ip.join(exit_code_filtered, basket_add_ip["ip_address"] == exit_code_filtered["ip"], "inner").distinct()

In [15]:
inner_join_df.show()
print("Entries:", inner_join_df.count())

+------------+------------+-------------------+--------------------+---------+----------+
|  ip_address|          ip|          timestamp|                 url|exit_code|      DATE|
+------------+------------+-------------------+--------------------+---------+----------+
|81.90.144.56|81.90.144.56|2019-01-22 04:11:48|/browse/home-appl...|      200|2019-01-22|
|81.90.144.56|81.90.144.56|2019-01-22 04:11:50|/image/%7B%7Bbask...|      200|2019-01-22|
|81.90.144.56|81.90.144.56|2019-01-22 04:11:50|/static/bundle-bu...|      200|2019-01-22|
|81.90.144.56|81.90.144.56|2019-01-22 04:11:50|/static/bundle-bu...|      200|2019-01-22|
|81.90.144.56|81.90.144.56|2019-01-22 04:11:51|/image/55/product...|      200|2019-01-22|
|81.90.144.56|81.90.144.56|2019-01-22 04:11:51|/image/8/productT...|      200|2019-01-22|
|81.90.144.56|81.90.144.56|2019-01-22 04:11:51|/image/18342/prod...|      200|2019-01-22|
|81.90.144.56|81.90.144.56|2019-01-22 04:11:51|/static/images/lo...|      200|2019-01-22|
|81.90.144

## Prepare Sequential Data for prefixspan 

After you get all data in spark and filter out unnecessary records, we want to prepare the sequential data for prefixspan.

In this project we define a sequence of events as:
 
> A series of URLs requested by one particular user (a user is identified by the IP address), within one day.
e.g., on 26/Jan/2019 a user visited urls at  `/A` then `/B` then at the same timestamp `/C` and `/D`, this will form a sequence as a list of list:

```python
[ ["/A"], ["/B"], ["/C", "/D"] ]
```

So now with your spark data frame (assuming all irrelevant rows filtered out), let's still call it `df`, you want to:


1. Group the dataframe by `(IP, TIMESTAMP)` so actions from same IP and at the same timestamp are grouped together
  - now each group contains one or a set of urls which we define as one event of the sequence
  - you want to apply an aggregation (`agg`) function to create a **list of URLs** for each group, let's call this new aggregated column `EVENTS` and let's call this aggregated data frame `DF2` (refer to reources listed above and refer to the example code above in `Converting RDD to dataframe` section)
2. From `DF2` create a new column called `DATE` which converts the `TIMESTAMP` into a date object so timestamps of the same day have the same values in this column (refer to reources listed above).
3. Since `DATE` column round timestamp to date so if you group the data frame by `(IP, DATE)` you will have in  each group a list of `EVENTS` from each user in each day  -- this is exactly the sequence we defined. So after grouping, just create another aggregated column `SEQUENCE` which consists list of `EVENTS` of the group (refer to the example code above in `Converting RDD to dataframe` section)
4. Finally you want to print out 30 rows from column `SEQUENCE` to verify you did it right.

## <font color=red>You code for `Prepare Sequential Data for prefixspan` here </font>

In [16]:
import pyspark.sql.functions as F

agg_df = inner_join_df.groupBy("ip", "DATE").agg(F.collect_list("url").alias("SEQUENCE"))

agg_df.show()

+---------------+----------+--------------------+
|             ip|      DATE|            SEQUENCE|
+---------------+----------+--------------------+
| 10.139.192.130|2019-01-26|[/filter/b43,p63,...|
|  10.233.251.23|2019-01-24|[/filter/b256,p44...|
|   10.87.76.147|2019-01-23|[/browse/air-cond...|
|   10.87.76.147|2019-01-24|[/image/33968?nam...|
| 102.165.34.250|2019-01-22|[/image/33980?nam...|
| 102.165.34.250|2019-01-23|[/filter/b2,p3, /...|
| 102.165.34.250|2019-01-24|[/filter/b20,b5,p...|
| 102.165.34.250|2019-01-25|[/browse/Screen-v...|
|  109.109.42.42|2019-01-22|[/m/browse/applia...|
| 109.162.131.74|2019-01-22|[/browse/dishwash...|
|  113.203.54.71|2019-01-25|[/amp-helper-fram...|
|128.199.117.156|2019-01-22|[/image/47987/pro...|
|128.199.117.156|2019-01-24|[/amp-helper-fram...|
|128.199.117.156|2019-01-25|[/image/32172?nam...|
| 128.65.171.197|2019-01-26|[/filter/b36,p5, ...|
| 138.68.132.207|2019-01-23|[/product/34286/6...|
|   149.28.172.2|2019-01-22|[/amp-helper-fram...|


In [17]:
agg_df.select("SEQUENCE").show(30)

+--------------------+
|            SEQUENCE|
+--------------------+
|[/filter/b43,p63,...|
|[/filter/b256,p44...|
|[/browse/air-cond...|
|[/image/33968?nam...|
|[/image/33980?nam...|
|[/filter/b2,p3, /...|
|[/filter/b20,b5,p...|
|[/browse/Screen-v...|
|[/m/browse/applia...|
|[/browse/dishwash...|
|[/amp-helper-fram...|
|[/image/47987/pro...|
|[/amp-helper-fram...|
|[/image/32172?nam...|
|[/filter/b36,p5, ...|
|[/product/34286/6...|
|[/amp-helper-fram...|
|[/m/product/29335...|
|[/product/33953/6...|
|[/basket/checkout...|
|[/settings/logo, ...|
|[/m/filter/p65?pa...|
|[/filter/b1,p41, ...|
|[/filter/p41?page...|
|[/go/CYRD, /go/CY...|
|[/image/30260?nam...|
|[/browse/microwav...|
|[/image/33379?nam...|
|[/amp-helper-fram...|
|[/amp-helper-fram...|
+--------------------+
only showing top 30 rows



## Show time

Now you want to use spark library for prefixspan to mine the sequence data. This is just a simple call:
https://spark.apache.org/docs/latest/api/python//reference/api/pyspark.ml.fpm.PrefixSpan.html

- Note default sequence column in the input data frame is `sequence`, you can change that to match your data frame:
```python
prefixSpan.setSequenceCol('SEQUENCE')
```
- you want to play with `MinSupport` and `MaxPatternLength` in order to get meaningful patterns. Start with higher `MinSupport` value (0.5 is a really high value but you can start with it) and lower `MaxPatternLength` (5 is a good start) to speed up your test run.

- You want to write a summary about your observed patterns in the output.

## <font color=red>You code for doing prefixspan here </font>

In [18]:
agg_df.printSchema()

root
 |-- ip: string (nullable = true)
 |-- DATE: date (nullable = true)
 |-- SEQUENCE: array (nullable = false)
 |    |-- element: string (containsNull = false)



In [19]:
#convert element into arrays
df = agg_df.withColumn("SEQUENCE", F.expr("transform(SEQUENCE, x -> array(x))"))

In [20]:
from pyspark.ml.fpm import PrefixSpan

#initial setting for prefix_span
prefix_span = PrefixSpan()
prefix_span.setSequenceCol('SEQUENCE')
prefix_span.setMinSupport(0.6)
prefix_span.setMaxPatternLength(10)

#initiating prefixspan using agg_df
result = prefix_span.findFrequentSequentialPatterns(df).sort("SEQUENCE")
result.show()

+--------------------+----+
|            sequence|freq|
+--------------------+----+
|  [[/settings/logo]]|2909|
|[[/settings/logo]...|2731|
|[[/settings/logo]...|2531|
|[[/settings/logo]...|2301|
|[[/settings/logo]...|2096|
|[[/settings/logo]...|1904|
|[[/settings/logo]...|2048|
|[[/settings/logo]...|1904|
|[[/settings/logo]...|2036|
|[[/settings/logo]...|1899|
|[[/settings/logo]...|1902|
|[[/settings/logo]...|2038|
|[[/settings/logo]...|1898|
|[[/static/css/fon...|2212|
|[[/static/css/fon...|2129|
|[[/static/css/fon...|1983|
|[[/static/css/fon...|2016|
|[[/static/css/fon...|1885|
|[[/static/css/fon...|2019|
|[[/static/css/fon...|1892|
+--------------------+----+
only showing top 20 rows



## <font color=red>You summary and clusions here </font>

It seems that at a support level of 0.6 and with max pattern length of 10 items, it produced 62 associated links which we can further analyze. Let's see what they are:

In [21]:
import pandas as pd
result.toPandas()

Unnamed: 0,sequence,freq
0,[[/settings/logo]],2909
1,"[[/settings/logo], [/settings/logo]]",2731
2,"[[/settings/logo], [/settings/logo], [/setting...",2531
3,"[[/settings/logo], [/settings/logo], [/setting...",2301
4,"[[/settings/logo], [/settings/logo], [/setting...",2096
...,...,...
57,"[[/static/images/guarantees/warranty.png], [/s...",2110
58,"[[/static/images/guarantees/warranty.png], [/s...",1962
59,"[[/static/images/guarantees/warranty.png], [/s...",1920
60,"[[/static/images/guarantees/warranty.png], [/s...",1925


In [22]:
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)

In [25]:
result.head(100)

[Row(sequence=[['/settings/logo']], freq=2909),
 Row(sequence=[['/settings/logo'], ['/settings/logo']], freq=2731),
 Row(sequence=[['/settings/logo'], ['/settings/logo'], ['/settings/logo']], freq=2531),
 Row(sequence=[['/settings/logo'], ['/settings/logo'], ['/settings/logo'], ['/settings/logo']], freq=2301),
 Row(sequence=[['/settings/logo'], ['/settings/logo'], ['/settings/logo'], ['/settings/logo'], ['/settings/logo']], freq=2096),
 Row(sequence=[['/settings/logo'], ['/settings/logo'], ['/settings/logo'], ['/settings/logo'], ['/settings/logo'], ['/settings/logo']], freq=1904),
 Row(sequence=[['/settings/logo'], ['/static/images/amp/blog.png']], freq=2048),
 Row(sequence=[['/settings/logo'], ['/static/images/amp/blog.png'], ['/settings/logo']], freq=1904),
 Row(sequence=[['/settings/logo'], ['/static/images/amp/instagram.png']], freq=2036),
 Row(sequence=[['/settings/logo'], ['/static/images/amp/instagram.png'], ['/settings/logo']], freq=1899),
 Row(sequence=[['/settings/logo'], ['/

### Pattern evaluation: at this stage you should try your best to filter out unrelevant "rules" and present only as much as possible valuable information for your mining.

Many of the resulting rules we obtain from the document seem to be image requests and not related to purchasing or adding into a bucket. This shows us that that the way of refining the data might still not be good enough for us to find reasonable results in relation to the purchasing aspect of this website. 

However, based on the requests, we can draw the conclusion that the website's blog, instagram, and telegram links, as well as the company logo itself, is attracting a lot of clicks. At the same time, consumers are very interested in the guarantees section of the website, including their best price good shopping guarantees as well as their warrantees. Most of the frequent items from these sections:

1. Social Media (Instagram/Telegram/Blog)
2. Guarantees (Best Price, Good Shopping, Warrantee)

are generating a lot of cross traffic, where if the visitor of the website will click on one image from the above category, it will likely click on a few more. These are most likely to be redirection links for the website which means if the website is trying to gain traffic for the links through images, they are doing a good job at it. 

At the same time, the homepage logo also has a high association rate, possibly indicating that people return to the homepage quite a few times.

# DELIVER (individually)



Submit on canvas:

* This notebook with all results shown
* If you discussed with classmates and get inspirations during the project write down his/her names in next section. No points deductions but you have to be honest.



<font size="+2" color="#003300">I hereby declare that, except for the code provided by the course instructors, all of my code, report, and figures were produced by myself.</font>