# AMPCamp 6
http://ampcamp.berkeley.edu/6

### Preparation
This assumes that you're running the Bitnami Hadoop VM version 3.3.0 (October 2020).

Before we can start with the AMPCamp 6 tutorial, we've to prepare our VM for PySpark.

First, we need to install some additional dependencies:
```shell
sudo apt install python3-requests python3-notebook jupyter jupyter-core 
```

Also, we have to apply the following configuration:
```shell
export PYSPARK_PYTHON=python3
export PATH="$PATH:$HOME/stack/hadoop/spark/bin"

sudo ufw allow 8888/tcp
sudo ufw allow 4040/tcp
```

Now, to start a PySpark shell session:
```shell
pyspark
```

Or, to start [Jupyter](https://jupyter.org), with support for PySpark (copy the URL that will appear in a browser on your host machine):
```shell
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip=$(hostname -I)" pyspark
```

We continue by downloading the data used in the tutorial. Run the code from the following cells into your PySpark shell or notebook.

In [1]:
import gzip
import itertools
import requests
import os
import pyspark
import shutil
import subprocess

In [2]:
# ./data/pagecounts

base_url = 'https://archive.org/download/wikipedia_visitor_stats_200905'
filenames = ['pagecounts-20090505-000000.gz', 'pagecounts-20090507-000000.gz']

os.makedirs('data', exist_ok=True)

for filename in filenames:
    with requests.get(f'{base_url}/{filename}', stream=True) as stream:
        with open(filename, 'wb') as file:
            shutil.copyfileobj(stream.raw, file)
        #
        filename_noext = os.path.splitext(filename)[0]
        with gzip.open(filename, 'rb') as src, open(filename_noext, 'wb') as dest:
            for chunk in iter(lambda : src.read(100 * 1024), b''):
                dest.write(chunk)
        #
        datetime = filename_noext[11:].encode('UTF-8')
        with open(filename_noext, 'rb') as src, open('data/pagecounts', 'ab') as dest:
            for line in src:
                dest.write(datetime + b' ' + line)
        #
        os.remove(filename)
        os.remove(filename_noext)

In [3]:
# ./data/wiki.parquet

base_url = 'https://github.com/databricks/spark-training/raw/master/data/wiki_parquet/'
filenames = [
    '_SUCCESS', '._metadata', 'part-r-1.parquet', 
    'part-r-2.parquet', 'part-r-3.parquet', 'part-r-4.parquet',
    'part-r-5.parquet', 'part-r-6.parquet', 'part-r-7.parquet', 
    'part-r-8.parquet', 'part-r-9.parquet', 'part-r-10.parquet'        
]

os.makedirs('data/wiki.parquet', exist_ok=True)

for filename in filenames:
    with requests.get(f'{base_url}/{filename}', stream=True) as stream:
        with open(f'data/wiki.parquet/{filename}', 'wb') as file:
            shutil.copyfileobj(stream.raw, file)      

As a last step, make sure the `sc` and `sqlContext` PySpark objects exists in your PySpark shell or notebook:

In [4]:
sc = pyspark.SparkContext.getOrCreate()
sqlContext = pyspark.sql.SparkSession.builder.getOrCreate()

Now your environment is ready to follow the AMPCamp 6 tutorial.

## Data Exploration Using Spark
http://ampcamp.berkeley.edu/6/exercises/data-exploration-using-spark.html

In [5]:
sc

In [6]:
pagecounts = sc.textFile("data/pagecounts")
pagecounts

data/pagecounts MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [7]:
for x in pagecounts.take(10):
    print(x)

20090505-000000 aa.b ?71G4Bo1cAdWyg 1 14463
20090505-000000 aa.b Special:Statistics 1 840
20090505-000000 aa.b Special:Whatlinkshere/MediaWiki:Returnto 1 1019
20090505-000000 aa.b Wikibooks:About 1 15719
20090505-000000 aa ?14mFX1ildVnBc 1 13205
20090505-000000 aa ?53A%2FuYP3FfnKM 1 13207
20090505-000000 aa ?93HqrnFc%2EiqRU 1 13199
20090505-000000 aa ?95iZ%2Fjuimv31g 1 13201
20090505-000000 aa File:Wikinews-logo.svg 1 8357
20090505-000000 aa Main_Page 2 9980


In [8]:
pagecounts.count()

8020573

In [9]:
enPages = pagecounts.filter(lambda x: x.split(" ")[1] == "en").cache()
enPages.count()

3537328

In [10]:
enTuples = enPages.map(lambda x: x.split(" "))
enKeyValuePairs = enTuples.map(lambda x: (x[0][:8], int(x[3])))

In [11]:
enKeyValuePairs.reduceByKey(lambda x, y: x + y, 1).collect()

[('20090505', 8822061), ('20090507', 8751659)]

In [12]:
enPages.map(lambda x: x.split(" ")).map(lambda x: (x[0][:8], int(x[3]))).reduceByKey(lambda x, y: x + y, 1).collect()

[('20090505', 8822061), ('20090507', 8751659)]

In [13]:
reduced = enPages.map(lambda x: x.split(" ")).map(lambda x: (x[2], int(x[3]))).reduceByKey(lambda x, y: x + y, 40)
filtered = reduced.filter(lambda x: x[1] > 200000).map(lambda x: (x[1], x[0]))

filtered.collect()

[(479251, 'Special:Search'), (1133206, '404_error/'), (482021, 'Main_Page')]

## Data Exploration Using Spark SQL
http://ampcamp.berkeley.edu/6/exercises/data-exploration-using-spark-sql.html

In [14]:
sqlContext

In [15]:
wikiData = sqlContext.read.parquet("data/wiki.parquet")

In [16]:
wikiData.count()

39365

In [17]:
wikiData.registerTempTable("wikiData")

In [18]:
result = sqlContext.sql("SELECT COUNT(*) AS pageCount FROM wikiData").collect()

In [19]:
result[0].pageCount

39365

In [20]:
sqlContext.sql("SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username <> '' GROUP BY username ORDER BY cnt DESC LIMIT 10").collect()

[Row(username='Waacstats', cnt=2003),
 Row(username='Cydebot', cnt=949),
 Row(username='BattyBot', cnt=939),
 Row(username='Yobot', cnt=890),
 Row(username='Addbot', cnt=853),
 Row(username='Monkbot', cnt=668),
 Row(username='ChrisGualtieri', cnt=438),
 Row(username='RjwilmsiBot', cnt=387),
 Row(username='OccultZone', cnt=377),
 Row(username='ClueBot NG', cnt=353)]