In [23]:
from datetime import datetime
import urllib.request
import time
from subprocess import call
import sys
import os

retry_time = 45


# save_path = ""


In [24]:
def get_raw_data(data_path, hdfs_path):
    print(data_path)
    start = time.clock()
    query = "http://export.arxiv.org/oai2?verb=ListRecords&metadataPrefix=oai_dc"
    print("request: %s" % (query))
    request = urllib.request.Request(query)
    response = urllib.request.urlopen(request).read().decode('utf-8')
    rawfile = open(data_path+'papers_0.xml', 'w')
    rawfile.write(response)
    rawfile.close()

    save_to_hdfs(data_path+'papers_0.xml', hdfs_path)

    end = time.clock()
    print("takes: %f s" % (end - start))

    pos_start = response.rfind('<resumptionToken')
    pos_end = response.rfind('</resumptionToken')
    if pos_end > 0 and pos_end > pos_start:
        pos = response.rfind('>', pos_start, pos_end)
        resume_token = response[pos + 1:pos_end]
        print("request_resume: %s" % (resume_token))
        get_resume(resume_token, data_path, hdfs_path)



In [25]:
def get_resume(token, data_path, hdfs_path):
    repeat = 0
    time.sleep(retry_time)
    start = time.clock()
    filen = data_path + 'papers_%s.xml' % (token.replace('|', '_'))
    rawfile = open(filen, 'w')
    try:
        query = "http://export.arxiv.org/oai2?verb=ListRecords&resumptionToken=%s" % (token)
        request = urllib.request.Request(query)
        response = urllib.request.urlopen(request).read().decode('utf-8')
        rawfile.write(response)
        rawfile.close()
        save_to_hdfs(filen, hdfs_path)

        end = time.clock()
        print("takes: %f s" % (end - start))

        pos_start = response.rfind('<resumptionToken')
        pos_end = response.rfind('</resumptionToken')
        if pos_end > 0 and pos_end > pos_start and repeat < 20:
            pos = response.rfind('>', pos_start, pos_end)
            resume_token = response[pos + 1:pos_end]
            print("request_resume: %s" % (resume_token))
            get_resume(resume_token, data_path, hdfs_path)
            repeat += 1
    except Exception as err:
        print(err)
        print("retry resume_token: %s" % (token))
        time.sleep(30)
        get_resume(token)


In [26]:
def save_to_hdfs(filename, hdfs_path):
    print(">>>>>>>", hdfs_path)
    file = os.path.join(hdfs_path, filename)
    try:
        out = call("hdfs dfs -test -e %s" % (file), shell=True)
        if out == 0:
            call("hdfs dfs -rm %s" % (file), shell=True)
            print("file %s exists, delete old one" % (file))
        call(['hdfs', 'dfs', '-put', filename, hdfs_path])
        print("send %s to hdfs" % (filename))
    except Exception as e:
        print("save %s to hdfs failed" % (filename))

In [27]:
data_path = '/home/ananth/airflow/arxiv/'
hdfs_path = '/user/ananth/arxiv/'

In [None]:
get_raw_data(data_path, hdfs_path)

/home/ananth/airflow/arxiv/
request: http://export.arxiv.org/oai2?verb=ListRecords&metadataPrefix=oai_dc
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_0.xml to hdfs
takes: 0.058562 s
request_resume: 3403458|1001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_1001.xml to hdfs
takes: 0.060318 s
request_resume: 3403458|2001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_2001.xml to hdfs
takes: 0.059626 s
request_resume: 3403458|3001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_3001.xml to hdfs
takes: 0.058961 s
request_resume: 3403458|4001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_4001.xml to hdfs
takes: 0.060755 s
request_resume: 3403458|5001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_5001.xml to hdfs
takes: 0.059453 s
request_resume: 3403458|6001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_34034

>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_58001.xml to hdfs
takes: 0.058809 s
request_resume: 3403458|59001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_59001.xml to hdfs
takes: 0.065387 s
request_resume: 3403458|60001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_60001.xml to hdfs
takes: 0.062098 s
request_resume: 3403458|61001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_61001.xml to hdfs
takes: 0.066919 s
request_resume: 3403458|62001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_62001.xml to hdfs
takes: 0.066069 s
request_resume: 3403458|63001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_63001.xml to hdfs
takes: 0.061721 s
request_resume: 3403458|64001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_64001.xml to hdfs
takes: 0.065542 s
request_resume: 3403458|65001
>>>>>>> /user

>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_116001.xml to hdfs
takes: 0.070691 s
request_resume: 3403458|117001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_117001.xml to hdfs
takes: 0.069223 s
request_resume: 3403458|118001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_118001.xml to hdfs
takes: 0.069346 s
request_resume: 3403458|119001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_119001.xml to hdfs
takes: 0.070861 s
request_resume: 3403458|120001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_120001.xml to hdfs
takes: 0.070404 s
request_resume: 3403458|121001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_121001.xml to hdfs
takes: 0.064818 s
request_resume: 3403458|122001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_122001.xml to hdfs
takes: 0.080008 s
request_resume: 3403458|123001

>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_174001.xml to hdfs
takes: 0.075252 s
request_resume: 3403458|175001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_175001.xml to hdfs
takes: 0.073615 s
request_resume: 3403458|176001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_176001.xml to hdfs
takes: 0.078704 s
request_resume: 3403458|177001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_177001.xml to hdfs
takes: 0.075333 s
request_resume: 3403458|178001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_178001.xml to hdfs
takes: 0.077856 s
request_resume: 3403458|179001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_179001.xml to hdfs
takes: 0.083220 s
request_resume: 3403458|180001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_180001.xml to hdfs
takes: 0.079801 s
request_resume: 3403458|181001

>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_232001.xml to hdfs
takes: 0.075768 s
request_resume: 3403458|233001
>>>>>>> /user/ananth/arxiv/
send /home/ananth/airflow/arxiv/papers_3403458_233001.xml to hdfs
takes: 0.075802 s
request_resume: 3403458|234001


In [1]:
import findspark
findspark.init('/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/')

import xml.etree.ElementTree as ET

import os

from pyspark.sql import SparkSession

xmlpath = '/user/ananth/arxiv'
parpath = '/user/ananth/arxiv/parquet/'
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext



In [10]:
def process(file):
    """
    Process an Arvix XML file and generates a dictionary with its contents
    :param file: XML file
    :return: list of dictionaries with contents
    """
    prefix = "{http://www.openarchives.org/OAI/2.0/}"
    prefix2 = "{http://purl.org/dc/elements/1.1/}"
    
    content = file[1].decode("utf-8")
    root = ET.fromstring(content)
    records = root.find(prefix + "ListRecords")
    data = []
    try:
        for record in records.findall(prefix + "record"):
            identifier = record[0].text
            datastamp = record[1].text
            subjects = []
            for subject in record[0].findall(prefix + "setSpec"):
                subjects.append(subject.text)
            if len(subjects) == 0:
                subjects.append("none")

            dc = record[1][0]
            title = dc[0].text
            authors = []
            for author in dc.findall(prefix2 + "creator"):
                authors.append(author.text)
                if len(authors) == 0:
                    authors.append("none")
            abstract = dc.find(prefix2 + "description").text

            d = {"id": identifier, "datastamp": datastamp, "title": title, "subjects": subjects, "authors": authors,
                 "abstract": abstract}
            data.append(d)
        return data
    except:
        return data

In [11]:
filep = os.path.join(xmlpath, "papers_*.xml")
xmlfiles = sc.binaryFiles(filep)
xmldf = xmlfiles.flatMap(process).toDF()



In [12]:
xmldf.count()

568316

In [13]:
parpath = '/user/ananth/arxiv/parquet/'
xmldf.write.parquet(parpath)

In [3]:
# parpath = '/user/ananth/arxiv/parquet/arxiv.parquet'
# xmldf.write.parquet(parpath)