## This is a testing grounds for spark job mapping

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql import SQLContext, Row
from pyspark import SparkContext
import redis 

filenames = "s3a://open-research-corpus/sample-S2-records.gz" # path to the example file from S3 file 

In [53]:
# Integer type output
square_udf_int = udf(lambda z: square(z), IntegerType())
get_id_udf_string = udf(lambda z: get_id(z), StringType())

In [51]:
# Using spark 2.1 
sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile(filenames)
#parts = lines.map(lambda l: l.split(","))
#parts = lines.map(lambda l: l)
parts = lines.map(lambda l:l.split())
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1], p[2], p[3], p[4], p[5], p[6]))

# The schema is encoded in a string.
schemaString = "id papername citations year abstract tags author"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT * FROM people LIMIT 1")

results.show()

+--------------------+---------+--------------------+--------------------+--------+----+-----------------+
|                  id|papername|           citations|                year|abstract|tags|           author|
+--------------------+---------+--------------------+--------------------+--------+----+-----------------+
|{"entities":["Epi...|  ovarian|cancer","Excision...|admission","Malig...|neoplasm|  of|ovary","Morbidity|
+--------------------+---------+--------------------+--------------------+--------+----+-----------------+



In [70]:
type(schemaPeople)
schemaPeople.printSchema()
schemaPeople.withColumn('id', (get_id_udf_string(schemaPeople.id)))

root
 |-- id: string (nullable = true)
 |-- papername: string (nullable = true)
 |-- citations: string (nullable = true)
 |-- year: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- author: string (nullable = true)



In [85]:
def get_id(line):
    parenthesis = "\"" # string literal for "
    paper_id_tag = "\"id\"" # find the first occurence of "id"
    id_label_start = line.find(paper_id_tag) # this is the index that the id label starts
    id_tag_start = id_label_start + 6 # this is the index that the id tag starts. Always be 6.
    id_tag_end = line.find(parenthesis, id_tag_start)  #this is the index that the id tag ends
    id_tag = line[id_tag_start:id_tag_end] # id tag string 
    return id_tag

In [86]:
# create a user defined function for get_id, which is compatable with a spark dataframe 
get_id_udf = udf(lambda line: get_id(line), StringType())

In [87]:
# read in the raw data file 
raw_data = spark.read.text(filenames)
#print(type(raw_data))

In [101]:
#grabbed_id = udf(lambda body: get_id(body), StringType())
#partially_cleaned_data = raw_data.withColumn("id", grabbed_id("id"))
#partially_cleaned_data = raw_data.withColumn("id", decode_line("paper_id"))
print("Schema for raw data")
print("-------------------")
raw_data.createOrReplaceTempView("raw")
raw_data.printSchema()
results = spark.sql("SELECT value FROM raw LIMIT 5")
print("First 5 entries for raw data")
print("----------------------------")
results.show()
add_ids = raw_data.withColumn("id", get_id_udf(raw_data.value))
print("Schema for raw data")
print("-------------------")
add_ids.createOrReplaceTempView("raw_and_ids")
add_ids.printSchema()
results = spark.sql("SELECT * FROM raw_and_ids LIMIT 5")
print("First 5 entries for add_ids data")
print("----------------------------")
results.show()
#results = spark.sql("SELECT id FROM partiallycleaned LIMIT 10")
#ids = spark.sql("SELECT id FROM partiallycleaned LIMIT 10")
#abstracts = spark.sql("SELECT paperAbstract FROM partiallycleaned LIMIT 10")
#raw_data.printSchema()
#years.show()
#ids.show()
#abstracts.show()
#results.show()

Schema for raw data
-------------------
root
 |-- value: string (nullable = true)

First 5 entries for raw data
----------------------------
+--------------------+
|               value|
+--------------------+
|{"entities":["Epi...|
|{"entities":["Lip...|
|{"entities":["Spa...|
|{"entities":["ACT...|
|{"entities":[],"j...|
+--------------------+

Schema for raw data
-------------------
root
 |-- value: string (nullable = true)
 |-- id: string (nullable = true)

First 5 entries for add_ids data
----------------------------
+--------------------+--------------------+
|               value|                  id|
+--------------------+--------------------+
|{"entities":["Epi...|4cbba8127c8747a3b...|
|{"entities":["Lip...|4c61478345166be0d...|
|{"entities":["Spa...|34ca6d85db744543d...|
|{"entities":["ACT...|3316b8b97c1e17ac9...|
|{"entities":[],"j...|58ff17c7d8ca00673...|
+--------------------+--------------------+



In [43]:
grabbed_id = udf(lambda body: get_id(body), StringType())
#partially_cleaned_data = raw_data.withColumn("id", grabbed_id("id"))
#print(type(partially_cleaned_data))
partially_cleaned_data.createOrReplaceTempView("partiallycleaned")
results = spark.sql("SELECT id FROM partiallycleaned LIMIT 10")
#partially_cleaned_data.printSchema()
results.show()

+-----------+
|         id|
+-----------+
|[B@7c01c4f5|
|[B@4f687f60|
|[B@5f334bbc|
|[B@40eeb25e|
|[B@32baf0da|
|[B@3ea51a92|
|[B@7b8ec851|
|[B@4ec7e93c|
|[B@2b1eec1e|
|[B@3df91265|
+-----------+



In [None]:
def process_papers(line):
    id_label_start = line.find(paper_id_tag) # this is the index that the id label starts
    id_tag_start = id_label_start + 6 # this is the index that the id tag starts. Always be 6.
    id_tag_end = line.find(parenthesis, id_tag_start)  #this is the index that the id tag ends
    id_tag = line[id_tag_start:id_tag_end] # id tag string 
    
    abstract_label_start = line.find(paper_abstract_tag) # index that the abstrat label starts 
    abstract_tag_start = abstract_label_start + 17 # the start of the abstract tag 
    abstract_tag_end = line.find(parenthesis, abstract_tag_start) # the end of the abstract tag
    abstract_tag = line[abstract_tag_start:abstract_tag_end] # abstract tag string
    
    return

In [None]:
def store_to_redis(rdd):
    return

In [None]:
filenames = "s3a://open-research-corpus/sample-S2-records.gz" # path to the example file from S3 file 
df_txt = sc.textFile(filenames) # reads and stores the data file 
df = df.WithColumn("hello")
df.printSchema()

In [None]:
square_udf_int = udf(lambda z: square(z), IntegerType())

In [None]:
'''
This file pulls 1GB .gz file of data from an S3 bucket
'''
rdb = redis.Redis(host="10.0.0.6", port=6379) # set up the redis database connection 
filenames = "s3a://open-research-corpus/sample-S2-records.gz" # path to the example file from S3 file 
# uncomment sc when you are running a spark-submit job on the terminal 
#sc = SparkContext(appName = "Pull Open Research Corpus") # setup the Spark Context 
data = sc.textFile(filenames) # reads and stores the data file 

# setting up important variables 
parenthesis = "\"" # string literal for "
bracket = r"]" # look for "]"
paper_id_tag = "\"id\"" # find the first occurence of "id"
paper_year_tag = "\"year\"" # find the first occurence of "year"
paper_citation_tag = "\"inCitations\"" # find the occurence of "inCitations"
paper_abstract_tag = "\"paperAbstract\""

num_in_db = 0
for line in data.take(len_of_data):
        #line = line.encode('utf8') # encodes the unicode to ascii 
        # look for the labels "id", "year", "inCitations", and "paperAbstract"
        id_label_start = line.find(paper_id_tag) # this is the index that the id label starts
        year_label_start = line.find(paper_year_tag) # index that the year label starts 
        citation_label_start = line.find(paper_citation_tag) # index that the citation label starts 
        abstract_label_start = line.find(paper_abstract_tag) # index that the abstrat label starts 

        # look for the tag of each label 
        id_tag_start = id_label_start + 6 # this is the index that the id tag starts. Always be 6.
        year_tag_start = year_label_start + 7 # index that the year tag starts 
        citation_tag_start = citation_label_start + 15 # index that the citation tag starts
        abstract_tag_start = abstract_label_start + 17 # the start of the abstract tag 

        # look for the last index of each tag 
        id_tag_end = line.find(parenthesis, id_tag_start)  #this is the index that the id tag ends
        year_tag_end = line.find(parenthesis, year_tag_start) - 1 # this is the index that the year tag ends
        citation_tag_end = line.find(bracket, citation_tag_start)  # this is the index that the citation tag ends 
        abstract_tag_end = line.find(parenthesis, abstract_tag_start) # the end of the abstract tag
    
        # extract the tag 
        id_tag = line[id_tag_start:id_tag_end] # id tag string 
        if year_label_start == -1: # it didn't find the string year
                year_tag = None
        else:
                year_tag = line[year_tag_start:year_tag_end] # year tag string
        if citation_tag_start == citation_tag_end: # if there are no citations:
                num_citations = 0
        else:
                citation_list = line[citation_tag_start:citation_tag_end].split(",") # make it a list, count number of entries
                num_citations = len(citation_list) # number of citations
        abstract_tag = line[abstract_tag_start:abstract_tag_end] # abstract tag string
        
        temp_tup = (year_tag, num_citations, abstract_tag) # temporarily store the (year, number of citation, abstract)
        #q_json = json.dumps({"id": q.id, "title": q.title, "min_hash": q.min_hash, "lsh_hash": q.lsh_hash, "timestamp": q.creation_date})
        #print(type(id_tag))
        #rdb.lpush(id_tag, pickle.dumps(temp_tup))
                

In [None]:
def store_to_redis(rdd): 
    '''
    takes a rdd and stores relevant information to redis database
    '''
    papers = rdd.collect() # an iterable list 
    rdb = redis.Redis(host="10.0.0.6", port=6379) # set up the redis database connection 
    # setting the keywords that we search for
    parenthesis = "\"" # string literal for "
    bracket = r"]" # look for "]"
    paper_id_tag = "\"id\"" # find the first occurence of "id"
    paper_year_tag = "\"year\"" # find the first occurence of "year"
    paper_citation_tag = "\"inCitations\"" # find the occurence of "inCitations"
    paper_abstract_tag = "\"paperAbstract\""
    
    for paper in papers: 
        # look for the labels "id", "year", "inCitations", and "paperAbstract"
        id_label_start = line.find(paper_id_tag) # this is the index that the id label starts
        year_label_start = line.find(paper_year_tag) # index that the year label starts 
        citation_label_start = line.find(paper_citation_tag) # index that the citation label starts 
        abstract_label_start = line.find(paper_abstract_tag) # index that the abstrat label starts 

        # look for the tag of each label 
        id_tag_start = id_label_start + 6 # this is the index that the id tag starts. Always be 6.
        year_tag_start = year_label_start + 7 # index that the year tag starts 
        citation_tag_start = citation_label_start + 15 # index that the citation tag starts
        abstract_tag_start = abstract_label_start + 17 # the start of the abstract tag 

        # look for the last index of each tag 
        id_tag_end = line.find(parenthesis, id_tag_start)  #this is the index that the id tag ends
        year_tag_end = line.find(parenthesis, year_tag_start) - 1 # this is the index that the year tag ends
        citation_tag_end = line.find(bracket, citation_tag_start)  # this is the index that the citation tag ends 
        abstract_tag_end = line.find(parenthesis, abstract_tag_start) # the end of the abstract tag

        #extract the tag
        #id_tag = line[id_tag_start:id_tag_end] # id tag string 
        #year_tag = line[year_tag_start:year_tag_end] # year tag string
        #citation_list = line[citation_tag_start:citation_tag_end].split(",") # make it a list, count number of entries
        #num_citations = len(citation_list) # number of citations 
        #abstract_tag = line[abstract_tag_start:abstract_tag_end] # abstract tag string

        # computation 
        id_tag = line[id_tag_start:id_tag_end] # id tag string 
        if year_label_start == -1: # it didn't find the string year
            year_tag = None
        else:
            year_tag = line[year_tag_start:year_tag_end] # year tag string
        if citation_tag_start == citation_tag_end: # if there are no citations:
            num_citations = 0
        else:
            citation_list = line[citation_tag_start:citation_tag_end].split(",") # make it a list, count number of entries
            num_citations = len(citation_list) # number of citations
        abstract_tag = line[abstract_tag_start:abstract_tag_end] # abstract tag string
        # this serializes the data using json.dumps so that it can be put into redis 
        #info = {"year":year_tag, "citations":num_citations, "abstract":abstract_tag}
        #json_representation = json.dumps(info)
        #rdb.set(id_tag, json_representation) # push to redis database 
        rdb.set(id_tag, abstract_tag)
    

In [None]:
import redis 
import json
from pyspark import SparkContext 
'''
This file pulls 1GB .gz file of data from an S3 bucket
'''
rdb = redis.Redis(host="10.0.0.6", port=6379) # set up the redis database connection 
filenames = "s3a://open-research-corpus/sample-S2-records.gz" # path to the example file from S3 file 

# uncomment sc when you are running a spark-submit job on the terminal 
#sc = SparkContext(appName = "Pull Open Research Corpus") # setup the Spark Context 
file = sc.textFile(filenames) # reads and stores the data file 
rdd = file.flatMap(lambda line: line.split("\n")).map(lambda )
#rdd.map(store_to_redis(rdd))


#store_to_redis(rdd)

In [None]:
rdb = redis.Redis(host="10.0.0.6", port=6379) # set up the redis database connection 
rdb.get("06f3d20b2c9191b4f03761a61312a1c71345a003")
#unpacked_dict = json.loads(rdb.get('185e6a412b05973b280bdad525907eb1d12a0a37').decode('utf-8'))

In [None]:
file = sc.textFile("hdfs://<public_dns>:9000/user/test.txt")
counts = file.flatMap(lambda line: line.split(" "))\
           .map(lambda word: (word, 1))\
           .reduceByKey(lambda a, b: a + b)
res = counts.collect()
for val in res:
    print(val)