In [1]:
import json, pprint, requests, textwrap
import datetime
host = 'http://localhost:8999'
data = {'kind': 'pyspark',
        'driverCores': 1,
        'executorCores':8}
headers = {'Content-Type': 'application/json'}

In [2]:
r = requests.post(host+'/sessions', data=json.dumps(data), headers=headers)
r.json()

{'id': 45,
 'appId': None,
 'owner': None,
 'proxyUser': None,
 'state': 'starting',
 'kind': 'pyspark',
 'appInfo': {'driverLogUrl': None, 'sparkUiUrl': None},
 'log': ['stdout: ', '\nstderr: ', '\nYARN Diagnostics: ']}

In [3]:
location = r.headers['location']
session_url = host + location
r = requests.get(session_url, headers=headers)
current_state = r.json()['state']
while current_state != 'idle':
    r = requests.get(session_url, headers=headers)
    current_state = r.json()['state']
r.json()['state']

'idle'

## Build Index

In [4]:
def build_index(index_folder):
    statements_url = host + location + '/statements'
    data = {
      'code': textwrap.dedent("""
    from pyspark.sql.functions import *
    import re,string
    import sys
    from pyspark.sql.types import *
    from pyspark.sql.functions import *
    spark.conf.set("spark.sql.shuffle.partitions", 8)
    text_files = sc.wholeTextFiles("/user/root/bbcsport/""" + index_folder +"""")
    file_count = text_files.count()
    hdfs_precursor = "hdfs://sandbox-hdp.hortonworks.com:8020/user/root/bbcsport/"
    files = text_files.map(lambda file: (file[0].replace(hdfs_precursor, ""), file[1]))
    lines = files.map(lambda lines: (lines[0], re.sub('\\n+', '\\n', lines[1]).replace('\\n', ' ')))
    lines = lines.map(lambda line: (line[0], re.sub('[^\w\s-]', '', line[1].lower().strip())))
    words = lines.flatMapValues(lambda word: word.split(" "))
    words_df = words.toDF(["file", "word"])
    inverted_index = words_df.groupBy("word").agg(collect_list("file").alias("files"))
    file_words_count = words_df.groupBy("file").agg(count("word").alias("word_count"))
    words_count = words_df.groupBy("word", "file").count()
    tf = words_count.join(file_words_count, file_words_count.file == words_count.file, 'left').withColumn("tf", col("count") / col("word_count")).select(words_count.word, words_count.file, "tf")
    doc_freq = words_df.groupBy("word").agg(countDistinct("file").alias("df"))
    idf = doc_freq.groupBy("word", "df").agg(log(file_count / column("df")).alias("idf"))
    tfidf = tf.join(idf, tf.word == idf.word, 'left').withColumn("tf_idf", col("tf") * col("idf")).select(tf.word,tf.file,idf.idf,"tf_idf")
    tfidf.write.orc('""" + index_folder +"""' + '.orc')
        """)
    }
    r = requests.post(statements_url, data=json.dumps(data), headers=headers)
    return(r)

In [5]:
index_folder = "rugby"
r = build_index(index_folder)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
current_state = r.json()['state']
a = datetime.datetime.now()
while current_state != 'available':
    r = requests.get(statement_url, headers=headers)
    current_state = r.json()['state']
b = datetime.datetime.now()
r.json()['output']['data']['text/plain']
print("Time taken: " + (b-a).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])

Time taken: 0:00:19.468279


In [6]:
index_folder = "football"
r = build_index(index_folder)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
current_state = r.json()['state']
a = datetime.datetime.now()
while current_state != 'available':
    r = requests.get(statement_url, headers=headers)
    current_state = r.json()['state']
b = datetime.datetime.now()
r.json()['output']['data']['text/plain']
print("Time taken: " + (b-a).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])

Time taken: 0:00:13.328639


In [7]:
index_folder = "cricket"
r = build_index(index_folder)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
current_state = r.json()['state']
a = datetime.datetime.now()
while current_state != 'available':
    r = requests.get(statement_url, headers=headers)
    current_state = r.json()['state']
b = datetime.datetime.now()
r.json()['output']['data']['text/plain']
print("Time taken: " + str(b-a))

Time taken: 0:00:06.131539


In [8]:
index_folder = "athletics"
r = build_index(index_folder)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
current_state = r.json()['state']
a = datetime.datetime.now()
while current_state != 'available':
    r = requests.get(statement_url, headers=headers)
    current_state = r.json()['state']
b = datetime.datetime.now()
r.json()['output']['data']['text/plain']
print("Time taken: " + str(b-a))

Time taken: 0:00:05.158427


In [9]:
index_folder = "tennis"
r = build_index(index_folder)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
current_state = r.json()['state']
a = datetime.datetime.now()
while current_state != 'available':
    r = requests.get(statement_url, headers=headers)
    current_state = r.json()['state']
b = datetime.datetime.now()
r.json()['output']['data']['text/plain']
print("Time taken: " + str(b-a))

Time taken: 0:00:08.511450


## Load Index 

In [10]:
def load_index(index_to_use):
    statements_url = host + location + '/statements'
    data = {
      'code': textwrap.dedent("""
        import sys
        from pyspark.sql.types import *
        from pyspark.sql.functions import *
        from pyspark import SparkContext, SparkConf
        from pyspark.sql import SparkSession
        spark.conf.set("spark.sql.shuffle.partitions", 8)
        tfidf = spark.read.orc('""" + index_to_use + """.orc')
        tfidf.persist()
        """)
    }
    r = requests.post(statements_url, data=json.dumps(data), headers=headers)
    return(r)

In [11]:
index_to_use = "rugby"
r = load_index(index_to_use)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
current_state = r.json()['state']
a = datetime.datetime.now()
while current_state != 'available':
    r = requests.get(statement_url, headers=headers)
    current_state = r.json()['state']
b = datetime.datetime.now()
r.json()['output']['data']['text/plain']
print("Time taken: " + str(b-a))

Time taken: 0:00:00


## Query

In [12]:
def query_call(query,N):
    statements_url = host + location + '/statements'
    data = {
      'code': textwrap.dedent("""
        spark.conf.set("spark.sql.shuffle.partitions", 8)
        query_string = """ + query +
        """ 
        N = """ + str(N) +
        """
        query_words = query_string.lower().split(" ")
        total_words = len(query_words)
        query_df = spark.createDataFrame(query_words,StringType())
        query_df = query_df.groupBy("value").count().select(col("value").alias("word"),col("count").alias("tf"))
        query_idf = query_df.join(broadcast(tfidf), tfidf.word == query_df.word,'left').select(tfidf.file,query_df.word,query_df.tf,tfidf.idf,tfidf.tf_idf)
        results = query_idf.groupBy("file").agg((sum("tf_idf")*(count("word")/total_words)).alias("score")).orderBy(desc("score"))
        results.show(N)
        """)
    }
    r = requests.post(statements_url, data=json.dumps(data), headers=headers)
    return(r)

## Initial Call to Trigger Load 

In [13]:
query = "'over penalties test twice'"
N = 1
r = query_call(query,N)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
current_state = r.json()['state']
a = datetime.datetime.now()
while current_state != 'available':
    r = requests.get(statement_url, headers=headers)
    current_state = r.json()['state']
b = datetime.datetime.now()
result = r.json()['output']['data']['text/plain']
print(result)

+-------------+--------------------+
|         file|               score|
+-------------+--------------------+
|rugby/001.txt|0.012391428847086721|
+-------------+--------------------+
only showing top 1 row


In [14]:
print("Time taken: " + str(b-a))

Time taken: 0:00:02.023340


## Query 1, N=1

In [15]:
query = "'England claim Dubai Sevens glory'"
N = 1
r = query_call(query,N)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
current_state = r.json()['state']
a = datetime.datetime.now()
while current_state != 'available':
    r = requests.get(statement_url, headers=headers)
    current_state = r.json()['state']
b = datetime.datetime.now()
result = r.json()['output']['data']['text/plain']
print(result)

+-------------+-------------------+
|         file|              score|
+-------------+-------------------+
|rugby/098.txt|0.12769736588535455|
+-------------+-------------------+
only showing top 1 row


In [16]:
print("Time taken: " + str(b-a))

Time taken: 0:00:00


## Query 1, N =3

In [17]:
query = "'England claim Dubai Sevens glory'"
N = 3
r = query_call(query,N)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
current_state = r.json()['state']
a = datetime.datetime.now()
while current_state != 'available':
    r = requests.get(statement_url, headers=headers)
    current_state = r.json()['state']
b = datetime.datetime.now()
result = r.json()['output']['data']['text/plain']
print(result)

+-------------+-------------------+
|         file|              score|
+-------------+-------------------+
|rugby/098.txt|0.12769736588535455|
|rugby/127.txt|0.06749421879927163|
|rugby/086.txt|0.05140068228743627|
+-------------+-------------------+
only showing top 3 rows


In [18]:
print("Time taken: " + str(b-a))

Time taken: 0:00:00


## Query 1, N = 5

In [19]:
query = "'England claim Dubai Sevens glory'"
N = 5
r = query_call(query,N)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
current_state = r.json()['state']
a = datetime.datetime.now()
while current_state != 'available':
    r = requests.get(statement_url, headers=headers)
    current_state = r.json()['state']
b = datetime.datetime.now()
result = r.json()['output']['data']['text/plain']
print(result)

+-------------+--------------------+
|         file|               score|
+-------------+--------------------+
|rugby/098.txt| 0.12769736588535455|
|rugby/127.txt| 0.06749421879927163|
|rugby/086.txt| 0.05140068228743627|
|rugby/092.txt|0.012255797343391425|
|rugby/060.txt|0.007656956682990293|
+-------------+--------------------+
only showing top 5 rows


In [20]:
print("Time taken: " + str(b-a))

Time taken: 0:00:00


## Query 2, N = 1

In [21]:
query = "'Yachvili slotted over four penalties'"
N = 1
r = query_call(query,N)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
current_state = r.json()['state']
a = datetime.datetime.now()
while current_state != 'available':
    r = requests.get(statement_url, headers=headers)
    current_state = r.json()['state']
b = datetime.datetime.now()
result = r.json()['output']['data']['text/plain']
print(result)

+-------------+--------------------+
|         file|               score|
+-------------+--------------------+
|rugby/001.txt|0.043268433667157026|
+-------------+--------------------+
only showing top 1 row


In [22]:
print("Time taken: " + str(b-a))

Time taken: 0:00:00


## Query 2, N = 3

In [23]:
query = "'Yachvili slotted over four penalties'"
N = 3
r = query_call(query,N)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
current_state = r.json()['state']
a = datetime.datetime.now()
while current_state != 'available':
    r = requests.get(statement_url, headers=headers)
    current_state = r.json()['state']
b = datetime.datetime.now()
result = r.json()['output']['data']['text/plain']
print(result)

+-------------+--------------------+
|         file|               score|
+-------------+--------------------+
|rugby/001.txt|0.043268433667157026|
|rugby/003.txt| 0.02242535737241099|
|rugby/134.txt|0.020408392192625024|
+-------------+--------------------+
only showing top 3 rows


In [24]:
print("Time taken: " + str(b-a))

Time taken: 0:00:00


## Query 2, N = 5

In [25]:
query = "'Yachvili slotted over four penalties'"
N = 5
r = query_call(query,N)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
current_state = r.json()['state']
a = datetime.datetime.now()
while current_state != 'available':
    r = requests.get(statement_url, headers=headers)
    current_state = r.json()['state']
b = datetime.datetime.now()
result = r.json()['output']['data']['text/plain']
print(result)

+-------------+--------------------+
|         file|               score|
+-------------+--------------------+
|rugby/001.txt|0.043268433667157026|
|rugby/003.txt| 0.02242535737241099|
|rugby/134.txt|0.020408392192625024|
|rugby/141.txt| 0.01865745104894276|
|rugby/097.txt|0.016309428405249717|
+-------------+--------------------+
only showing top 5 rows


In [26]:
print("Time taken: " + str(b-a))

Time taken: 0:00:00


## End Session

In [27]:
requests.delete(session_url, headers=headers)

<Response [200]>