## Task 2. Heuristic user segmentation

In this task you should firstly parse the user logs. Then distiguish segments and count the *unique* uids in each segment. Sort the output by counts.

You may find more useful methods in the following sources:

* Book "Learning Spark: Lightning-Fast Big Data Analysis" by Holden Karau.

* [SparkStreaming Documentation](https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark-streaming-module)

* [HyperLogLog documentation](https://pypi.org/project/hyperloglog/), [HyperLogLog theory](https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html)

* [Ua_Parser_documentation](https://pypi.org/project/ua-parser/0.7.0/)

In [1]:
import os
from time import sleep
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

# You need also use this specific libraries
from ua_parser import user_agent_parser
from hyperloglog import HyperLogLog

In [2]:
# Here is an example of `user_agent_parser`

from ua_parser import user_agent_parser
ua = 'Mozilla/5.0 (iPad; CPU OS 7_0_4 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B554a Safari/9537.53'
user_agent_parser.Parse(ua)

{'device': {'brand': 'Apple', 'family': 'iPad', 'model': 'iPad'},
 'os': {'family': 'iOS',
  'major': '7',
  'minor': '0',
  'patch': '4',
  'patch_minor': None},
 'string': 'Mozilla/5.0 (iPad; CPU OS 7_0_4 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B554a Safari/9537.53',
 'user_agent': {'family': 'Mobile Safari',
  'major': '7',
  'minor': '0',
  'patch': None}}

**NB.** Please don't change the cell below. It is used for emulation realtime batch arriving. But figure out the code, it will help you when you'll work with real SparkStreaming applications.

In [3]:
sc = SparkContext(master='local[4]')

DATA_PATH = "/data/course4/uid_ua_100k_splitted_by_5k"
batches = [sc.textFile(os.path.join(DATA_PATH, path)) for path in os.listdir(DATA_PATH)]

BATCH_TIMEOUT = 5 # Timeout between batch generation
ssc = StreamingContext(sc, BATCH_TIMEOUT)
dstream = ssc.queueStream(rdds=batches)

There are 2 flags used in this task. 
* `finished` flag indicates if the current RDD is empty.
* `printed` indicates the the result has been printed and SparkStreaming context can be stopped.

In [4]:
finished = False
printed = False

def set_ending_flag(rdd):
    global finished
    if rdd.isEmpty():
        finished = True

def print_only_at_the_end(rdd):
    global printed
    rdd.count()
    if finished and not printed:

        # Type your code for sorting and printing the resulting RDD
        ans = rdd.collect()
        
      
        print(ans[2][0]+'\t'+str(ans[2][1]))
        print(ans[1][0]+'\t'+str(ans[1][1]))
        print(ans[0][0]+'\t'+str(ans[0][1]))
    
        printed = True

# If we have received empty an rdd, the stream is finished.
# So print the result and stop the context.

dstream.foreachRDD(set_ending_flag)

In [5]:
def aggregator(values, old):
    return (old or 0) + sum(values)

In [6]:
hll = sc.broadcast(HyperLogLog(0.01))

def filter_duplicate_user(line, broadcast_hll):
    user_hash = line.split("\t")[0]
    count = len(broadcast_hll.value)
    broadcast_hll.value.add(user_hash)
    return count != len(broadcast_hll.value)

In [7]:

def preprocess(line):
    parsed_line = user_agent_parser.Parse(line)

    device = parsed_line['device']['family'].lower() 

    browser = parsed_line['user_agent']['family'].lower()

    os = parsed_line['os']['family'].lower()
    return [device, browser, os]



In [8]:
def filter_devices(word):
    return word=="iphone" or word=="firefox" or word=="windows"

In [9]:
# Type your code for data processing and aggregation here


dstream.filter(lambda line : filter_duplicate_user(line, hll))\
    .flatMap(lambda line: preprocess(line))\
    .filter(lambda word : filter_devices(word))\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda a,b : a+b)\
    .updateStateByKey(aggregator)\
    .foreachRDD(print_only_at_the_end)
    

**NB.** Please don't change the cell below. It is used for stopping SparkStreaming context and Spark context when the stream finished.

In [10]:
ssc.checkpoint('./checkpoint')  # checkpoint for storing current state        
ssc.start()
while not printed:
    pass
ssc.stop()  # when the result printed, stop the SparkStreaming context
sc.stop()  # stop the Spark context to be able restart the code without restarting the kernel

kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko2
iphone	1728
firefox	5622
windows	37295
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1
kiko1


Here you can see the part of an output on the sample dataset:
```
...
seg_unknown 22377
seg_firefox 8237
...
```
Of course, the numbers may be different but not very much (the error about 1% will be accepted).