# Install jupyter

```
pip install jupyter
```



# Jupyter spark enviroment.
```
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
pyspark --master local[2] --packages graphframes:graphframes:0.3.0-spark1.6-s_2.10
```

# Get file list

In [1]:
import requests
import lxml.html as lh

gdelt_base_url = 'http://data.gdeltproject.org/events/'

page = requests.get(gdelt_base_url+'index.html')
doc = lh.fromstring(page.content)
link_list = doc.xpath("//*/ul/li/a/@href")

file_list = [x for x in link_list if str.isdigit(x[0:4])]

In [2]:
import datetime
datetime.datetime.now().strftime("%Y%m%d")
from datetime import datetime, timedelta

today = datetime.today()
tmpfiles = []
while (1):
    if len(tmpfiles) == 6:
        break
    date_string = today.strftime("%Y%m%d") + ".export.CSV.zip"
    if date_string in file_list:
        tmpfiles.append(date_string)
    today -= timedelta(days=1)

In [3]:
import os.path
import urllib
import zipfile

if not os.path.isdir('data'):
    os.mkdir('data')

rdd = None
for download_file in tmpfiles:
    zipfilename = './data/' + download_file
    while not os.path.isfile(zipfilename):
        urllib.urlretrieve(url=gdelt_base_url + download_file,
                           filename=zipfilename)
    zf = zipfile.ZipFile(file=zipfilename, mode='r')
    for info in zf.infolist():
        data = zf.read(info.filename)
        tmprdd = sc.parallelize(data.split('\n')).map(lambda line: line.split('\t'))
        if rdd:
            rdd = sc.union([rdd, tmprdd])
        else:
            rdd = tmprdd
    zf.close()


In [4]:
import graphframes
import random
data = rdd.filter(lambda line: len(line) == 58)\
        .filter(lambda line: line[7] != "" and line[7] != None)\
        .filter(lambda line: line[17] != "" and line[17] != None)\
        .filter(lambda line: line[28] == "19")\
        .map(lambda line: (line[7], line[17]))
data.cache()
print "data cnt: ", data.count()
keys = data.flatMap(lambda x: (x[0], x[1])).distinct()
keylist = keys.collect()
sqlContext = SQLContext(sc)
vertices = keys.map(lambda x: (keylist.index(x), x)).toDF(["id", "name"])
edge = data.map(lambda x: (keylist.index(x[0]), keylist.index(x[1]), x[0] + ":" + x[1])).toDF(["src", "dst", "relationship"])
g = graphframes.GraphFrame(vertices, edge)


data cnt:  16981


In [5]:
results = g.pageRank(resetProbability=0.0001, maxIter=20)



In [6]:
results.vertices.sort('pagerank', ascending=False).select("name", "pagerank").show()

+----+--------------------+
|name|            pagerank|
+----+--------------------+
| USA| 0.08072696742055645|
| SYR|  0.0249218889561444|
| IRQ|0.014433021876873058|
| AFG|0.013552605443693825|
| GBR|0.013178425313217201|
| RUS|0.009542773663141414|
| IRN|0.009400180132529587|
| DEU|0.008040352567475727|
| ISR|0.006522195331280399|
| SAU|0.006143162503799965|
| FRA|0.005864131303488251|
| TUR|0.005578346865722095|
| NGA|0.005468691767628191|
| PAK|0.005364283130664706|
| CAN|0.005204803732281306|
| AUS|0.004945816524759933|
| CHN|0.004424257705490986|
| YEM|0.004033055595166144|
| JPN|0.003910145159530437|
| PSE|0.003617808402184263|
+----+--------------------+
only showing top 20 rows



# Reference
Graphframes Documents: [http://graphframes.github.io/user-guide.html#pagerank](http://graphframes.github.io/user-guide.html#pagerank)

Spark graphframes package: [https://spark-packages.org/package/graphframes/graphframes](https://spark-packages.org/package/graphframes/graphframes)